主頁 > 後端開發 > spring boot集成RabbitMQ-SpringBoot(24)

spring boot集成RabbitMQ-SpringBoot(24)

2022-08-15 07:38:23 後端開發

1. 認識RabbitMQ

??1.1介紹RabbitMQ

RabbitMQ 是開源的高級訊息佇列協議(Advanced Message Queueing Protocol, AMQP) 的實作,用Erlang 語言撰寫,支持多種客戶端,

RabbitMQ是目前應用相當廣泛的訊息中間件(其他同類的訊息處理中間件有ActiveMQ、Kafka等),在企業級應用、微服務應用中,RabbitMQ擔當著十分重要的角色,例如,在業務服務模塊中解耦、異步通信、高并發限流、超時業務、資料延遲處理等都可以使用RabbitMQ,

RabbitMQ的處理流程如圖12-1所示

??

圖 12-1

??1.2 使用場景

??(1)推送通知

??“發布/訂閱”是RabbitMQ的重要功能,可以用"發布/訂閱"功能來實作通知功能,消費者 (consumer) 一直監聽RabbitMQ的資料,如果RabbitMQ有資料,則消費者會按照“先進先岀” 規則逐條進行消費,而生產者(producer)只需要將資料存入RabbitMQ,這樣既降低了不同系統之間的耦合度,也確保了訊息通知的及時性,且不影響系統的性能,

??"發布/訂閱”功能支持三種模式:一對一、一對多、廣播,這三種模式都可以根據規則選擇分發的物件,眾多消費者(consumer)可以根據規則選擇是否接收這些資料,擴展性非常強,

??(2)異步任務

??后臺系統接到任務后,將其分解成多個小任務,只要分別完成這些小任務,整個任務便可以完成,但是,如果某個小任務很費時,且延遲執行并不影響整個任務,則可以將該任務放入訊息佇列中去處理,以便加快請求回應時間,

??如果用戶注冊會員時有一項需求一發送驗證郵件或短信驗證碼以完成驗證,則可以使用 RabbitMQ的訊息佇列來實作,這樣可以及時提醒用戶操作已經成功,等待收到郵件或驗證碼,然后進行相應的確認,即完成驗證,

??(3)多平臺應用的通信

??RabbitMQ可以用于不同開發語言開發的應用間的通信(如Java開發的應用程式需要與C++ 開發的應用程式進行通信),實作企業應用集成,由于訊息佇列是無關平臺和語言的,而且語意上也不是函式呼叫,因此RabbitMQ適合作為多個應用之間的松耦合的介面,且不需要發送方和接收方同時在線,

??不同語言的軟體解耦,可以最大限度地減少程式之間的相互依賴,提高系統可用性及可擴展性, 同時還增加了訊息的可靠傳輸和事務管理功能,

RabbitMQ提供兩種事務模式:

  • AMQP事務模式,
  • Confirm事務模式,

??(4)訊息延遲

??利用RabbitMQ訊息佇列演出功能,可以實作訂單、支付過期定時取消功能,因為延遲佇列存盤延時訊息,所以,當訊息被發送以后,消費者不是立即拿到訊息,而是等待指定時間后才拿到這個訊息進行消費,

??當然,死信、計時器、定時任務也可以實作延退或定時功能,但是需要開發者去處理,

??要實作訊息佇列延遲功能,一般釆用官方提供的插件“rabbitmq_delayed_message_ exchange"來實作,但RabbitMQ版本必須是3.5.8版本以上才支持該插件,如果低于這個版本, 則可以利用“死信”來完成,

??(5)遠程程序呼叫

??在實際的應用場景中,有時需要一些同步處理,以等待服務器端將訊息處理完成后再進行下一 步處理,這相當于RPC ( Remote Procedure Call,遠程程序呼叫),RabbitMQ也支持RPC,

??1.3 特性

RabbitMQ具有以下特性,

  • 資訊確認:RabbitMQ有以下兩種應答模式,
    • 自動應答:當RabbitMQ把訊息發送到接收端,接收端從佇列接收訊息時,會自動發送應答訊息給服務器端,
    • 手動應答:需要開發人員手動呼叫方法告訴服務端已經收到,
  • 佇列持久化:佇列可以被持久化,但是否為持久化,要看持久化設定,
  • 資訊持久化:設定properties.DeliveryMode值即可,默認值為1,代表不是持久的,2代表持久化,
  • 訊息拒收:接收端可以拒收訊息,而且在發送"reject”命令時,可以選擇是否要把拒收的訊息重新放回佇列中,
  • 訊息的QoS:在接收端設定的,發送端沒有任何變化,接收端的代碼也比較簡單,只需要加上如 "channel.BasicQos(0,1, false);"的代碼即可,

如果實際場景中對個別訊息的丟失不是很敏感,則選用自動應答比較理想,

如果是一個訊息都不能丟的場景,則需要選用手動應答,在正確處理完以后才應答, 如果選擇了自動應答,那訊息重發這個功能就沒有了,

2. RabbitMQ的基本概念

??2.1 生產者、消費者和代理

RabbitMQ的角色有以下三種,

  • 生產者:訊息的創建者,負責創建和推送資料到訊息服務器,
  • 消費者:訊息的接收方,用于處理資料和確認訊息,
  • 代理:RabbitMQ本身,扮演“快遞”的角色,本身不生產訊息,

生產者和消費者并不屬于RabbitMQ,RabbitMQ只是為生產者和消費者提供發送和接收訊息的API,

??2.2 訊息佇列

Queue (佇列)是RabbitMQ的內部物件,用于存盤生產者的訊息直到發送給消費者,也是消費者接收訊息的地方,RabbitMQ中的訊息也都只能存盤在Queue中,多個消費者可以訂閱同一 個 Queue,

Queue有以下一些重要的屬性,

  • 持久性:如果啟用,則佇列將會在訊息協商器(broker)重啟前都有效,
  • 自動洗掉:如果啟用,則佇列將會在所有的消費者停止使用之后自動洗掉掉,
  • 惰性:如果沒有宣告佇列,則應用程式呼叫佇列時會導致例外,并不會主動宣告,
  • 排他性:如果啟用,則宣告它的消費者才能使用,

??2.3 交換機

??Exchange (交換機)用于接收、分配訊息,生產者先要指定一個“routing key”,然后將訊息發送到交換機,這個"routing key"需要與"Exchange Type"及"binding key"聯合使用才能最終生效,然后,交換機將訊息路由到一個或多個Queue中,或丟棄,

??在虛擬主機的訊息協商器(broker)中,每個Exchange都有睢一的名字,

??Exchange包含4種型別:direct、topic、fanout、headers,不同的型別代表系結到佇列的行為不同,

??(1)direct

??direct型別的行為是“先匹配,再投送”,在系結佇列時會設定一個routing key,只有在訊息的routing key與佇列匹配時,訊息才會被交換機投送到系結的佇列中,允許一個佇列通過一個固定的routing key (通常是佇列的名字)進行系結,Direct交換機將訊息根據其routing key屬性投遞到包含對應key屬性的系結器上,

??Direct Exchange是RabbitMQ默認的交換機模式,也是最簡單的模式,它根據routing key 全文匹配去尋找佇列,

??(2)topic

??按規則轉發訊息(最靈活),主題交換機(topic exchange )轉發訊息主要根據通配符,佇列和交換機的系結會定義一種路由模式,通配符就要在這種路由模式和路由鍵之間匹配后,交換機才能轉發訊息,

??在這種交換機模式下,路由鍵必須是一串字符,用”.“隔開,

??路由模式必須包含一個星號“*”,主要用于匹配路由鍵指定位置的一個單詞,

??topic還支持訊息的routing key,用”*“或”#“的模式進行系結,“*”匹配一個單詞,“#” 匹配0個或多個單詞,例如 “/binding key *.user.#”匹配 routing key 為“cn.user“和“us.user.db”, 但是不匹配“user.hello”

??(3)headers

??它根據應用程式訊息的特定屬性進行匹配,可以在binding key中標記訊息為可選或必選,在佇列與交換機系結時,會設定一組鍵值對規則,訊息中也包括一組鍵值對(headers屬性),當這些鍵值対中有一對,或全部匹配時,訊息被投送到對應佇列,

??(4)fanout

??訊息廣播的模式,即將訊息廣播到所有系結到它的佇列中,而不考慮routing key的值(不管路由鍵或是路由模式),如果配置了 routing key,則routing key依然會被忽略,

??2.4 系結

??RabbitMQ中通過系結(binding ),將Exchange與Queue關聯起來,這樣 RabbitMQ 就知道如何正確地將訊息路由到指定的 Queue 了,

??在系結 Exchange與 Queue時,—般會指定一個binding key,消賽者將訊息發送給Exchange 時,一般會指定一個routing key,如果 binding key 與 routing key 相匹配,則訊息將會被路由到對應的Queue中,

??系結是生產者和消費者訊息傳遞的連接,生產者發送訊息到 Exchange,消費者從Queue接收訊息,都是根據系結來執行的,

??2.5 通道

??有些應用需要與AMQP代理建立多個連接,但同時開啟多個TCP ( Transmission Control Protocol,傳輸控制協議)連接會消耗過多的系統資源,并使得防火堵的配置變得更加困難,“AMQP 0-9-1“協議用通道(channel)來處理多連接,可以把通道理解成“共享一個TCP連接的多個輕量化連接”,

??一個特定通道上的通信與其他通道上的通信是完全隔離的,因此,每個AMQP方法都需要攜帶一個通道號,這樣客戶端就可以指定此方法是為哪個通道準備的,

??2.6 訊息確認

??訊息確認(message acknowledgement )是指:當一個訊息從佇列中投遞給消費者 (consumer)后,消費者會通知一下訊息代理(broker),這個程序可以是自動的,也可以由處理訊息的應用的開發者執行,當“訊息確認”啟用時,訊息代理需要收到來自消費者的確認回執后, 才完全將訊息從佇列中洗掉,

??如果訊息無法被成功路由,或被返給發送者并被丟棄,或訊息代理執行了延期操作,則訊息會 被放入一個“死信”佇列中,此時,訊息發送者可以選擇某些引數來處理這些特殊情況,

3. RabbitMQ的六種作業模式

??3.1 簡單模式

??生產者把訊息放入佇列,消費者獲得訊息,如圖12-2所示,這個模式只有一個消費者、一個生產者、一個佇列,只需要配置主機引數,其他引數使用默認值即可通信,

??

圖 12-2

??3.2 作業佇列模式

??這種模式出現了多個消費者,如圖12-3所示,為了保證消費者之間的負載均衡和同步,需要在訊息佇列之間加上同步功能,

??作業佇列(任務佇列)背后的主要思想是:避免立即執行資源密集型任務(耗時),以便下一個任務執行時不用等待它完成,作業佇列將任務封裝為訊息并將其發送到佇列中,

??

圖 12-3

??3.3 交換機模式

??實際上,前兩種模式也使用了交換機,只是使用了采用默認設定的交換機,交換機引數是可以配置的,如果訊息配置的交換機引數和RabbitMQ佇列系結(binding )的交換機名稱相同,則轉發,否則丟棄,如圖12-4所示,

??

??圖 12-4

??3.4 Routing 轉發模式

??交換機要配置為direct型別,轉發的規則變為檢查佇列的routing key值,如果routing key 值相同,則轉發,否則丟棄,如圖12-5所示,

??

圖 12-5

??3.5 主題轉發模式

??這種模式下交換機要配置為topic型別,routing key配置失效,發送到主題交換機的資訊, 不能是任意routing key,它必須是一個單詞的串列,用逗號分隔,特點是可以模糊匹配,匹配規則為:*(星號)可以代替一個詞;#(#號)可以代替零個或更多的單詞,其模式情況如圖12-6 所示,

??

圖 12-6

??3.6 RPC 模式

??這種模式主要使用在遠程呼叫的場景下,如果一個應用程式需要另外一個應用程式來最侄訓傳運行結果,那這個程序可能是比較耗時的操作,使用RPC模式是最合適的,其模式情況如圖12-7 所示,

??

圖 12-7

6種作業模式的主要特點如下,

  • 簡單模式:只有一個生產者,一個消費者
  • 作業佇列模式:一個生產者,多個消費者,每個消費者獲取到的訊息唯一,
  • 訂閱模式:一個生產者發送的訊息會被多個消費者獲取,
  • 路由模式:發送訊息到交換機,并且要指定路由key,消費者在將佇列系結到交換機時需要指定路由key,
  • topic模式:根據主題進行匹配,此時佇列需要系結在一個模式上,“#”匹配一個詞或多個詞,”*“只匹配一個詞,

4. 認識AmqpTemplate介面

??Spring AMQP提供了操作AMQP協議的模板類AmqpTemplate,用于發送和接收訊息, 它定義發送和接收訊息等操作,還提供了 RabbitTemplate用于實作AmqpTemplate介面, 而且還提供了錯誤拋岀類AmqpException,RabbitTemplate支持訊息的確認與回傳(默認禁用)

??4.1 發送訊息

??(1)send方法

AmqpTemplate模板提供了 send方法用來發送訊息,它有以下3個多載:

  • void send(Message message) throws AmqpException
  • void send(String routingKey, Message message) throws AmqpException
  • void send(String exchange, String routingKey, Message message)throws AmqpException

??(2)convertAndSend 方法

??AmqpTemplate模板還提供了 convertAndSend方法用來發送訊息,convertAndSend 方法相當于簡化了的send方法,可以自動處理訊息的序列化,下面通過兩個功能一樣的代碼來比較兩者的區別:

@Test
void contextLoads() {
    Message message = MessageBuilder.withBody("body content".getBytes())
        .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
        .setMessageId("1")
        .setHeader("header","header")
        .build();
    amqpTemplate.send("QueueHello",message);
}
@Test
void send(){
    amqpTemplate.convertAndSend("QueueHello","body content");
}

??上面代碼和下面代碼的效果一樣,

??4.2 接收訊息

接收訊息可以有兩種方式,

  • 直接去查詢獲取訊息,即呼叫receive方法,如果該方法沒有獲得訊息,則直接回傳null, 因為receive方法不阻塞,
  • 異步接收,通過注冊一個Listener (監聽器)來實作訊息接收,接收訊息需要指定佇列 (Queue),或設定默認的佇列,

??AmqpTemplate提供的直接獲得訊息的方法是receive

??另外,AmqpTemplate也提供了直接接收POJO (代替訊息物件)的方法receiveAndConvert,并提供了各種的 Messageconverter 用來處理回傳的Object (物件),

??從 Spring-Rabbit 1.3 版本開始,AmqpTemplate 也提供了 receiveAndReply 方法來異步接收、處理及回復訊息,

??4.3 異步接收訊息

??Spring AMQP也提供了多種不同的方式來實作異步接收訊息,比如常用的通過 MessageListener (訊息監聽器)的方式來實作,

?? 從Spring-rabbit 1.4版本開始,可使用注解@RabbitListener來異步接收訊息,它更為簡便, 使用方法見以下代碼:

@Component
@RabbitListener(queues = "object")
public class ObjectReceiver {
    @RabbitHandler
    public void process(User user){
        System.out.println("Receiver object"+user);
    }
}

5. 在Spring Boot中集成RabbitMQ

??5.1 安裝RabbitMQ

??RabbitMQ是用Erlang語言開發的,所以,需要先安裝Erlang環境,再安裝RabbitMQ,

????(1)下載 Erlang 環境和 RabbitMQ

??????到Erlang官網下載Erlang環境,

??????到 RabbitMQ 官網下載 RabbitMQ,

????(2)安裝

??????下載完成后,先單擊Erlang安裝檔案進行安裝,然后單擊RabbitMQ安裝檔案進行安裝,在安裝程序中,按照提示一步一步操作即可,在RabbitMQ成功安裝后,會自動啟動服務器,

????(3)開啟網頁管理界面

??????雖然可以在命令列管理RabbitMQ,但稍微麻煩,RabbitMQ提供了可視化的網頁管理平臺, 可以使用“rabbitmq-plugms.bat enable rabbitmq_management”命令開啟網頁管理界面,

??5.2 界面化管理RabbitMQ

??(1)概覽

????在安裝配置完成后,開啟網頁管理,然后可以通過"http://localhost:15672"進行查看和管理, 輸入默認的用戶名"guest"和密碼"guest"進行登錄,RabbitMQ的后臺界面如圖12-8所示,

????

圖 12-8

??(2)管理交換機

??進入交換機管理頁面后,單擊“Add exchange (添加交換機)”按鈕,彈出添加界面,可以看到列出了 RabbitMQ 默認的4種型別,由于筆者已經添加了訊息延遲插件,所以會有 “x-delayed-message”型別,如圖 12-9 所示,

??

圖 12-9

??(3)管理管理員

訊息中間件的安全配置也是必不可少的,在RabbitMQ中,可以通過命令列創建用戶、設定密碼、系結角色,常用的命令如下,

  • rabbitmqctl.bat list_users:查看現有用戶,
  • rabbitmqctl.bat add_user username password:新増用戶,新増的用戶只有用戶名、密碼,沒有管理員、超級管理員等角色,
  • rabbitmqctl.bat set_user_tags username administrator:設定角色,角色分為 none、 management、policymaker、monitoring、administrator,
  • rabbitmqctl change_password userName newPassword:修改密碼命令,
  • rabbitmqctl.bat delete_user username:洗掉用戶命令,

還可以在開啟RabbitMQ網頁管理界面之后,用可視化界面進行操作,如圖12-10所示,其 中“Tags”是管理員型別,

??在創建用戶后,需要指定用戶訪問一個虛擬機(如圖12-11所示),并且該用戶只能訪問該虛擬機下的佇列和交換機,如果沒有指定,則默認是"No access",而不是“/"(所有),在一個 RabbitMQ服務器上可以運行多個vhost,以適應不同的業務需要,這樣做既可以滿足權限配置的要求,也可以避免不同業務之間佇列、交換機的命名沖突問題,因為不同vhost之間是隔離的,權限設定可以細化到主題,

??

圖 12-10 管理管理員

圖 12-11 設定權限

??5.3 在 Spring Boot 中配置 RabbitMQ

??(1)添加依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

??(2)配置 application.properties 檔案

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /

6. 在SpringBoot中實作RabbitMQ的四種發送/接收模式

??6.1 實體:實作發送和接收佇列

??(1)配置佇列

@Bean
public Queue queue(){
    return new Queue("Queue1");
}

??(2)創建接收者

??注意,發送者和接收者的 Queue 名稱必須一致,否則不能接收,見以下代碼:

@Component
//監聽QueueHello的訊息佇列
@RabbitListener(queues = "Queue1")
public class ReceiveA {
    //@RabbitHandler來實作具體消費
    @RabbitHandler
    public void QueueReceiver(String Queue1){
        System.out.println("Receive A:"+Queue1);
    }
}

??(3)創建發送者

@Component
public class SendA {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void send(String content){
        System.out.println("Sender:"+content);
        //使用AmqpTemplate將訊息發送到訊息佇列中
        rabbitTemplate.convertAndSend("Queue1",content);
    }
}

??(4)測驗發送和接收情況

@Test
void QueueSend(){
    int i = 2;
    for (int j = 0; j < i; j++) {
        String msg = "Queue1 msg"+j+new Date();
        try {
            sendA.send(msg);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

運行測驗,可以看到控制臺輸岀如下結果:

Receive A:Queue1 msg0Sun Aug 14 11:16:45 CST 2022
Receive A:Queue1 msg1Sun Aug 14 11:16:45 CST 2022

上述資訊表示發送成功,且接收成功,

如果是多個接收者,則會均勻地將訊息發送到 N 個接收者中,并不是全部發送一遍, 也會和"一對多” 一樣,接收端仍然會均勻地接收到訊息,

??6.2 實作發送和接收物件

??(1)編輯配置類

package com.intehel.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Configuration
@Component
public class RabbitMQConfig {
    @Bean
    public Queue objectQueue(){
        return new Queue("object");
    }

}

??(2)撰寫接收類

@Component
@RabbitListener(queues = "object")
public class ObjectReceiver {
    @RabbitHandler
    public void process(User user){
        System.out.println("Receiver object"+user);
    }
}

??(3)撰寫發送類

@Component
public class ObjectSender {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send(User user){
        System.out.println("Sender object"+user);
        this.amqpTemplate.convertAndSend("object",user);
    }
}

??(4)撰寫測驗

@Test
void objectSend(){
    try {
        User user = new User();
        user.setId(1);
        user.setMsg("username");
        objectSender.send(user);
    }catch (Exception e){
        e.printStackTrace();
    }
}

??運行測驗,可以看到控制臺輸岀如下結果:

Sender objectUser(id=1, msg=username)

Receiver objectUser(id=1, msg=username)

??6.3 實體:實作用接收器接收多個主題

??(1)配置topic

@Configuration
@Component
public class RabbitMQConfig {
    @Bean
    public Queue topicA(){
        return new Queue("topic.a");
    }
    @Bean
    public Queue topicB(){
        return new Queue("topic.b");
    }
    @Bean
    TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }
    @Bean
    Binding bingTopicA(Queue topicA,TopicExchange exchange){
        return BindingBuilder.bind(topicA).to(exchange).with("topic.a");
    }
    @Bean
    Binding bingTopicB(Queue topicB,TopicExchange exchange){
        return BindingBuilder.bind(topicB).to(exchange).with("topic.#");
    }
}

??(2)撰寫接收者A

@Component
@RabbitListener(queues = "topic.a")
public class TopicReceiveA {
    @RabbitHandler
    public void process(String message) {
        System.out.println("topicReceiveA: " + message);
    }
}

??(3)撰寫接收者B

@Component
@RabbitListener(queues = "topic.b")
public class TopicReceiveB {
    @RabbitHandler
    public void process(String message) {
        System.out.println("topicReceiveB: " + message);
    }
}

??(4)撰寫發送者

@Component
public class TopicSender {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send(){
        String context = "topic";
        System.out.println("Sender: "+context);
        this.amqpTemplate.convertAndSend("topicExchange","topic.1",context);
    }
    public void sendToA(){
        String context = "topicToA";
        System.out.println("Sender: "+context);
        this.amqpTemplate.convertAndSend("topicExchange","topic.a",context);
    }
    public void sendToB(){
        String context = "topicToB";
        System.out.println("Sender: "+context);
        this.amqpTemplate.convertAndSend("topicExchange","topic.b",context);
    }
}

 ??(5)撰寫測驗

@Test
public void topic(){
    topicSender.send();
}
@Test
public void topicA(){
    topicSender.sendToA();
}
@Test
public void topicB(){
    topicSender.sendToB();
}

 ??6.4 實作廣播模式?

??(1)配置fanout

@Configuration
@Component
public class RabbitMQConfig {
    @Bean
    public Queue fanoutA(){
        return new Queue("fanout.A");
    }
    @Bean
    public Queue fanoutB(){
        return new Queue("fanout.B");
    }
    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }
    @Bean
    Binding bingFanoutA(Queue fanoutA,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutA).to(fanoutExchange);
    }
    @Bean
    Binding bingFanoutB(Queue fanoutB,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutB).to(fanoutExchange);
    }
}

??(2)撰寫發送者

@Component
public class FanoutSender {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send(){
        String context = "Fanout";
        System.out.println("Sender: " + context);
        this.amqpTemplate.convertAndSend("fanoutExchange", "",context);
    }
}

??(3)撰寫接收者A

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiveA {
    @RabbitHandler
    public void process(String message){
        System.out.println("fanout ReceiveA: " + message);
    }
}

??(4)撰寫接收者B

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiveB {
    @RabbitHandler
    public void process(String message){
        System.out.println("fanout ReceiveB: " + message);
    }
}

??(5)撰寫測驗

@SpringBootTest
public class FanoutSendControllerTest {
    @Autowired
    private FanoutSender sender;
    public void fanoutSend(){
        sender.send();
    }
}

??運行測驗,可以看到控制臺輸出如下結果:

??fanout ReceiveB: Fanout
??fanout ReceiveA: Fanout

??6.5 實體:實作訊息佇列延遲功能

??要實作這個功能,一般使用RabbitMQ的訊息佇列延遲功能,即采用官方提供的插件 "rabbitmq_delayed_message_exchange”來實作,但 RabbitMQ 版本必須是 3.5.8 以上才支持該插件,否則得用其“死信”功能,

??(1)安裝延遲插件

??用rabbitmq-plugins list命令可以查看安裝的插件,如果沒有,則直接訪問官網進行下載,下載完成后,將其解壓到RabbitMQ的plugins目錄,

??然后執行下面的命令進行安裝:

??rabbitmq-plugins enable rabbitmq_delayed_message_exchange

??(2)配置交換機

@Bean
public Queue queueDelay(){
    return new Queue("delay_queue_1");
}
@Bean
public CustomExchange delayExchange(){
    Map<String,Object> args = new HashMap<String,Object>();
    args.put("x-delayed-type","direct");
    return new CustomExchange("delayed_exchange","x-delayed-message",true,false,args);
}
@Bean
Binding bingDelayB(Queue queueDelay,CustomExchange delayExchange){
    return BindingBuilder.bind(queueDelay).to(delayExchange).with("delay_queue_1").noargs();
}

??這里要使用 CustomExchange,而不是 DirectExchange,CustomExchange 的型別必須是 x-delayed-message

??(3)實作訊息發送

??這里設定訊息延遲5s

@Service
public class CustomSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMsg(String queueName, String msg){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("訊息發送時間:"+sdf.format(new Date()));
        rabbitTemplate.convertAndSend("delayed_exchange", queueName, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //訊息延遲5s
                message.getMessageProperties().setHeader("x-delay",5000);
                return message;
            }
        });
    }
}

??(4)實作訊息接收

@Component
public class CustomReceiver {
    @RabbitListener(queues = "delay_queue_1")
    public void receive(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(sdf.format(new Date()));
        System.out.println("Received: 執行取消訂單" + msg);
    }
}

??(5)測驗發送延遲訊息

@SpringBootTest
public class FanoutSendControllerTest {
    @Autowired
    private CustomSender customSender;
    @Test
    public void send(){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        customSender.sendMsg("delay_queue_1","支付超時,取消訂單通知!");
    }
}

??運行測驗,可以看到控制臺輸岀如下結果:

??訊息發送時間:2022-08-14 14:48:41

??2022-08-14 14:48:46
??Received: 執行取消訂單支付超時,取消訂單通知!

??至此,訊息佇列延遲功能成功實作,在rabbitmq_delayed_message_exchange插件產生之前,我們大都是使用“死信”功能來達到延遲佇列的效果,

??“死信”在創建Queue(佇列)時,要宣告“死信”佇列,佇列里的訊息到一定時間沒被消費, 就會變成死信轉發到死信相應的Exchange或Queue中,

??延退訊息是Exchange到Queue或其他Exchange的延遲,但如果訊息延遲到期了,或訊息不能被分配給其他的Exchange或Queue,則訊息會被丟棄,

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/501834.html

標籤:其他

上一篇:const char* , char const* 和char * const之間有區別嗎?

下一篇:長篇圖解java反射機制及其應用場景

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more