文章目錄
- 1.RocketMq架構
- 2.訊息不丟失
- 2.1 同步發送
- 2.2 異步訊息
- 2.3 刷盤機制
- 2.4 Broker 多副本和高可用
- 2.5 訊息確認
- 2.6 Consumer 重試
- 2.7 事務訊息
- 2.8 訊息索引
- 2.9 極端
1.RocketMq架構

Producer,Consumer,Brocker,Name Server
2.訊息不丟失
1.Producer發送訊息
2.Brocker保存訊息
3.Consumer 消費訊息
4.Brocker主從切換
2.1 同步發送
public void send() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
SendResult sendResult = null;
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
sendResult = producer.send(sendMessage);
} catch (Exception e) {
e.printStackTrace();
}
if (sendResult != null) {
System.out.println(sendResult.getSendStatus());
}
}
同步發送會回傳狀態碼
1.SEND_OK:訊息發送成功,需要注意的是,訊息發送到 broker 后,還有兩個操作:訊息刷盤和訊息同步到 slave 節點,默認這兩個操作都是異步的,只有把這兩個操作都改為同步,SEND_OK 這個狀態才能真正表示發送成功,
2.FLUSH_DISK_TIMEOUT:訊息發送成功但是訊息刷盤超時,
3.FLUSH_SLAVE_TIMEOUT:訊息發送成功但是訊息同步到 slave 節點時超時,
4.SLAVE_NOT_AVAILABLE:訊息發送成功但是 broker 的 slave 節點不可用,
根據狀態碼可以重復訊息,重試的數量為3
2.2 異步訊息
public void sendAsync() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
// TODO 可以在這里加入重試邏輯
}
});
}
異步發送,可以重寫回呼函式,回呼函式捕獲到 Exception 時表示發送失敗,這時可以進行重試,這里設定的重試次數是 3,
2.3 刷盤機制
- 異步刷盤:默認,訊息寫入 CommitLog 時,并不會直接寫入磁盤,而是先寫入 PageCache 快取后回傳成功,然后用后臺執行緒異步把訊息刷入磁盤,異步刷盤提高了訊息吞吐量,但是可能會有訊息丟失的情況,比如斷點導致機器停機,PageCache 中沒來得及刷盤的訊息就會丟失,
- 同步刷盤:訊息寫入記憶體后,立刻請求刷盤執行緒進行刷盤,如果訊息未在約定的時間內(默認 5 s)刷盤成功,就回傳 FLUSH_DISK_TIMEOUT,Producer 收到這個回應后,可以進行重試,同步刷盤策略保證了訊息的可靠性,同時降低了吞吐量,增加了延遲,要開啟同步刷盤,需要增加下面配置
2.4 Broker 多副本和高可用
Broker 為了保證高可用,采用一主多從的方式部署,

訊息發送到 master 節點后,slave 節點會從 master 拉取訊息保持跟 master 的一致,這個程序默認是異步的,即 master 收到訊息后,不等 slave 節點復制訊息就直接給 Producer 回傳成功,
這樣會有一個問題,如果 slave 節點還沒有完成訊息復制,這時 master 宕機了,進行主備切換后就會有訊息丟失,為了避免這個問題,可以采用 slave 節點同步復制訊息,即等 slave 節點復制訊息成功后再給 Producer 回傳發送成功,只需要增加下面的配置:
brokerRole=SYNC_MASTER
同步復制:
- slave 初始化后,跟 master 建立連接并向 master 發送自己的 offset;
- master 收到 slave 發送的 offset 后,將 offset 后面的訊息批量發送給 slave;
- slave 把收到的訊息寫入 commitLog 檔案,并給 master 發送新的 offset;
- master 收到新的 offset 后,如果 offset >= producer 發送訊息后的 offset,給 Producer 回傳 SEND_OK,
2.5 訊息確認
public void consume() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic1", "tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try{
System.out.printf("Receive New Messages: %s", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
如果 Consumer 消費成功,回傳 CONSUME_SUCCESS,提交 offset 并從 Broker 拉取下一批訊息,
2.6 Consumer 重試
- 回傳 RECONSUME_LATER
- 回傳 null
- 拋出例外
Broker 收到這個回應后,會把這條訊息放入重試佇列,重新發送給 Consumer,
- Broker 默認最多重試 16 次,如果重試 16 次都失敗,就把這條訊息放入死信佇列,Consumer 可以訂閱死信佇列進行消費,
- 重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的,
- Consumer 端一定要做好冪等處理,
Consumer 給Brocker 結束重試, 這里是count = 3 的時候結束重試
int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) {
//TODO 把訊息寫入本地存盤
System.out.println("重試次數超過3次");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
2.7 事務訊息
RocketMq支持事務

- Producer 發送half訊息
- Brocker先把訊息寫入topic, RMQ_SYS_TRANS_HALF_TOPIC的佇列,然后回傳half訊息給producer成功
- Producer 執行本地事務,成功后給 Broker 發送 commit 命令, 或者rollback
- Broker 收到 commit 請求后把訊息狀態更改為成功并把訊息推到真正的 topic;
- Consumer拉取訊息進行消費
public class ProducerTransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
/**
* 這里執行本地事務,執行成功回傳LocalTransactionState.COMMIT_MESSAGE,執行失敗回傳
* LocalTransactionState.ROLLBACK_MESSAGE,如果回傳LocalTransactionState.UNKNOW,
* Broker會回來查詢,所以需要記錄事務執行狀態
*/
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
/**
* 這里查詢事務執行狀態,根據事務狀態回傳LocalTransactionState.COMMIT_MESSAGE或
* LocalTransactionState.ROLLBACK_MESSAGE,如果沒有查詢到回傳LocalTransactionState.UNKNOW,
* Broker會再次查詢,可以記錄查詢次數,超過次數后回傳ROLLBACK_MESSAGE
*/
return LocalTransactionState.UNKNOW;
}
}
2.8 訊息索引
RocketMQ 核心的資料檔案有 3 個:CommitLog、ConsumeQueue 和 Index, Index是一個索引檔案

查找訊息時,首先根據訊息 key 的 hashcode 計算出 Hash 槽的位置,然后讀取 Hash 槽的值計算 Index 條目的位置,從Index 條目位置讀取到訊息在 CommitLog 檔案中的 offset,從而查找到訊息,
Producer 發送訊息時,可以指定一個 key
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");
這樣可以通過 RocketMQ 提供的命令或者管理控制臺來查詢訊息是否發送成功,
2.9 極端
極端情況比如Rocketmq集群掛了, Producer發送訊息一定失敗, 可以在Producer做降級, 把發送的訊息先存盤在磁盤或者資料庫中, 然后等到Rocketmq集群恢復了再推送訊息
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423504.html
標籤:其他
