RocketMQ 5.0:云原生“訊息、事件、流”實時資料處理平臺,覆寫云邊端一體化資料處理場景,
核心特性
- 云原生:生與云,長與云,無限彈性擴縮,K8s友好
- 高吞吐:萬億級吞吐保證,同時滿足微服務與大資料場景
- 流處理:提供輕量、高擴展、高性能和豐富功能的流計算引擎
- 金融級:金融級的穩定性,廣泛用于交易核心鏈路
- 架構極簡:零外部依賴,Shared-nothing 架構
- 生態友好:無縫對接微服務、實時計算、資料湖等周邊生態
1. 基本概念

1、訊息由生產者初始化并發送到Apache RocketMQ 服務端,
2、訊息按照到達Apache RocketMQ 服務端的順序存盤到主題的指定佇列中,
3、消費者按照指定的訂閱關系從Apache RocketMQ 服務端中獲取訊息并消費,
1.1. 訊息
訊息是 Apache RocketMQ 中的最小資料傳輸單元,生產者將業務資料的負載和拓展屬性包裝成訊息發送到 Apache RocketMQ 服務端,服務端按照相關語意將訊息投遞到消費端進行消費,
RocketMQ 訊息構成非常簡單,如下所示:
- topic:表示要發送的訊息的主題
- body:表示訊息的存盤內容
- properties:表示訊息屬性
- transactionId:會在事務訊息中使用
訊息內部屬性
| 欄位名 | 必填 | 說明 |
| 主題名稱 | 是 |
當前訊息所屬的主題的名稱,集群內全域唯一, |
| 訊息體 | 是 | 訊息體 |
| 訊息型別 | 是 |
Normal:普通訊息,訊息本身無特殊語意,訊息之間也沒有任何關聯, FIFO:順序訊息,Apache RocketMQ 通過訊息分組MessageGroup標記一組特定訊息的先后順序,可以保證訊息的投遞順序嚴格按照訊息發送時的順序, Delay:定時/延時訊息,通過指定延時時間控制訊息生產后不要立即投遞,而是在延時間隔后才對消費者可見, Transaction:事務訊息,Apache RocketMQ 支持分布式事務訊息,支持應用資料庫更新和訊息呼叫的事務一致性保障, |
| 過濾標簽Tag | 否 | 方便服務器過濾使用,消費者可通過Tag對訊息進行過濾,僅接收指定標簽的訊息,目前只支持每個訊息設定一個, |
| 索引Key | 否 | 訊息的索引鍵,可通過設定不同的Key區分訊息和快速查找訊息, |
| 定時時間?? | 否 | 定時場景下,訊息觸發延時投遞的毫秒級時間戳, |
| 消費重試次數 | 否?? | 訊息消費失敗后,Apache RocketMQ 服務端重新投遞的次數,每次重試后,重試次數加1, |
| 業務自定義屬性?? | 否 | 生產者可以自定義設定的擴展資訊, |
系統默認的訊息最大限制如下:
- 普通和順序訊息:4 MB
- 事務和定時或延時訊息:64 KB
1.2. Tag
Topic 與 Tag 都是業務上用來歸類的標識,區別在于 Topic 是一級分類,而 Tag 可以理解為是二級分類,使用 Tag 可以實作對 Topic 中的訊息進行過濾,
提示:
- Topic:訊息主題,通過 Topic 對不同的業務訊息進行分類,
- Tag:訊息標簽,用來進一步區分某個 Topic 下的訊息分類,訊息從生產者發出即帶上的屬性,
Topic 和 Tag 的關系如下圖所示:

什么時候該用 Topic,什么時候該用 Tag?
可以從以下幾個方面進行判斷:
- 訊息型別是否一致:如普通訊息、事務訊息、定時(延時)訊息、順序訊息,不同的訊息型別使用不同的 Topic,無法通過 Tag 進行區分,
- 業務是否相關聯:沒有直接關聯的訊息,如淘寶交易訊息,京東物流訊息使用不同的 Topic 進行區分;而同樣是天貓交易訊息,電器類訂單、女裝類訂單、化妝品類訂單的訊息可以用 Tag 進行區分,
- 訊息優先級是否一致:如同樣是物流訊息,盒馬必須小時內送達,天貓超市 24 小時內送達,淘寶物流則相對會慢一些,不同優先級的訊息用不同的 Topic 進行區分,
- 訊息量級是否相當:有些業務訊息雖然量小但是實時性要求高,如果跟某些萬億量級的訊息使用同一個 Topic,則有可能會因為過長的等待時間而“餓死”,此時需要將不同量級的訊息進行拆分,使用不同的 Topic,
通常情況下,不同的 Topic 之間的訊息沒有必然的聯系,而 Tag 則用來區分同一個 Topic 下相互關聯的訊息,例如全集和子集的關系、流程先后的關系,
1.3. Keys
Apache RocketMQ 每個訊息可以在業務層面的設定唯一標識碼 keys 欄位,方便將來定位訊息丟失問題, Broker 端會為每個訊息創建索引(哈希索引),應用可以通過 topic、key 來查詢這條訊息內容,以及訊息被誰消費,由于是哈希索引,請務必保證 key 盡可能唯一,這樣可以避免潛在的哈希沖突,
// 訂單Id
String orderId = "20034568923546";
message.setKeys(orderId);
1.4. 佇列
一個 Topic 可能有多個佇列,并且可能分布在不同的 Broker 上,
佇列天然具備順序性,即訊息按照進入佇列的順序寫入存盤,同一佇列間的訊息天然存在順序關系,佇列頭部為最早寫入的訊息,佇列尾部為最新寫入的訊息,訊息在佇列中的位置和訊息之間的順序通過位點(Offset)進行標記管理,

Apache RocketMQ 默認提供訊息可靠存盤機制,所有發送成功的訊息都被持久化存盤到佇列中,配合生產者和消費者客戶端的呼叫可實作至少投遞一次的可靠性語意,
Apache RocketMQ 佇列模型和Kafka的磁區(Partition)模型類似,在 Apache RocketMQ 訊息收發模型中,佇列屬于主題的一部分,雖然所有的訊息資源以主題粒度管理,但實際的操作實作是面向佇列,例如,生產者指定某個主題,向主題內發送訊息,但實際訊息發送到該主題下的某個佇列中,
Apache RocketMQ 中通過修改佇列數量,以此實作橫向的水平擴容和縮容,
一般來說一條訊息,如果沒有重復發送(比如因為服務端沒有回應而進行重試),則只會存在在 Topic 的其中一個佇列中,訊息在佇列中按照先進先出的原則存盤,每條訊息會有自己的位點,每個佇列會統計當前訊息的總條數,這個稱為最大位點 MaxOffset;佇列的起始位置對應的位置叫做起始位點 MinOffset,佇列可以提升訊息發送和消費的并發度,
注意:按照實際業務消耗設定佇列數,佇列數量的設定應遵循少用夠用原則,避免隨意增加佇列數量,
1.5. 生產者
生產者(Producer)就是訊息的發送者,Apache RocketMQ 擁有豐富的訊息型別,可以支持不同的應用場景,在不同的場景中,需要使用不同的訊息進行發送,比如在電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送一條延時訊息,這條訊息將會在 30 分鐘以后投遞給消費者,消費者收到此訊息后需要判斷對應的訂單是否已完成支付,如支付未完成,則關閉訂單,如已完成支付則忽略,此時就需要用到延遲訊息;電商場景中,業務上要求同一訂單的訊息保持嚴格順序,此時就要用到順序訊息,在日志處理場景中,可以接受的比較大的發送延遲,但對吞吐量的要求很高,希望每秒能處理百萬條日志,此時可以使用批量訊息,在銀行扣款的場景中,要保持上游的扣款操作和下游的短信通知保持一致,此時就要使用事務訊息,
注意:不要在同一個主題內使用多種訊息型別
生產者通常被集成在業務系統中,將業務訊息按照要求封裝成 Apache RocketMQ 的訊息(Message)并發送至服務端,
生產者和主題的關系為多對多關系,即同一個生產者可以向多個主題發送訊息,同一個主題也可以接收多個生產者的訊息,

注意:不建議頻繁創建和銷毀生產者
Producer p = ProducerBuilder.build();
for (int i =0;i<n;i++){
Message m= MessageBuilder.build();
p.send(m);
}
p.shutdown();
1.6. 消費者與消費者組
如果多個消費者設定了相同的Consumer Group,我們認為這些消費者在同一個消費組內,同一個消費組的多個消費者必須保持消費邏輯和配置一致,共同分擔該消費組訂閱的訊息,實作消費能力的水平擴展,
在 Apache RocketMQ 有兩種消費模式,分別是:
- 集群消費模式:當使用集群消費模式時,RocketMQ 認為任意一條訊息只需要被消費組內的任意一個消費者處理即可,
- 廣播消費模式:當使用廣播消費模式時,RocketMQ 會將每條訊息推送給消費組所有的消費者,保證訊息至少被每個消費者消費一次,


負載均衡
RocketMQ的負載均衡策略與Kafka極其類似,幾乎一毛一樣
集群模式下,同一個消費組內的消費者會分擔收到的全量訊息,這里的分配策略是怎樣的?如果擴容消費者是否一定能提升消費能力?
Apache RocketMQ 提供了多種集群模式下的分配策略,包括平均分配策略、機房優先分配策略、一致性hash分配策略等,可以通過如下代碼進行設定相應負載均衡策略,
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
默認的分配策略是平均分配,這也是最常見的策略,平均分配策略下消費組內的消費者會按照類似分頁的策略均攤消費,
在平均分配的演算法下,可以通過增加消費者的數量來提高消費的并行度,比如下圖中,通過增加消費者來提高消費能力,


但也不是一味地增加消費者就能提升消費能力的,比如下圖中Topic的總佇列數小于消費者的數量時,消費者將分配不到佇列,即使消費者再多也無法提升消費能力,

1.7. 消費者分類

如上圖所示, Apache RocketMQ 的消費者處理訊息時主要經過以下階段:訊息獲取--->訊息處理--->消費狀態提交,
針對以上幾個階段,Apache RocketMQ 提供了不同的消費者型別: PushConsumer 、SimpleConsumer 和 PullConsumer,這幾種型別的消費者通過不同的實作方式和介面可滿足您在不同業務場景下的消費需求,具體差異如下:
注:在實際使用場景中,PullConsumer 僅推薦在流處理框架中集成使用,大多數訊息收發場景使用 PushConsumer 和 SimpleConsumer 就可以滿足需求,

PushConsumer
PushConsumers是一種高度封裝的消費者型別,消費訊息僅通過消費監聽器處理業務并回傳消費結果,訊息的獲取、消費狀態提交以及消費重試都通過 Apache RocketMQ 的客戶端SDK完成,
SimpleConsumer
SimpleConsumer 是一種介面原子型的消費者型別,訊息的獲取、消費狀態提交以及消費重試都是通過消費者業務邏輯主動發起呼叫完成,
補充:
rocketmq-client中定義的:
- DefaultMQProducer
- DefaultMQPushConsumer
- DefaultLitePullConsumer
rocketmq-client-java中定義的:
- Producer
- PushConsumer
- SimpleConsumer
1.8. 消費位點
訊息是按到達Apache RocketMQ 服務端的先后順序存盤在指定主題的多個佇列中,每條訊息在佇列中都有一個唯一的Long型別坐標,這個坐標被定義為訊息位點,一條訊息被某個消費者消費完成后不會立即從佇列中洗掉,Apache RocketMQ 會基于每個消費者分組記錄消費過的最新一條訊息的位點,即消費位點,

如上圖所示,在Apache RocketMQ中每個佇列都會記錄自己的最小位點、最大位點,針對于消費組,還有消費位點的概念,在集群模式下,消費位點是由客戶端提給交服務端保存的,在廣播模式下,消費位點是由客戶端自己保存的,一般情況下消費位點正常更新,不會出現訊息重復,但如果消費者發生崩潰或有新的消費者加入群組,就會觸發重平衡,重平衡完成后,每個消費者可能會分配到新的佇列,而不是之前處理的佇列,為了能繼續之前的作業,消費者需要讀取每個佇列最后一次的提交的消費位點,然后從消費位點處繼續拉取訊息,但在實際執行程序中,由于客戶端提交給服務端的消費位點并不是實時的,所以重平衡就可能會導致訊息少量重復,
1.9. 訂閱關系
一個訂閱關系指的是指定某個消費者分組對于某個主題的訂閱,
不同消費者分組對于同一個主題的訂閱相互獨立如下圖所示,消費者分組Group A和消費者分組Group B分別以不同的訂閱關系訂閱了同一個主題Topic A,這兩個訂閱關系互相獨立,可以各自定義,不受影響,

同一個消費者分組對于不同主題的訂閱也相互獨立如下圖所示,消費者分組Group A訂閱了兩個主題Topic A和Topic B,對于Group A中的消費者來說,訂閱的Topic A為一個訂閱關系,訂閱的Topic B為另外一個訂閱關系,且這兩個訂閱關系互相獨立,可以各自定義,不受影響,

2. 訊息型別
1、順序訊息(FIFO):這類訊息必須設定 message group,這種型別的訊息需要與FIFO消費者組一起使用
2、延遲訊息(DELAY):訊息被發送后不會立即對消費者可見,這種型別的訊息必須設定delivery timestamp以決定對消費者可見的時間;
3、事務訊息(TRANSACTIONAL):將一個或多個訊息的發布包裝到一個事務中,提供提交/回滾方法來決定訊息的可見性;
4、普通訊息(NORMAL):默認型別
不同的型別是互斥的,當意味著要發布的訊息不能同時是FIFO型別和DELAY型別,實際上,主題的型別決定了訊息的型別,例如,FIFO主題不允許發布其他型別的訊息,

2.1. 普通訊息
普通訊息一般應用于微服務解耦、事件驅動、資料集成等場景,這些場景大多數要求資料傳輸通道具有可靠傳輸的能力,且對訊息的處理時機、處理順序沒有特別要求,
典型場景一:微服務異步解耦

如上圖所示,以在線的電商交易場景為例,上游訂單系統將用戶下單支付這一業務事件封裝成獨立的普通訊息并發送至Apache RocketMQ服務端,下游按需從服務端訂閱訊息并按照本地消費邏輯處理下游任務,每個訊息之間都是相互獨立的,且不需要產生關聯,
典型場景二:資料集成傳輸

如上圖所示,以離線的日志收集場景為例,通過埋點組件收集前端應用的相關操作日志,并轉發到 Apache RocketMQ ,每條訊息都是一段日志資料,Apache RocketMQ 不做任何處理,只需要將日志資料可靠投遞到下游的存盤系統和分析系統即可,后續功能由后端應用完成,
2.2. 順序訊息
應用場景
在有序事件處理、撮合交易、資料實時增量同步等場景下,異構系統間需要維持強一致的狀態同步,上游的事件變更需要按照順序傳遞到下游進行處理,在這類場景下使用 Apache RocketMQ 的順序訊息可以有效保證資料傳輸的順序性,
典型場景一:撮合交易

以證券、股票交易撮合場景為例,對于出價相同的交易單,堅持按照先出價先交易的原則,下游處理訂單的系統需要嚴格按照出價順序來處理訂單,
典型場景二:資料實時增量同步

以資料庫變更增量同步場景為例,上游源端資料庫按需執行增刪改操作,將二進制操作日志作為訊息,通過 Apache RocketMQ 傳輸到下游搜索系統,下游系統按順序還原訊息資料,實作狀態資料按序重繪,如果是普通訊息則可能會導致狀態混亂,和預期操作結果不符,基于順序訊息可以實作下游狀態和上游操作結果一致,
功能原理
順序訊息是 Apache RocketMQ 提供的一種高級訊息型別,支持消費者按照發送訊息的先后順序獲取訊息,從而實作業務場景中的順序處理, 相比其他型別訊息,順序訊息在發送、存盤和投遞的處理程序中,更多強調多條訊息間的先后順序關系,
Apache RocketMQ 順序訊息的順序關系通過訊息組(MessageGroup)判定和識別,發送順序訊息時需要為每條訊息設定歸屬的訊息組,相同訊息組的多條訊息之間遵循先進先出的順序關系,不同訊息組、無訊息組的訊息之間不涉及順序性,
基于訊息組的順序判定邏輯,支持按照業務邏輯做細粒度拆分,可以在滿足業務區域順序的前提下提高系統的并行度和吞吐能力,
如何保證訊息的順序性?
Apache RocketMQ 的訊息的順序性分為兩部分,生產順序性和消費順序性,
1、生產順序性
如需保證訊息生產的順序性,則必須滿足以下條件:
- 單一生產者:訊息生產的順序性僅支持單一生產者,不同生產者分布在不同的系統,即使設定相同的訊息組,不同生產者之間產生的訊息也無法判定其先后順序,
- 串行發送:Apache RocketMQ 生產者客戶端支持多執行緒安全訪問,但如果生產者使用多執行緒并行發送,則不同執行緒間產生的訊息將無法判定其先后順序,
滿足以上條件的生產者,將順序訊息發送至 Apache RocketMQ 后,會保證設定了同一訊息組的訊息,按照發送順序存盤在同一佇列中,服務端順序存盤邏輯如下:
- 相同訊息組的訊息按照先后順序被存盤在同一個佇列,
- 不同訊息組的訊息可以混合在同一個佇列中,且不保證連續,

2、消費順序性
如需保證訊息消費的順序性,則必須滿足以下條件:
- 投遞順序:Apache RocketMQ 通過客戶端SDK和服務端通信協議保障訊息按照服務端存盤順序投遞,但業務方消費訊息時需要嚴格按照接收---處理---應答的語意處理訊息,避免因異步處理導致訊息亂序,
- 有限重試:Apache RocketMQ 順序訊息投遞僅在重試次數限定范圍內,即一條訊息如果一直重試失敗,超過最大重試次數后將不再重試,跳過這條訊息消費,不會一直阻塞后續訊息處理,對于需要嚴格保證消費順序的場景,請務設定合理的重試次數,避免引數不合理導致訊息亂序,
生產順序性和消費順序性組合
如果訊息需要嚴格按照先進先出(FIFO)的原則處理,即先發送的先消費、后發送的后消費,則必須要同時滿足生產順序性和消費順序性,
一般業務場景下,同一個生產者可能對接多個下游消費者,不一定所有的消費者業務都需要順序消費,您可以將生產順序性和消費順序性進行差異化組合,應用于不同的業務場景,例如發送順序訊息,但使用非順序的并發消費方式來提高吞吐能力,更多組合方式如下表所示:
| 生產順序 | 消費順序 | 順序性效果 |
| 設定訊息組,保證訊息順序發送, | 順序消費 | 按照訊息組粒度,嚴格保證訊息順序, 同一訊息組內的訊息的消費順序和發送順序完全一致, |
| 設定訊息組,保證訊息順序發送, | 并發消費 | 并發消費,盡可能按時間順序處理, |
| 未設定訊息組,訊息亂序發送, | 順序消費 | 按佇列存盤粒度,嚴格順序, 基于 Apache RocketMQ 本身佇列的屬性,消費順序和佇列存盤的順序一致,但不保證和發送順序一致, |
| 未設定訊息組,訊息亂序發送, | 并發消費 | 并發消費,盡可能按照時間順序處理, |
2.3. 定時/延時訊息
注:定時訊息和延時訊息本質相同,都是服務端根據訊息設定的定時時間在某一固定時刻將訊息投遞給消費者消費,
應用場景
在分布式定時調度觸發、任務超時處理等場景,需要實作精準、可靠的定時事件觸發,使用 Apache RocketMQ 的定時訊息可以簡化定時調度任務的開發邏
輯,實作高性能、可擴展、高可靠的定時觸發能力,
典型場景一:分布式定時調度

在分布式定時調度場景下,需要實作各類精度的定時任務,例如每天5點執行檔案清理,每隔2分鐘觸發一次訊息推送等需求,基于 Apache RocketMQ 的定時訊息可以封裝出多種型別的定時觸發器,
典型場景二:任務超時處理
以電商交易場景為例,訂單下單后暫未支付,此時不可以直接關閉訂單,而是需要等待一段時間后才能關閉訂單,使用 Apache RocketMQ 定時訊息可以實作超時任務的檢查觸發,
基于定時訊息的超時任務處理具備如下優勢:
- 精度高、開發門檻低:基于訊息通知方式不存在定時階梯間隔,可以輕松實作任意精度事件觸發,無需業務去重,
- 高性能可擴展:傳統的資料庫掃描方式較為復雜,需要頻繁呼叫介面掃描,容易產生性能瓶頸, Apache RocketMQ 的定時訊息具有高并發和水平擴展的能力,
功能原理
定時時間設定原則
Apache RocketMQ 定時訊息設定的定時時間是一個預期觸發的系統時間戳,延時時間也需要轉換成當前系統時間后的某一個時間戳,而不是一段延時時長,
投遞等級
Apache RocketMQ 一共支持18個等級的延遲投遞,具體時間如下:

2.4. 事務訊息

以電商交易場景為例,用戶支付訂單這一核心操作的同時會涉及到下游物流發貨、積分變更、購物車狀態清空等多個子系統的變更,當前業務的處理分支包括:
- 主分支訂單系統狀態更新:由未支付變更為支付成功,
- 物流系統狀態新增:新增待發貨物流記錄,創建訂單物流記錄,
- 積分系統狀態變更:變更用戶積分,更新用戶積分表,
- 購物車系統狀態變更:清空購物車,更新用戶購物車記錄,
使用普通訊息和訂單事務無法保證一致的原因,本質上是由于普通訊息無法像單機資料庫事務一樣,具備提交、回滾和統一協調的能力,而基于 RocketMQ 的分布式事務訊息功能,在普通訊息基礎上,支持二階段的提交能力,將二階段提交和本地事務系結,實作全域提交結果的一致性,

事務訊息發送分為兩個階段,第一階段會發送一個半事務訊息,半事務訊息是指暫不能投遞的訊息,生產者已經成功地將訊息發送到了 Broker,但是Broker 未收到生產者對該訊息的二次確認,此時該訊息被標記成“暫不能投遞”狀態,如果發送成功則執行本地事務,并根據本地事務執行成功與否,向 Broker 半事務訊息狀態(commit或者rollback),半事務訊息只有 commit 狀態才會真正向下游投遞,如果由于網路閃斷、生產者應用重啟等原因,導致某條事務訊息的二次確認丟失,Broker 端會通過掃描發現某條訊息長期處于“半事務訊息”時,需要主動向訊息生產者詢問該訊息的最終狀態(Commit或是Rollback),這樣最終保證了本地事務執行成功,下游就能收到訊息,本地事務執行失敗,下游就收不到訊息,總而保證了上下游資料的一致性,(PS:重點是兩階段提交)
事務訊息處理流程

1、生產者將訊息發送至Apache RocketMQ服務端,
2、Apache RocketMQ服務端將訊息持久化成功之后,向生產者回傳Ack確認訊息已經發送成功,此時訊息被標記為"暫不能投遞",這種狀態下的訊息即為半事務訊息,
3、生產者開始執行本地事務邏輯,
4、生產者根據本地事務執行結果向服務端提交二次確認結果(Commit或是Rollback),服務端收到確認結果后處理邏輯如下:
- 二次確認結果為Commit:服務端將半事務訊息標記為可投遞,并投遞給消費者,
- 二次確認結果為Rollback:服務端將回滾事務,不會將半事務訊息投遞給消費者,
5、在斷網或者是生產者應用重啟的特殊情況下,若服務端未收到發送者提交的二次確認結果,或服務端收到的二次確認結果為Unknown未知狀態,經過固定時間后,服務端將對訊息生產者即生產者集群中任一生產者實體發起訊息回查,
6、生產者收到訊息回查后,需要檢查對應訊息的本地事務執行的最終結果,
7、生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務訊息進行處理,
3. 機制
3.1. 訊息發送重試機制
Apache RocketMQ 客戶端連接服務端發起訊息發送請求時,可能會因為網路故障、服務例外等原因導致呼叫失敗,為保證訊息的可靠性, Apache RocketMQ 在客戶端SDK中內置請求重試邏輯,嘗試通過重試發送達到最終呼叫成功的效果,
同步發送和異步發送模式均支持訊息發送重試,
重試觸發條件:
- 客戶端訊息發送請求呼叫失敗或請求超時
- 網路例外造成連接失敗或請求超時
- 服務端節點處于重啟或下線等狀態造成連接失敗
- 服務端運行慢造成請求超時
- 服務端回傳失敗錯誤碼
重試流程:
生產者在初始化時設定訊息發送最大重試次數,當出現上述觸發條件的場景時,生產者客戶端會按照設定的重試次數一直重試發送訊息,直到訊息發送成功或達到最大重試次數重試結束,并在最后一次重試失敗后回傳呼叫錯誤回應,
- 同步發送:呼叫執行緒會一直阻塞,直到某次重試成功或最終重試失敗,拋出錯誤碼和例外,
- 異步發送:呼叫執行緒不會阻塞,但呼叫結果會通過例外事件或者成功事件回傳,
重試間隔
- 除服務端回傳系統流控錯誤場景,其他觸發條件觸發重試后,均會立即進行重試,無等待間隔,
- 若由于服務端回傳流控錯誤觸發重試,系統會按照指數退避策略進行延遲重試,指數退避演算法通過以下引數控制重試行為:
- INITIAL_BACKOFF: 第一次失敗重試前后需等待多久,默認值:1秒
- MULTIPLIER :指數退避因子,即退避倍率,默認值:1.6
- JITTER :隨機抖動因子,默認值:0.2
- MAX_BACKOFF :等待間隔時間上限,默認值:120秒
- MIN_CONNECT_TIMEOUT :最短重試間隔,默認值:20秒?
3.2. 訊息流控機制
訊息流控指的是系統容量或水位過高, Apache RocketMQ 服務端會通過快速失敗回傳流控錯誤來避免底層資源承受過高壓力,
觸發條件
- 存盤壓力大:消費者分組的初始消費位點為當前佇列的最大消費位點,
- 服務端請求任務排隊溢位:若消費者消費能力不足,導致佇列中有大量堆積訊息,當堆積訊息超過一定數量后會觸發訊息流控,減少下游消費系統壓力,
流控行為
當系統觸發訊息發送流控時,客戶端會收到系統限流錯誤和例外,錯誤碼資訊如下:
- reply-code:530
- reply-text:TOO_MANY_REQUESTS
3.3. 消費重試
消費者出現例外,消費某條訊息失敗時, Apache RocketMQ 會根據消費重試策略重新投遞該訊息,消費重試主要解決的是業務處理邏輯失敗導致的消費完整性問題,是一種為業務兜底的策略,不應該被用做業務流程控制,
推薦使用訊息重試場景如下:
- 業務處理失敗,且失敗原因跟當前的訊息內容相關,比如該訊息對應的事務狀態還未獲取到,預期一段時間后可執行成功,
- 消費失敗的原因不會導致連續性,即當前訊息消費失敗是一個小概率事件,不是常態化的失敗,后面的訊息大概率會消費成功,此時可以對當前訊息進行重試,避免行程阻塞,
消費重試策略
消費重試指的是,消費者在消費某條訊息失敗后,Apache RocketMQ 服務端會根據重試策略重新消費該訊息,超過一次定數后若還未消費成功,則該訊息將不再繼續重試,直接被發送到死信佇列中,
訊息重試的觸發條件
- 消費失敗,包括消費者回傳訊息失敗狀態標識或拋出非預期例外,
- 訊息處理超時,包括在PushConsumer中排隊超時,
重試策略差異

3.4. 消費進度
訊息位點(Offset)
訊息是按到達服務端的先后順序存盤在指定主題的多個佇列中,每條訊息在佇列中都有一個唯一的Long型別坐標,這個坐標被定義為訊息位點,
任意一個訊息佇列在邏輯上都是無限存盤,即訊息位點會從0到Long.MAX無限增加,通過主題、佇列和位點就可以定位任意一條訊息的位置,具體關系如下圖所示:

Apache RocketMQ 定義佇列中最早一條訊息的位點為最小訊息位點(MinOffset);最新一條訊息的位點為最大訊息位點(MaxOffset),雖然訊息佇列邏輯上是無限存盤,但由于服務端物理節點的存盤空間有限, Apache RocketMQ 會滾動洗掉佇列中存盤最早的訊息,因此,訊息的最小消費位點和最大消費位點會一直遞增變化,

消費位點(ConsumerOffset)
Apache RocketMQ 領域模型為發布訂閱模式,每個主題的佇列都可以被多個消費者分組訂閱,若某條訊息被某個消費者消費后直接被洗掉,則其他訂閱了該主題的消費者將無法消費該訊息,
因此,Apache RocketMQ 通過消費位點管理訊息的消費進度,每條訊息被某個消費者消費完成后不會立即在佇列中洗掉,Apache RocketMQ 會基于每個消費者分組維護一份消費記錄,該記錄指定消費者分組消費某一個佇列時,消費過的最新一條訊息的位點,即消費位點,
當消費者客戶端離線,又再次重新上線時,會嚴格按照服務端保存的消費進度繼續處理訊息,如果服務端保存的歷史位點資訊已過期被洗掉,此時消費位點向前移動至服務端存盤的最小位點,
注:消費位點的保存和恢復是基于 Apache RocketMQ 服務端的存盤實作,和任何消費者無關,
佇列中訊息位點MinOffset、MaxOffset和每個消費者分組的消費位點ConsumerOffset的關系如下:

ConsumerOffset≤MaxOffset:
- 當消費速度和生產速度一致,且全部訊息都處理完成時,最大訊息位點和消費位點相同,即ConsumerOffset=MaxOffset
- 當消費速度較慢小于生產速度時,佇列中會有部分訊息未消費,此時消費位點小于最大訊息位點,即ConsumerOffset<MaxOffset,兩者之差就是該佇列中堆積的訊息量
ConsumerOffset≥MinOffset:
- 正常情況下有效的消費位點ConsumerOffset必然大于等于最小訊息位點MinOffset,消費位點小于最小訊息位點時是無效的,相當于消費者要消費的訊息已經從佇列中洗掉了,是無法消費到的,此時服務端會將消費位點強制糾正到合法的訊息位點,
消費位點初始值
消費位點初始值指的是消費者分組首次啟動消費者消費訊息時服務端保存的消費位點的初始值,Apache RocketMQ 定義消費位點的初始值為消費者首次獲取訊息時,該時刻佇列中的最大訊息位點,相當于消費者將從佇列中最新的訊息開始消費,
3.5. 訊息存盤機制
Apache RocketMQ 使用存盤時長作為訊息存盤的依據,即每個節點對外承諾訊息的存盤時長,在存盤時長范圍內的訊息都會被保留,無論訊息是否被消費;超過時長限制的訊息則會被清理掉,

4. 架構
4.1. 技術架構

RocketMQ架構上主要分為四部分,如上圖所示:
- Producer:訊息發布的角色,支持分布式集群方式部署,Producer通過MQ的負載均衡模塊選擇相應的Broker集群佇列進行訊息投遞,投遞的程序支持快速失敗并且低延遲,
- Consumer:訊息消費的角色,支持分布式集群方式部署,支持以push推,pull拉兩種模式對訊息進行消費,同時也支持集群方式和廣播方式的消費,它提供實時訊息訂閱機制,可以滿足大多數用戶的需求,
- NameServer:NameServer是一個非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動態注冊與發現,主要包括兩個功能:Broker管理,NameServer接受Broker集群的注冊資訊并且保存下來作為路由資訊的基本資料,然后提供心跳檢測機制,檢查Broker是否還存活;路由資訊管理,每個NameServer將保存關于Broker集群的整個路由資訊和用于客戶端查詢的佇列資訊,然后Producer和Consumer通過NameServer就可以知道整個Broker集群的路由資訊,從而進行訊息的投遞和消費,NameServer通常也是集群的方式部署,各實體間相互不進行資訊通訊,Broker是向每一臺NameServer注冊自己的路由資訊,所以每一個NameServer實體上面都保存一份完整的路由資訊,當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由資訊,Producer和Consumer仍然可以動態感知Broker的路由的資訊,
- BrokerServer:Broker主要負責訊息的存盤、投遞和查詢以及服務高可用保證,為了實作這些功能,Broker包含了以下幾個重要子模塊,
4.2. 部署架構

RocketMQ 網路部署特點:
- NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何資訊同步,
- Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave,Master也可以部署多個,每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic資訊到所有NameServer, 注意:當前RocketMQ版本在部署架構上支持一Master多Slave,但只有BrokerId=1的從服務器才會參與訊息的讀負載,
-
Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由資訊,并向提供Topic 服務的Master建立長連接,且定時向Master發送心跳,Producer完全無狀態,可集群部署,
- Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由資訊,并向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳,Consumer既可以從Master訂閱訊息,也可以從Slave訂閱訊息,消費者在向Master拉取訊息時,Master服務器會根據拉取偏移量與最大偏移量的距離(判斷是否讀老訊息,產生讀I/O),以及從服務器是否可讀等因素建議下一次是從Master還是Slave拉取,
結合部署架構圖,描述集群作業流程:
- 啟動NameServer,NameServer起來后監聽埠,等待Broker、Producer、Consumer連上來,相當于一個路由控制中心,
- Broker啟動,跟所有的NameServer保持長連接,定時發送心跳包,心跳包中包含當前Broker資訊(IP+埠等)以及存盤所有Topic資訊,注冊成功后,NameServer集群中就有Topic跟Broker的映射關系,
- 收發訊息前,先創建Topic,創建Topic時需要指定該Topic要存盤在哪些Broker上,也可以在發送訊息時自動創建Topic,
- Producer發送訊息,啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從佇列串列中選擇一個佇列,然后與佇列所在的Broker建立長連接從而向Broker發訊息,
- Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費訊息,
5. 客戶端
在撰寫客戶端代碼時,首先準備一個簡單的環境,可以選用Local模式,這里不多介紹,只說一句,啟動broker的時候可以-c指定組態檔,啟動完以后通過jps查看行程

通過mqadmin命令創建并查看主題
mqadmin updateTopic -n localhost:9876 -b 172.16.52.116:10911 -t TEST_TOPIC
mqadmin topicList -n localhost:9876
具體命令引數,參見 https://rocketmq.apache.org/zh/docs/deploymentOperations/16admintool/
也可以通過RocketMQ Dashboard創建主題



5.1. rocketmq-client
引入依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.0.0</version>
</dependency>
代碼片段
public class AppTest extends TestCase {
private String producerGroupName = "MyProducerGroup";
private String consumerGroupName = "MyConsumerGroup";
/**
* 發送同步訊息
*/
@Test
public void testSyncProducer() throws Exception {
// 實體化訊息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
// 設定NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer實體
producer.start();
// 發送訊息
Message message = new Message("TEST_TOPIC", "A", "UserID12345", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
// 關閉Producer實體
producer.shutdown();
}
/**
* 發送異步訊息
*/
@Test
public void testAsyncProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
Message msg = new Message("TEST_TOPIC", "B", "OrderID12346", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收異步回傳結果的回呼
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
public void onException(Throwable e) {
e.printStackTrace();
}
});
// 等待5秒
TimeUnit.SECONDS.sleep(5);
}
/**
* 單向發送訊息
* 這種方式主要用在不特別關心發送結果的場景,例如日志發送,
*/
@Test
public void testOnewayProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TEST_TOPIC", "C", "OrderID12348", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發送單向訊息,沒有任何回傳結果
producer.sendOneway(msg);
}
/**
* 消費訊息
*/
@Test
public void testConsumer() throws Exception {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroupName);
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TEST_TOPIC", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.start();
while (true) {
List<MessageExt> messageExts = consumer.poll();
if (messageExts.isEmpty()) {
continue;
}
messageExts.forEach(msg -> {
System.out.println(String.format("MsgId: %s, MsgBody: %s", msg.getMsgId(), new String(msg.getBody())));
});
consumer.commitSync();
}
}
}
5.2. rocketmq-spring-boot-starter
依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
application.yml
配置項詳見 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties
rocketmq:
name-server: localhost:9876
producer:
group: MyProducerGroup
send-message-timeout: 10000
consumer:
group: MyConsumerGroup
發送訊息
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: ChengJianSheng
* @Date: 2023/1/18
*/
@RestController
@RequestMapping("/message")
public class MessageController {
private String springTopic = "SPRING_TOPIC";
private String userTopic = "USER_TOPIC";
private String orderTopic = "ORDER_TOPIC";
private String extTopic = "EXT_TOPIC";
private String reqTopic = "REQ_TOPIC";
private String objTopic = "OBJECT_TOPIC";
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public String send() {
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello World");
Message message = MessageBuilder.withPayload("Hello World!2222".getBytes()).build();
sendResult = rocketMQTemplate.syncSend(springTopic, message);
message = MessageBuilder.withPayload("Hello, World! I'm from spring message").build();
sendResult = rocketMQTemplate.syncSend(springTopic, message);
sendResult = rocketMQTemplate.syncSend(userTopic, new User("zhangsan", 20));
message = MessageBuilder.withPayload(new User("lisi", 21))
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.build();
sendResult = rocketMQTemplate.syncSend(userTopic, message);
rocketMQTemplate.asyncSend(orderTopic, new Order("oid1234", "4.56"), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("async onSucess SendResult=%s %n", sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.printf("async onException Throwable=%s %n", throwable);
}
});
rocketMQTemplate.convertAndSend(extTopic + ":tag0", "I'm from tag0");
rocketMQTemplate.convertAndSend(extTopic + ":tag1", "I'm from tag1");
String replyString = rocketMQTemplate.sendAndReceive(reqTopic, "request string", String.class);
System.out.printf("receive %s %n", replyString);
User replyUser = rocketMQTemplate.sendAndReceive(objTopic, new User("wangwu", 21), User.class);
System.out.printf("receive %s %n", replyUser);
return "ok";
}
}
接收訊息
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "SPRING_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received: %s \n", message);
}
}
@Component
@RocketMQMessageListener(topic = "ORDER_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order message) {
System.out.printf("------- OrderConsumer received: %s [orderId : %s]\n", message, message.getOrderNo());
}
}
@Component
@RocketMQMessageListener(topic = "USER_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class UserConsumer implements RocketMQListener<User>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(User message) {
System.out.printf("------ UserConsumer received: %s ; age: %s ; name: %s \n", message, message.getAge(), message.getName());
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
}
}
@Component
@RocketMQMessageListener(topic = "REQ_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
return "reply string";
}
}
@Component
@RocketMQMessageListener(topic = "OBJECT_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User> {
@Override
public User onMessage(User message) {
System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", message);
return new User("tom", 8);
}
}
@Component
@RocketMQMessageListener(topic = "EXT_TOPIC", selectorExpression = "tag0||tag1", consumerGroup = "${rocketmq.consumer.group}")
public class MessageExtConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.printf("------- MessageExtConsumer received message, msgId: %s, body:%s \n", message.getMsgId(), new String(message.getBody()));
}
}
6. 檔案
https://rocketmq.apache.org/zh/
https://rocketmq.apache.org/zh/docs/deploymentOperations/15deploy/
https://github.com/apache/rocketmq/tree/rocketmq-all-5.0.0/docs/cn
https://github.com/apache/rocketmq/blob/rocketmq-all-5.0.0/docs/cn/architecture.md
https://github.com/apache/rocketmq/blob/rocketmq-all-5.0.0/docs/cn/RocketMQ_Example.md
https://github.com/apache/rocketmq-dashboard
https://github.com/apache/rocketmq-spring
https://github.com/apache/rocketmq
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/542204.html
標籤:其他
下一篇:Arrays類
