作者 王協 騰訊互動娛樂事業群,移動游戲運營開發
延遲佇列是我們日常開發程序中,經常接觸并需要使用到的一種技術方案,前些時間在開發業務需求時,我也遇到了一個需要使用到延遲訊息佇列的需求場景,因此我也在網上調研了一系列不同的延遲佇列的實作方案,在此進行了一個總結并且給大家進行分享,
本文大綱:

Part1 延遲佇列定義
首先,佇列這種資料結構相信大家都不陌生,它是一種先進先出的資料結構,普通佇列中的元素是有序的,先進入佇列中的元素會被優先取出進行消費,
延時佇列相比于普通佇列最大的區別就體現在其延時的屬性上,普通佇列的元素是先進先出,按入隊順序進行處理,而延時佇列中的元素在入隊時會指定一個延遲時間,表示其希望能夠在經過該指定時間后處理,從某種意義上來講,延遲佇列的結構并不像一個佇列,而更像是一種以時間為權重的有序堆結構,
Part2 應用場景
我在開發業務需求時遇到的使用場景是這樣的,用戶可以在小程式中訂閱不同的微信或者QQ的模板訊息,產品同學可以在小程式的管理端新建訊息推送計劃,當到達指定的時間節點的時候給所有訂閱模板訊息的用戶進行訊息推送,
如果僅僅是服務單一的小程式,那也許起個定時任務,或者甚至人工的定時去執行能夠最便捷最快速的去完成這項需求,但我們希望能夠抽象出一個訊息訂閱的模塊服務出來給所有業務使用,這時候就需要一種通用的系統的解決方案,這時候便需要使用到延遲佇列了,
除了上述我所遇到的這樣的典型的需求以外,延遲佇列的應用場景其實也非常的廣泛,比如說以下的場景:
-
新建的訂單,如果用戶在15分鐘內未支付,則自動取消,
-
公司的會議預定系統,在會議預定成功后,會在會議開始前半小時通知所有預定該會議的用戶,
-
安全工單超過24小時未處理,則自動拉企業微信群提醒相關責任人,
-
用戶下單外賣以后,距離超時時間還有10分鐘時提醒外賣小哥即將超時,
對于資料量比較少并且時效性要求不那么高的場景,一種比較簡單的方式是輪詢資料庫,比如每秒輪詢一下資料庫中所有資料,處理所有到期的資料,比如如果我是公司內部的會議預定系統的開發者,我可能就會采用這種方案,因為整個系統的資料量必然不會很大并且會議開始前提前30分鐘提醒與提前29分鐘提醒的差別并不大,
但是如果需要處理的資料量比較大實時性要求比較高,比如淘寶每天的所有新建訂單15分鐘內未支付的自動超時,數量級高達百萬甚至千萬,這時候如果你還敢輪詢資料庫怕是要被你老板打死,不被老板打死估計也要被運維同學打死,
這種場景下,就需要使用到我們今天的主角 —— 延遲佇列了,延遲佇列為我們提供了一種高效的處理大量需要延遲消費訊息的解決方案,那么話不多說,下面我們就來看一下幾種常見的延遲佇列的解決方案以及它們各自的優缺點,
Part3 實作方案
Redis ZSet
我們知道Redis有一個有序集合的資料結構ZSet,ZSet中每個元素都有一個對應Score,ZSet中所有元素是按照其Score進行排序的,
那么我們可以通過以下這幾個操作使用Redis的ZSet來實作一個延遲佇列:
-
入隊操作:ZADD KEY timestamp task, 我們將需要處理的任務,按其需要延遲處理時間作為Score加入到ZSet中,Redis的ZAdd的時間復雜度是O(logN),N是ZSet中元素個數,因此我們能相對比較高效的進行入隊操作,
-
起一個行程定時(比如每隔一秒)通過ZREANGEBYSCORE方法查詢ZSet中Score最小的元素,具體操作為:ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES,查詢結果有兩種情況:
a. 查詢出的分數小于等于當前時間戳,說明到這個任務需要執行的時間了,則去異步處理該任務;
b. 查詢出的分數大于當前時間戳,由于剛剛的查詢操作取出來的是分數最小的元素,所以說明ZSet中所有的任務都還沒有到需要執行的時間,則休眠一秒后繼續查詢;
同樣的,ZRANGEBYSCORE操作的時間復雜度為O(logN + M),其中N為ZSet中元素個數,M為查詢的元素個數,因此我們定時查詢操作也是比較高效的,
這里從網上搬運了一套Redis實作延遲佇列的后端架構,其在原來Redis的ZSet實作上進行了一系列的優化,使得整個系統更穩定、更健壯,能夠應對高并發場景,并且具有更好的可擴展性,是一個挺不錯的架構設計,其整體架構圖如下:

其核心設計思路:
- 將延遲的訊息任務通過hash演算法路由至不同的Redis Key上,這樣做有兩大好處:
a. 避免了當一個KEY在存盤了較多的延時訊息后,入隊操作以及查詢操作速度變慢的問題(兩個操作的時間復雜度均為O(logN)),
b. 系統具有了更好的橫向可擴展性,當資料量激增時,我們可以通過增加Redis Key的數量來快速的擴展整個系統,來抗住資料量的增長,
-
每個Redis Key都對應建立一個處理行程,稱為Event行程,通過上述步驟2中所述的ZRANGEBYSCORE方法輪詢Key,查詢是否有待處理的延遲訊息,
-
所有的Event行程只負責分發訊息,具體的業務邏輯通過一個額外的訊息佇列異步處理,這么做的好處也是顯而易見的:
a. 一方面,Event行程只負責分發訊息,那么其處理訊息的速度就會非常快,就不太會出現因為業務邏輯復雜而導致訊息堆積的情況,
b. 另一方面,采用一個額外的訊息佇列后,訊息處理的可擴展性也會更好,我們可以通過增加消費者行程數量來擴展整個系統的訊息處理能力,
- Event行程采用Zookeeper選主單行程部署的方式,避免Event行程宕機后,Redis Key中訊息堆積的情況,一旦Zookeeper的leader主機宕機,Zookeeper會自動選擇新的leader主機來處理Redis Key中的訊息,
從上述的討論中我們可以看到,通過Redis Zset實作延遲佇列是一種理解起來較為直觀,可以快速落地的方案,并且我們可以依賴Redis自身的持久化來實作持久化,使用Redis集群來支持高并發和高可用,是一種不錯的延遲佇列的實作方案,
RabbitMQ
RabbitMQ本身并不直接提供對延遲佇列的支持,我們依靠RabbitMQ的TTL以及死信佇列功能,來實作延遲佇列的效果,那就讓我們首先來了解一下,RabbitMQ的死信佇列以及TTL功能,
死信佇列
死信佇列實際上是一種RabbitMQ的訊息處理機制,當RabbmitMQ在生產和消費訊息的時候,訊息遇到如下的情況,就會變成“死信”:
-
訊息被拒絕basic.reject/ basic.nack 并且不再重新投遞 requeue=false
-
訊息超時未消費,也就是TTL過期了
-
訊息佇列到達最大長度
訊息一旦變成一條死信,便會被重新投遞到死信交換機(Dead-Letter-Exchange),然后死信交換機根據系結規則轉發到對應的死信佇列上,監聽該佇列就可以讓訊息被重新消費,
訊息生存時間TTL
TTL(Time-To-Live)是RabbitMQ的一種高級特性,表示了一條訊息的最大生存時間,單位為毫秒,如果一條訊息在TTL設定的時間內沒有被消費,那么它就會變成一條死信,進入我們上面所說的死信佇列,
有兩種不同的方式可以設定訊息的TTL屬性,一種方式是直接在創建佇列的時候設定整個佇列的TTL過期時間,所有進入佇列的訊息,都被設定成了統一的過期時間,一旦訊息過期,馬上就會被丟棄,進入死信佇列,參考代碼如下:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
在延遲佇列的延遲時間為固定值的時候,比較適合使用這種方式,
另一種方式是針對單條訊息設定,參考代碼如下,該訊息被設定了6秒的過期時間:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg content".getBytes());
如果需要不同的訊息設定不同的延遲時間,上面針對佇列的TTL設定便無法滿足我們的需求,需要使用這種針對單個訊息的TTL設定,
不過需要注意的是,使用這種方式設定的TTL,訊息可能不會按時死亡,因為RabbitMQ只會檢查第一個訊息是否過期,比如這種情況,第一個訊息設定了20s的TTL,第二個訊息設定了10s的TTL,那么RabbitMQ會等到第一個訊息過期之后,才會讓第二個訊息過期,
解決這個問題的方法也很簡單,只需要安裝RabbitMQ的一個插件即可:https://www.rabbitmq.com/community-plugins.html ,安裝好這個插件后,所有的訊息就都能按照被設定的TTL過期了,
RabbitMQ實作延遲佇列
好了,介紹完RabbitMQ的死信佇列以及TTL這兩種特性之后,我們離實作延遲佇列就只差一步之遙了,
聰明的讀者可能已經發現了,TTL不就是延遲佇列中訊息要延遲的時間么?如果我們把需要延遲的訊息,將TTL設定為其延遲時間,投遞到RabbitMQ的普通佇列中,一直不去消費它,那么經過TTL的時間后,訊息就會自動被投遞到死信佇列,這時候我們使用消費者行程實時地去消費死信佇列中的訊息,不就實作了延遲佇列的效果,
從下圖可以直觀的看出使用RabbitMQ實作延遲佇列的整體流程:

使用RabbitMQ來實作延遲佇列,我們可以很好的利用一些RabbitMQ的特性,比如訊息可靠發送、訊息可靠投遞、死信佇列來保障訊息至少被消費一次以及未被正確處理的訊息不會被丟棄,另外,通過RabbitMQ集群的特性,可以很好的解決單點故障問題,不會因為單個節點掛掉導致延遲佇列不可用或者訊息丟失,
TimeWheel
TimeWheel時間輪演算法,是一種實作延遲佇列的巧妙且高效的演算法,被應用在Netty,Zookeeper,Kafka等各種框架中,
時間輪

如上圖所示,時間輪是一個存盤延遲訊息的環形佇列,其底層采用陣列實作,可以高效回圈遍歷,這個環形佇列中的每個元素對應一個延遲任務串列,這個串列是一個雙向環形鏈表,鏈表中每一項都代表一個需要執行的延遲任務,
時間輪會有表盤指標,表示時間輪當前所指時間,隨著時間推移,該指標會不斷前進,并處理對應位置上的延遲任務串列,
添加延遲任務
由于時間輪的大小固定,并且時間輪中每個元素都是一個雙向環形鏈表,我們可以在O(1) 的時間復雜度下向時間輪中添加延遲任務,
如下圖,例如我們有一個這樣的時間輪,在表盤指標指向當前時間為2時,我們需要新添加一個延遲3秒的任務,我們可以快速計算出延遲任務在時間輪中所對應的位置為5,并添加到位置5上任務串列尾部,

多層時間輪
到現在為止一切都非常棒,但是細心的同學可能發現了,上面的時間輪的大小是固定的,只有12秒,如果此時我們有一個需要延遲200秒的任務,我們應該怎么處理呢?直接擴充整個時間輪的大小嗎?這顯然不可取,因為這樣做的話我們就需要維護一個非常非常大的時間輪,記憶體是不可接受的,而且底層陣列大了之后尋址效率也會降低,影響性能,
為此,Kafka引入了多層時間輪的概念,其實多層時間輪的概念和我們的機械表上時針、分針、秒針的概念非常類似,當僅使用秒針無法表示當前時間時,就使用分針結合秒針一起表示,同樣的,當任務的到期時間超過了當前時間輪所表示的時間范圍時,就會嘗試添加到上層時間輪中,如下圖所示:

第一層時間輪整個時間輪所表示時間范圍是0-12秒,第二層時間輪每格能表示的時間范圍是整個第一層時間輪所表示的范圍也就是12秒,所以整個第二層時間輪能表示的時間范圍即12*12=144秒,依次類推第三層時間輪能表示的范圍是1728秒,第四層為20736秒等等,
比如現在我們需要添加一個延時為200秒的延遲訊息,我們發現其已經超過了第一層時間輪能表示的時間范圍,我們就需要繼續往上層時間輪看,將其添加在第二層時間輪 200/12 = 17的位置,然后我們發現17也超過了第二次時間輪的表示范圍,那么我們就需要繼續往上層看,將其添加在第三層時間輪的 17/12 = 2 的位置,
Kafka中時間輪演算法添加延遲任務以及推動時間輪滾動的核心流程如下,其中Bucket即時間輪中的延遲任務佇列,并且Kafka引入的DelayQueue解決了多數Bucket為空導致的時間輪滾動效率低下的問題:

使用時間輪實作的延遲佇列,能夠支持大量任務的高效觸發,并且在Kafka的時間輪演算法的實作方案中,還引入了DelayQueue,使用DelayQueue來推送時間輪滾動,而延遲任務的添加與洗掉操作都放在時間輪中,這樣的設計大幅提升了整個延遲佇列的執行效率,
Part4 總結
延遲佇列在我們日常開發中應用非常廣泛,本文介紹了三種不同的實作延遲佇列的方案,三種方案各自有各自的特點,例如Redis的實作方案理解起來最為簡單,能夠快速落地,但Redis畢竟是基于記憶體的,雖然有資料持久化方案,但還是有資料丟失的可能性,而RabbitMQ的實作方案,由于RabbitMQ本身的訊息可靠發送、訊息可靠投遞、死信佇列等特性,可以保障訊息至少被消費一次以及未被正確處理的訊息不會被丟棄,讓訊息的可靠性有了保障,最后Kafka的時間輪演算法,個人覺得是三種實作方案中最難理解但也不失為一種非常巧妙實作方案,最后,希望以上這些內容,能幫助大家在實作自己的延遲佇列時提供一點思路,
本文由博客一文多發平臺 OpenWrite 發布!
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/187664.html
標籤:其他
