目錄
- 第一章 Stream介紹
- 1.1、什么是Stream
- 1.2、為啥用Stream
- 第二章 Stream重要概念
- 2.1、基本流程
- 2.2、常用注解
- 2.3、其他術語
- 第三章 Stream入門案例
- 3.1、專案準備與啟動
- 3.2、創建訊息生產者
- 3.3、創建訊息消費者
- 第四章 Stream自定義管道名
- 4.1、分析stream原始碼
- 4.2、修改訊息生產者
- 4.3、修改訊息消費者
- 第五章 Stream分組與持久化
- 5.1、創建訊息消費者
- 5.2、奇怪問題的演示
- 5.3、如何解決這問題
- 5.4、訊息分組后演示
- 5.5、訊息持久化演示
- 第六章 Stream訊息磁區處理
- 6.1、修改訊息生產者
- 6.2、奇怪問題的演示
- 6.3、如何解決這問題
- 6.4、訊息磁區后演示
配套資料,免費下載
鏈接:https://pan.baidu.com/s/1la_3-HW-UvliDRJzfBcP_w
提取碼:lxfx
復制這段內容后打開百度網盤手機App,操作更方便哦
第一章 Stream介紹
1.1、什么是Stream
Spring Cloud Stream用官方的話來說就是:他是一個構建訊息驅動微服務的框架,他能為一些訊息中間件供應商的產品提供了個性化的自動化配置實作,參考了發布/訂閱、消費組、磁區這三個核心概念,目前支持的產品主要是:RabbitMQ和Kafka,
Spring Cloud Stream用一句話來概括就是:屏蔽訊息中間件底層的差異,用于統一訊息的編程模型,降低切換訊息中間件的成本,
官方檔案手冊:https://docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE/reference/html/
1.2、為啥用Stream
在微服務的開發程序中,可能會經常用到訊息中間件,通過訊息中間件在服務與服務之間傳遞訊息,不管你使用的是哪款訊息中間件,比如RabbitMQ還是Kafka,那么訊息中間件和服務之間都有一點耦合性,這個耦合性就是指,如果我原來使用的RabbitMQ,現在要替換為Kafka,那么我們的微服務都需要修改,變動會比較大,因為這兩款訊息中間件有一些區別,如果我們使用Spring Cloud Stream來整合我們的訊息中間件,那么這樣就可以降低微服務和訊息中間件的耦合性,做到輕松在不同訊息中間件間切換,當然目前Spring Cloud Stream只支持RabbitMQ和Kafka,
按照官方的定義,Spring Cloud Stream 是一個構建訊息驅動微服務的框架,Spring Cloud Stream解決了開發人員無感知的使用訊息中間件的問題,因為Spring Cloud Stream對訊息中間件的進一步封裝,可以做到代碼層面對訊息中間件的無感知,甚至于動態的切換中間件(RabbitMQ切換為Kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程,
第二章 Stream重要概念
2.1、基本流程

應用程式通過input(相當于消費者consumer)、output(相當于生產者producer/提供者provider)來與Spring Cloud Stream中Binder互動,而Binder負責與訊息中間件互動,因此,我們只需關注如何與Binder互動即可,而無需關注與具體訊息中間件的互動,

Binder
Binder是應用與訊息中間件之間的封裝,目前實作了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變訊息型別(對應于Kafka的topic,RabbitMQ的exchange),這些都可以通過組態檔來實作,
Channel
通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實作存盤和轉發的媒介,通過對Channel對佇列進行配置,
Source
Source用于實作了訊息發布,由服務內的Spring完成,
Sink
Sink用于監聽進入Channel的訊息并將其反序列化,
2.2、常用注解
| 注解 | 說明 |
|---|---|
| @Input | 該注解標識輸入通道,通過該輸入通道接收訊息進入應用程式 |
| @Output | 該注解標識輸出通道,發布的訊息將通過該通道離開應用程式 |
| @StreamListener | 監聽佇列,用于消費者的佇列的訊息接收 |
| @EnableBinding | 將信道 channel 和 exchange 系結在一起 |
2.3、其他術語
發布/訂閱
簡單的講就是一種生產者,消費者模式,發布者是生產,將輸出發布到資料中心,訂閱者是消費者,訂閱自己感興趣的資料,當有資料到達資料中心時,就把資料發送給對應的訂閱者,
消費組
直觀的理解就是一群消費者一起處理訊息,需要注意的是:每個發送到消費組的資料,僅由消費組中的一個消費者處理,而不同消費組可以處理相同的資料,
磁區
類比于消費組,磁區是將資料磁區,舉例:某應用有多個實體,都系結到同一個資料中心,也就是不同實體都將資料發布到同一個資料中心,磁區就是將資料中心的資料再細分成不同的區,為什么需要磁區?因為即使是同一個應用,不同實體發布的資料型別可能不同,也希望這些資料由不同的消費者處理,這就需要消費者可以僅訂閱一個資料中心的部分資料,這就需要磁區了,
第三章 Stream入門案例
3.1、專案準備與啟動
我們接下來的所有操作均是在Sleuth+Zipkin最后完成的工程上進行操作,相關代碼請到配套資料中尋找,

注意:這一章節我們需要使用RabbitMQ,而RabbitMQ的學習與搭建與啟動,請參考:https://caochenlei.blog.csdn.net/article/details/112549952
我們需要啟動注冊中心,啟動順序如下:
- eureka-server7001
- eureka-server7002

我們接下來還需要啟動RabbitMQ,并進入控制臺進行相應的查看,查看地址:http://localhost:15672,登錄賬戶:guest,登錄密碼:guest


3.2、創建訊息生產者
(1)在父專案spring-cloud-study下創建一個子專案stream-rabbitmq-provider2001
(2)在子專案stream-rabbitmq-provider2001的pom.xml中匯入如下依賴:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
(3)在子專案stream-rabbitmq-provider2001中新建組態檔application.yaml并寫入如下配置:
server:
port: 2001
eureka:
instance:
#是否使用 ip 地址注冊
prefer-ip-address: true
#該實體注冊到服務中心的唯一ID
instance-id: ${spring.cloud.client.ip-address}:${server.port}
client:
#是否將自己注冊到注冊中心,默認為 true
register-with-eureka: true
#表示 Eureka Client 間隔多久去服務器拉取注冊資訊,默認為 30 秒
registry-fetch-interval-seconds: 10
#設定服務注冊中心地址
service-url:
defaultZone: http://root:123456@eureka-server7001.com:7001/eureka/,http://root:123456@eureka-server7002.com:7002/eureka/
spring:
application:
name: stream-rabbitmq-provider2001
#Stream核心配置如下:
cloud:
stream:
binders:
rabbit: #系結訊息中間件的名稱,自定義
type: rabbit #系結訊息中間件的型別,固定值
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
#訊息生產者的設定選項
output:
destination: springCloudStream #表示要使用的exchange名稱定義
content-type: application/json #表示要傳送的訊息型別,文本則設定"text/plain"
binder: rabbit #表示要系結到哪一個訊息中間件上,看binders配置
(4)在子專案stream-rabbitmq-provider2001中新建啟動類com.caochenlei.StreamRabbitmqProvider2001Application并寫入如下代碼:
@SpringBootApplication
public class StreamRabbitmqProvider2001Application {
public static void main(String[] args) {
SpringApplication.run(StreamRabbitmqProvider2001Application.class, args);
}
}
(5)創建訊息生產者com.caochenlei.message.ProviderSender
@EnableBinding(Source.class)
public class ProviderSender {
@Autowired//定義訊息發送管道,名字只能叫output
private MessageChannel output;
public void send() {
String uuid = UUID.randomUUID().toString();
//核心代碼:構建一個訊息型別,然后使用發送管道發送出去
output.send(MessageBuilder.withPayload(uuid).build());
System.out.println("send:" + uuid);
}
}
(6)創建一個控制器com.caochenlei.controller.ProviderController
@RestController
public class ProviderController {
@Autowired
private ProviderSender sender;
@RequestMapping("/send")
public void send() {
sender.send();
}
}
(7)啟動stream-rabbitmq-provider2001,然后打開eureka注冊中心查看注冊,查看地址:http://localhost:7001,登錄賬號:root,登錄密碼:123456

(8)我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send

(9)我們查看一下RabbitMQ的控制臺,請在瀏覽器地址輸入:http://localhost:15672/#/exchanges,登錄賬號:guest,登錄密碼:guest

3.3、創建訊息消費者
(1)在父專案spring-cloud-study下創建一個子專案stream-rabbitmq-consumer2002
(2)在子專案stream-rabbitmq-consumer2002的pom.xml中匯入如下依賴:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
(3)在子專案stream-rabbitmq-consumer2002中新建組態檔application.yaml并寫入如下配置:
server:
port: 2002
eureka:
instance:
#是否使用 ip 地址注冊
prefer-ip-address: true
#該實體注冊到服務中心的唯一ID
instance-id: ${spring.cloud.client.ip-address}:${server.port}
client:
#是否將自己注冊到注冊中心,默認為 true
register-with-eureka: true
#表示 Eureka Client 間隔多久去服務器拉取注冊資訊,默認為 30 秒
registry-fetch-interval-seconds: 10
#設定服務注冊中心地址
service-url:
defaultZone: http://root:123456@eureka-server7001.com:7001/eureka/,http://root:123456@eureka-server7002.com:7002/eureka/
spring:
application:
#集群環境下名稱應該保持一致,這樣才能使用Eureka的負載均衡
name: stream-rabbitmq-consumer
#Stream核心配置如下:
cloud:
stream:
binders:
rabbit: #系結訊息中間件的名稱,自定義
type: rabbit #系結訊息中間件的型別,固定值
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
#訊息消費者的設定選項
input:
destination: springCloudStream #表示要使用的exchange名稱定義
content-type: application/json #表示要接收的訊息型別,文本則設定"text/plain"
binder: rabbit #表示要系結到哪一個訊息中間件上,看binders配置
(4)在子專案stream-rabbitmq-consumer2002中新建啟動類com.caochenlei.StreamRabbitmqConsumer2002Application并寫入如下代碼:
@SpringBootApplication
public class StreamRabbitmqConsumer2002Application {
public static void main(String[] args) {
SpringApplication.run(StreamRabbitmqConsumer2002Application.class, args);
}
}
(5)創建訊息消費者com.caochenlei.message.ConsumerReceiver
@EnableBinding(Sink.class)
public class ConsumerReceiver {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void receive(Message message) {
System.out.println(serverPort + " receive:" + message.getPayload());
}
}
(6)啟動stream-rabbitmq-consumer2002,然后打開eureka注冊中心查看注冊,查看地址:http://localhost:7001,登錄賬號:root,登錄密碼:123456

(7)我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看stream-rabbitmq-consumer2002控制臺

第四章 Stream自定義管道名
4.1、分析stream原始碼
在前面的案例中,我們已經實作了一個基礎的Spring Cloud Stream訊息傳遞處理操作,但在操作之中使用的是系統提供的 Source (output)、Sink(input),接下來我們來看一下自定義管道名稱是怎么實作的,在此之前,我們需要先分析原始碼,
Source.class
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
從這里我們不難看出,默認的發送管道名就是output,因此我們在使用發送管道發送訊息時,名稱必須叫output
@Autowired//定義訊息發送管道,名字只能叫output
private MessageChannel output;
Sink.class
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
從這里我們不難看出,默認的接收管道名就是input,因此我們在使用接收管道接收訊息時,名稱必須叫input
@StreamListener(Sink.INPUT)
public void receive(Message message) {
System.out.println(serverPort + " receive:" + message.getPayload());
}
我們要是想要實作自己定義的管道名,我們可以照貓畫虎,分別寫兩個介面,就照抄原始碼,改改默認名稱,然后在使用的時候換上咱們自己的就行了,
4.2、修改訊息生產者
(1)自定義一個Source(com.caochenlei.message.MySource)
public interface MySource {
String OUTPUT = "myOutput";
@Output("myOutput")
MessageChannel output();
}
(2)替換默認Source為咱們自定義的MySource
@EnableBinding(MySource.class)
public class ProviderSender {
@Autowired//使用自定義的管道名myOutput
private MessageChannel myOutput;
public void send() {
String uuid = UUID.randomUUID().toString();
//核心代碼:構建一個訊息型別,然后使用發送管道發送出去
myOutput.send(MessageBuilder.withPayload(uuid).build());
System.out.println("send:" + uuid);
}
}
(3)修改組態檔application.yaml系結管道的名稱
#區域配置,請對應修改,其他地方保持不變
bindings:
#訊息生產者的設定選項
myOutput: #修改管道的名稱,不是output了
destination: springCloudStream #表示要使用的exchange名稱定義
content-type: application/json #表示要傳送的訊息型別,文本則設定"text/plain"
binder: rabbit #表示要系結到哪一個訊息中間件上,看binders配置
(4)重新啟動stream-rabbitmq-provider2001
4.3、修改訊息消費者
(1)自定義一個Sink(com.caochenlei.message.MySink)
public interface MySink {
String INPUT = "myInput";
@Input("myInput")
SubscribableChannel input();
}
(2)替換默認Sink為咱們自定義的MySink
@EnableBinding(MySink.class)
public class ConsumerReceiver {
@Value("${server.port}")
private String serverPort;
@StreamListener(MySink.INPUT)
public void receive(Message message) {
System.out.println(serverPort + " receive:" + message.getPayload());
}
}
(3)修改組態檔application.yaml系結管道的名稱
#區域配置,請對應修改,其他地方保持不變
bindings:
#訊息消費者的設定選項
myInput: #修改管道的名稱,不是input了
destination: springCloudStream #表示要使用的exchange名稱定義
content-type: application/json #表示要接收的訊息型別,文本則設定"text/plain"
binder: rabbit #表示要系結到哪一個訊息中間件上,看binders配置
(4)重新啟動stream-rabbitmq-consumer2002
(5)我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看stream-rabbitmq-consumer2002控制臺
StreamRabbitmqProvider2001Application :2001

StreamRabbitmqConsumer2002Application :2002

第五章 Stream分組與持久化
5.1、創建訊息消費者
根據stream-rabbitmq-consumer2002重新創建一份除埠號和啟動類名上的埠號不一樣其余均一樣的stream-rabbitmq-consumer2003工程,然后啟動,

5.2、奇怪問題的演示
我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看控制臺
StreamRabbitmqProvider2001Application :2001

StreamRabbitmqConsumer2002Application :2002

StreamRabbitmqConsumer2003Application :2003

我們發現這兩個消費者都執行了,那可能有人會說,這有啥問題,是消費者就應該執行啊,場景不一樣,這個問題的嚴重程度就不一樣,
比如:你現在開發了一套電商系統,有一套商品詳情頁需要生成靜態商品詳情頁,這個生成靜態頁面的服務部署了很多機器,當這個商品發布的時候,你就應該讓每一臺機器上都生成一份該商品的商品詳情頁,這么一看確實沒問題,
但是你再想啊,如果你現在有一套訂單系統,也是部署了很多機器,當用戶一下單支付的時候,所有部署了訂單的機器都跑了一遍,那這個用戶得多支付多少錢啊,這個時候,問題就嚴重了,那我們怎么能夠無論有多少個消費者,只能讓其中的一個執行呢?
5.3、如何解決這問題
這個問題,Stream也幫我們想到了,因此,他就參考了一種分組機制,分組后會擁有以下的特性:
- 同一個組中,無論有多少消費者,最終都只會有一個消費者執行,
- 有分組就表示該訊息可以進行持久化,不分組的話,消費者要先啟動起來,然后再用生產者發送訊息,這樣才可以接收到訊息,否則發送的訊息就丟失了,生產者先發了訊息,消費者后面才啟動的話是接收不到訊息的;分組后,如果消費端的微服務宕機或重啟,該佇列資訊依然會被保留在RabbitMQ中,后續依然可以進行消費,
默認的情況下,Stream就已經為每一個訊息消費者都分配了一個組,這個組名是隨機匿名的,查看地址:http://localhost:15672/#/queues

那么,問題有隨之而來了,如何修改默認分組,怎么修改,請往下學習,
5.4、訊息分組后演示
(1)修改stream-rabbitmq-consumer2002的application.yaml,只需要加入一句,修改完畢,然后重啟
#區域配置,請對應修改,其他地方保持不變
bindings:
#訊息消費者的設定選項
myInput: #修改管道的名稱,不是input了
destination: springCloudStream #表示要使用的exchange名稱定義
content-type: application/json #表示要接收的訊息型別,文本則設定"text/plain"
binder: rabbit #表示要系結到哪一個訊息中間件上,看binders配置
group: orderGroup #表示自定義分組,同一組內的所有消費者有競爭關系,且具備訊息持久化
(2)修改stream-rabbitmq-consumer2003的application.yaml,只需要加入一句,修改完畢,然后重啟
#區域配置,請對應修改,其他地方保持不變
bindings:
#訊息消費者的設定選項
myInput: #修改管道的名稱,不是input了
destination: springCloudStream #表示要使用的exchange名稱定義
content-type: application/json #表示要接收的訊息型別,文本則設定"text/plain"
binder: rabbit #表示要系結到哪一個訊息中間件上,看binders配置
group: orderGroup #表示自定義分組,同一組內的所有消費者有競爭關系,且具備訊息持久化
(3)我們看看是不是真的分組了,瀏覽器地址輸入:http://localhost:15672/#/queues

(4)我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看控制臺
StreamRabbitmqProvider2001Application :2001

StreamRabbitmqConsumer2002Application :2002

StreamRabbitmqConsumer2003Application :2003

這樣問題就得到了解決,
5.5、訊息持久化演示
(1)關閉stream-rabbitmq-consumer2002和stream-rabbitmq-consumer2003
(2)我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send
(3)隨便啟動一個訊息消費者,這里啟動stream-rabbitmq-consumer2003,然后查看控制臺

持久化也得到了保證,
第六章 Stream訊息磁區處理
6.1、修改訊息生產者
(1)修改stream-rabbitmq-provider2001的com.caochenlei.message.ProviderSender
@EnableBinding(MySource.class)
public class ProviderSender {
@Autowired//使用自定義的管道名myOutput
private MessageChannel myOutput;
// public void send() {
// String uuid = UUID.randomUUID().toString();
// //核心代碼:構建一個訊息型別,然后使用發送管道發送出去
// myOutput.send(MessageBuilder.withPayload(uuid).build());
// System.out.println("send:" + uuid);
// }
public void send() {
String uuid = UUID.randomUUID().toString();
for (int i = 1; i < 8; i++) {
//核心代碼:構建一個訊息型別,然后使用發送管道發送出去
Map<String, Object> payload = new HashMap<>();
payload.put("orderId", uuid);
payload.put("data", i);
myOutput.send(MessageBuilder.withPayload(payload).build());
System.out.println("send:" + uuid + ",data:" + i);
}
}
}
(2)重啟stream-rabbitmq-provider2001和stream-rabbitmq-consumer2002

6.2、奇怪問題的演示
我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看控制臺
StreamRabbitmqProvider2001Application :2001

StreamRabbitmqConsumer2002Application :2002

StreamRabbitmqConsumer2003Application :2003

我們通過觀察,發現上邊運行的很正常,因為是隨機競爭,只有一個消費者進行消費,因此就會產生上邊這種情況,
假設,我現在一個用戶購買了800件商品,發送一個訂單,資料量很大,每次都會導致發送超時,因此,我們就想了一個辦法,將一個資料量很大的訂單,進行拆分,比如,100件為1組,我們通過回圈,發送8次,這樣,每一次發送的資料量就大大減少,這樣就保證了成功率,到消費端再根據訂單的id來對訂單進行重新組裝,保證訂單的一致,但是由此會產生另外一個問題,什么問題呢,就是每發送一次都會有一個消費者來消費,一個完整的訂單的分組就會被多個消費者消費,使得這一個訂單分布到了不同的消費者上,這樣我就沒有辦法通過訂單id拿到所有的資料進行重新組裝了,
我現在就想解決這個問題,消費的時候,你當前這個組競爭,隨便誰都可以,但是,只要你搶到了我這個訂單的消費權,這個訂單的資料全部交給你來處理,其他人就不能消費了,這樣就保證了在競爭的情況下,一組資料經過多次發送,消費端保證只有一個消費者對該資料進行消費,
6.3、如何解決這問題
針對上述的問題,Stream又提供了磁區的功能,使用磁區功能就能解決,
(1)修改stream-rabbitmq-provider2001的application.yaml,修改完成,重新啟動stream-rabbitmq-provider2001
#區域配置,請對應修改,其他地方保持不變
bindings:
#訊息生產者的設定選項
myOutput: #修改管道的名稱,不是output了
destination: springCloudStream #表示要使用的exchange名稱定義
content-type: application/json #表示要傳送的訊息型別,文本則設定"text/plain"
binder: rabbit #表示要系結到哪一個訊息中間件上,看binders配置
#訊息生產者磁區設定
producer:
partitionCount: 2 #指定參與訊息磁區的消費端節點數量為2個
partitionKeyExpression: payload.orderId #通過該引數指定了磁區鍵的運算式規則
(2)修改stream-rabbitmq-consumer2002的application.yaml,修改完成,重新啟動stream-rabbitmq-consumer2002
#區域配置,請對應修改,其他地方保持不變
bindings:
#訊息消費者的設定選項
myInput: #修改管道的名稱,不是input了
destination: springCloudStream #表示要使用的exchange名稱定義
content-type: application/json #表示要接收的訊息型別,文本則設定"text/plain"
binder: rabbit #表示要系結到哪一個訊息中間件上,看binders配置
group: orderGroup #表示自定義分組,同一組內的所有消費者有競爭關系,且具備訊息持久化
#訊息消費者磁區設定
consumer:
partitioned: true #開啟消費者磁區功能
#spring.cloud.stream.instanceCount,指定了當前消費者的總實體數量
instance-count: 2
#spring.cloud.stream.instanceIndex,設定當前實體的索引號為0索引
instance-index: 0
(3)修改stream-rabbitmq-consumer2003的application.yaml,修改完成,重新啟動stream-rabbitmq-consumer2003
#區域配置,請對應修改,其他地方保持不變
bindings:
#訊息消費者的設定選項
myInput: #修改管道的名稱,不是input了
destination: springCloudStream #表示要使用的exchange名稱定義
content-type: application/json #表示要接收的訊息型別,文本則設定"text/plain"
binder: rabbit #表示要系結到哪一個訊息中間件上,看binders配置
group: orderGroup #表示自定義分組,同一組內的所有消費者有競爭關系,且具備訊息持久化
#訊息消費者磁區設定
consumer:
partitioned: true #開啟消費者磁區功能
#spring.cloud.stream.instanceCount,指定了當前消費者的總實體數量
instance-count: 2
#spring.cloud.stream.instanceIndex,設定當前實體的索引號為1索引
instance-index: 1
(4)我們可以看看他是怎么給你區別不同的磁區的,瀏覽器地址輸入:http://localhost:15672/#/queues

6.4、訊息磁區后演示
我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看控制臺
StreamRabbitmqProvider2001Application :2001

StreamRabbitmqConsumer2002Application :2002

StreamRabbitmqConsumer2003Application :2003

好了,問題解決了,總結一下,訊息磁區就是保證當生產者將訊息資料發送給多個消費者實體時,保證同一訊息資料始終是由同一個消費者實體接收和處理,
至于磁區的選擇,則是基于以下公式:key.hashCode() % partitionCount,我們上邊的key是payload.orderId,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/257836.html
標籤:其他
上一篇:原碼, 反碼, 補碼 詳解
