文章目錄
- 1 示例模式
- 2 安裝與配置 RocketMQ
- 3 運行服務
- 3.1 啟動 NameServer
- 3.2 啟動 broker
- 4 生產者
- 4.1 事務監聽器
- 4.2 事務訊息生產者
- 5 消費者
- 6 測驗
1 示例模式
RocketMQ 事務訊息示例包含一個生產者、消費者、NameServer 以及 Broker 服務,它們之間的關系如下:

RocketMQ架構上主要分為四部分[^1]:
-
Producer:訊息發布的角色,支持分布式集群方式部署,Producer通過MQ的負載均衡模塊選擇相應的Broker集群佇列進行訊息投遞,投遞的程序支持快速失敗并且低延遲,
-
Consumer:訊息消費的角色,支持分布式集群方式部署,支持以push推,pull拉兩種模式對訊息進行消費,同時也支持集群方式和廣播方式的消費,它提供實時訊息訂閱機制,可以滿足大多數用戶的需求,
-
NameServer:NameServer是一個非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動態注冊與發現,主要包括兩個功能:Broker管理,NameServer接受Broker集群的注冊資訊并且保存下來作為路由資訊的基本資料,然后提供心跳檢測機制,檢查Broker是否還存活;路由資訊管理,每個NameServer將保存關于Broker集群的整個路由資訊和用于客戶端查詢的佇列資訊,然后Producer和Conumser通過NameServer就可以知道整個Broker集群的路由資訊,從而進行訊息的投遞和消費,NameServer通常也是集群的方式部署,各實體間相互不進行資訊通訊,Broker是向每一臺NameServer注冊自己的路由資訊,所以每一個NameServer實體上面都保存一份完整的路由資訊,當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由資訊,Producer,Consumer仍然可以動態感知Broker的路由的資訊,
-
BrokerServer:Broker主要負責訊息的存盤、投遞和查詢以及服務高可用保證,
2 安裝與配置 RocketMQ
(1)下載解壓
到 RocketMQ 官網下載二進制版本包,解壓到磁盤下,
(2)JVM 版本
注意:openjdk11 不能運行(會拋出Error: Could not create the Java Virtual Machine.),jdk8 可以,
如果cmd 命令一閃而過,可以在命令代碼的末尾加入 pause,以觀察出錯日志,
(3)配置環境變數
配置 ROCKETMQ_HOME 與 NAMESEV_ADDR 環境變數,其中 ROCKETMQ_HOME 是 RocketMQ 的解壓后的路徑,NAMESEV_ADDR 是 NameServer 的訪問地址,


(4)在應用工程的 pom 中配置
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
3 運行服務
使用 powershell 分別啟動 name server與 broker 服務,
3.1 啟動 NameServer
啟動 name server:
cd C:\programs\rocketmq-4.9.2\bin
.\mqnamesrv.cmd
?
輸出:
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
3.2 啟動 broker
啟動 broker:
cd C:\programs\rocketmq-4.9.2\bin
.\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
-n 指定 NameServer 地址,autoCreateTopicEnable 為 true 表示自動創建主題,
輸出:
The broker[DENIRO-LEE, 192.168.17.1:10911] boot success. serializeType=JSON and name server is localhost:9876
4 生產者
4.1 事務監聽器
事務監聽器需要實作兩個方法,其中的 executeLocalTransaction 方法用于執行本地事務,checkLocalTransaction 方法用于 broker 回查本地事務的狀態,具體邏輯如下:
- 定義了一個原子整型計數器 transactionIndex,用于回圈事務狀態,
- 定義了一個執行緒安全 ConcurrentHashMap,名為 localTrans,用于存放事務 ID與狀態碼,
- 在 executeLocalTransaction 方法中,遞增 transactionIndex,然后除以 3,求得余數,因為事務只有三種狀態:UNKNOW、ROLLBACK_MESSAGE 和 COMMIT_MESSAGE,所以這里除數為 3,然后回傳 UNKNOW 狀態,讓 broker 回查這批訊息的事務狀態,
- 在 checkLocalTransaction 方法中,會傳入由 broker 給出的事務 ID,然后依據這個事務 ID 從 localTrans 中取出這個事務 ID 的狀態碼,接著依據不同的狀態碼,列印日志并回傳對應的事務狀態,
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
Integer status = localTrans.get(transactionId);
if (null != status) {
switch (status) {
case 0:
System.out.printf("%s%s%n", "事務ID -> "+transactionId," 回傳 UNKNOW");
return LocalTransactionState.UNKNOW;
case 1:
System.out.printf("%s%s%n", "事務ID -> "+transactionId, "回傳 COMMIT_MESSAGE");
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
System.out.printf("%s%s%n", "事務ID -> "+transactionId, "回傳 ROLLBACK_MESSAGE");
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
System.out.printf("%s%s%n", "事務ID -> "+transactionId, "回傳 UNKNOW");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
4.2 事務訊息生產者
該生產者會產生 10 條事務訊息,具體邏輯如下:
- 創建事務監聽器 transactionListener,
- 創建事務訊息生產者 producer,
- 生產者設定 NameServer的地址,
- 創建執行緒池執行器 ThreadPoolExecutor,
- 生產者設定執行緒池執行器,
- 生產者設定事務監聽器,
- 啟動生產者,
- 創建多個標簽,
- 回圈生成 10 條事務訊息,每個訊息創建后,使用生產者以事務的方式發送該訊息,列印并休眠 10 ms,
- 執行緒休眠一段時間,用于回應 broker 的回查請求,
- 關閉生產者,
package com.deniro.rocketmq.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.example.transaction.TransactionListenerImpl;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
/**
* 生產事務訊息的生產者
*
* @author Deniro Lee
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer(
"deniro_transaction_message_group");
// 設定NameServer的地址
producer.setNamesrvAddr("localhost:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("transactionMsg", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", "第" + (i+1) + "條 ->" + sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
5 消費者

消費者只能獲取事務狀態為 COMMIT_MESSAGE 的訊息,具體邏輯如下:
- 創建消費者,
- 消費者設定NameServer的地址,
- 消費者訂閱Topic,
- 消費者注冊回呼實作類來處理從broker拉取回來的訊息,回呼實作類的方法回傳訊息已經被成功消費的狀態,
- 啟動消費者,
package com.deniro.rocketmq.base;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 消費者
*
* @author Deniro Lee
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 實體化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("deniro_transaction_message_group");
// 設定NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的訊息
consumer.subscribe("transactionMsg", "*");
// 注冊回呼實作類來處理從broker拉取回來的訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 標記該訊息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者實體
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
6 測驗
生產者端發送 10 條事務訊息:
第1條 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF410000, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=432]
第2條 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF550001, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=433]
第3條 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF640002, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=3], queueOffset=434]
第4條 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF740003, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=0], queueOffset=435]
第5條 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF840004, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=436]
第6條 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF930005, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=437]
第7條 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFA20006, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=3], queueOffset=438]
第8條 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFB20007, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=0], queueOffset=439]
第9條 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFC20008, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=440]
第10條 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFD10009, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=441]
事務監聽器對 3 取余數,依據余數的值來回傳事務狀態,
| 余數值 | 說明 |
|---|---|
| 0 | 回傳 UNKNOW,表示未知,這樣 broker 會不斷回查生產者的事務狀態(限制為 15 次,可在 broker 端配置,超過限制則丟棄該訊息并記錄日志), |
| 1 | 回傳 ROLLBACK_MESSAGE,表示事務回滾,這樣消費者端就不會收到這條訊息, |
| 2 | 回傳 COMMIT_MESSAGE,表示事務提交,這樣消費者端就會收到這條訊息,執行本地事務操作, |
生產者端事務監聽器輸出:
事務ID -> 7F000001771418B4AAC24166CF410000 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CF640002回傳 ROLLBACK_MESSAGE
事務ID -> 7F000001771418B4AAC24166CF550001回傳 COMMIT_MESSAGE
事務ID -> 7F000001771418B4AAC24166CF740003 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CF840004回傳 COMMIT_MESSAGE
事務ID -> 7F000001771418B4AAC24166CF930005回傳 ROLLBACK_MESSAGE
事務ID -> 7F000001771418B4AAC24166CFA20006 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CFB20007回傳 COMMIT_MESSAGE
事務ID -> 7F000001771418B4AAC24166CFD10009 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CFC20008回傳 ROLLBACK_MESSAGE
事務ID -> 7F000001771418B4AAC24166CF410000 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CF740003 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CFA20006 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CFD10009 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CF410000 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CF740003 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CFA20006 回傳 UNKNOW
事務ID -> 7F000001771418B4AAC24166CFD10009 回傳 UNKNOW
消費者端只會收到狀態為 COMMIT_MESSAGE 的訊息:

通過 RocketMQ 可以實作可靠訊息最終一致性,適用于執行周期長且實時性要求不高的場景,優點是避免了分布式事務中的同步阻塞的影響,并實作了兩個服務的解耦,不足是如果消費者端事務例外回滾,生產者端不會回滾,而且訊息可能會被重復消費,因此需要在消費者端進行冥等處理,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/438671.html
標籤:其他
上一篇:CentOS 7.6環境安裝kafka_2.13-3.0.0[單機版]
下一篇:Caused by: java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 1752 because the siz
