1、 RocketMQ安裝測驗
1.1 下載解壓
下載地址:https://rocketmq.apache.org/release-notes/
rocketmq-all-5.0.0-bin-release.zip
下載后上傳到服務器;
解壓命令# unzip rocketmq-all-5.0.0-bin-release.zip
1.2 啟動 測驗
RocketMQ默認配置是比較好的,這樣可以直接應用于生產環境,所以如果機器記憶體較小,啟動會因為記憶體不足失敗,為了避免后面啟動失敗,選擇先修改其記憶體大小,一般阿里云服務器是滿足不了默認記憶體,
手動調整JVM的配置,單位從g改為m
1.2.1 啟動nameserver
1.2.1.1 修改runbroker.sh和runserver.sh

1.2.1.2 runbroker.sh
-server -Xms256m -Xmx256m -Xmn128m

1.2.1.3 runserver.sh
-server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m

1.2.1.4 啟動nameserver
解壓目錄執行# nohup ./bin/mqnamesrv -n 1.117.75.57(自己的ip):9876 &

1.2.2 啟動broker
1.2.2.1 修改broker.conf

添加namesrvAddr 和 brokerIP1:

1.2.2.3 啟動 borker
解壓目錄執行# nohup ./bin/mqbroker -n 1.117.75.57:9876 -c ./conf/broker.conf &
1.2.2.4 查看啟動情況
jps

1.2.3 測驗
由于服務器記憶體可能比較小,建議先關閉其他應用,比如rabbitmq,docker的容器等;
還需要開啟幾個埠:9876,10909,10910,10911;
1.2.3.1 生產者
匯出環境變數# export NAMESRV_ADDR=1.117.75.57:9876
發送訊息# bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

1.2.3.2 消費者
匯出環境變數# export NAMESRV_ADDR=1.117.75.57:9876
消費訊息# bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

1.2.4 關閉命令
關閉nameserver# bin/mqshutdown namesrv
關閉broker# bin/mqshutdown broker
1.2.5 RocketMQ控制臺
1.2.5.1 下載,解壓,修改配置資訊

1.2.5.2 訪問控制臺
localhost:9696

2、RocketMQ框架原理
2.1 框架

2.2 概念
整體可以分成4個角色,分別是:NameServer,Broker,Producer,Consumer:
- Broker(郵遞員):Broker是RocketMQ的核心,負責訊息的接收,存盤,投遞等功能;
- NameServer(郵局):訊息佇列的協調者,Broker向它注冊路由資訊,同時Producer和Consumer向其獲取路由資訊Producer(寄件人)訊息的生產者,需要從NameServer獲取Broker資訊,然后與Broker建立連接,向Broker發送訊息;
- Consumer(收件人) :訊息的消費者,需要從NameServer獲取Broker資訊,然后與Broker建立連接,從Broker獲取訊息;
- Topic(地區):用來區分不同型別的訊息,發送和接收訊息前都需要先創建Topic,針對Topic來發送和接收訊息Message Queue(郵件)為了提高性能和吞吐量,引入了Message Queue,一個Topic可以設定一個或多個Message Queue,這樣訊息就可以并行往各個Message Queue發送訊息,消費者也可以并行的從多個Message Queue讀取訊息;
- Message:Message 是訊息的載體;
- Producer Group:生產者組,簡單來說就是多個發送同一類訊息的生產者稱之為一個生產者組,
- Consumer Group:消費者組,消費同一類訊息的多個 consumer 實體組成一個消費者組,
3、RocketMQ整合
3.1 rocketmq模塊 發送訊息
3.2.1.1 依賴
<!-- rocket -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
3.2.1.2 配置
# rocketmq配置
rocketmq:
#rocketMQ服務的地址
name-server: 1.117.75.57:9876
# 生產者組
producer:
group: kh96-sendsms-group
3.2.1.3 請求
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗發送訊息到用戶中心,用戶中心給手機號發資訊
*/
@RequestMapping("/testRocketMQSendMsg")
public String testRocketMQSendMsg(@RequestParam String phoneNo) {
log.info("------ 使用RocketMQ,測驗給手機:{},發送訊息 -------", phoneNo);
//使用RocketMQ發送訊息
rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
return "send msg success";
}
3.2 user模塊 消費訊息
1.添加加rocketmq的依賴;
2.用戶服務,監聽發送短信的請求發送訊息:
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 用戶服務,監聽發送短信的請求發送訊息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-user-sms-group", //組 隨便寫
topic = "rocketmq-send-sms-kh96" //訊息佇列,發送的時候指定的
)
public class SendSmsListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("***** 接收發送資訊的請求,給手機:{},發送訊息 ******", message);
}
}
3.3 測驗
3.3.1 發送請求

3.3.2 發送訊息模塊日志

3.3.3 接收訊息模塊日志

3.3.4 控制臺查看訊息詳情


4、發送不同型別的訊息
4.1 發送可靠同步訊息
4.1.1 請求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗roketmq-發送可靠同步訊息
*/
@RequestMapping("/testRocketMQSendMsgSync")
public String testRocketMQSendMsgSync(@RequestParam String syncMsg) {
log.info("------ 使用RocketMQ,發送可靠同步訊息{} -------", syncMsg);
//使用RocketMQ發送訊息,拿到同步結果
SendResult sendResult = rocketMQTemplate.syncSend("rocketmq-sync-msg-kh96", syncMsg);
log.info("------ 使用RocketMQ,發送可靠同步訊息結果:{} -------", sendResult);
return "send sync msg success";
}
4.1.2 發送請求

4.1.3 同步結果

4.2 發送可靠異步訊息
4.2.1 請求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗roketmq-發送可靠異步訊息
*/
@RequestMapping("/testRocketMQSendMsgAsync")
public String testRocketMQSendMsgAsync(@RequestParam String asyncMsg) {
log.info("------ 使用RocketMQ,發送可靠異步訊息:{} -------", asyncMsg);
//使用RocketMQ發送訊息
rocketMQTemplate.asyncSend("rocketmq-sync-msg-kh96",
asyncMsg,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("------ 可靠異步發送成功回呼 ------");
}
@Override
public void onException(Throwable throwable) {
log.info("------ 可靠異步發送失敗回呼 ------");
}
});
return "send async msg success";
}
4.2.2 發送請求

4.2.3 回呼結果

4.3 發送單項訊息,只發不收結果
4.3.1 請求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗roketmq-發送單向訊息,只發不收結果
*/
@RequestMapping("/testRocketMQSendMsgOneWay")
public String testRocketMQSendMsgOneWay(@RequestParam String oneWayMsg) {
log.info("------ 使用RocketMQ,發送單向訊息給:{} -------", oneWayMsg);
//使用RocketMQ發送訊息
rocketMQTemplate.sendOneWay("rocketmq-oneWay-msg-kh96", oneWayMsg);
return "send oneWay msg success";
}
4.3.2 發送請求

4.3.3 日志查看

4.4 發送順序訊息
4.4.1 請求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗roketmq-發送順序訊息
*/
@RequestMapping("/testRocketMQSendMsgOrderly")
public String testRocketMQSendMsgOrderly(@RequestParam String orderlyMsgs) {
log.info("------ 使用RocketMQ,發送順序訊息:{} -------", orderlyMsgs);
//使用RocketMQ發送順序訊息,必須要提供一個唯一的標識di,比如用戶編號等
String userId = UUID.randomUUID().toString().replace("-", "");
//發送多條順序訊息,模擬iang訊息分割成多個符號發送
Arrays.asList(orderlyMsgs.split("")).
forEach(orderlyMsg -> rocketMQTemplate.syncSendOrderly("rocketmq-orderly-msg-kh96", orderlyMsg, userId));
return "send orderly msg success";
}
4.4.2 監聽器
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RocketMQ 監聽順序訊息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-orderly-sms-group",
topic = "rocketmq-orderly-msg-kh96",
consumeMode = ConsumeMode.ORDERLY)
public class RocketMQOrderlyMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String orderlyMsg) {
log.info("------接收順序訊息 :{} ------", orderlyMsg);
}
}
4.2.3 發送請求

4.2.4 消費訊息

4.5 發送事務訊息(重點)
4.5.1 發送事務訊息流程
4.5.1.1 流程圖

4.5.1.2 流程決議
- 正常事務發送與提交階段
- 生產者發送一個半訊息給broker(半訊息是指的暫時不能消費的訊息);
- 服務端回應;
- 開始執行本地事務;
- 根據本地事務的執行情況執行Commit或者Rollback
- 事務資訊的補償流程
- 如果broker長時間沒有收到本地事務的執行狀態,會向生產者發起一個確認會查的操作請求;
- 生產者收到確認會查請求后,檢查本地事務的執行狀態;
- 根據檢查后的結果執行Commit或者Rollback操作 補償階段主要是用于解決生產者在發送Commit或者Rollbacke操作時發生超時或失敗的情況;
4.5.2 RocketMQ事務流程關鍵
-
事務訊息在一階段對用戶不可見、
? 事務訊息相對普通訊息最大的特點就是一階段發送的訊息對用戶是不可見的,也就是說消費者不能直接消費.這里RocketMQ實作方法是原訊息的主題與訊息消費佇列,然后把主題改成RMQ_SYS_TRANS_HALF_TOPIC.這樣由于消費者沒有訂閱這個主題,所以不會消費;
-
如何處理第二階段的發送訊息?
? 在本地事務執行完成后回向Broker發送Commit或者Rollback操作,此時如果在發送訊息的時候生產者出故障了,要保證這條訊息最終被消費,broker就會向服務端發送回查請求,確認本地事務的執行狀態.當然RocketMQ并不會無休止的發送事務狀態回查請求,默認是15次,如果15次回查還是無法得知事務的狀態,RocketMQ默認回滾訊息(broker就會將這條半訊息洗掉);
4.5.3 RocketMQ事務訊息原理
-
設計思想
? 在RocketMQ事務訊息的主要流程中,一階段的訊息如何對用戶不可見,其中,事務訊息相對普通訊息最大的特點就是一階段發送的訊息對用戶是不可見的,那么,如何做到寫入訊息但是對用戶不可見呢?RocketMQ事務訊息的做法是:如果訊息是half訊息,將備份原訊息的主題與訊息消費佇列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC,由于消費組未訂閱該主題,故消費端無法消費half型別的訊息,
-
如何實作事務回查?
? Broker會啟動一個訊息回查的定時任務,定時從事務訊息queue中讀取所有待反查的訊息,針對每個需要反查的半訊息,Broker會給對應的Producer發一個要求執行事務狀態反查的RPC請求,然后根據RPC回傳回應中的反查結果,來決定這個半訊息是需要提交還是回滾,或者后續繼續來反查,最后,提交或者回滾事務,將半訊息標記為已處理狀態【將訊息存盤在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些訊息已經被處理(提交或回滾)】, 如果是提交事務,就把半訊息從半訊息佇列中復制到該訊息真正的topic和queue中; 如果是回滾事務,則什么都不做,
參考博客1:https://blog.csdn.net/Weixiaohuai/article/details/123733518
參考博客2:https://blog.csdn.net/qq_42877546/article/details/125404307
4.5.4 實作代碼

4.5.4.1 業務層
4.5.4.1.1 介面
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事務訊息的業務 介面
*/
public interface RocketMQTxService {
/**
* @param : [kgcMallOrder]
* @return : void
* @author : huayu
* @date : 30/11/2022
* @description : 發送生成訂單的半事務訊息
*/
void sendCreateOrderHalfTx(KgcMallOrder kgcMallOrder);
/**
* @param : [txId, kgcMallOrder]
* @return : void
* @author : huayu
* @date : 30/11/2022
* @description : 執行本地生成訂單的事務操作
*/
void executeCreateOrderLocalTx(String txId, KgcMallOrder kgcMallOrder);
}
4.5.4.1.2 實作類
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事務訊息的業務 處理類
*/
@Service
@Slf4j
public class RocketMQTxServiceImpl implements RocketMQTxService {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
@Autowired
private KgcMallOrderRepository kgcMallOrderRepository;
@Autowired
private KgcMallTxlogrepisitory kgcMallTxlogrepisitory;
@Override
@Transactional
public void sendCreateOrderHalfTx(KgcMallOrder kgcMallOrder) {
log.info("###### 1. 開始發送生成訂單的半事務訊息 到 rocketmq服務端 ######");
//自定義事務編號
String txId = UUID.randomUUID().toString().substring(0, 8);
//發送半事務訊息,回傳發送結果
TransactionSendResult transactionSendResult =
rocketMQTemplate.sendMessageInTransaction("rocketmq-tx-msg-group", //組
"rocketmq-tx-msg-kh96", //佇列
MessageBuilder.withPayload(kgcMallOrder).setHeader("txId", txId).build(), // 訊息體
kgcMallOrder); //發送內容
log.info("###### 2. 開始發送生成訂單的半事務訊息rocketmq服務端成功,回應:{} ######", transactionSendResult);
}
@Override
@Transactional
public void executeCreateOrderLocalTx(String txId, KgcMallOrder kgcMallOrder) {
log.info("###### 3.1 本地開始執行生成訂單的事務操作 ######");
//開始插入訂單
kgcMallOrderRepository.save(kgcMallOrder);
log.info("###### 3.2 本地執行生成訂單的事務操作 成功 ######");
// 模擬本地事務處理失敗
// int a = 10 / 0;
log.info("###### 3.3開始生成用于事務回查的本地事務日志 ######");
//創建事務物件
KgcMallTxlog kgcMallTxlog = KgcMallTxlog.builder()
.id(txId)
.txDetail("本地事務日志")
.txTime(new Date())
.build();
//事務日志入庫
kgcMallTxlogrepisitory.save(kgcMallTxlog);
log.info("###### 3.4 生成用于事務回查的本地事務日志成功 ######");
}
}
4.5.4.2 監聽器
4.5.4.2.1 RocketMQExecuteLocalTxListener
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事務訊息,本地執行事務監聽,半事務訊息發送成功后,此監聽會收到本地事務處理的通知
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "rocketmq-tx-msg-group")
public class RocketMQExecuteLocalTxListener implements RocketMQLocalTransactionListener {
@Autowired
private RocketMQTxService rocketMQTxService;
@Autowired
private KgcMallTxlogrepisitory kgcMallTxlogrepisitory;
//執行本地事務
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
//呼叫本地事務執行的業務處理介面
log.info("###### 3 半事務訊息發送成功, 執行本地事務 ######");
rocketMQTxService.executeCreateOrderLocalTx((String) msg.getHeaders().get("txId"), (KgcMallOrder) arg);
//回應本地事務執行成功結果給服務端,服務端接收到此提交結果,會投遞訊息
log.info("###### 4 本地事務處理成功,回應事務處理結果給服務端 ######");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("###### 本地事務執行例外:{} ######", e.getMessage());
}
//回應本地事務執行失敗結果給服務端,服務端接收到此回滾結果,不會投遞訊息(快取,并定期洗掉)
log.info("###### 4 本地事務處理失敗,回應事務處理結果給服務端 #######");
return RocketMQLocalTransactionState.ROLLBACK;
}
//回查本地事務
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("###### 5 未收到第4步本地事務處理結果,回查事務狀態 ######");
//在網路閃斷或者服務重啟,沒有及時通知服務斷事務處理結果,進行會查操作
//如果回查本地事務執行成功(看事物日志是否存在,如果存在就是處理成功如果不存在就是處理失敗),通知服務端投遞訊息,否則不能投遞
log.info("###### 6 檢查本地事務處理結果,回應事務處理結果給服務端 ######");
if (kgcMallTxlogrepisitory.findById((String) msg.getHeaders().get("txId")).orElse(null) == null) {
//本地事務入庫失敗,代表本地事務沒有處理成功,步投遞訊息(快取,并定期洗掉)
log.info("###### 7 檢查本地事務處理結果失敗 ######");
return RocketMQLocalTransactionState.ROLLBACK;
}
//本地事務入庫成功,代表本地事務處理成功,投遞訊息
log.info("###### 7 檢查本地事務處理結果成功 ######");
return RocketMQLocalTransactionState.COMMIT;
}
}
4.5.4.2.2 RocketMQConsumerTxMsgListener
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事務訊息,消費監聽,如果本地事務處理成功,會收到投遞的訊息,如果失敗,收不到訊息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocket-tx-msg-consumer-group",
topic = "rocket-tx-msg-kh96"
)
public class RocketMQConsumerTxMsgListener implements RocketMQListener<Object> {
@Override
public void onMessage(Object message) {
log.info("###### 8 消費端,收到生成訂單成功的事務訊息:{} ###### ", message);
}
}
4.5.4.3 控制器
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RocketMQ 訊息佇列 測驗訊息入口
*/
@Slf4j
@RestController
public class RocketMQController {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
@Autowired
private RocketMQTxService rocketMQTxService;
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗發送訊息到用戶中心,用戶中心給手機號發資訊
*/
@RequestMapping("/testRocketMQSendMsg")
public String testRocketMQSendMsg(@RequestParam String phoneNo) {
log.info("------ 使用RocketMQ,測驗給手機:{},發送訊息 -------", phoneNo);
//使用RocketMQ發送訊息
rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
return "send msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗roketmq-發送可靠同步訊息
*/
@RequestMapping("/testRocketMQSendMsgSync")
public String testRocketMQSendMsgSync(@RequestParam String syncMsg) {
log.info("------ 使用RocketMQ,發送可靠同步訊息{} -------", syncMsg);
//使用RocketMQ發送訊息,拿到同步結果
SendResult sendResult = rocketMQTemplate.syncSend("rocketmq-sync-msg-kh96", syncMsg);
log.info("------ 使用RocketMQ,發送可靠同步訊息結果:{} -------", sendResult);
return "send sync msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗roketmq-發送可靠異步訊息
*/
@RequestMapping("/testRocketMQSendMsgAsync")
public String testRocketMQSendMsgAsync(@RequestParam String asyncMsg) {
log.info("------ 使用RocketMQ,發送可靠異步訊息:{} -------", asyncMsg);
//使用RocketMQ發送訊息
rocketMQTemplate.asyncSend("rocketmq-sync-msg-kh96",
asyncMsg,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("------ 可靠異步發送成功回呼 ------");
}
@Override
public void onException(Throwable throwable) {
log.info("------ 可靠異步發送失敗回呼 ------");
}
});
return "send async msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗roketmq-發送單向訊息,只發不收結果
*/
@RequestMapping("/testRocketMQSendMsgOneWay")
public String testRocketMQSendMsgOneWay(@RequestParam String oneWayMsg) {
log.info("------ 使用RocketMQ,發送單向訊息:{} -------", oneWayMsg);
//使用RocketMQ發送訊息
rocketMQTemplate.sendOneWay("rocketmq-oneWay-msg-kh96", oneWayMsg);
return "send oneWay msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗roketmq-發送順序訊息
*/
@RequestMapping("/testRocketMQSendMsgOrderly")
public String testRocketMQSendMsgOrderly(@RequestParam String orderlyMsgs) {
log.info("------ 使用RocketMQ,發送順序訊息:{} -------", orderlyMsgs);
//使用RocketMQ發送順序訊息,必須要提供一個唯一的標識di,比如用戶編號等
String userId = UUID.randomUUID().toString().replace("-", "");
//發送多條順序訊息,模擬iang訊息分割成多個符號發送
Arrays.asList(orderlyMsgs.split("")).
forEach(orderlyMsg -> rocketMQTemplate.syncSendOrderly("rocketmq-orderly-msg-kh96", orderlyMsg, userId));
return "send orderly msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 測驗roketmq-發送事務訊息
*/
@RequestMapping("/testRocketMQSendMsgTx")
public String testRocketMQSendMsgTx(@RequestParam String txmsg) {
log.info("------ 使用RocketMQ,發送事務訊息:{} -------", txmsg);
//使用RocketMQ發送事務訊息,模擬生成一筆訂單
KgcMallOrder kgcMallOrder = KgcMallOrder.builder()
.userId(2)
.userName("RocketMQ_tx")
.prodId(2)
.prodName(txmsg)
.totalPrice(96.0)
.build();
//發送事務訊息
rocketMQTxService.sendCreateOrderHalfTx(kgcMallOrder);
return "send tx msg success";
}
}
4.5.5 測驗
4.5.5.1 發送事務訊息成功
4.5.5.1.1 發送請求

4.5.5.1.2 日志查看

4.5.5.2 發送事務訊息失敗(模擬例外)
4.5.5.2.1 發送請求

4.5.5.2.2 日志查看

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/539024.html
標籤:其他
上一篇:基于SpringBoot分層2開發web應用學習筆記之控制器的理解以及常見注解
下一篇:day12_內部類&API
