本文探討如何使用 RocketMQ Binder 完成 Spring Cloud 應用訊息的訂閱和發布,
介紹
RocketMQ 是一款開源的分布式訊息系統,基于高可用分布式集群技術,提供低延時的、高可靠的訊息發布與訂閱服務,廣泛應用于多個領域,包括異步通信解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通信、移動應用、手游、視頻、物聯網、車聯網等,
RocketMQ 是阿里巴巴在2012年開源的分布式訊息中間件,目前已經捐贈給 Apache 軟體基金會,并于2017年9月25日成為 Apache 的頂級專案,作為經歷過多次阿里巴巴雙十一這種“超級工程”的洗禮并有穩定出色表現的國產中間件,以其高性能、低延時和高可靠等特性近年來已經也被越來越多的國內企業使用,
RocketMQ特點
- 是一個佇列模型的訊息中間件,具有高性能、高可靠、高實時、分布式等特點
- Producer、Consumer、佇列都可以分布式
- Producer 向一些佇列輪流發送訊息,佇列集合稱為 Topic,Consumer 如果做廣播消費,則一個 Consumer 實體消費這個 Topic 對應的所有佇列,如果做集群消費,則多個 Consumer 實體平均消費這個 Topic 對應的佇列集合
- 能夠保證嚴格的訊息順序
- 支持拉(pull)和推(push)兩種訊息模式
- 高效的訂閱者水平擴展能力
- 實時的訊息訂閱機制
- 億級訊息堆積能力
- 支持多種訊息協議,如 JMS、OpenMessaging 等
- 較少的依賴
Spring Cloud Stream
Spring Cloud Stream 是一個構建訊息驅動微服務的框架,
Spring Cloud Stream 提供了訊息中間件配置的統一抽象,推出了 pub/sub,consumer groups,semantics,stateful partition 這些統一的模型支持,
Spring Cloud Stream 核心構件有:Binders、Bindings和Message,應用程式通過 inputs 或者 outputs 來與 binder 互動,通過我們配置來 binding ,而 binder 負責與中間件互動,Message為資料交換的統一資料規范格式,
- Binding: 包括 Input Binding 和 Output Binding,
Binding 在訊息中間件與應用程式提供的 Provider 和 Consumer 之間提供了一個橋梁,實作了開發者只需使用應用程式的 Provider 或 Consumer 生產或消費資料即可,屏蔽了開發者與底層訊息中間件的接觸,
- Binder: 跟外部訊息中間件集成的組件,用來創建 Binding,各訊息中間件都有自己的 Binder 實作,
比如 Kafka 的實作 KafkaMessageChannelBinder,RabbitMQ 的實作 RabbitMessageChannelBinder 以及 RocketMQ 的實作 RocketMQMessageChannelBinder,
- Message:是 Spring Framework 中的一個模塊,其作用就是統一訊息的編程模型,
比如訊息 Messaging 對應的模型就包括一個訊息體 Payload 和訊息頭 Header,
spring-cloud-stream 官網
Window搭建部署RocketMQ
下載
當前最新版本為4.6.0
下載出來解壓到:D:\rocketmq 目錄,目錄最好不要帶空格和太深,否則服務運行可能會報錯
啟動NameServer服務
在啟動之前需要配置系統環境,不然會報錯,
Please set the ROCKETMQ_HOME variable in your environment!
系統環境變數名:ROCKETMQ_HOME
根據你解壓的目錄配置環境變數,比如我的變數值為:D:\rocketmq
進入window命令視窗,進入D:\rocketmq\bin目錄下,執行
start mqnamesrv.cmd
如上則NameServer啟動成功,使用期間,視窗不要關閉,
啟動Broker服務
進入bin目錄下,輸入
start mqbroker.cmd -n localhost:9876
如上的 ip+port 是rocketmq的服務地址和埠,
運行如上命令,可能會報如下錯誤,找不到或無法加載主類
如果出此情況,打開bin-->runbroker.cmd,修改%CLASSPATH%成"%CLASSPATH%"
保存再次執行如上命令,執行成功后,提示boot success 代表成功,
示例
本示例實作三種訊息的發布以及訂閱接收,
創建 RocketMQ 訊息生產者
創建 ali-rocketmq-producer 工程,埠為:28081
- pom.xml添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud-alibaba</artifactId>
<groupId>com.easy</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ali-rocketmq-producer</artifactId>
<packaging>jar</packaging>
<dependencies>
<!--rocketmq依賴-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<!--web依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 配置 Output 的 Binding 資訊并配合
@EnableBinding注解使其生效
application.yml配置
server:
port: 28081
spring:
application:
name: ali-rocketmq-producer
cloud:
stream:
rocketmq:
binder:
# RocketMQ 服務器地址
name-server: 127.0.0.1:9876
bindings:
output1: {destination: test-topic1, content-type: application/json}
output2: {destination: test-topic2, content-type: application/json}
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always
ArProduceApplication.java
@SpringBootApplication
@EnableBinding({MySource.class})
public class ArProduceApplication {
public static void main(String[] args) {
SpringApplication.run(ArProduceApplication.class, args);
}
}
- 訊息生產者服務
MySource.java
package com.easy.arProduce;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MySource {
@Output("output1")
MessageChannel output1();
@Output("output2")
MessageChannel output2();
}
SenderService.java
package com.easy.arProduce;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
@Service
public class SenderService {
@Autowired
private MySource source;
/**
* 發送字串
*
* @param msg
*/
public void send(String msg) {
Message message = MessageBuilder.withPayload(msg)
.build();
source.output1().send(message);
}
/**
* 發送帶tag的字串
*
* @param msg
* @param tag
*/
public void sendWithTags(String msg, String tag) {
Message message = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, tag)
.build();
source.output1().send(message);
}
/**
* 發送物件
*
* @param msg
* @param tag
* @param <T>
*/
public <T> void sendObject(T msg, String tag) {
Message message = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, tag)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
source.output2().send(message);
}
}
撰寫 TestController.java 控制器方便測驗
package com.easy.arProduce;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "https://www.cnblogs.com/tqlin/p/test")
public class TestController {
@Autowired
SenderService senderService;
@RequestMapping(value = "https://www.cnblogs.com/send", method = RequestMethod.GET)
public String send(String msg) {
senderService.send(msg);
return "字串訊息發送成功!";
}
@RequestMapping(value = "https://www.cnblogs.com/sendWithTags", method = RequestMethod.GET)
public String sendWithTags(String msg) {
senderService.sendWithTags(msg, "tagStr");
return "帶tag字串訊息發送成功!";
}
@RequestMapping(value = "https://www.cnblogs.com/sendObject", method = RequestMethod.GET)
public String sendObject(int index) {
senderService.sendObject(new Foo(index, "foo"), "tagObj");
return "Object物件訊息發送成功!";
}
}
創建 RocketMQ 訊息消費者
創建 ali-rocketmq-consumer 工程,埠為:28082
- pom.xml添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud-alibaba</artifactId>
<groupId>com.easy</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ali-rocketmq-consumer</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
-配置 Input 的 Binding 資訊并配合 @EnableBinding 注解使其生效
application.yml配置
server:
port: 28082
spring:
application:
name: ali-rocketmq-consumer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876 #rocketmq 服務地址
bindings:
input1: {consumer.orderly: true} #是否排序
input2: {consumer.tags: tagStr} #訂閱 帶tag值為tagStr的字串
input3: {consumer.tags: tagObj} #訂閱 帶tag值為tabObj的字串
bindings:
input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1}
input2: {destination: test-topic1, content-type: application/plain, group: test-group2, consumer.maxAttempts: 1}
input3: {destination: test-topic2, content-type: application/plain, group: test-group3, consumer.maxAttempts: 1}
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always
ArConsumerApplication.java
package com.easy.arConsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableBinding({MySource.class})
public class ArConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ArConsumerApplication.class, args);
}
}
- 訊息消費者服務
MySource.java
package com.easy.arConsumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MySource {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
@Input("input3")
SubscribableChannel input3();
}
ReceiveService.java
package com.easy.arConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class ReceiveService {
@StreamListener("input1")
public void receiveInput1(String receiveMsg) {
log.info("input1 接收到了訊息:" + receiveMsg);
}
@StreamListener("input2")
public void receiveInput2(String receiveMsg) {
log.info("input2 接收到了訊息:" + receiveMsg);
}
@StreamListener("input3")
public void receiveInput3(@Payload Foo foo) {
log.info("input3 接收到了訊息:" + foo);
}
}
使用示例
示例關聯專案
本示例我們創建了兩個專案實作
-
ali-rocketmq-producer:RocketMQ 訊息服務生產者,服務名:ali-rocketmq-producer,埠:28081
-
ali-rocketmq-consumer:RocketMQ 訊息服務消費者,服務名:ali-rocketmq-producer,埠:28082
運行示例測驗
首先要啟動ali-rocketmq-producer服務及ali-rocketmq-consumer服務
- 訪問訊息服務生產者地址: http://localhost:28081/test/send?msg=yuntian
查看服務消費者控制臺,輸出
2019-12-04 15:37:47.859 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input1 接收到了訊息:yuntian
2019-12-04 15:37:47.859 INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDA70E0014 cost: 1 ms
表示字串消費成功被input1消費了
- 訪問訊息服務生產者地址: http://localhost:28081/test/sendWithTags?msg=tagyuntian
查看服務消費者控制臺,輸出
2019-12-04 15:38:09.586 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input2 接收到了訊息:tagyuntian
2019-12-04 15:38:09.592 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input1 接收到了訊息:tagyuntian
2019-12-04 15:38:09.592 INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDFCD30015 cost: 6 ms
表示帶tag的字串成功被input2和input1消費了,因為input1也訂閱了test-topic1,并且沒有我們沒有加tag過濾,默認表示接收所有訊息,所以也能成功接收tagyuntian字串
- 訪問訊息服務生產者地址: http://localhost:28081/test/sendObject?index=1
查看服務消費者控制臺,輸出
2019-12-04 15:41:15.285 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input3 接收到了訊息:Foo{id=1, bar='foo'}
表示input3成功接收到了tag帶tagObj的物件訊息了,而input1卻沒有輸出訊息,這是因為sendObject發布的訊息走的是test-topic2訊息管道,所以不會發布給input1及input2訂閱者
資料
- Spring Cloud Alibaba 示例原始碼
- 原文地址
- RocketMQ 專案
Spring Boot、Cloud 學習專案
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/97490.html
標籤:MySQL
上一篇:資料庫連接
