摘要:RocketMQ 支持兩種訊息模式:集群消費( Clustering )和廣播消費( Broadcasting ),
本文分享自華為云社區《一文講透RocketMQ消費者是如何負載均衡的》,作者:勇哥java實戰分享,
RocketMQ 支持兩種訊息模式:集群消費( Clustering )和廣播消費( Broadcasting ),
集群消費:同一 Topic 下的一條訊息只會被同一消費組中的一個消費者消費,也就是說,訊息被負載均衡到了同一個消費組的多個消費者實體上,
廣播消費:當使用廣播消費模式時,每條訊息推送給集群內所有的消費者,保證訊息至少被每個消費者消費一次,
我們重點講解下集群消費的消費流程 ,因為集群消費是使用最普遍的消費模式,理解了集群消費,廣播消費也就能順理成章的掌握了,
集群消費示例代碼里,啟動消費者,我們需要配置三個核心屬性:消費組名、訂閱主題、訊息監聽器,最后呼叫 start 方法啟動,
消費者啟動后,我們可以將整個流程簡化成:
消費端的負載均衡是指將 Broker 端中多個佇列按照某種演算法分配給同一個消費組中的不同消費者,
負載均衡是每個客戶端獨立進行計算,那么何時觸發呢 ?
- 消費端啟動時,立即進行負載均衡;
- 消費端定時任務每隔 20 秒觸發負載均衡;
- 消費者上下線,Broker 端通知消費者觸發負載均衡,
負載均衡流程如下:
1、發送心跳
消費者啟動后,它就會通過定時任務不斷地向 RocketMQ 集群中的所有 Broker 實體發送心跳包(訊息消費分組名稱、訂閱關系集合、訊息通信模式和客戶端實體編號等資訊),
Broker 端在收到消費者的心跳訊息后,會將它維護在 ConsumerManager 的本地快取變數 consumerTable,同時并將封裝后的客戶端網路通道資訊保存在本地快取變數 channelInfoTable 中,為之后做消費端的負載均衡提供可以依據的元資料資訊,
2、啟動負載均衡服務
下圖展示了按照主題負載均衡的代碼片段:
負載均衡服務會根據消費模式為”廣播模式”還是“集群模式”做不同的邏輯處理,這里主要來看下集群模式下的主要處理流程:
(1) 獲取該主題下的訊息消費佇列集合;
(2) 查詢 Broker 端獲取該消費組下消費者 Id 串列;
(3) 先對 Topic 下的訊息消費佇列、消費者 Id 排序,然后用訊息佇列分配策略演算法(默認為:訊息佇列的平均分配演算法),計算出待拉取的訊息佇列;
這里的平均分配演算法,類似于分頁的演算法,將所有 MessageQueue 排好序類似于記錄,將所有消費端排好序類似頁數,并求出每一頁需要包含的平均 size 和每個頁面記錄的范圍 range ,最后遍歷整個 range 而計算出當前消費端應該分配到的記錄,
(4) 分配到的訊息佇列集合與 processQueueTable 做一個過濾比對操作
消費者實體內 ,processQueueTable 物件存盤著當前負載均衡的佇列 ,以及該佇列的消費快照,
標紅的部分表示與分配到的訊息佇列集合互不包含,則需要將這些紅色佇列 Dropped 屬性為 true , 然后從 processQueueTable 物件中移除,
綠色的部分表示與分配到的訊息佇列集合的交集,processQueueTable 物件中已經存在該佇列,
黃色的部分表示這些佇列需要添加到 processQueueTable 物件中,創建這些佇列的消費快照,最后創建拉取訊息請求串列,并將請求分發到訊息拉取服務,進入拉取訊息環節,
點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/551729.html
標籤:其他
上一篇:到底什么是小程式插件?
下一篇:返回列表
