1. 生產端保障
生產端保障需要從以下幾個方面來保障
- 使用可靠的訊息發送方式
- 注意生產端重試
- 生產禁止自動創建topic
1.1 ?訊息發送保障
1.1.1同步發送
發送者向MQ執行發送訊息API時,同步等待,直到訊息服務器回傳發送結果,會在收到接收方發回回應之后才發下一個資料包的通訊方式,這種方式只有在訊息完全發送完成之后才回傳結果,此方式存在需要同步等待發送結果的時間代價,

簡單來說,同步發送就是指 producer 發送訊息后,會在接收到 broker 回應后才繼續發下一條訊息的通信方式,
使用場景
由于這種同步發送的方式確保了訊息的可靠性,同時也能及時得到訊息發送的結果,故而適合一些發送比較重要的訊息場景,比如說重要的通知郵件,
注意事項
這種方式具有內部重試機制,即在主動宣告本次訊息發送失敗之前,內部實作將重試一定次數,默認為2次( DefaultMQProducer#getRetryTimesWhenSendFailed),存在同一個訊息可
能被多次發送給broker的問題,這里需要應用的開發者自己在消費端處理冪等性問題,
1.1.2 異步發送
異步發送是指發送方發出資料后,不等接收方發回回應,接著發送下個資料包的通訊方式, MQ的異步發送,需要用戶實作異步發送回呼介面( SendCallback )

異步發送是指 producer 發出一條訊息后,不需要等待 broker 回應,就接著發送下一條訊息的通信方式,需要注意的是,不等待 broker 回應,并不意味著 broker 不回應,而是通過回呼介面來接收broker 的回應,所以要記住一點,異步發送同樣可以對訊息的回應結果進行處理,
使用場景
由于異步發送不需要等待 broker 的回應,故在一些比較注重 RT(回應時間)的場景就會比較適用,比如,在一些視頻上傳的場景
注意事項:
注意:RocketMQ內部只對同步模式做了重試,異步發送模式是沒有自動重試的,需要自己手動實作
1.1.3 單向發送
比較簡單,就是只管發,不管有沒有抵達
1.2 訊息發送總結

1.3 發送狀態
發送訊息時,將獲得包含SendStatus的SendResult,首先,我們假設Message的isWaitStoreMsgOK = true(默認為true),如果沒有拋出例外,我們將始侄訓得SEND_OK,以下是每個狀態的說明串列:
FLUSH_DISK_TIMEOUT
如果設定了 FlushDiskType=SYNC_FLUSH (默認是 ASYNC_FLUSH),并且 Broker 沒有在syncFlushTimeout(默認是 5 秒)設定的時間內完成刷盤,就會收到此狀態碼,
FLUSH_SLAVE_TIMEOUT
如果設定為 SYNC_MASTER ,并且 slave Broker 沒有在 syncFlushTimeout 設定時間內完成同步,就會收到此狀態碼,
SLAVE_NOT_AVAILABLE
如果設定為SYNC_MASTER ,并沒有配置 slave Broker,就會收到此狀態碼,
SEND_OK
這個狀態可以簡單理解為,沒有發生上面列出的三個問題狀態就是SEND_OK,需要注意的是,SEND_OK并不意味著可靠,如果想嚴格確保沒有訊息丟失,需要開啟SYNC_MASTER orSYNC_FLUSH,
1.4 ?MQ發送端重試保障
如果由于網路抖動等原因,Producer程式向Broker發送訊息時沒有成功,即發送端沒有收到Broker的ACK,導致最終Consumer無法消費訊息,此時RocketMQ會自動進行重試,
DefaultMQProducer可以設定訊息發送失敗的最大重試次數,并可以結合發送的超時時間來進行重試的處理,具體API如下:
//設定訊息發送失敗時的最大重試次數
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
//同步發送訊息,并指定超時時間
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
1.4.1 重試問題
超時重試針對網上說的超時例外會重試的說法大部分是錯誤的
是因為下面測驗代碼的超時時間設定為5毫秒 ,按照正常肯定會報超時例外,但設定1次重試和3000次的重試,雖然最終都會報下面例外,但輸出錯誤時間報顯然不應該是一個級別,但測驗發現無論設定的多少次的重試次數,報例外的時間都差不多,按道理說重試次數越多,報例外的時間跨度應該越大
測驗代碼
public class RetryProducer {
public static void main(String[] args) throws UnsupportedEncodingException,
InterruptedException, RemotingException, MQClientException, MQBrokerException {
//創建一個訊息生產者,并設定一個訊息生產者組
DefaultMQProducer producer = new
DefaultMQProducer("rocket_test_consumer_group");
//指定 NameServer 地址
producer.setNamesrvAddr("192.168.80.16:9876");
//設定重試次數(默認2次)
producer.setRetryTimesWhenSendFailed(300000);
//初始化 Producer,整個應用生命周期內只需要初始化一次
producer.start();
Message msg = new Message(
/* 訊息主題名 */
"topicTest",
/* 訊息標簽 */
"TagA",
/* 訊息內容 */
("Hello Java demo RocketMQ"
).getBytes(RemotingHelper.DEFAULT_CHARSET));
//發送訊息并回傳結果,設定超時時間 5ms 所以每次都會發送失敗
SendResult sendResult = producer.send(msg, 5);
System.out.printf("%s%n", sendResult);
// 一旦生產者實體不再被使用則將其關閉,包括清理資源,關閉網路連接等
producer.shutdown();
}
}
原因!
針對這個疑惑,需要查看原始碼,發現只有同步發送才會重試,并且超時是不重試的,
/**
* 說明 抽取部分代碼
*/
private SendResult sendDefaultImpl(Message msg, final CommunicationMode
communicationMode, final SendCallback sendCallback, final long timeout) {
//1、獲取當前時間
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev ;
//2、去服務器看下有沒有主題訊息
TopicPublishInfo topicPublishInfo =
this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
Boolean callTimeout = false;
//3、通過這里可以很明顯看出 如果不是同步發送訊息 那么訊息重試只有1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 +
this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
//4、根據設定的重試次數,回圈再去獲取服務器主題訊息
for (times = 0; times < timesTotal; times++) {
MessageQueue mqSelected =
this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
//5、前后時間對比 如果前后時間差 大于 設定的等待時間 那么直接跳出for回圈了 這就
說明連接超時是不進行多次連接重試的
if (timeout < costTime) {
callTimeout = true;
break;
}
//6、如果超時直接報錯
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call
timeout");
}
}
}
1.4.2 重試總結
通過這段原始碼很明顯可以看出以下幾點
- 如果是異步發送那么send次數只有1次
- 對于同步而言,超時例外是不會再去重試,
- 因為發生重試是在一個for 回圈里去重試,所以它是立即重試而不是隔一段時間去重試,
1.5 ?禁止自動創建topic
1.5.1 自動創建TOPIC流程
autoCreateTopicEnable 設定為true 標識開啟自動創建topic
- 訊息發送時如果根據topic沒有獲取到 路由資訊,則會根據默認的topic去獲取,獲取到路由資訊后選擇一個佇列進行發送,發送時報文會帶上默認的topic以及默認的佇列數量,
- 訊息到達broker后,broker檢測沒有topic的路由資訊,則查找默認topic的路由資訊,查到表示開啟了自動創建topic,則會根據訊息內容中的默認的佇列數量在本broker上創建topic,然后進行訊息存盤,
- broker創建topic后并不會馬上同步給namesrv,而是每30進行匯報一次,更新namesrv上的topic路由資訊,producer會每30s進行拉取一次topic的路由資訊,更新完成后就可以正常發送訊息,更新之前一直都是按照默認的topic查找路由資訊,
1.5.2 為什么不能開啟自動創建
上述 broker 中流程會有一個問題,就是在producer更新路由資訊之前的這段時間,如果訊息只發送到了broker-a,則broker-b上不會創建這個topic的路由資訊,broker互相之間不通信,當producer更新之后,獲取到的broker串列只有broker-a,就永遠不會輪詢到broker-b的佇列(因為沒有路由資訊),所以我們生產通常關閉自動創建broker,而是采用手動創建的方式,
1.6 發送端規避
注意了,這里我們發現,有可能在實際的生產程序中,我們的 RocketMQ 有幾臺服務器構成的集群,其中有可能是一個主題 TopicA 中的 4 個佇列分散在 Broker1、Broker2、Broker3 服務器上,

如果這個時候 Broker2 掛了,我們知道,但是生產者不知道(因為生產者客戶端每隔 30S 更新一次路由,但是 NamServer 與 Broker 之間的心跳檢測間隔是 10S,所以生產者最快也需要 30S 才能感知Broker2 掛了),所以發送到 queue2 的訊息會失敗,RocketMQ 發現這次訊息發送失敗后,就會將Broker2排除在訊息的選擇范圍,下次再次發送訊息時就不會發送到 Broker2,這樣做的目的就是為了提高發送訊息的成功率,
2. 消費端保障
2.1 注意冪等性
**“至少一次送達”**的訊息交付策略,和訊息重復消費是一對共生的因果關系,要做到不丟訊息就無法避免訊息重復消費,原因很簡單,試想一下這樣的場景:客戶端接收到訊息并完成了消費,在消費確認程序中發生了通訊錯誤,從Broker的角度是無法得知客戶端是在接收訊息程序中出錯還是在消費確認程序中出錯,為了確保不丟訊息,重發訊息是唯一的選擇,
有了訊息冪等消費約定的基礎,RocketMQ就能夠有針對性地采取一些性能優化措施,例如:并行消費、消費進度同步機制等,這也是RocketMQ性能優異的原因之一,
2.2 訊息消費模式(從維度劃分)
從不同的維度劃分,Consumer支持以下消費模式:
- 廣播消費模式下,訊息消費失敗不會進行重試,消費進度保存在Consumer端;
- 集群消費模式下,訊息消費失敗有機會進行重試,消費進度集中保存在Broker端,
2.2.1 集群消費
使用相同 Group ID 的訂閱者屬于同一個集群,同一個集群下的訂閱者消費邏輯必須完全一致(包括 Tag 的使用),這些訂閱者在邏輯上可以認為是一個消費節點

注意事項
- 消費端集群化部署, 每條訊息只需要被處理一次,
- 由于消費進度在服務端維護, 可靠性更高,
- 集群消費模式下,每一條訊息都只會被分發到一臺機器上處理,如果需要被集群下的每一臺機器都處理,請使用廣播模式,
- 集群消費模式下,不保證每一次失敗重投的訊息路由到同一臺機器上,因此處理訊息時不應該做任何確定性假設,
2.2.2 廣播消費
廣播消費指的是:一條訊息被多個consumer消費,即使這些consumer屬于同一個ConsumerGroup,訊息也會被ConsumerGroup中的每個Consumer都消費一次,廣播消費中ConsumerGroup概念可以認為在訊息劃分方面無意義,

注意事項

2.2.3 集群模式模擬廣播
如果業務需要使用廣播模式,也可以創建多個 Group ID,用于訂閱同一個 Topic,

注意事項
- 每條訊息都需要被多臺機器處理,每臺機器的邏輯可以相同也可以不一樣,
- 消費進度在服務端維護,可靠性高于廣播模式,
- 對于一個 Group ID 來說,可以部署一個消費端實體,也可以部署多個消費端實體,當部署多個消費端實體時,實體之間又組成了集群模式(共同分擔消費訊息),假設 Group ID 1 部署了三個消費者實體 C1、C2、C3,那么這三個實體將共同分擔服務器發送給 Group ID 1 的訊息,同時,實體之間訂閱關系必須保持一致,
2.3 訊息消費的推、拉模式
RocketMQ訊息消費本質上是基于的拉(pull)模式,consumer主動向訊息服務器broker拉取訊息,
- 推訊息模式下,消費進度的遞增是由RocketMQ內部自動維護的;
- 拉訊息模式下,消費進度的變更需要上層應用自己負責維護,RocketMQ只提供消費進度保存和查詢功能,
2.3.1 推模式(PUSH)
我們上面使用的消費者都是PUSH模式,也是最常用的消費模式,
由訊息中間件(MQ訊息服務器代理)主動地將訊息推送給消費者;采用Push方式,可以盡可能實時地將訊息發送給消費者進行消費,但是,在消費者的處理訊息的能力較弱的時候(比如,消費者端的業務系統處理一條訊息的流程比較復雜,其中的呼叫鏈路比較多導致消費時間比較久,概括起來地說就是“慢消費問題”),而MQ不斷地向消費者Push訊息,消費者端的緩沖區可能會溢位,導致例外,
實作方式,代碼上使用 DefaultMQPushConsumer
consumer把輪詢程序封裝了,并注冊MessageListener監聽器,取到訊息后,喚醒MessageListener的consumeMessage()來消費,對用戶而言,感覺訊息是被推送(push)過來的,主要用的也是這種方式,
2.3.2 拉模式(PULL)
RocketMQ的PUSH模式是由PULL模式來實作的,
由消費者客戶端主動向訊息中間件(MQ訊息服務器代理)拉取訊息;采用Pull方式,如何設定Pull訊息的頻率需要重點去考慮,舉個例子來說,可能1分鐘內連續來了1000條訊息,然后2小時內沒有新訊息產生(概括起來說就是“訊息延遲與忙等待”),如果每次Pull的時間間隔比較久,會增加訊息的延遲,即訊息到達消費者的時間加長,MQ中訊息的堆積量變大;若每次Pull的時間間隔較短,但是在一段時間內MQ中并沒有任何訊息可以消費,那么會產生很多無效的Pull請求的RPC開銷,影響MQ整體的網路性能,
2.3.3 注意事項
注意:RocketMQ 4.6.0版本后將棄用DefaultMQPullConsumer,DefaultMQPullConsumer方式需要手動管理偏移量,官方已經被廢棄,將在2022年進行洗掉

現在推薦使用DefaultLitePullConsumer,該類是官方推薦使用的手動拉取的實作類,偏移量提交由RocketMQ管理,不需要手動管理,
2.4 訊息確認機制
為了保證資料不被丟失,RocketMQ支持訊息確認機制,即ack,發送者為了保證訊息肯定消費成功,只有使用方明確表示消費成功,RocketMQ才會認為訊息消費成功,中途斷電,拋出例外等都不會認為成功——即都會重新投遞,
2.4.1 確認消費
業務實作消費回呼的時候,當且僅當此回呼函式回傳ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才會認為這批訊息(默認是1條)是消費完成的,
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New
Messages: " + msgs);
execute();
//執行真正消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
)
2.4.2 消費例外
如果這時候訊息消費失敗,例如資料庫例外,余額不足扣款失敗等一切業務認為訊息需要重試的場景,只要回傳ConsumeConcurrentlyStatus.RECONSUME_LATER ,RocketMQ就會認為這批訊息消費失敗了,
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New
Messages: " + msgs);
execute();
//執行真正消費
return ConsumeConcurrentlyStatus.RECONSUME_LATER
}
}
)
為了保證訊息是肯定被至少消費成功一次,RocketMQ會把這批訊息重發回Broker(topic不是原topic而是這個消費組的RETRY topic,可以理解為臨時存放點),在延遲的某個時間點(默認是10秒,業務可設定)后,再次投遞到這個ConsumerGroup,而如果一直這樣重復消費都持續失敗到一定次數(默認16次),就會投遞到DLQ死信佇列,應用可以監控死信佇列來做人工干預,
2.5 訊息重試機制
2.5.1 順序訊息的重試
對于順序訊息,當消費者消費訊息失敗后,訊息佇列RocketMQ會自動不斷地進行訊息重試(每次間隔時間為1秒),這時,應用會出現訊息消費被阻塞的情況,因此,建議您使用順序訊息時,務必保證應用能夠及時監控并處理消費失敗的情況,避免阻塞現象的發生,
2.5.2 無序訊息的重試
無序訊息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗訊息不再重試,繼續消費新的訊息,
2.5.3 重試次數
訊息佇列RocketMQ默認允許每條訊息最多重試16次,每次重試的間隔時間如下,

如果訊息重試16次后仍然失敗,訊息將不再投遞,如果嚴格按照上述重試時間間隔計算,某條訊息在一直消費失敗的前提下,將會在接下來的4小時46分鐘之內進行16次重試,超過這個時間范圍訊息將不再重試投遞,
2.5.4 和生產端重試區別
消費者和生產者的重試還是有區別的,主要有兩點
- 默認重試次數:Product默認是2次,而Consumer默認是16次,
= 重試時間間隔:Product是立刻重試,而Consumer是有一定時間間隔的,它照1S,5S,10S,30S,1M,2M····2H 進行重試,
注意:Product在異步情況重試失效,而對于Consumer在廣播情況下重試失效,
2.5.5 重試配置方式
消費失敗后,重試配置方式,集群消費方式下,訊息消費失敗后期望訊息重試,需要在訊息監聽器介面的實作中明確進行配置(三種方式任選一種):
方式1:回傳RECONSUME_LATER(推薦)
方式2:回傳Null
方式3:拋出例外
無需重試
集群消費方式下,訊息失敗后期望訊息不重試,需要捕獲消費邏輯中可能拋出的例外,最侄訓傳Action.CommitMessage,此后這條訊息將不會再重試,
//注冊訊息監聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext context) {
//訊息處理邏輯拋出例外,訊息將重試,
try {
doConsumeMessage(list);
}
catch (Exception e){
//捕獲消費邏輯中的所有例外,并回傳Action.CommitMessage;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//業務方正常消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423265.html
標籤:其他
上一篇:【Kafka】kafka Removed ??? expired offsets in ??? milliseconds.
