我是3y,一年CRUD經驗用十年的markdown程式員???????常年被譽為職業八股文選手
前陣子,有個小伙伴找到問我,如果要實作延時發送,那是基于什么來做的,

我看到這個問題之后,稍微思考了下,覺得確實也是austin平臺所需要實作的功能,對于前端而言,只要讓業務方在創建模板的時候填選屏蔽型別,后端根據這個欄位增添一點點細節,這個需求就做完了,簡單!

延遲訊息如何實作?
延遲訊息就是字面上的意思:當接收到訊息之后,我需要隔一段時間進行處理(相對于立馬處理,它隔了一段時間,所以他叫延遲訊息),
在原生的Java有DelayQueue供我們去使用,在使用的時候,我們add進去的佇列的元素需要實作Delayed介面(同時該介面繼承了Comparable介面,所以我們DelayQueue是有序的)
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
從poll的原始碼上可以清晰地發現本質上就是在取數的時候判斷了下時間
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
有的人就反駁到:這不是廢話嗎?肯定要判斷時間啊,不判斷時間怎么知道我要延遲的訊息什么時候執行,
明白了這點之后,我們再來別的方案,因為在生產環境中是不太可能使用JDK原生延遲佇列的,它是沒有持久化的,重啟就會導致資料丟失,

當austin專案使用記憶體佇列去解耦處理資料已經有人提出服務器重啟的時候該怎么辦,我的解決思路就是通過優雅關閉服務器這種手段去盡量避免資料丟失,而延遲佇列這種就不能這么干了,我們等不了這么久的,
稍微想想還有什么存盤適合當佇列且有持久化機制的呢?
答案顯而易見:Redis和訊息佇列(Kafka/RocketMQ/RabbmitMQ 等)
我們先來看Redis里提供了一種資料結構叫做zset,它是可排序的集合并且Redis原生就支持持久化,有贊的延遲佇列就是基于通過zset進行設計和存盤的,整體架構如下圖:

簡單理解這張圖就是:將需要延遲的訊息放置Redis,通過Timer輪詢得到可執行的訊息,將可執行的訊息放置不同的Topic供業務方自行消費,
更多的設計思路可以參考有贊的技術原文,這里我不再贅述:https://tech.youzan.com/queuing_delay/
通過timer去輪詢zset查看是否有可執行的訊息是一種思路,也有人通過Redis的過期回呼的姿勢也能達到延遲訊息的效果(把訊息執行的時間定義為key過期的時間,當key觸發了過期回呼,那說明該訊息可執行了),

說完Redis,我們再來看看訊息佇列,在austin專案上使用訊息佇列是Kafka,而Kafka在官方是沒有提供延遲佇列這種機制的,不過RabbmitMQ和RocketMQ都有對應的機制,我們可以簡單看看窺探下它們的實作思路,
RabbmitMQ它的延遲佇列機制本質上也是通過TTL(Time To Live 訊息存活的時間)所實作的,當佇列里的元素觸發了過期時,會被送往到Dead Letter Exchanges(死信佇列中),我們可以將死信佇列的元素再次轉發,對其進行消費,從而達到延遲佇列的效果,
畢竟RabbmitMQ是專門做訊息佇列的,所以它對訊息的可靠性會比Redis更加高(訊息投遞的可靠性、至少處理一次的消費語意)

RocketMQ支持在我們投遞訊息的時候設定延遲等級
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
默認支持18個延遲等級,分別是:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
當我們設定了延遲等級的訊息之后,RocketMQ不會把訊息直接投遞到對應的topic,而是轉發到對應延遲等級的佇列中,在Broker內部會為每個延遲佇列起TimerTask來進行判斷是否有訊息到達了時間,
ScheduleMessageService#start
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
如果到期了,則將訊息重新存盤到CommitLog,轉發到真正目標的topic

對RocketMQ延遲佇列比較感興趣的,推薦看這篇文章:https://cloud.tencent.com/developer/article/1581368
實作需求
在前面提到我們可以利用JDK原生的延時佇列,又或是Redis的zset資料結構或者其過期時間機制、又或是RabbitMQ使用TTL+死信佇列機制、又或是RocketMQ的延時等級佇列機制來實作我們的需求(延時佇列)
針對此次需求,上面所講的延時佇列,我都沒用到...
austin專案引入的是Kafka,不太可能去為了延時佇列去引入第二種訊息佇列(RabbitMQ在互聯網應該用得相對較少,RocketMQ需要改動組態檔的延遲等級才能支持更豐富的延時需求),
如果基于Kafka或者Redis去二次開發延時佇列,開發成本還是有不少的,在GitHub也還沒撈到我想要的輪子,
于是,我換了一種方案:萬物皆掃表
針對這次需求(晚上發的訊息,次日早上發送),就不需要上延時佇列,因為austin已經接入了分布式定時任務框架了(對應的實作是xxl-job)
只要把晚上的接收到的訊息扔進Redis list,然后啟個定時任務(每天早上9點)輪詢該list是否有資料,如果有再重新做處理就完事了,


總結
這篇文章主要講述了如果我們要使用延時佇列,我們可以有什么方案,他們的設計是怎么樣的,在需求側上看,這個需求就是「延時佇列」的場景,但基于現狀的系統架構和開發成本考慮,我們是可以用另類(分布式定時任務框架)的方式去把需求給實作了,
很多時候,我們看到的系統很爛,技術堆疊很爛,發現好多場景都沒有用到最佳實踐而感到懊惱,在年輕的時候都想有重構的心,但實際上每引入一個中間件都是需要付出成本的,粗糙也有粗糙的好處,
只要業務能完美支持,那就是好的方案,想要搞自己想搞的技術,那就做開源,如果有一天我覺得分布式定時任務來實作此次需求不順眼了,我再花時間來重構才干掉,現在就這么實作吧( // TODO),
如果你實在是覺得看著糟心,歡迎提個pull request,這樣我就不得不把這種實作給干掉了(我對提過來的pull request都會謹慎且用心處理)
都看到這里了,點個贊一點都不過分吧?我是3y,下期見,

關注我的微信公眾號【Java3y】除了技術我還會聊點日常,有些話只能悄悄說~ 【對線面試官+從零撰寫Java專案】 持續高強度更新中!求star!!原創不易!!求三連!!
austin專案原始碼Gitee鏈接:gitee.com/austin
austin專案原始碼GitHub鏈接:github.com/austin
更多的文章可往:文章的目錄導航轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/486439.html
標籤:Java
