接上一篇:RocketMQ入門到入土(一)新手也能看懂的原理和實戰!
一、事務訊息的由來
1、案例
參考官方的購物案例:
小明購買一個100元的東西,賬戶扣款100元的同時需要保證在下游的積分系統給小明這個賬號增加100積分,賬號系統和積分系統是兩個獨立是系統,一個要減少100元,一個要增加100積分,如下圖:

2、問題
-
賬號服務扣款成功了,通知積分系統也成功了,但是積分增加的時候失敗了,資料不一致了,
-
賬號服務扣款成功了,但是通知積分系統失敗了,所以積分不會增加,資料不一致了,
3、方案
RocketMQ針對第一個問題解決方案是:如果消費失敗了,是會自動重試的,如果重試幾次后還是消費失敗,那么這種情況就需要人工解決了,比如放到死信佇列里然后手動查原因進行處理等,
RocketMQ針對第二個問題解決方案是:如果你扣款成功了,但是往mq寫訊息的時候失敗了,那么RocketMQ會進行回滾訊息的操作,這時候我們也能回滾我們扣款的操作,
二、事務訊息的原理
1、原理圖解

2、詳細程序
1.Producer發送半訊息(Half Message)到broker,
我真想吐槽一句為啥叫半訊息,難以理解,其實這就是prepare message,預發送訊息,
-
Half Message發送成功后開始執行本地事務,
-
如果本地事務執行成功的話則回傳commit,如果執行失敗則回傳rollback,(這個是在事務訊息的回呼方法里由開發者自己決定commit or rollback)
Producer發送上一步的commit還是rollback到broker,這里有兩種情況:
1.如果broker收到了commit/rollback訊息 :
-
如果收到了commit,則broker認為整個事務是沒問題的,執行成功的,那么會下發訊息給Consumer端消費,
-
如果收到了rollback,則broker認為本地事務執行失敗了,broker將會洗掉Half Message,不下發給Consumer端,
2.如果broker未收到訊息(如果執行本地事務突然宕機了,相當本地事務執行結果回傳unknow,則和broker未收到確認訊息的情況一樣處理,):
-
broker會定時回查本地事務的執行結果:如果回查結果是本地事務已經執行則回傳commit,若未執行,則回傳rollback,
-
Producer端回查的結果發送給Broker,Broker接收到的如果是commit,則broker視為整個事務執行成功,如果是rollback,則broker視為本地事務執行失敗,broker洗掉Half Message,不下發給consumer,如果broker未接收到回查的結果(或者查到的是unknow),則broker會定時進行重復回查,以確保查到最終的事務結果,重復回查的時間間隔和次數都可配,
三、事務訊息實作流程
1、實作流程

簡單來看就是:事務訊息是個監聽器,有回呼函式,回呼函式里我們進行業務邏輯的操作,比如給賬戶-100元,然后發訊息到積分的mq里,這時候如果賬戶-100成功了,且發送到mq成功了,則設定訊息狀態為commit,這時候broker會將這個半訊息發送到真正的topic中,一開始發送他是存到半訊息佇列里的,并沒存在真實topic的佇列里,只有確認commit后才會轉移,
2、補救方案
如果事務因為中斷,或是其他的網路原因,導致無法立即回應的,RocketMQ當做UNKNOW處理,RocketMQ事務訊息還提供了一個補救方案:定時查詢事務訊息的事務狀態,這也是一個回呼函式,這里面可以做補償,補償邏輯開發者自己寫,成功的話自己回傳commit就完事了,
四、事務訊息代碼實體
1、代碼
package com.chentongwei.mq.rocketmq; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.Date; /** * Description: * * @author TongWei.Chen 2020-06-21 11:32:58 */ public class ProducerTransaction2 { public static void main(String[] args) throws Exception { TransactionMQProducer producer = new TransactionMQProducer("my-transaction-producer"); producer.setNamesrvAddr("124.57.180.156:9876"); // 回呼 producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object arg) { LocalTransactionState state = null; //msg-4回傳COMMIT_MESSAGE if(message.getKeys().equals("msg-1")){ state = LocalTransactionState.COMMIT_MESSAGE; } //msg-5回傳ROLLBACK_MESSAGE else if(message.getKeys().equals("msg-2")){ state = LocalTransactionState.ROLLBACK_MESSAGE; }else{ //這里回傳unknown的目的是模擬執行本地事務突然宕機的情況(或者本地執行成功發送確認訊息失敗的場景) state = LocalTransactionState.UNKNOW; } System.out.println(message.getKeys() + ",state:" + state); return state; } /** * 事務訊息的回查方法 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { if (null != messageExt.getKeys()) { switch (messageExt.getKeys()) { case "msg-3": System.out.println("msg-3 unknow"); return LocalTransactionState.UNKNOW; case "msg-4": System.out.println("msg-4 COMMIT_MESSAGE"); return LocalTransactionState.COMMIT_MESSAGE; case "msg-5": //查詢到本地事務執行失敗,需要回滾訊息, System.out.println("msg-5 ROLLBACK_MESSAGE"); return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); //模擬發送5條訊息 for (int i = 1; i < 6; i++) { try { Message msg = new Message("transactionTopic", null, "msg-" + i, ("測驗,這是事務訊息! " + i).getBytes()); producer.sendMessageInTransaction(msg, null); } catch (Exception e) { e.printStackTrace(); } } } }
2、結果
msg-1,state:COMMIT_MESSAGE
msg-2,state:ROLLBACK_MESSAGE
msg-3,state:UNKNOW
msg-4,state:UNKNOW
msg-5,state:UNKNOW
msg-3 unknow
msg-3 unknow
msg-5 ROLLBACK_MESSAGE
msg-4 COMMIT_MESSAGE
msg-3 unknow
msg-3 unknow
msg-3 unknow
msg-3 unknow
3、管控臺

4、結果分析
-
只有msg-1和msg-4發送成功了,msg-4在msg-1前面是因為msg-1先成功的,msg-4是回查才成功的,按時間倒序來的,
-
先來輸出五個結果,對應五條訊息
msg-1,state:COMMIT_MESSAGE
msg-2,state:ROLLBACK_MESSAGE
msg-3,state:UNKNOW
msg-4,state:UNKNOW
msg-5,state:UNKNOW
-
然后進入了回查,msg-3還是unknow,msg-5回滾了,msg-4提交了事務,所以這時候msg-4在管控臺里能看到了,
-
過了一段時間再次回查msg-3,發現還是unknow,所以一直回查,
回查的時間間隔和次數都是可配的,默認是回查15次還失敗的話就會把這個訊息丟掉了,
五、疑問
疑問:Spring事務、常規的分布式事務不行嗎?Rocketmq的事務是否多此一舉了呢?
MQ用于解耦,之前是分布式事務直接操作了賬號系統和積分系統,但是他兩就是強耦合的存在,如果中間插了個mq,賬號系統操作完發訊息到mq,這時候只要保證發送成功就提交,發送失敗則回滾,這步怎么保證,就是靠事務了,而且用RocketMQ做分布式事務的也蠻多的,
六、順序訊息解釋
1、概述
RocketMQ的訊息是存盤到Topic的queue里面的,queue本身是FIFO(First Int First Out)先進先出佇列,所以單個queue是可以保證有序性的,
但問題是1個topic有N個queue,作者這么設計的好處也很明顯,天然支持集群和負載均衡的特性,將海量資料均勻分配到各個queue上,你發了10條訊息到同一個topic上,這10條訊息會自動分散在topic下的所有queue中,所以消費的時候不一定是先消費哪個queue,后消費哪個queue,這就導致了無序消費,
2、圖解

3、再次分析
一個Producer發送了m1、m2、m3、m4四條訊息到topic上,topic有四個佇列,由于自帶的負載均衡策略,四個佇列上分別存盤了一條訊息,queue1上存盤的m1,queue2上存盤的m2,queue3上存盤的m3,queue4上存盤的m4,Consumer消費的時候是多執行緒消費,所以他無法保證先消費哪個佇列或者哪個訊息,比如發送的時候順序是m1,m2,m3,m4,但是消費的時候由于Consumer內部是多執行緒消費的,所以可能先消費了queue4佇列上的m4,然后才是m1,這就導致了無序,
七、順序訊息解決方案
1、方案一
很簡單,問題產生的關鍵在于多個佇列都有訊息,我消費的時候又不知道哪個佇列的訊息是最新的,那么思路就有了,發訊息的時候你要想保證有序性的話,就都給我發到一個queue上,然后消費的時候因為只有那一個queue上有訊息且queue是FIFO,先進先出,所以正常消費就完了,
很完美,而且RocketMQ也給我們提供了這種發訊息的時候選擇queue的api(MessageQueueSelector),直接上代碼,
2、代碼一
2.1、生產者
import java.util.List; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; /** * 訊息發送者 */ public class Producer5 { public static void main(String[] args)throws Exception { DefaultMQProducer producer = new DefaultMQProducer("my-order-producer"); producer.setNamesrvAddr("124.57.180.156:9876"); producer.start(); for (int i = 0; i < 5; i++) { Message message = new Message("orderTopic", ("hello!" + i).getBytes()); producer.send( // 要發的那條訊息 message, // queue 選擇器 ,向 topic中的哪個queue去寫訊息 new MessageQueueSelector() { // 手動 選擇一個queue @Override public MessageQueue select( // 當前topic 里面包含的所有queue List<MessageQueue> mqs, // 具體要發的那條訊息 Message msg, // 對應到 send() 里的 args,也就是2000前面的那個0 // 實際業務中可以把0換成實際業務系統的主鍵,比如訂單號啥的,然后這里做hash進行選擇queue等,能做的事情很多,我這里做演示就用第一個queue,所以不用arg, Object arg) { // 向固定的一個queue里寫訊息,比如這里就是向第一個queue里寫訊息 MessageQueue queue = mqs.get(0); // 選好的queue return queue; } }, // 自定義引數:0 // 2000代表2000毫秒超時時間 0, 2000); } } }
2.2、消費者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * Description: * * @author TongWei.Chen 2020-06-22 11:17:47 */ public class ConsumerOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer"); consumer.setNamesrvAddr("124.57.180.156:9876"); consumer.subscribe("orderTopic", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody()) + " Thread:" + Thread.currentThread().getName() + " queueid:" + msg.getQueueId()); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer start..."); } }
2.3、輸出結果
Consumer start...
hello!0 Thread:ConsumeMessageThread_1 queueid:0
hello!1 Thread:ConsumeMessageThread_1 queueid:0
hello!2 Thread:ConsumeMessageThread_1 queueid:0
hello!3 Thread:ConsumeMessageThread_1 queueid:0
hello!4 Thread:ConsumeMessageThread_1 queueid:0
很完美,有序輸出!
3、情況二
比如你新需求:把未支付的訂單都放到queue1里,已支付的訂單都放到queue2里,支付例外的訂單都放到queue3里,然后你消費的時候要保證每個queue是有序的,不能消費queue1一條直接跑到queue2去了,要逐個queue去消費,
這時候思路是發訊息的時候利用自定義引數arg,訊息體里肯定包含支付狀態,判斷是未支付的則選擇queue1,以此類推,這樣就保證了每個queue里只包含同等狀態的訊息,那么消費者目前是多執行緒消費的,肯定亂序,三個queue隨機消費,解決方案更簡單,直接將消費端的執行緒數改為1個,這樣佇列是FIFO,他就逐個消費了,RocketMQ也為我們提供了這樣的api,如下兩句:
// 最大執行緒數1 consumer.setConsumeThreadMax(1); // 最小執行緒數 consumer.setConsumeThreadMin(1);
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/154279.html
標籤:Java
