1. 認識RabbitMQ
??1.1介紹RabbitMQ
RabbitMQ 是開源的高級訊息佇列協議(Advanced Message Queueing Protocol, AMQP) 的實作,用Erlang 語言撰寫,支持多種客戶端,
RabbitMQ是目前應用相當廣泛的訊息中間件(其他同類的訊息處理中間件有ActiveMQ、Kafka等),在企業級應用、微服務應用中,RabbitMQ擔當著十分重要的角色,例如,在業務服務模塊中解耦、異步通信、高并發限流、超時業務、資料延遲處理等都可以使用RabbitMQ,
RabbitMQ的處理流程如圖12-1所示
??
圖 12-1
??1.2 使用場景
??(1)推送通知
??“發布/訂閱”是RabbitMQ的重要功能,可以用"發布/訂閱"功能來實作通知功能,消費者 (consumer) 一直監聽RabbitMQ的資料,如果RabbitMQ有資料,則消費者會按照“先進先岀” 規則逐條進行消費,而生產者(producer)只需要將資料存入RabbitMQ,這樣既降低了不同系統之間的耦合度,也確保了訊息通知的及時性,且不影響系統的性能,
??"發布/訂閱”功能支持三種模式:一對一、一對多、廣播,這三種模式都可以根據規則選擇分發的物件,眾多消費者(consumer)可以根據規則選擇是否接收這些資料,擴展性非常強,
??(2)異步任務
??后臺系統接到任務后,將其分解成多個小任務,只要分別完成這些小任務,整個任務便可以完成,但是,如果某個小任務很費時,且延遲執行并不影響整個任務,則可以將該任務放入訊息佇列中去處理,以便加快請求回應時間,
??如果用戶注冊會員時有一項需求一發送驗證郵件或短信驗證碼以完成驗證,則可以使用 RabbitMQ的訊息佇列來實作,這樣可以及時提醒用戶操作已經成功,等待收到郵件或驗證碼,然后進行相應的確認,即完成驗證,
??(3)多平臺應用的通信
??RabbitMQ可以用于不同開發語言開發的應用間的通信(如Java開發的應用程式需要與C++ 開發的應用程式進行通信),實作企業應用集成,由于訊息佇列是無關平臺和語言的,而且語意上也不是函式呼叫,因此RabbitMQ適合作為多個應用之間的松耦合的介面,且不需要發送方和接收方同時在線,
??不同語言的軟體解耦,可以最大限度地減少程式之間的相互依賴,提高系統可用性及可擴展性, 同時還增加了訊息的可靠傳輸和事務管理功能,
RabbitMQ提供兩種事務模式:
- AMQP事務模式,
- Confirm事務模式,
??(4)訊息延遲
??利用RabbitMQ訊息佇列演出功能,可以實作訂單、支付過期定時取消功能,因為延遲佇列存盤延時訊息,所以,當訊息被發送以后,消費者不是立即拿到訊息,而是等待指定時間后才拿到這個訊息進行消費,
??當然,死信、計時器、定時任務也可以實作延退或定時功能,但是需要開發者去處理,
??要實作訊息佇列延遲功能,一般釆用官方提供的插件“rabbitmq_delayed_message_ exchange"來實作,但RabbitMQ版本必須是3.5.8版本以上才支持該插件,如果低于這個版本, 則可以利用“死信”來完成,
??(5)遠程程序呼叫
??在實際的應用場景中,有時需要一些同步處理,以等待服務器端將訊息處理完成后再進行下一 步處理,這相當于RPC ( Remote Procedure Call,遠程程序呼叫),RabbitMQ也支持RPC,
??1.3 特性
RabbitMQ具有以下特性,
- 資訊確認:RabbitMQ有以下兩種應答模式,
- 自動應答:當RabbitMQ把訊息發送到接收端,接收端從佇列接收訊息時,會自動發送應答訊息給服務器端,
- 手動應答:需要開發人員手動呼叫方法告訴服務端已經收到,
- 佇列持久化:佇列可以被持久化,但是否為持久化,要看持久化設定,
- 資訊持久化:設定properties.DeliveryMode值即可,默認值為1,代表不是持久的,2代表持久化,
- 訊息拒收:接收端可以拒收訊息,而且在發送"reject”命令時,可以選擇是否要把拒收的訊息重新放回佇列中,
- 訊息的QoS:在接收端設定的,發送端沒有任何變化,接收端的代碼也比較簡單,只需要加上如 "channel.BasicQos(0,1, false);"的代碼即可,
如果實際場景中對個別訊息的丟失不是很敏感,則選用自動應答比較理想,
如果是一個訊息都不能丟的場景,則需要選用手動應答,在正確處理完以后才應答, 如果選擇了自動應答,那訊息重發這個功能就沒有了,
2. RabbitMQ的基本概念
??2.1 生產者、消費者和代理
RabbitMQ的角色有以下三種,
- 生產者:訊息的創建者,負責創建和推送資料到訊息服務器,
- 消費者:訊息的接收方,用于處理資料和確認訊息,
- 代理:RabbitMQ本身,扮演“快遞”的角色,本身不生產訊息,
生產者和消費者并不屬于RabbitMQ,RabbitMQ只是為生產者和消費者提供發送和接收訊息的API,
??2.2 訊息佇列
Queue (佇列)是RabbitMQ的內部物件,用于存盤生產者的訊息直到發送給消費者,也是消費者接收訊息的地方,RabbitMQ中的訊息也都只能存盤在Queue中,多個消費者可以訂閱同一 個 Queue,
Queue有以下一些重要的屬性,
- 持久性:如果啟用,則佇列將會在訊息協商器(broker)重啟前都有效,
- 自動洗掉:如果啟用,則佇列將會在所有的消費者停止使用之后自動洗掉掉,
- 惰性:如果沒有宣告佇列,則應用程式呼叫佇列時會導致例外,并不會主動宣告,
- 排他性:如果啟用,則宣告它的消費者才能使用,
??2.3 交換機
??Exchange (交換機)用于接收、分配訊息,生產者先要指定一個“routing key”,然后將訊息發送到交換機,這個"routing key"需要與"Exchange Type"及"binding key"聯合使用才能最終生效,然后,交換機將訊息路由到一個或多個Queue中,或丟棄,
??在虛擬主機的訊息協商器(broker)中,每個Exchange都有睢一的名字,
??Exchange包含4種型別:direct、topic、fanout、headers,不同的型別代表系結到佇列的行為不同,
??(1)direct
??direct型別的行為是“先匹配,再投送”,在系結佇列時會設定一個routing key,只有在訊息的routing key與佇列匹配時,訊息才會被交換機投送到系結的佇列中,允許一個佇列通過一個固定的routing key (通常是佇列的名字)進行系結,Direct交換機將訊息根據其routing key屬性投遞到包含對應key屬性的系結器上,
??Direct Exchange是RabbitMQ默認的交換機模式,也是最簡單的模式,它根據routing key 全文匹配去尋找佇列,
??(2)topic
??按規則轉發訊息(最靈活),主題交換機(topic exchange )轉發訊息主要根據通配符,佇列和交換機的系結會定義一種路由模式,通配符就要在這種路由模式和路由鍵之間匹配后,交換機才能轉發訊息,
??在這種交換機模式下,路由鍵必須是一串字符,用”.“隔開,
??路由模式必須包含一個星號“*”,主要用于匹配路由鍵指定位置的一個單詞,
??topic還支持訊息的routing key,用”*“或”#“的模式進行系結,“*”匹配一個單詞,“#” 匹配0個或多個單詞,例如 “/binding key *.user.#”匹配 routing key 為“cn.user“和“us.user.db”, 但是不匹配“user.hello”
??(3)headers
??它根據應用程式訊息的特定屬性進行匹配,可以在binding key中標記訊息為可選或必選,在佇列與交換機系結時,會設定一組鍵值對規則,訊息中也包括一組鍵值對(headers屬性),當這些鍵值対中有一對,或全部匹配時,訊息被投送到對應佇列,
??(4)fanout
??訊息廣播的模式,即將訊息廣播到所有系結到它的佇列中,而不考慮routing key的值(不管路由鍵或是路由模式),如果配置了 routing key,則routing key依然會被忽略,
??2.4 系結
??RabbitMQ中通過系結(binding ),將Exchange與Queue關聯起來,這樣 RabbitMQ 就知道如何正確地將訊息路由到指定的 Queue 了,
??在系結 Exchange與 Queue時,—般會指定一個binding key,消賽者將訊息發送給Exchange 時,一般會指定一個routing key,如果 binding key 與 routing key 相匹配,則訊息將會被路由到對應的Queue中,
??系結是生產者和消費者訊息傳遞的連接,生產者發送訊息到 Exchange,消費者從Queue接收訊息,都是根據系結來執行的,
??2.5 通道
??有些應用需要與AMQP代理建立多個連接,但同時開啟多個TCP ( Transmission Control Protocol,傳輸控制協議)連接會消耗過多的系統資源,并使得防火堵的配置變得更加困難,“AMQP 0-9-1“協議用通道(channel)來處理多連接,可以把通道理解成“共享一個TCP連接的多個輕量化連接”,
??一個特定通道上的通信與其他通道上的通信是完全隔離的,因此,每個AMQP方法都需要攜帶一個通道號,這樣客戶端就可以指定此方法是為哪個通道準備的,
??2.6 訊息確認
??訊息確認(message acknowledgement )是指:當一個訊息從佇列中投遞給消費者 (consumer)后,消費者會通知一下訊息代理(broker),這個程序可以是自動的,也可以由處理訊息的應用的開發者執行,當“訊息確認”啟用時,訊息代理需要收到來自消費者的確認回執后, 才完全將訊息從佇列中洗掉,
??如果訊息無法被成功路由,或被返給發送者并被丟棄,或訊息代理執行了延期操作,則訊息會 被放入一個“死信”佇列中,此時,訊息發送者可以選擇某些引數來處理這些特殊情況,
3. RabbitMQ的六種作業模式
??3.1 簡單模式
??生產者把訊息放入佇列,消費者獲得訊息,如圖12-2所示,這個模式只有一個消費者、一個生產者、一個佇列,只需要配置主機引數,其他引數使用默認值即可通信,
??
圖 12-2
??3.2 作業佇列模式
??這種模式出現了多個消費者,如圖12-3所示,為了保證消費者之間的負載均衡和同步,需要在訊息佇列之間加上同步功能,
??作業佇列(任務佇列)背后的主要思想是:避免立即執行資源密集型任務(耗時),以便下一個任務執行時不用等待它完成,作業佇列將任務封裝為訊息并將其發送到佇列中,
??
圖 12-3
??3.3 交換機模式
??實際上,前兩種模式也使用了交換機,只是使用了采用默認設定的交換機,交換機引數是可以配置的,如果訊息配置的交換機引數和RabbitMQ佇列系結(binding )的交換機名稱相同,則轉發,否則丟棄,如圖12-4所示,
??
??圖 12-4
??3.4 Routing 轉發模式
??交換機要配置為direct型別,轉發的規則變為檢查佇列的routing key值,如果routing key 值相同,則轉發,否則丟棄,如圖12-5所示,
??
圖 12-5
??3.5 主題轉發模式
??這種模式下交換機要配置為topic型別,routing key配置失效,發送到主題交換機的資訊, 不能是任意routing key,它必須是一個單詞的串列,用逗號分隔,特點是可以模糊匹配,匹配規則為:*(星號)可以代替一個詞;#(#號)可以代替零個或更多的單詞,其模式情況如圖12-6 所示,
??
圖 12-6
??3.6 RPC 模式
??這種模式主要使用在遠程呼叫的場景下,如果一個應用程式需要另外一個應用程式來最侄訓傳運行結果,那這個程序可能是比較耗時的操作,使用RPC模式是最合適的,其模式情況如圖12-7 所示,
??
圖 12-7
6種作業模式的主要特點如下,
- 簡單模式:只有一個生產者,一個消費者
- 作業佇列模式:一個生產者,多個消費者,每個消費者獲取到的訊息唯一,
- 訂閱模式:一個生產者發送的訊息會被多個消費者獲取,
- 路由模式:發送訊息到交換機,并且要指定路由key,消費者在將佇列系結到交換機時需要指定路由key,
- topic模式:根據主題進行匹配,此時佇列需要系結在一個模式上,“#”匹配一個詞或多個詞,”*“只匹配一個詞,
4. 認識AmqpTemplate介面
??Spring AMQP提供了操作AMQP協議的模板類AmqpTemplate,用于發送和接收訊息, 它定義發送和接收訊息等操作,還提供了 RabbitTemplate用于實作AmqpTemplate介面, 而且還提供了錯誤拋岀類AmqpException,RabbitTemplate支持訊息的確認與回傳(默認禁用)
??4.1 發送訊息
??(1)send方法
AmqpTemplate模板提供了 send方法用來發送訊息,它有以下3個多載:
- void send(Message message) throws AmqpException
- void send(String routingKey, Message message) throws AmqpException
- void send(String exchange, String routingKey, Message message)throws AmqpException
??(2)convertAndSend 方法
??AmqpTemplate模板還提供了 convertAndSend方法用來發送訊息,convertAndSend 方法相當于簡化了的send方法,可以自動處理訊息的序列化,下面通過兩個功能一樣的代碼來比較兩者的區別:
@Test
void contextLoads() {
Message message = MessageBuilder.withBody("body content".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("1")
.setHeader("header","header")
.build();
amqpTemplate.send("QueueHello",message);
}
@Test
void send(){
amqpTemplate.convertAndSend("QueueHello","body content");
}
??上面代碼和下面代碼的效果一樣,
??4.2 接收訊息
接收訊息可以有兩種方式,
- 直接去查詢獲取訊息,即呼叫receive方法,如果該方法沒有獲得訊息,則直接回傳null, 因為receive方法不阻塞,
- 異步接收,通過注冊一個Listener (監聽器)來實作訊息接收,接收訊息需要指定佇列 (Queue),或設定默認的佇列,
??AmqpTemplate提供的直接獲得訊息的方法是receive
??另外,AmqpTemplate也提供了直接接收POJO (代替訊息物件)的方法receiveAndConvert,并提供了各種的 Messageconverter 用來處理回傳的Object (物件),
??從 Spring-Rabbit 1.3 版本開始,AmqpTemplate 也提供了 receiveAndReply 方法來異步接收、處理及回復訊息,
??4.3 異步接收訊息
??Spring AMQP也提供了多種不同的方式來實作異步接收訊息,比如常用的通過 MessageListener (訊息監聽器)的方式來實作,
?? 從Spring-rabbit 1.4版本開始,可使用注解@RabbitListener來異步接收訊息,它更為簡便, 使用方法見以下代碼:
@Component
@RabbitListener(queues = "object")
public class ObjectReceiver {
@RabbitHandler
public void process(User user){
System.out.println("Receiver object"+user);
}
}
5. 在Spring Boot中集成RabbitMQ
??5.1 安裝RabbitMQ
??RabbitMQ是用Erlang語言開發的,所以,需要先安裝Erlang環境,再安裝RabbitMQ,
????(1)下載 Erlang 環境和 RabbitMQ
??????到Erlang官網下載Erlang環境,
??????到 RabbitMQ 官網下載 RabbitMQ,
????(2)安裝
??????下載完成后,先單擊Erlang安裝檔案進行安裝,然后單擊RabbitMQ安裝檔案進行安裝,在安裝程序中,按照提示一步一步操作即可,在RabbitMQ成功安裝后,會自動啟動服務器,
????(3)開啟網頁管理界面
??????雖然可以在命令列管理RabbitMQ,但稍微麻煩,RabbitMQ提供了可視化的網頁管理平臺, 可以使用“rabbitmq-plugms.bat enable rabbitmq_management”命令開啟網頁管理界面,
??5.2 界面化管理RabbitMQ
??(1)概覽
????在安裝配置完成后,開啟網頁管理,然后可以通過"http://localhost:15672"進行查看和管理, 輸入默認的用戶名"guest"和密碼"guest"進行登錄,RabbitMQ的后臺界面如圖12-8所示,
????
圖 12-8
??(2)管理交換機
??進入交換機管理頁面后,單擊“Add exchange (添加交換機)”按鈕,彈出添加界面,可以看到列出了 RabbitMQ 默認的4種型別,由于筆者已經添加了訊息延遲插件,所以會有 “x-delayed-message”型別,如圖 12-9 所示,
??
圖 12-9
??(3)管理管理員
訊息中間件的安全配置也是必不可少的,在RabbitMQ中,可以通過命令列創建用戶、設定密碼、系結角色,常用的命令如下,
- rabbitmqctl.bat list_users:查看現有用戶,
- rabbitmqctl.bat add_user username password:新増用戶,新増的用戶只有用戶名、密碼,沒有管理員、超級管理員等角色,
- rabbitmqctl.bat set_user_tags username administrator:設定角色,角色分為 none、 management、policymaker、monitoring、administrator,
- rabbitmqctl change_password userName newPassword:修改密碼命令,
- rabbitmqctl.bat delete_user username:洗掉用戶命令,
還可以在開啟RabbitMQ網頁管理界面之后,用可視化界面進行操作,如圖12-10所示,其 中“Tags”是管理員型別,
??在創建用戶后,需要指定用戶訪問一個虛擬機(如圖12-11所示),并且該用戶只能訪問該虛擬機下的佇列和交換機,如果沒有指定,則默認是"No access",而不是“/"(所有),在一個 RabbitMQ服務器上可以運行多個vhost,以適應不同的業務需要,這樣做既可以滿足權限配置的要求,也可以避免不同業務之間佇列、交換機的命名沖突問題,因為不同vhost之間是隔離的,權限設定可以細化到主題,
??
圖 12-10 管理管理員

圖 12-11 設定權限
??5.3 在 Spring Boot 中配置 RabbitMQ
??(1)添加依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
??(2)配置 application.properties 檔案
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
6. 在SpringBoot中實作RabbitMQ的四種發送/接收模式
??6.1 實體:實作發送和接收佇列
??(1)配置佇列
@Bean
public Queue queue(){
return new Queue("Queue1");
}
??(2)創建接收者
??注意,發送者和接收者的 Queue 名稱必須一致,否則不能接收,見以下代碼:
@Component
//監聽QueueHello的訊息佇列
@RabbitListener(queues = "Queue1")
public class ReceiveA {
//@RabbitHandler來實作具體消費
@RabbitHandler
public void QueueReceiver(String Queue1){
System.out.println("Receive A:"+Queue1);
}
}
??(3)創建發送者
@Component
public class SendA {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String content){
System.out.println("Sender:"+content);
//使用AmqpTemplate將訊息發送到訊息佇列中
rabbitTemplate.convertAndSend("Queue1",content);
}
}
??(4)測驗發送和接收情況
@Test
void QueueSend(){
int i = 2;
for (int j = 0; j < i; j++) {
String msg = "Queue1 msg"+j+new Date();
try {
sendA.send(msg);
}catch (Exception e){
e.printStackTrace();
}
}
}
運行測驗,可以看到控制臺輸岀如下結果:
Receive A:Queue1 msg0Sun Aug 14 11:16:45 CST 2022
Receive A:Queue1 msg1Sun Aug 14 11:16:45 CST 2022
上述資訊表示發送成功,且接收成功,
如果是多個接收者,則會均勻地將訊息發送到 N 個接收者中,并不是全部發送一遍, 也會和"一對多” 一樣,接收端仍然會均勻地接收到訊息,
??6.2 實作發送和接收物件
??(1)編輯配置類
package com.intehel.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Configuration
@Component
public class RabbitMQConfig {
@Bean
public Queue objectQueue(){
return new Queue("object");
}
}
??(2)撰寫接收類
@Component
@RabbitListener(queues = "object")
public class ObjectReceiver {
@RabbitHandler
public void process(User user){
System.out.println("Receiver object"+user);
}
}
??(3)撰寫發送類
@Component
public class ObjectSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(User user){
System.out.println("Sender object"+user);
this.amqpTemplate.convertAndSend("object",user);
}
}
??(4)撰寫測驗
@Test
void objectSend(){
try {
User user = new User();
user.setId(1);
user.setMsg("username");
objectSender.send(user);
}catch (Exception e){
e.printStackTrace();
}
}
??運行測驗,可以看到控制臺輸岀如下結果:
Sender objectUser(id=1, msg=username)
Receiver objectUser(id=1, msg=username)
??6.3 實體:實作用接收器接收多個主題
??(1)配置topic
@Configuration
@Component
public class RabbitMQConfig {
@Bean
public Queue topicA(){
return new Queue("topic.a");
}
@Bean
public Queue topicB(){
return new Queue("topic.b");
}
@Bean
TopicExchange exchange(){
return new TopicExchange("topicExchange");
}
@Bean
Binding bingTopicA(Queue topicA,TopicExchange exchange){
return BindingBuilder.bind(topicA).to(exchange).with("topic.a");
}
@Bean
Binding bingTopicB(Queue topicB,TopicExchange exchange){
return BindingBuilder.bind(topicB).to(exchange).with("topic.#");
}
}
??(2)撰寫接收者A
@Component
@RabbitListener(queues = "topic.a")
public class TopicReceiveA {
@RabbitHandler
public void process(String message) {
System.out.println("topicReceiveA: " + message);
}
}
??(3)撰寫接收者B
@Component
@RabbitListener(queues = "topic.b")
public class TopicReceiveB {
@RabbitHandler
public void process(String message) {
System.out.println("topicReceiveB: " + message);
}
}
??(4)撰寫發送者
@Component
public class TopicSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "topic";
System.out.println("Sender: "+context);
this.amqpTemplate.convertAndSend("topicExchange","topic.1",context);
}
public void sendToA(){
String context = "topicToA";
System.out.println("Sender: "+context);
this.amqpTemplate.convertAndSend("topicExchange","topic.a",context);
}
public void sendToB(){
String context = "topicToB";
System.out.println("Sender: "+context);
this.amqpTemplate.convertAndSend("topicExchange","topic.b",context);
}
}
??(5)撰寫測驗
@Test
public void topic(){
topicSender.send();
}
@Test
public void topicA(){
topicSender.sendToA();
}
@Test
public void topicB(){
topicSender.sendToB();
}
??6.4 實作廣播模式?
??(1)配置fanout
@Configuration
@Component
public class RabbitMQConfig {
@Bean
public Queue fanoutA(){
return new Queue("fanout.A");
}
@Bean
public Queue fanoutB(){
return new Queue("fanout.B");
}
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bingFanoutA(Queue fanoutA,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutA).to(fanoutExchange);
}
@Bean
Binding bingFanoutB(Queue fanoutB,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutB).to(fanoutExchange);
}
}
??(2)撰寫發送者
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "Fanout";
System.out.println("Sender: " + context);
this.amqpTemplate.convertAndSend("fanoutExchange", "",context);
}
}
??(3)撰寫接收者A
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiveA {
@RabbitHandler
public void process(String message){
System.out.println("fanout ReceiveA: " + message);
}
}
??(4)撰寫接收者B
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiveB {
@RabbitHandler
public void process(String message){
System.out.println("fanout ReceiveB: " + message);
}
}
??(5)撰寫測驗
@SpringBootTest
public class FanoutSendControllerTest {
@Autowired
private FanoutSender sender;
public void fanoutSend(){
sender.send();
}
}
??運行測驗,可以看到控制臺輸出如下結果:
??fanout ReceiveB: Fanout
??fanout ReceiveA: Fanout
??6.5 實體:實作訊息佇列延遲功能
??要實作這個功能,一般使用RabbitMQ的訊息佇列延遲功能,即采用官方提供的插件 "rabbitmq_delayed_message_exchange”來實作,但 RabbitMQ 版本必須是 3.5.8 以上才支持該插件,否則得用其“死信”功能,
??(1)安裝延遲插件
??用rabbitmq-plugins list命令可以查看安裝的插件,如果沒有,則直接訪問官網進行下載,下載完成后,將其解壓到RabbitMQ的plugins目錄,
??然后執行下面的命令進行安裝:
??rabbitmq-plugins enable rabbitmq_delayed_message_exchange
??(2)配置交換機
@Bean
public Queue queueDelay(){
return new Queue("delay_queue_1");
}
@Bean
public CustomExchange delayExchange(){
Map<String,Object> args = new HashMap<String,Object>();
args.put("x-delayed-type","direct");
return new CustomExchange("delayed_exchange","x-delayed-message",true,false,args);
}
@Bean
Binding bingDelayB(Queue queueDelay,CustomExchange delayExchange){
return BindingBuilder.bind(queueDelay).to(delayExchange).with("delay_queue_1").noargs();
}
??這里要使用 CustomExchange,而不是 DirectExchange,CustomExchange 的型別必須是 x-delayed-message
??(3)實作訊息發送
??這里設定訊息延遲5s
@Service
public class CustomSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String queueName, String msg){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("訊息發送時間:"+sdf.format(new Date()));
rabbitTemplate.convertAndSend("delayed_exchange", queueName, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//訊息延遲5s
message.getMessageProperties().setHeader("x-delay",5000);
return message;
}
});
}
}
??(4)實作訊息接收
@Component
public class CustomReceiver {
@RabbitListener(queues = "delay_queue_1")
public void receive(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sdf.format(new Date()));
System.out.println("Received: 執行取消訂單" + msg);
}
}
??(5)測驗發送延遲訊息
@SpringBootTest
public class FanoutSendControllerTest {
@Autowired
private CustomSender customSender;
@Test
public void send(){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
customSender.sendMsg("delay_queue_1","支付超時,取消訂單通知!");
}
}
??運行測驗,可以看到控制臺輸岀如下結果:
??訊息發送時間:2022-08-14 14:48:41
??2022-08-14 14:48:46
??Received: 執行取消訂單支付超時,取消訂單通知!
??至此,訊息佇列延遲功能成功實作,在rabbitmq_delayed_message_exchange插件產生之前,我們大都是使用“死信”功能來達到延遲佇列的效果,
??“死信”在創建Queue(佇列)時,要宣告“死信”佇列,佇列里的訊息到一定時間沒被消費, 就會變成死信轉發到死信相應的Exchange或Queue中,
??延退訊息是Exchange到Queue或其他Exchange的延遲,但如果訊息延遲到期了,或訊息不能被分配給其他的Exchange或Queue,則訊息會被丟棄,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/501834.html
標籤:其他
