主頁 > 軟體設計 > 服務訊息驅動:Stream

服務訊息驅動:Stream

2021-02-08 14:30:14 軟體設計

目錄

  • 第一章 Stream介紹
    • 1.1、什么是Stream
    • 1.2、為啥用Stream
  • 第二章 Stream重要概念
    • 2.1、基本流程
    • 2.2、常用注解
    • 2.3、其他術語
  • 第三章 Stream入門案例
    • 3.1、專案準備與啟動
    • 3.2、創建訊息生產者
    • 3.3、創建訊息消費者
  • 第四章 Stream自定義管道名
    • 4.1、分析stream原始碼
    • 4.2、修改訊息生產者
    • 4.3、修改訊息消費者
  • 第五章 Stream分組與持久化
    • 5.1、創建訊息消費者
    • 5.2、奇怪問題的演示
    • 5.3、如何解決這問題
    • 5.4、訊息分組后演示
    • 5.5、訊息持久化演示
  • 第六章 Stream訊息磁區處理
    • 6.1、修改訊息生產者
    • 6.2、奇怪問題的演示
    • 6.3、如何解決這問題
    • 6.4、訊息磁區后演示


配套資料,免費下載
鏈接:https://pan.baidu.com/s/1la_3-HW-UvliDRJzfBcP_w
提取碼:lxfx
復制這段內容后打開百度網盤手機App,操作更方便哦

第一章 Stream介紹

1.1、什么是Stream

Spring Cloud Stream用官方的話來說就是:他是一個構建訊息驅動微服務的框架,他能為一些訊息中間件供應商的產品提供了個性化的自動化配置實作,參考了發布/訂閱、消費組、磁區這三個核心概念,目前支持的產品主要是:RabbitMQ和Kafka,

Spring Cloud Stream用一句話來概括就是:屏蔽訊息中間件底層的差異,用于統一訊息的編程模型,降低切換訊息中間件的成本,

官方檔案手冊:https://docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE/reference/html/

1.2、為啥用Stream

在微服務的開發程序中,可能會經常用到訊息中間件,通過訊息中間件在服務與服務之間傳遞訊息,不管你使用的是哪款訊息中間件,比如RabbitMQ還是Kafka,那么訊息中間件和服務之間都有一點耦合性,這個耦合性就是指,如果我原來使用的RabbitMQ,現在要替換為Kafka,那么我們的微服務都需要修改,變動會比較大,因為這兩款訊息中間件有一些區別,如果我們使用Spring Cloud Stream來整合我們的訊息中間件,那么這樣就可以降低微服務和訊息中間件的耦合性,做到輕松在不同訊息中間件間切換,當然目前Spring Cloud Stream只支持RabbitMQ和Kafka,

按照官方的定義,Spring Cloud Stream 是一個構建訊息驅動微服務的框架,Spring Cloud Stream解決了開發人員無感知的使用訊息中間件的問題,因為Spring Cloud Stream對訊息中間件的進一步封裝,可以做到代碼層面對訊息中間件的無感知,甚至于動態的切換中間件(RabbitMQ切換為Kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程,

第二章 Stream重要概念

2.1、基本流程

應用程式通過input(相當于消費者consumer)、output(相當于生產者producer/提供者provider)來與Spring Cloud Stream中Binder互動,而Binder負責與訊息中間件互動,因此,我們只需關注如何與Binder互動即可,而無需關注與具體訊息中間件的互動,

Binder

Binder是應用與訊息中間件之間的封裝,目前實作了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變訊息型別(對應于Kafka的topic,RabbitMQ的exchange),這些都可以通過組態檔來實作,

Channel

通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實作存盤和轉發的媒介,通過對Channel對佇列進行配置,

Source

Source用于實作了訊息發布,由服務內的Spring完成,

Sink

Sink用于監聽進入Channel的訊息并將其反序列化,

2.2、常用注解

注解說明
@Input該注解標識輸入通道,通過該輸入通道接收訊息進入應用程式
@Output該注解標識輸出通道,發布的訊息將通過該通道離開應用程式
@StreamListener監聽佇列,用于消費者的佇列的訊息接收
@EnableBinding將信道 channel 和 exchange 系結在一起

2.3、其他術語

發布/訂閱

簡單的講就是一種生產者,消費者模式,發布者是生產,將輸出發布到資料中心,訂閱者是消費者,訂閱自己感興趣的資料,當有資料到達資料中心時,就把資料發送給對應的訂閱者,

消費組

直觀的理解就是一群消費者一起處理訊息,需要注意的是:每個發送到消費組的資料,僅由消費組中的一個消費者處理,而不同消費組可以處理相同的資料,

磁區

類比于消費組,磁區是將資料磁區,舉例:某應用有多個實體,都系結到同一個資料中心,也就是不同實體都將資料發布到同一個資料中心,磁區就是將資料中心的資料再細分成不同的區,為什么需要磁區?因為即使是同一個應用,不同實體發布的資料型別可能不同,也希望這些資料由不同的消費者處理,這就需要消費者可以僅訂閱一個資料中心的部分資料,這就需要磁區了,

第三章 Stream入門案例

3.1、專案準備與啟動

我們接下來的所有操作均是在Sleuth+Zipkin最后完成的工程上進行操作,相關代碼請到配套資料中尋找,

注意:這一章節我們需要使用RabbitMQ,而RabbitMQ的學習與搭建與啟動,請參考:https://caochenlei.blog.csdn.net/article/details/112549952

我們需要啟動注冊中心,啟動順序如下:

  1. eureka-server7001
  2. eureka-server7002

我們接下來還需要啟動RabbitMQ,并進入控制臺進行相應的查看,查看地址:http://localhost:15672,登錄賬戶:guest,登錄密碼:guest

3.2、創建訊息生產者

(1)在父專案spring-cloud-study下創建一個子專案stream-rabbitmq-provider2001

(2)在子專案stream-rabbitmq-provider2001pom.xml中匯入如下依賴:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
</dependencies>

(3)在子專案stream-rabbitmq-provider2001中新建組態檔application.yaml并寫入如下配置:

server:
  port: 2001

eureka:
  instance:
    #是否使用 ip 地址注冊
    prefer-ip-address: true
    #該實體注冊到服務中心的唯一ID
    instance-id: ${spring.cloud.client.ip-address}:${server.port}
  client:
    #是否將自己注冊到注冊中心,默認為 true
    register-with-eureka: true
    #表示 Eureka Client 間隔多久去服務器拉取注冊資訊,默認為 30 秒
    registry-fetch-interval-seconds: 10
    #設定服務注冊中心地址
    service-url:
      defaultZone: http://root:123456@eureka-server7001.com:7001/eureka/,http://root:123456@eureka-server7002.com:7002/eureka/

spring:
  application:
    name: stream-rabbitmq-provider2001
  #Stream核心配置如下:
  cloud:
    stream:
      binders:
        rabbit:                             #系結訊息中間件的名稱,自定義
          type: rabbit                      #系結訊息中間件的型別,固定值
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        #訊息生產者的設定選項
        output:
          destination: springCloudStream    #表示要使用的exchange名稱定義
          content-type: application/json    #表示要傳送的訊息型別,文本則設定"text/plain"
          binder: rabbit                    #表示要系結到哪一個訊息中間件上,看binders配置

(4)在子專案stream-rabbitmq-provider2001中新建啟動類com.caochenlei.StreamRabbitmqProvider2001Application并寫入如下代碼:

@SpringBootApplication
public class StreamRabbitmqProvider2001Application {
    public static void main(String[] args) {
        SpringApplication.run(StreamRabbitmqProvider2001Application.class, args);
    }
}

(5)創建訊息生產者com.caochenlei.message.ProviderSender

@EnableBinding(Source.class)
public class ProviderSender {
    @Autowired//定義訊息發送管道,名字只能叫output
    private MessageChannel output;

    public void send() {
        String uuid = UUID.randomUUID().toString();
        //核心代碼:構建一個訊息型別,然后使用發送管道發送出去
        output.send(MessageBuilder.withPayload(uuid).build());
        System.out.println("send:" + uuid);
    }
}

(6)創建一個控制器com.caochenlei.controller.ProviderController

@RestController
public class ProviderController {
    @Autowired
    private ProviderSender sender;

    @RequestMapping("/send")
    public void send() {
        sender.send();
    }
}

(7)啟動stream-rabbitmq-provider2001,然后打開eureka注冊中心查看注冊,查看地址:http://localhost:7001,登錄賬號:root,登錄密碼:123456

(8)我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send

(9)我們查看一下RabbitMQ的控制臺,請在瀏覽器地址輸入:http://localhost:15672/#/exchanges,登錄賬號:guest,登錄密碼:guest

3.3、創建訊息消費者

(1)在父專案spring-cloud-study下創建一個子專案stream-rabbitmq-consumer2002

(2)在子專案stream-rabbitmq-consumer2002pom.xml中匯入如下依賴:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
</dependencies>

(3)在子專案stream-rabbitmq-consumer2002中新建組態檔application.yaml并寫入如下配置:

server:
  port: 2002

eureka:
  instance:
    #是否使用 ip 地址注冊
    prefer-ip-address: true
    #該實體注冊到服務中心的唯一ID
    instance-id: ${spring.cloud.client.ip-address}:${server.port}
  client:
    #是否將自己注冊到注冊中心,默認為 true
    register-with-eureka: true
    #表示 Eureka Client 間隔多久去服務器拉取注冊資訊,默認為 30 秒
    registry-fetch-interval-seconds: 10
    #設定服務注冊中心地址
    service-url:
      defaultZone: http://root:123456@eureka-server7001.com:7001/eureka/,http://root:123456@eureka-server7002.com:7002/eureka/

spring:
  application:
    #集群環境下名稱應該保持一致,這樣才能使用Eureka的負載均衡
    name: stream-rabbitmq-consumer
  #Stream核心配置如下:
  cloud:
    stream:
      binders:
        rabbit:                             #系結訊息中間件的名稱,自定義
          type: rabbit                      #系結訊息中間件的型別,固定值
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        #訊息消費者的設定選項
        input:
          destination: springCloudStream    #表示要使用的exchange名稱定義
          content-type: application/json    #表示要接收的訊息型別,文本則設定"text/plain"
          binder: rabbit                    #表示要系結到哪一個訊息中間件上,看binders配置

(4)在子專案stream-rabbitmq-consumer2002中新建啟動類com.caochenlei.StreamRabbitmqConsumer2002Application并寫入如下代碼:

@SpringBootApplication
public class StreamRabbitmqConsumer2002Application {
    public static void main(String[] args) {
        SpringApplication.run(StreamRabbitmqConsumer2002Application.class, args);
    }
}

(5)創建訊息消費者com.caochenlei.message.ConsumerReceiver

@EnableBinding(Sink.class)
public class ConsumerReceiver {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void receive(Message message) {
        System.out.println(serverPort + " receive:" + message.getPayload());
    }
}

(6)啟動stream-rabbitmq-consumer2002,然后打開eureka注冊中心查看注冊,查看地址:http://localhost:7001,登錄賬號:root,登錄密碼:123456

(7)我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看stream-rabbitmq-consumer2002控制臺

第四章 Stream自定義管道名

4.1、分析stream原始碼

在前面的案例中,我們已經實作了一個基礎的Spring Cloud Stream訊息傳遞處理操作,但在操作之中使用的是系統提供的 Source (output)、Sink(input),接下來我們來看一下自定義管道名稱是怎么實作的,在此之前,我們需要先分析原始碼,

Source.class

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

從這里我們不難看出,默認的發送管道名就是output,因此我們在使用發送管道發送訊息時,名稱必須叫output

@Autowired//定義訊息發送管道,名字只能叫output
private MessageChannel output;

Sink.class

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

從這里我們不難看出,默認的接收管道名就是input,因此我們在使用接收管道接收訊息時,名稱必須叫input

@StreamListener(Sink.INPUT)
public void receive(Message message) {
    System.out.println(serverPort + " receive:" + message.getPayload());
}

我們要是想要實作自己定義的管道名,我們可以照貓畫虎,分別寫兩個介面,就照抄原始碼,改改默認名稱,然后在使用的時候換上咱們自己的就行了,

4.2、修改訊息生產者

(1)自定義一個Source(com.caochenlei.message.MySource)

public interface MySource {
    String OUTPUT = "myOutput";

    @Output("myOutput")
    MessageChannel output();
}

(2)替換默認Source為咱們自定義的MySource

@EnableBinding(MySource.class)
public class ProviderSender {
    @Autowired//使用自定義的管道名myOutput
    private MessageChannel myOutput;

    public void send() {
        String uuid = UUID.randomUUID().toString();
        //核心代碼:構建一個訊息型別,然后使用發送管道發送出去
        myOutput.send(MessageBuilder.withPayload(uuid).build());
        System.out.println("send:" + uuid);
    }
}

(3)修改組態檔application.yaml系結管道的名稱

#區域配置,請對應修改,其他地方保持不變
      bindings:
        #訊息生產者的設定選項
        myOutput: #修改管道的名稱,不是output了
          destination: springCloudStream    #表示要使用的exchange名稱定義
          content-type: application/json    #表示要傳送的訊息型別,文本則設定"text/plain"
          binder: rabbit                    #表示要系結到哪一個訊息中間件上,看binders配置

(4)重新啟動stream-rabbitmq-provider2001

4.3、修改訊息消費者

(1)自定義一個Sink(com.caochenlei.message.MySink)

public interface MySink {
    String INPUT = "myInput";

    @Input("myInput")
    SubscribableChannel input();
}

(2)替換默認Sink為咱們自定義的MySink

@EnableBinding(MySink.class)
public class ConsumerReceiver {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(MySink.INPUT)
    public void receive(Message message) {
        System.out.println(serverPort + " receive:" + message.getPayload());
    }
}

(3)修改組態檔application.yaml系結管道的名稱

#區域配置,請對應修改,其他地方保持不變
      bindings:
        #訊息消費者的設定選項
        myInput: #修改管道的名稱,不是input了
          destination: springCloudStream    #表示要使用的exchange名稱定義
          content-type: application/json    #表示要接收的訊息型別,文本則設定"text/plain"
          binder: rabbit                    #表示要系結到哪一個訊息中間件上,看binders配置

(4)重新啟動stream-rabbitmq-consumer2002

(5)我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看stream-rabbitmq-consumer2002控制臺

StreamRabbitmqProvider2001Application :2001

StreamRabbitmqConsumer2002Application :2002

第五章 Stream分組與持久化

5.1、創建訊息消費者

根據stream-rabbitmq-consumer2002重新創建一份除埠號和啟動類名上的埠號不一樣其余均一樣的stream-rabbitmq-consumer2003工程,然后啟動,

5.2、奇怪問題的演示

我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看控制臺

StreamRabbitmqProvider2001Application :2001

StreamRabbitmqConsumer2002Application :2002

StreamRabbitmqConsumer2003Application :2003

我們發現這兩個消費者都執行了,那可能有人會說,這有啥問題,是消費者就應該執行啊,場景不一樣,這個問題的嚴重程度就不一樣,

比如:你現在開發了一套電商系統,有一套商品詳情頁需要生成靜態商品詳情頁,這個生成靜態頁面的服務部署了很多機器,當這個商品發布的時候,你就應該讓每一臺機器上都生成一份該商品的商品詳情頁,這么一看確實沒問題,

但是你再想啊,如果你現在有一套訂單系統,也是部署了很多機器,當用戶一下單支付的時候,所有部署了訂單的機器都跑了一遍,那這個用戶得多支付多少錢啊,這個時候,問題就嚴重了,那我們怎么能夠無論有多少個消費者,只能讓其中的一個執行呢?

5.3、如何解決這問題

這個問題,Stream也幫我們想到了,因此,他就參考了一種分組機制,分組后會擁有以下的特性:

  1. 同一個組中,無論有多少消費者,最終都只會有一個消費者執行,
  2. 有分組就表示該訊息可以進行持久化,不分組的話,消費者要先啟動起來,然后再用生產者發送訊息,這樣才可以接收到訊息,否則發送的訊息就丟失了,生產者先發了訊息,消費者后面才啟動的話是接收不到訊息的;分組后,如果消費端的微服務宕機或重啟,該佇列資訊依然會被保留在RabbitMQ中,后續依然可以進行消費,

默認的情況下,Stream就已經為每一個訊息消費者都分配了一個組,這個組名是隨機匿名的,查看地址:http://localhost:15672/#/queues

那么,問題有隨之而來了,如何修改默認分組,怎么修改,請往下學習,

5.4、訊息分組后演示

(1)修改stream-rabbitmq-consumer2002application.yaml,只需要加入一句,修改完畢,然后重啟

#區域配置,請對應修改,其他地方保持不變
      bindings:
        #訊息消費者的設定選項
        myInput: #修改管道的名稱,不是input了
          destination: springCloudStream    #表示要使用的exchange名稱定義
          content-type: application/json    #表示要接收的訊息型別,文本則設定"text/plain"
          binder: rabbit                    #表示要系結到哪一個訊息中間件上,看binders配置
          group: orderGroup                 #表示自定義分組,同一組內的所有消費者有競爭關系,且具備訊息持久化

(2)修改stream-rabbitmq-consumer2003application.yaml,只需要加入一句,修改完畢,然后重啟

#區域配置,請對應修改,其他地方保持不變
      bindings:
        #訊息消費者的設定選項
        myInput: #修改管道的名稱,不是input了
          destination: springCloudStream    #表示要使用的exchange名稱定義
          content-type: application/json    #表示要接收的訊息型別,文本則設定"text/plain"
          binder: rabbit                    #表示要系結到哪一個訊息中間件上,看binders配置
          group: orderGroup                 #表示自定義分組,同一組內的所有消費者有競爭關系,且具備訊息持久化

(3)我們看看是不是真的分組了,瀏覽器地址輸入:http://localhost:15672/#/queues

(4)我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看控制臺

StreamRabbitmqProvider2001Application :2001

StreamRabbitmqConsumer2002Application :2002

StreamRabbitmqConsumer2003Application :2003

這樣問題就得到了解決,

5.5、訊息持久化演示

(1)關閉stream-rabbitmq-consumer2002stream-rabbitmq-consumer2003

(2)我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send

(3)隨便啟動一個訊息消費者,這里啟動stream-rabbitmq-consumer2003,然后查看控制臺

持久化也得到了保證,

第六章 Stream訊息磁區處理

6.1、修改訊息生產者

(1)修改stream-rabbitmq-provider2001com.caochenlei.message.ProviderSender

@EnableBinding(MySource.class)
public class ProviderSender {
    @Autowired//使用自定義的管道名myOutput
    private MessageChannel myOutput;

//    public void send() {
//        String uuid = UUID.randomUUID().toString();
//        //核心代碼:構建一個訊息型別,然后使用發送管道發送出去
//        myOutput.send(MessageBuilder.withPayload(uuid).build());
//        System.out.println("send:" + uuid);
//    }

    public void send() {
        String uuid = UUID.randomUUID().toString();
        for (int i = 1; i < 8; i++) {
            //核心代碼:構建一個訊息型別,然后使用發送管道發送出去
            Map<String, Object> payload = new HashMap<>();
            payload.put("orderId", uuid);
            payload.put("data", i);
            myOutput.send(MessageBuilder.withPayload(payload).build());
            System.out.println("send:" + uuid + ",data:" + i);
        }
    }
}

(2)重啟stream-rabbitmq-provider2001stream-rabbitmq-consumer2002

6.2、奇怪問題的演示

我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看控制臺

StreamRabbitmqProvider2001Application :2001

StreamRabbitmqConsumer2002Application :2002

StreamRabbitmqConsumer2003Application :2003

我們通過觀察,發現上邊運行的很正常,因為是隨機競爭,只有一個消費者進行消費,因此就會產生上邊這種情況,

假設,我現在一個用戶購買了800件商品,發送一個訂單,資料量很大,每次都會導致發送超時,因此,我們就想了一個辦法,將一個資料量很大的訂單,進行拆分,比如,100件為1組,我們通過回圈,發送8次,這樣,每一次發送的資料量就大大減少,這樣就保證了成功率,到消費端再根據訂單的id來對訂單進行重新組裝,保證訂單的一致,但是由此會產生另外一個問題,什么問題呢,就是每發送一次都會有一個消費者來消費,一個完整的訂單的分組就會被多個消費者消費,使得這一個訂單分布到了不同的消費者上,這樣我就沒有辦法通過訂單id拿到所有的資料進行重新組裝了,

我現在就想解決這個問題,消費的時候,你當前這個組競爭,隨便誰都可以,但是,只要你搶到了我這個訂單的消費權,這個訂單的資料全部交給你來處理,其他人就不能消費了,這樣就保證了在競爭的情況下,一組資料經過多次發送,消費端保證只有一個消費者對該資料進行消費,

6.3、如何解決這問題

針對上述的問題,Stream又提供了磁區的功能,使用磁區功能就能解決,

(1)修改stream-rabbitmq-provider2001application.yaml,修改完成,重新啟動stream-rabbitmq-provider2001

#區域配置,請對應修改,其他地方保持不變
      bindings:
        #訊息生產者的設定選項
        myOutput: #修改管道的名稱,不是output了
          destination: springCloudStream    #表示要使用的exchange名稱定義
          content-type: application/json    #表示要傳送的訊息型別,文本則設定"text/plain"
          binder: rabbit                    #表示要系結到哪一個訊息中間件上,看binders配置
          #訊息生產者磁區設定
          producer:
            partitionCount: 2                       #指定參與訊息磁區的消費端節點數量為2個
            partitionKeyExpression: payload.orderId #通過該引數指定了磁區鍵的運算式規則

(2)修改stream-rabbitmq-consumer2002application.yaml,修改完成,重新啟動stream-rabbitmq-consumer2002

#區域配置,請對應修改,其他地方保持不變
      bindings:
        #訊息消費者的設定選項
        myInput: #修改管道的名稱,不是input了
          destination: springCloudStream    #表示要使用的exchange名稱定義
          content-type: application/json    #表示要接收的訊息型別,文本則設定"text/plain"
          binder: rabbit                    #表示要系結到哪一個訊息中間件上,看binders配置
          group: orderGroup                 #表示自定義分組,同一組內的所有消費者有競爭關系,且具備訊息持久化
          #訊息消費者磁區設定
          consumer:
            partitioned: true               #開啟消費者磁區功能
      #spring.cloud.stream.instanceCount,指定了當前消費者的總實體數量
      instance-count: 2
      #spring.cloud.stream.instanceIndex,設定當前實體的索引號為0索引
      instance-index: 0

(3)修改stream-rabbitmq-consumer2003application.yaml,修改完成,重新啟動stream-rabbitmq-consumer2003

#區域配置,請對應修改,其他地方保持不變
      bindings:
        #訊息消費者的設定選項
        myInput: #修改管道的名稱,不是input了
          destination: springCloudStream    #表示要使用的exchange名稱定義
          content-type: application/json    #表示要接收的訊息型別,文本則設定"text/plain"
          binder: rabbit                    #表示要系結到哪一個訊息中間件上,看binders配置
          group: orderGroup                 #表示自定義分組,同一組內的所有消費者有競爭關系,且具備訊息持久化
          #訊息消費者磁區設定
          consumer:
            partitioned: true               #開啟消費者磁區功能
      #spring.cloud.stream.instanceCount,指定了當前消費者的總實體數量
      instance-count: 2
      #spring.cloud.stream.instanceIndex,設定當前實體的索引號為1索引
      instance-index: 1

(4)我們可以看看他是怎么給你區別不同的磁區的,瀏覽器地址輸入:http://localhost:15672/#/queues

6.4、訊息磁區后演示

我們發送一個訊息試試,請在瀏覽器地址輸入:http://localhost:2001/send,然后查看控制臺

StreamRabbitmqProvider2001Application :2001

StreamRabbitmqConsumer2002Application :2002

StreamRabbitmqConsumer2003Application :2003

好了,問題解決了,總結一下,訊息磁區就是保證當生產者將訊息資料發送給多個消費者實體時,保證同一訊息資料始終是由同一個消費者實體接收和處理,

至于磁區的選擇,則是基于以下公式:key.hashCode() % partitionCount,我們上邊的keypayload.orderId

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/257836.html

標籤:其他

上一篇:原碼, 反碼, 補碼 詳解

下一篇:新年將至, 程式員如何以代碼送出新春祝福

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more