摘要:這篇文章主要介紹 Spring Boot 專案使用 rocketmq-spring SDK 實作訊息收發的操作流程,同時筆者會從開發者的角度解讀 SDK 的設計邏輯,
本文分享自華為云社區《RocketMQ-Spring : 實戰與原始碼決議一網打盡》,作者:勇哥java實戰分享,
RocketMQ 是大家耳熟能詳的訊息佇列,開源專案 rocketmq-spring 可以幫助開發者在 Spring Boot 專案中快速整合 RocketMQ,
這篇文章會介紹 Spring Boot 專案使用 rocketmq-spring SDK 實作訊息收發的操作流程,同時筆者會從開發者的角度解讀 SDK 的設計邏輯,
一 SDK 簡介
專案地址:https://github.com/apache/rocketmq-spring
rocketmq-spring 的本質是一個 Spring Boot starter ,
Spring Boot 基于“約定大于配置”(Convention over configuration)這一理念來快速地開發、測驗、運行和部署 Spring 應用,并能通過簡單地與各種啟動器(如 spring-boot-web-starter)結合,讓應用直接以命令列的方式運行,不需再部署到獨立容器中,
Spring Boot starter 構造的啟動器使用起來非常方便,開發者只需要在 pom.xml 引入 starter 的依賴定義,在組態檔中撰寫約定的配置即可,
下面我們看下 rocketmq-spring-boot-starter 的配置:
1、引入依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency>
2、約定配置
接下來,我們分別按照生產者和消費者的順序,詳細的講解訊息收發的操作程序,
二 生產者
首先我們添加依賴后,進行如下三個步驟:
1、組態檔中配置如下
rocketmq: name-server: 127.0.0.1:9876 producer: group: platform-sms-server-group # access-key: myaccesskey # secret-key: mysecretkey topic: sms-common-topic
生產者配置非常簡單,主要配置名字服務地址和生產者組,
2、需要發送訊息的類中注入 RcoketMQTemplate
@Autowired private RocketMQTemplate rocketMQTemplate; ? @Value("${rocketmq.topic}") private String smsTopic;
3、發送訊息,訊息體可以是自定義物件,也可以是 Message 物件
rocketMQTemplate 類包含多鐘發送訊息的方法:
- 同步發送 syncSend
- 異步發送 asyncSend
- 順序發送 syncSendOrderly
- oneway發送 sendOneWay
下面的代碼展示如何同步發送訊息,
String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags; SendResult sendResult = rocketMQTemplate.syncSend( destination, MessageBuilder.withPayload(messageContent). setHeader(MessageConst.PROPERTY_KEYS, uniqueId). build() ); if (sendResult != null) { if (sendResult.getSendStatus() == SendStatus.SEND_OK) { // send message success ,do something } }
syncSend 方法的第一個引數是發送的目標,格式是:topic + ":" + tags ,
第二個引數是:spring-message 規范的 message 物件 ,而 MessageBuilder 是一個工具類,方法鏈式呼叫創建訊息物件,
三 消費者
1、組態檔中配置如下
rocketmq: name-server: 127.0.0.1:9876 consumer1: group: platform-sms-worker-common-group topic: sms-common-topic
2、實作訊息監聽器
@Component @RocketMQMessageListener( consumerGroup = "${rocketmq.consumer1.group}", //消費組 topic = "${rocketmq.consumer1.topic}" //主題 ) public class SmsMessageCommonConsumer implements RocketMQListener<String> { public void onMessage(String message) { System.out.println("普通短信:" + message); } }
消費者實作類也可以實作 RocketMQListener<MessageExt>, 在 onMessage 方法里通過 RocketMQ 原生訊息物件 MessageExt 獲取更詳細的訊息資料 ,
public void onMessage(MessageExt message) { try { String body = new String(message.getBody(), "UTF-8"); logger.info("普通短信:" + message); } catch (Exception e) { logger.error("common onMessage error:", e); } }
四 原始碼概覽
最新原始碼中,我們可以看到原始碼中包含四個模塊:
1、rocketmq-spring-boot-parent
該模塊是父模塊,定義專案所有依賴的 jar 包,
2、rocketmq-spring-boot
核心模塊,實作了 starter 的核心邏輯,
3、rocketmq-spring-boot-starter
SDK 模塊,簡單封裝,外部專案參考,
4、rocketmq-spring-boot-samples
示例代碼模塊,這個模塊非常重要,當用戶使用 SDK 時,可以參考示例快速開發,
五 starter 實作
我們重點分析下 rocketmq-spring-boot 模塊的核心原始碼:
spring-boot-starter 實作需要包含如下三個部分:
1、定義 Spring 自身的依賴包和 RocketMQ 的依賴包 ;
2、定義spring.factories 檔案
在 resources 包下創建 META-INF 目錄后,新建 spring.factories 檔案,并在檔案中定義自動加載類,檔案內容是:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
spring boot 會根據檔案中配置的自動化配置類來自動初始化相關的 Bean、Component 或 Service,
3、實作自動加載類
在 RocketMQAutoConfiguration 類的具體實作中,我們重點分析下生產者和消費者是如何分別啟動的,
▍生產者發送模板類:RocketMQTemplate
RocketMQAutoConfiguration 類定義了兩個默認的 Bean :

首先SpringBoot專案中組態檔中的配置值會根據屬性條件系結到 RocketMQProperties 物件 中,然后使用 RocketMQ 的原生 API 分別創建生產者 Bean 和拉取消費者 Bean , 分別將兩個 bean 設定到 RocketMQTemplate 物件中,
兩個重點需要強調:
- 發送訊息時,將 spring-message 規范下的訊息物件封裝成 RocketMQ 訊息物件
- 默認拉取消費者 litePullConsumer ,拉取消費者一般用于大資料批量處理場景 ,
RocketMQTemplate 類封裝了拉取消費者的receive方法,以方便開發者使用,
▍自定義消費者類
下圖是并發消費者的例子:
那么 rocketmq-spring 是如何自動啟動消費者呢 ?
spring 容器首先注冊了訊息監聽器后置處理器,然后呼叫 ListenerContainerConfiguration 類的 registerContainer 方法 ,
對比并發消費者的例子,我們可以看到: DefaultRocketMQListenerContainer 是對 DefaultMQPushConsumer 消費邏輯的封裝,
封裝消費訊息的邏輯,同時滿足 RocketMQListener 泛化介面支持不同引數,比如 String 、MessageExt 、自定義物件 ,
首先DefaultRocketMQListenerContainer初始化之后, 獲取 onMessage 方法的引數型別 ,
然后消費者呼叫 consumeMessage 處理訊息時,封裝了一個 handleMessage 方法 ,將原生 RocketMQ 訊息物件 MessageExt 轉換成 onMessage 方法定義的引數物件,然后呼叫 rocketMQListener 的 onMessage 方法,
上圖右側標紅的代碼也就是該方法的精髓:
rocketMQListener.onMessage(doConvertMessage(messageExt));
六 寫到最后
開源專案 rocketmq-spring 有很多值得學習的地方 ,我們可以從如下四個層面逐層進階:
1、學會如何使用 :參考 rocketmq-spring-boot-samples 模塊的示例代碼,學會如何發送和接收訊息,快速編碼;
2、模塊設計:學習專案的模塊分層 (父模塊、SDK 模塊、核心實作模塊、示例代碼模塊);
3、starter 設計思路 :定義自動組態檔 spring.factories 、設計配置屬性類 、在 RocketMQ client 的基礎上實作優雅的封裝、深入理解 RocketMQ 原始碼等;
4、舉一反三:當我們理解了 rocketmq-spring 的原始碼,我們可以嘗試模仿該專案寫一個簡單的 spring boot starter,
點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/551031.html
標籤:Java
上一篇:Java的final修飾符
下一篇:返回列表
