
說到訊息佇列,首先映入腦海的就是RocketMQ、Kafka等,訊息佇列在各個領域都發揮了很大的作用,但是,在一些場景下,傳統的訊息佇列無法滿足需求,比如以下場景:
訊息重復概率比較高時,需要對重復訊息進行合并處理避免浪費有限的資源,減少消費延遲;
需要根據業務自定義優先級進行訊息處理,高優先級的訊息比低優先級的訊息先處理;
訊息需要定時消費的場景,訊息只有在設定的消費時間到了之后立馬被消費,
本文將介紹一種基于Redis實作的訊息佇列(Redis message queue, RMQ),RMQ可以作為傳統訊息佇列的互補選擇,在傳統訊息佇列沒有涉及的場景中使用RMQ,
功能介紹
RMQ設計為一個二方庫,可以幫助用戶基于Redis快速實作訊息佇列的功能,RMQ訊息佇列具有訊息合并、區分優先級、支持定時訊息等特性,RMQ訊息佇列可以用于異步解耦、削峰填谷,支持億級資料堆積,RMQ訊息佇列目前支持三種型別的訊息,分別是RangeMergeMessage(區間重復合并訊息)、PriorityMessage(優先級訊息)、FixedTimeMessage(任意定時訊息),
? 區間重復合并訊息
RangeMergeMessage支持區間重復訊息合并,發送訊息時需要設定時間區間,訊息延遲該時間區間長度后被消費,在該時間區間內如果發送重復的訊息,重復訊息將會被合并,如果訊息在Redis服務端發生堆積,重復到來的訊息依然會被合并處理,
該型別訊息適用于訊息重復率較高且希望重復訊息合并處理的場景,對重復訊息進行合并可以減少下游消費系統的壓力,減少不必要的資源消耗,將有限的資源最大化的利用,提升消費效率,
? 優先級訊息
PriorityMessage支持給訊息設定任意等級的優先級,優先級高的訊息會被優先消費,相同優先級的訊息被隨機消費,如果訊息在Redis服務端發生堆積,重復的訊息將被合并處理,合并后訊息的優先級等于最后存盤的訊息的優先級,
該型別訊息適用于希望重復訊息合并處理且需要設定優先級的場景,下游消費者資源有限時,合并重復訊息且優先處理優先級高的訊息將可以合理利用有限的資源,
? 任意定時訊息
FixedTimeMessage支持給訊息設定任意消費時間,只有消費時間到了之后訊息才被消費,消費時間可精確到秒,訊息到期后沒有及時被消費時,消費者將按照時間由遠及近進行消費,如果訊息在Redis服務端發生堆積,重復的訊息將被合并處理,合并后訊息的消費時間等于最后存盤的訊息的消費時間,
該型別訊息適用于希望重復訊息合并處理且需要定時消費的場景,定時訊息應用場景非常豐富,比如定時打標去標、活動結束后清理動作、訂單超時關閉等,
? 并發消費控制
使用傳統訊息中間件進行集群消費的時候,為了避免并發處理同一元資料導致不一致問題,通常需要對元資料加分布式鎖,頻繁的鎖沖突會導致消費效率低下,加分布式鎖的最終目的其實就是保障屬于同一元資料的訊息被串行消費,加分布式鎖并不是最好的方案,最好的方案應該是從根上解決并發問題,讓屬于同一元資料的訊息串行消費,
RMQ訊息佇列具有并發消費控制能力,屬于同一元資料的訊息只會被分配給全域唯一一個執行緒進行消費,因此屬于同一元資料的訊息將被串行消費,使用方如果需要該能力,除了需要提供Redis,還需要提供ZooKeeper,
? 重試次數控制
RMQ訊息佇列支持失敗重試消費16次,業務回傳消費失敗后,訊息會被回滾并等待重試消費,重試16次后訊息進入死信佇列,訊息不再被消費,除非人工干預,
技術原理
? 總體框架

RMQ訊息佇列由三部分組成,分別為ZooKeeper、RMQ二方庫、Redis,ZooKeeper負責維護集群worker的資訊,將topic的所有slot分配給全域的woker,Redis負責存盤訊息,采用Sorted Set結構存盤,Store Queue是訊息存放的佇列,Prepare Queue是采用二階段消費方式正在消費的訊息存放佇列,Dead Queue是死信佇列,RMQ二方庫由RmqClient、Consumer、Producer三部分組成,RmqClient負責RMQ的啟動作業,包括上報TopicDef、Worker給ZooKeeper,分配Slot給Worker,掃描業務定義的MessageListener Bean,Producer負責根據不用訊息型別將訊息按照指定的方式存盤到Redis,Consumer負責根據不用訊息型別按照指定方式從Redis彈出訊息并呼叫業務的MessageListener,
? 訊息存盤

Topic的設計
Topic的定義有三部分組成,topic表示主題名稱,slotAmount表示訊息存盤劃分的槽數量,topicType表示訊息的型別,主題名稱是一個Topic的唯一標示,相同主題名稱Topic的slotAmount和topicType一定是一樣的,
訊息存盤采用Redis的Sorted Set結構,為了支持大量訊息的堆積,需要把訊息分散存盤到很多個槽中,slotAmount表示該Topic訊息存盤共使用的槽數量,槽數量一定需要是2的n次冪,在訊息存盤的時候,采用對指定資料或者訊息體哈希求余得到槽位置,
StoreQueue的設計
上圖中topic劃分了8個槽位,編號0-7,如果發送方指定了訊息的slotBasis,則計算slotBasis的CRC32值,CRC32值對槽數量進行取模得到槽序號,SlotKey設計為#{topic}_#{index}(也即Redis的鍵),其中#{}表示占位符,
發送方需要保證相同內容的訊息的slotBasis相同,如果沒有指定slotBasis則采用訊息內容計算SlotKey,這樣內容相同的訊息體就會落在同一個Sorted Set里面,所以內容相同的訊息會進行合并,
Redis的Sorted Set中的資料按照分數排序,實作不同型別的訊息的關鍵就在于如何利用分數、如何添加訊息到Sorted Set、如何從Sorted Set中彈出訊息,優先級訊息將優先級作為分數,消費時每次彈出分數最大的訊息,任意定時訊息將時間戳作為分數,消費時每次彈出分數大于當前時間戳的一個訊息,
區間重復合并訊息將時間戳作為分數,添加訊息時將(當前時間戳+時間區間)作為分數,消費時每次彈出分數大于當前時間戳的一個訊息,
PrepareQueue的設計
為了保障RMQ訊息佇列的可用性,做到每條訊息至少消費一次,消費者不是直接pop有序集合中的元素,而是將元素從StoreQueue移動到PrepareQueue并回傳訊息給消費者,等消費成功后再從PrepareQueue從洗掉,或者消費失敗后從PreapreQueue重新移動到StoreQueue,這便是根據二階段提交的思想實作的二階段消費,
在后面將會詳細介紹二階段消費的實作思路,這里重點介紹下PrepareQueue的存盤設計,StoreQueue中每一個Slot對應PrepareQueue中的Slot,PrepareQueue的SlotKey設計為prepare{#{topic}#{index}},PrepareQueue采用Sorted Set作為存盤,訊息移動到PrepareQueue時刻對應的(秒級時間戳*1000+重試次數)作為分數,字串存盤的是訊息體內容,這里分數的設計與重試次數的設計密切相關,所以在重試次數設計章節詳細介紹,
PrepareQueue的SlotKey設計中需要注意的一點,由于訊息從StoreQueue移動到PrepareQueue是通過Lua腳本操作的,因此需要保證Lua腳本操作的Slot在同一個Redis節點上,如何保證PrepareQueue的SlotKey和對應的StoreQueue的SlotKey被hash到同一個Redis槽中呢,Redis的hash tag功能可以指定SlotKey中只有某一部分參與計算hash,這一部分采用{}包括,因此PrepareQueue的SlotKey中采用{}包括了StoreQueue的SlotKey,
DeadQueue的設計
訊息重試消費16次后,訊息將進入DeadQueue,DeadQueue的SlotKey設計為prepare{#{topic}#{index}},這里同樣采用hash tag功能保證DeadQueue的SlotKey與對應StoreQueue的SlotKey存盤在同一Redis節點,
? 生產者
生產者的任務就是將訊息添加到Redis的Sorted Set中,首先,需要計算出訊息添加到Redis的SlotKey,如果發送方指定了訊息的slotBasis(否則采用content代替),則計算slotBasis的CRC32值,CRC32值對槽數量進行取模得到槽序號,SlotKey設計為#{topic}_#{index},其中#{}表示占位符,然后,不同型別的訊息有不同的添加方式,因此分布講述三種型別訊息的添加程序,
區間重復合并訊息
發送該訊息時需要設定timeRange,timeRange必須大于0,單位為毫秒,表示訊息將延遲timeRange毫秒后被消費,期間到來的重復訊息將被合并,合并后的訊息依然維持原來的消費時間,
因此在存盤該型別訊息的時候,采用(當前時間戳+timeRange)作為分數,添加訊息采用Lua腳本執行,保證操作的原子性,Lua腳本首先采用zscore命令檢查訊息是否已經存在,如果已經存在則直接回傳,如果不存在則執行zadd命令添加,
優先級訊息
發送該訊息時需要設定priority,priority必須大于16,表示訊息的優先級,數值越大表示優先級越高,因此在存盤該型別訊息的時候,采用priority作為分數,采用zadd命令直接添加,
任意定時訊息
發送該型別訊息時需要設定fixedTime,fixedTime必須大于當前時間,表示消費時間戳,當前時間大于該消費時間戳的時候,訊息才會被消費,因此在存盤該型別訊息的時候,采用fixedTime作為分數,采用命令zadd直接添加,
? 消費者
二階段消費方式
三種消費模式
一般訊息佇列存在三種消費模式,分別是:最多消費一次、至少消費一次、只消費一次,最多消費一次模式訊息可能丟失,一般不怎么使用,至少消費一次模式訊息不會丟失,但是可能存在重復消費,比較常用,只消費一次模式訊息被精確只消費一次,實作較困難,一般需要業務記錄冪等ID來實作,RMQ實作了至少消費一次的模式,那么如何保證訊息至少被消費一次呢?
至少消費一次模式實作的難點
從最簡單的消費模式——最多消費一次說起,消費者端只需要從訊息佇列服務中取出訊息就行,即執行Redis的zpopmax命令,不倫消費者是否接收到該訊息并成功消費,訊息佇列服務都認為訊息消費成功,最多一次消費模式導致訊息丟失的因素可能有:網路丟包導致消費者沒有接收到訊息,消費者接收到訊息但在消費的時候宕機了,消費者接收到訊息但消費失敗,針對消費失敗導致訊息丟失的情況比較好解決,只需要把消費失敗的訊息重新放入訊息佇列服務就行,但是網路丟包和消費系統例外導致的訊息丟失問題不好解決,
可能有人會想到,我們不把元素從有序集合中pop出來,我們先查詢優先級最高的元素,然后消費,再洗掉消費成功的元素,但是這樣訊息服務佇列就變成了同步阻塞佇列,性能會很差,
至少消費一次模式的實作
至少消費一次的問題比較類似銀行轉賬問題,A向B賬戶轉賬100元,如何保障A賬戶扣減100同時B賬戶增加100,因此我們可以想到二階段提交的思想,第一個準備階段,A、B分別進行資源凍結并持久化undo和redo日志,A、B分別告訴協調者已經準備好;第二個提交階段,協調者告訴A、B進行提交,A、B分別提交事務,RMQ基于二階段提交的思想來實作至少消費一次的模式,
RMQ存盤設計中PrepareQueue的作用就是用來凍結資源并記錄事務日志,消費者端即是參與者也是協調者,第一個準備階段,消費者端通過執行Lua腳本從StoreQueue中Pop訊息并存盤到PrepareQueue,同時訊息傳輸到消費者端,消費者端消費該訊息;第二個提交階段,消費者端根據消費結果是否成功協調訊息佇列服務是提交還是回滾,如果消費成功則提交事務,該訊息從PrepareQueue中洗掉,如果消費失敗則回滾事務,消費者端將該訊息從PrepareQueue移動到StoreQueue,如果因為各種例外導致PrepareQueue中訊息滯留超時,超時后將自動執行回滾操作,二階段消費的流程圖如下所示,

實作方案的例外情況分析
我們來分析下采用二階段消費方案可能存在的例外情況,從以下分析來看二階段消費方案可以保障訊息至少被消費一次,
網路丟包導致消費者沒有接收到訊息,這時訊息已經記錄到PrepareQueue,如果到了超時時間,訊息被回滾放回StoreQueue,等待下次被消費,訊息不丟失,
消費者接收到了訊息,但是消費者還沒來得及消費完成系統就宕機了,訊息消費超時到了后,訊息會被重新放入StoreQueue,等待下次被消費,訊息不丟失,
消費者接收到了訊息并消費成功,消費者端在協調事務提交的時候宕機了,訊息消費超時到了后,訊息會被重新放入StoreQueue,等待下次被消費,訊息被重復消費,
消費者接收到了訊息但消費失敗,消費者端在協調事務提交的時候宕機了,訊息消費超時到了后,訊息會被重新放入StoreQueue,等待下次被消費,訊息不丟失,
消費者接收到了訊息并消費成功,但是由于fullgc等原因使消費時間太長,PrepareQueue中的訊息由于超時已經回滾到StoreQueue,等待下次被消費,訊息被重復消費,
重試次數控制的實作
采用二階段消費方式,需要將訊息在StoreQueue和PrepareQueue之間移動,如何實作重試次數控制呢,其關鍵在StoreQueue和PrepareQueue的分數設計,
PrepareQueue的分數需要與時間相關,正常情況下,消費者不管消費失敗還是消費成功,都會從PrepareQueue洗掉訊息,當消費者系統發生例外或者宕機的時候,訊息就無法從PrepareQueue中洗掉,我們也不知道消費者是否消費成功,為保障訊息至少被消費一次,我們需要做到超時回滾,因此分數需要與消費時間相關,當PrepareQueue中的訊息發生超時的時候,將訊息從PrepareQueue移動到StoreQueue,
因此PrepareQueue的分數設計為:秒級時間戳*1000+重試次數,不同型別的訊息首次存盤到StoreQueue中的分數表示的含義不盡相同,區間重復合并訊息和任意定時訊息存盤時的分數表示消費時間戳,優先級訊息存盤時的分數表示優先級,如果訊息消費失敗,訊息從PrepareQueue回滾到StoreQueue,所有型別的訊息存盤時的分數都表示剩余重試次數,剩余重試次數從16次不斷降低最后為0,訊息進入死信佇列,訊息在StoreQueue和PrepareQueue之間移動流程如下:

Pop訊息
不同型別的訊息在消費的時候Pop訊息的方式不一樣,因此接下來分別講述三種型別訊息的Pop方式,
區間重復合并訊息
該訊息存盤的分數設計為消費時間戳,當前時間大于訊息的消費時間戳時,該訊息應該被消費,因此采用Redis命令ZRANGEBYSCORE彈出分數小于當前時間戳的一條訊息,
優先級訊息
該訊息存盤的分數設計為優先級,優先級越高分數越大,因此采用Redis命令ZPOPMAX彈出分數最大的一條訊息,
任意定時訊息
該訊息存盤的分數設計為消費時間戳,當前時間大于訊息的消費時間戳時,該訊息應該被消費,因此采用Redis命令ZRANGEBYSCORE彈出分數小于當前時間戳的一條訊息,
相關應用
? 主圖價格表達專案

在主圖價格表達中需要實作一個功能,商品價格發生變化時將商品價格列印在商品主圖上面,那么需要在價格發生變動的時候觸發合成一張帶價格的圖片,每一次觸發合圖時計算價格都是獲取當前最新的價格,上游價格變化的因素很多,變化很頻繁,下游合圖消耗GPU資源較大,處理容量較低,因此需要盡可能合并觸發合圖訊息,減輕下游處理壓力,于是使用了RMQ作為訊息佇列來進行削峰填谷、訊息合并,不僅如此,還可以根據商家等級劃分觸發合圖訊息的等級,使KA商家能夠優先得到處理,縮短價格變化的延遲,
在線上實際環境中,集群共130臺機器,RMQ訊息佇列的發送訊息能力和消費訊息能力均可以達到5w tps,而且這并不是峰值,理論上可以達到10w tps,
? 在線資料圈選引擎
在線資料圈選引擎需要處理各種來源的大量動態資料,需要將一段時間區間內的訊息合并處理,減少處理壓力,并且在對同一元資料進行并發處理需要加分布式鎖,鎖沖突導致消費效率下降,RMQ的區間重復合并訊息和并發消費控制能力可以幫助解決這些問題,目前,在線資料圈選引擎已經采用了RMQ訊息佇列作為核心組件,RMQ訊息佇列發揮了很大的作用,
總結
本文提出了一種可實作的基于Redis的訊息佇列,充分利用Sorted Set結構設計了訊息合并、優先級、定時等特性,與傳統訊息佇列形成互補,彌補傳統訊息佇列這方面特性的缺失,為了實作高可用,本文在二階段提交的思想上進行改進設計了二階段消費方式,保障訊息至少被消費一次,
未來將基于Redis的特性打造更多獨特的功能,與傳統訊息中間件形成互補,在消費控制方面會增加流量自動調控能力,根據訊息型別調控消費速度,減少因為某種型別訊息消費瓶頸導致整體消費性能下降,
? 拓展閱讀




作者|默達
編輯|橙子君
出品|阿里巴巴新零售淘系技術


轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/282153.html
標籤:其他
下一篇:RISC-V 能打 50 年!不必期待 RISC-VI —— 對話 RISC-V CTO Mark Himelstein
