Cloud Stream
主要使用的 MQ (訊息中間件)
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
如此多的訊息中間件,要都學完,人可能都沒了!
有沒有一種新的技術誕生,讓我們不再關注具體MQ的細節,我們只需要用一種適配系結的方式,自動的給我們在各種MQ內切換,(類似于Hibernate)
引出 Cloud Stream ,Cloud Stream 是什么?
屏蔽底層訊息中間件的差異,降低切換成本,統一訊息的編程模型,
簡介
官方檔案 : https://spring.io/projects/spring-cloud-stream#overview
Cloud Stream中文指導手冊 : https://m.wang1314.com/doc/webapp/topic/20971999.html
官方定義Spring Cloud Stream是一個構建訊息驅動微服務的框架,
應用程式通過inputs或者 outputs 來與Spring Cloud Stream中binder物件互動,
通過我們配置來binding(系結),而Spring Cloud Stream 的binder物件負責與訊息中間件互動,所以,我們只需要搞清楚如何與Spring Cloud Stream互動就可以方便使用訊息驅動的方式,
通過使用Spring Integration來連接訊息代理中間件以實作訊息事件驅動,
Spring Cloud Stream為一些供應商的訊息中間件產品提供了個性化的自動化配置實作,參考了發布-訂閱、消費組、磁區的三個核心概念,
目前僅支持RabbitMQ、 Kafka,
設計思想
標準 MQ

- 生產者/消費者之間靠訊息媒介傳遞資訊內容
- 訊息必須走特定的通道 - 訊息通道 Message Channel
- 訊息通道里的訊息如何被消費呢,誰負責收發處理 - 訊息通道
MessageChannel的子介面SubscribableChannel,由MessageHandler訊息處理器所訂閱,

使用原因
比如我們用到了RabbitMQ和Kafka,由于這兩個訊息中間件的架構上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions磁區,
這些中間件的差異性導致我們實際專案開發給我們造成了一定的困擾,我們如果用了兩個訊息佇列的其中一種,后面的業務需求,我想往另外一種訊息佇列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候Spring Cloud Stream給我們提供了—種解耦合的方式,
Stream 統一底層差異
在沒有系結器這個概念的情況下,我們的SpringBoot應用要直接與訊息中間件進行資訊互動的時候,由于各訊息中間件構建的初衷不同,它們的實作細節上會有較大的差異性通過定義系結器作為中間層,完美地實作了應用程式與訊息中間件細節之間的隔離,通過向應用程式暴露統一的Channel通道,使得應用程式不需要再考慮各種不同的訊息中間件實作,
通過定義系結器Binder作為中間層,實作了應用程式與訊息中間件細節之間的隔離,
Binder:
- INPUT對應于消費者
- OUTPUT對應于生產者
Stream 架構

Stream中的訊息通信方式遵循了發布-訂閱模式
Topic主題進行廣播
- 在RabbitMQ就是Exchange
- 在Kakfa中就是Topic

- Binder - 很方便的連接中間件,屏蔽差異,
- Channel - 通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實作存盤和轉發的媒介,通過Channel對佇列進行配置,
- Source和Sink - 簡單的可理解為參照物件是Spring Cloud Stream自身,從Stream發布訊息就是輸出,接受訊息就是輸入,
使用案例
編碼API和常用注解
| 組成 | 說明 |
|---|---|
| Middleware | 中間件,目前只支持RabbitMQ和Kafka |
| Binder | Binder是應用與訊息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變訊息型別(對應于Kafka的topic,RabbitMQ的exchange),這些都可以通過組態檔來實作 |
| @Input | 注解標識輸入通道,通過該輸乎通道接收到的訊息進入應用程式 |
| @Output | 注解標識輸出通道,發布的訊息將通過該通道離開應用程式 |
| @StreamListener | 監聽佇列,用于消費者的佇列的訊息接收 |
| @EnableBinding | 指信道channel和exchange系結在一起 |
環境準備
準備 RabbitMQ 環境
工程中新建三個子模塊
- cloud-stream-rabbitmq-provider8801,作為生產者進行發訊息模塊
- cloud-stream-rabbitmq-consumer8802,作為訊息接收模塊
- cloud-stream-rabbitmq-consumer8803,作為訊息接收模塊
訊息驅動之生產者
新建Module:
cloud-stream-rabbitmq-provider8801
POM 檔案
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xxx</artifactId>
<groupId>com.xx.xxx</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--基礎配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
YML 檔案
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要系結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 訊息組件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為json,文本則設定“text/plain”
binder: defaultRabbit # 設定要系結的訊息服務的具體設定
eureka:
client: # 客戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(默認是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
instance-id: send-8801.com # 在資訊串列時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址
主啟動類
StreamMQMain8801
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}
業務類
- 發送訊息介面
public interface IMessageProvider {
public String send();
}
- 發送訊息介面實作類
import com.lun.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class) //定義訊息的推送管道
public class MessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // 訊息發送管道
@Override
public String send()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}
- Controller
import com.lun.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class SendMessageController
{
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProvider.send();
}
}
測驗
訪問 - http://localhost:8801/sendMessage
- 后臺將列印serial: UUID字串
訊息驅動之消費者
新建Module:
cloud-stream-rabbitmq-consumer8802
POM 檔案
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xxx</artifactId>
<groupId>com.xx.xxx</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--基礎配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
YML 檔案
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要系結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 訊息組件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為物件json,如果是文本則設定“text/plain”
binder: defaultRabbit # 設定要系結的訊息服務的具體設定
eureka:
client: # 客戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(默認是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
instance-id: receive-8802.com # 在資訊串列時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址
主啟動類
StreamMQMain8801
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class,args);
}
}
業務類
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
System.out.println("消費者1號,----->接受到的訊息: "+message.getPayload()+"\t port: "+serverPort);
}
}
測驗
啟動 StreamMQMain8801
訪問 - http://localhost:8801/sendMessage
- 8801 發送 8802 接收訊息
group 組
訊息重復消費問題
按照 8802 ,克隆出來一份運行8803 - cloud-stream-rabbitmq-consumer8803
依次啟動 RabbitMQ
- 服務注冊 - 8801
- 訊息生產 - 8801
- 訊息消費 - 8802
- 訊息消費 - 8802
運行后有兩個問題
- 有重復消費問題
- 訊息持久化問題
消費 - http://localhost:8801/sendMessage
- 目前是8802/8803同時都收到了,存在重復消費問題
- 解決:分組和持久化屬性
group(重要)
假設在如下場景中,訂單系統我們做集群部署,都會從RabbitMQ中獲取訂單資訊,那如果一個訂單同時被兩個服務獲取到,那么就會造成資料錯誤,我們得避免這種情況,這時我們就可以使用Stream中的訊息分組來解決,

注意在Stream中處于同一個group中的多個消費者是競爭關系,就能夠保證訊息只會被其中一個應用消費一次,不同組是可以全面消費的(重復消費),
group 使用
微服務應用放置于同一個group中,就能夠保證訊息只會被其中一個應用消費一次,
不同的組是可以重復消費的,同一個組內會發生競爭關系,只有其中一個可以消費,
8802/8803都變成不同組,group 兩個不同
group: A_Group、B_Group
8802 修改 YML
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要系結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 訊息組件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為json,文本則設定“text/plain”
binder: defaultRabbit # 設定要系結的訊息服務的具體設定
group: A_Group #<----------------------------------------關鍵
8803 修改相同位置的 YML(即最后一行 group 欄位)
結論:還是重復消費
解決:
8802/8803 實作輪詢分組,每次只有一個消費者,8801模塊的發的訊息只能被8802或8803其中一個接收到,這樣避免了重復消費,
8802/8803 都變成相同組,group 兩個相同 group: A_Group
- 8802修改 YML group:
A_Group - 8803修改 YML group:
A_Group
結論:同一個組的多個微服務實體,每次只會有一個拿到
訊息持久化
停止 8802/8803 并去除掉 8802 的分組g roup:
A_Group,8803的分組 group :A_Group沒有去掉,
-
8801 先發送4 條訊息到 訊息佇列
-
先啟動8802,
無分組屬性配置,后臺沒有打出來訊息 -
再啟動8803,
有分組屬性配置,后臺打出來了MQ上的訊息,(訊息持久化體現)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/388173.html
標籤:其他
上一篇:web安全學習筆記
