RabbitMQ核心功能
- 一.MQ的概念與功能介紹
- 二.RabbitMQ的介紹和入門案例
- 三.RabbitMQ的作業佇列
- 四.RabbitMQ的作業模式
- 五.RabbitMQ的發布確認
- 六.RabbitMQ的死性佇列
- 七.RabbitMQ的延遲佇列
本文對RabbitMQ核心功能的介紹,沒有介紹RabbitMQ的安裝與集群,案列代碼采用原生的Java代碼和springboot兩種形式,內容參考自RabbitMQ中文檔案黑馬程式員和尚硅谷RabbitMQ教程的筆記,還有一些自己的理解,
一.MQ的概念與功能介紹
1.MQ是什么?
🐘🐘 MQ(message queue),從字面意思上看就是訊息·佇列,滿足佇列的FIFO 先入先出特點,只不過佇列中存放的內容是message,還是一種跨行程的通信機制,用于上下游傳遞訊息,在互聯網架構中,MQ 是一種非常常見的上下游“邏輯解耦+物理解耦”的訊息通信服務,使用了 MQ 之后,訊息發送上游只需要依賴 MQ,不用依賴其他服務,
什么叫上下游傳遞訊息:就好比老板給我發微信讓我加班,老板就是上游,我就是下游,我和老板之間的通信就可以稱為上下游傳遞訊息,又好比兩個分布式系統之間進行通信A系統(又稱為生產者)給B系統(消費者)發送訊息只需要依賴 MQ,不用依賴其他服務,

2.為什么要用MQ呢?
🐸🐸MQ優勢1.應用解耦:因為系統的耦合性越高,容錯性就越低,可維護性就越低,那么我們就要進行應用解耦,
假如有如下分布式系統用戶購買完商品去訂單系統下單,訂單系統去操作庫存系統修改訂單,操作支付系統扣錢,提醒物流系統發貨,如果其中有任何一個系統掛了那么跟著訂單系統也掛了,再比如又添加了一個X系統那么又要去修改訂單系統的代碼操作X系統,

解決辦法:使用MQ,萬物皆可中間加一層,沒有加一層解決不了的問題如果有那就再加一層,

此時如果用戶再去下訂單,訂單系統只需要發一條訊息給MQ就可以,MQ再去操作其他系統,訂單系統就可以發一條訊息告訴客戶下單成功,其他的事情都交給MQ解決,假如此時庫存系統又掛了,MQ可以等到庫存系統修復了再發訊息給庫存系統讓他減庫存即可,此時訂單系統與其他的系統之間耦合度便降低了,如果再添加X系統那么也還是通過MQ發送訊息給X系統,此時系統的可擴展性就提高了,
🐻🐻MQ優勢2.異步提速:同樣用上面用戶下單的例子來說明問題,

用戶購買商品通過訂單系統下單,此時訂單系統要去操作庫存系統支付系統物流系統然后再去操作自己的資料庫,一系列操作完成后再去告訴用戶下單成功,此時過去了920ms,十分影響用戶體驗,

加入MQ此時訂單系統把訂單訊息發送給MQ然后就可以去操作資料庫和回傳訊息給用戶了,其他事情都交給MQ去做,用戶點擊完下單按鈕后,只需等待25ms就能得到下單回應 (20 + 5 = 25ms),
提升用戶體驗和系統吞吐量(單位時間內處理請求的數目),
🐼🐼MQ優勢三:流量削峰或者稱為削峰填谷:A系統每秒最大處理1000個請求突然來了5000個請求,此時系統扛不住就要掛了,

解決方案:把請求發往MQ,然后A系統每秒從MQ中拉取1000個請求就行處理,使用了 MQ 之后,限制消費訊息的速度為1000,這樣一來,高峰期產生的資料勢必會被積壓在 MQ 中,高峰就被“削”掉了,但是因為訊息積壓,在高峰期過后的一段時間內,消費訊息的速度還是會維持在1000,直到消費完積壓的訊息,這就叫做“填谷”,使用MQ后,可以提高系統穩定性,

MQ的劣勢
💀💀系統可用性降低
系統引入的外部依賴越多,系統穩定性越差,一旦 MQ 宕機,就會對業務造成影響,需要考慮如何保證MQ的高可用?
💀💀 系統復雜度提高
MQ 的加入大大增加了系統的復雜度,以前系統間是同步的遠程呼叫,現在是通過 MQ 進行異步呼叫,存在一下問題:
如何保證訊息沒有被重復消費?怎么處理訊息丟失情況?怎么保證訊息傳遞的順序性?
💀💀一致性問題
A 系統處理完業務,通過 MQ 給B、C、D三個系統發訊息資料,如果 B 系統、C 系統處理成功,D 系統處理失敗,如何保證訊息資料處理的一致性?
3.MQ有優勢有劣勢,那MQ什么時候可以使用?
- 生產者不需要從消費者處獲得反饋,
- 容許短暫的不一致性,
- 確實是用了有效果,能夠有解耦、提速、削峰這些方面的收益,并且帶來的收益超過加入MQ后,管理MQ的成本,
4. 幾種常見的MQ

MQ的選擇
-
Kafka:
主要特點是基于 Pull 的模式來處理訊息消費,追求高吞吐量,一開始的目的就是用于日志收集和傳輸,適合產生大量資料的互聯網服務的資料收集業務,大型公司建議可以選用,如果有日志采集功能,肯定是首選 kafka 了, -
RocketMQ:
天生為金融互聯網領域而生,對于可靠性要求很高的場景,尤其是電商里面的訂單扣款,以及業務削峰,在大量交易涌入時,后端可能無法及時處理的情況,RoketMQ 在穩定性上可能更值得信賴,這些業務場景在阿里雙 11 已經經歷了多次考驗,如果你的業務有上述并發場景,建議可以選擇 RocketMQ, -
RabbitMQ
結合 erlang 語言本身的并發優勢,性能好時效性微秒級,社區活躍度也比較高,管理界面用起來十分方便,如果你的資料量沒有那么大,中小型公司優先選擇功能比較完備的 RabbitMQ,
二.RabbitMQ的介紹和入門案例
1.RabbitMQ的介紹
RabbitMQ 是Rabbit 技術公司基于 AMQP 標準開發的,采用 Erlang 語言開發,是一個訊息中間件它接受并轉發訊息,你可以把它當做一個快遞站點,當你要發送一個包裹時,你把你的包裹放到快遞站,快遞員最侄訓把你的快遞送到收件人那里,按照這種邏輯 RabbitMQ 是一個快遞站,一個快遞員幫你傳遞快件,RabbitMQ 與快遞站的主要區別在于,它不處理快件而是接收,存盤和轉發訊息資料,
2. AMQP是什么
AMQP(高級訊息佇列協議)是一個網路協議,它支持符合要求的客戶端應用(application)和訊息中間件代理(messaging middleware broker)之間進行通信,模型圖如下,

訊息(message)被發布者(publisher)發送給交換機(exchange),交換機常常被比喻成郵局或者郵箱,然后交換機將收到的訊息根據路由規則分發給系結的佇列(queue),最后AMQP代理會將訊息投遞給訂閱了此佇列的消費者,或者消費者按照需求自行獲取,
3.RabbitMQ的作業原理圖和相關概念解釋
因為RabbitMQ是基于 AMQP 標準開發的所以其原理圖與AMQP模型圖相差無幾,

🐴🐴1.Producer: 產生資料發送訊息的程式是生產者
🐑🐑2.Broker:接收和分發訊息的應用,RabbitMQ Server 就是 Message Broker
🐍🐍3.Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網路中的 namespace 概念,當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個 vhost,每個用戶在自己的 vhost 創建 exchange/queue 等
🐥🐥4.Connection:publisher/consumer 和 broker 之間的 TCP 連接
🐧🐧5.Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在訊息量大的時候建立 TCP ,Connection 的開銷將是巨大的,效率也較低,Channel 是在 connection 內部建立的邏輯連接,如果應用程式支持多執行緒,通常每個 thread 創建單獨的 channel 進行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker 識別 channel,所以 channel 之間是完全隔離的,Channel 作為輕量級的Connection 極大減少了作業系統建立 TCP connection 的開銷
🐝🐝6.Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發訊息到 queue 中去,交換機必須確切知道如何處理它接收到的訊息,是將這些訊息推送到特定佇列還是推送到多個佇列,亦或者是把訊息丟棄,這個由交換機型別決定
🐌🐌7.Queue:佇列是 RabbitMQ 內部使用的一種資料結構,盡管訊息流經 RabbitMQ 和應用程式,但它們只能存盤在佇列中,佇列僅受主機的記憶體和磁盤限制的約束,本質上是一個大的訊息緩沖區,許多生產者可以將訊息發送到一個佇列,許多消費者可以嘗試從一個佇列接收資料,這就是我們使用佇列的方式
🐀🐀8.Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key,Binding 資訊被保存到 exchange 中的查詢表中,用于 message 的分發依據
🐁🐁8.Consumer:消費與接收具有相似的含義,消費者大多時候是一個等待接收訊息的程式,
注意:生產者,消費者和訊息中間件很多時候并不在同一機器上,同一個應用程式既可以是生產者又是可以是消費者,
4.RabbitMQ入門程式
🐳🐳1.普通jiava程式,引入相關依賴
<dependencies>
<!--rabbitmq 依賴客戶端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作檔案流的一個依賴-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
生產者
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//創建一個連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.142.89.135");
factory.setUsername("guest");
factory.setPassword("guest");
//channel 實作了自動 close 介面 自動關閉 不需要顯示關閉
try(Connection connection = factory.newConnection();Channel channel =
connection.createChannel()) {
/**
* 生成一個佇列
* 1.佇列名稱
* 2.佇列里面的訊息是否持久化 默認訊息存盤在記憶體中
* 3.該佇列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費
* 4.是否自動洗掉 最后一個消費者端開連接以后 該佇列是否自動洗掉 true 自動洗掉
* 5.其他引數
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello world";
/**
* 發送一個訊息
* 1.發送到那個交換機
* 2.路由的 key 是哪個
* 3.其他的引數資訊
* 4.發送訊息的訊息體
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("訊息發送完畢");
}
}
}
消費者
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.142.89.135");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收訊息....");
//推送的訊息如何進行消費的介面回呼
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
//取消消費的一個回呼介面 如在消費的時候佇列被洗掉掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("訊息消費被中斷");
};
/**
* 消費者消費訊息
* 1.消費哪個佇列
* 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答
* 3.消費者未成功消費的回呼
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
🐬🐬2.SpringBoot入門程式
生產者消費者都引入一下依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生產者
@Configuration
public class HelloWorldConfig {
//交換機名稱
public static final String DIRECT_EXCHANGE_NAME="direct_exchange";
//佇列名稱
public static final String QUEUE_NAME="hello_world";
//宣告交換機
@Bean("helloExchange")
public DirectExchange helloExchange(){
return new DirectExchange(DIRECT_EXCHANGE_NAME);
}
//宣告佇列
@Bean("consoleQueue")
public Queue consoleQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//系結關系
@Bean
public Binding helloBinding(@Qualifier("helloExchange") DirectExchange exchange,@Qualifier("consoleQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("hello");
}
}
@SpringBootTest
public class HelloWorldTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void TestSendMsg(){
rabbitTemplate.convertAndSend(HelloWorldConfig.DIRECT_EXCHANGE_NAME,"hello","hello world");
}
}
消費者
@Component
public class HelloConsumer {
@RabbitListener(queues = "hello_world")
public void ListenerQueue(Message message){
String msg = new String(message.getBody());
System.out.println(msg);
}
}
啟動消費者的主程式控制臺下輸出:

三.RabbitMQ的作業佇列
1.作業佇列的概念
WorkQueues作業佇列(又稱任務佇列)的主要思想是避免立即執行資源密集型任務,而不得不等待它完成,
相反我們安排任務在之后執行,我們把任務封裝為訊息并將其發送到佇列,在后臺運行的作業行程將彈出任務并最終執行作業,當有多個作業執行緒時,這些作業執行緒將一起處理這些任務,
案例分析:我們啟動兩個訊息消費者執行緒,一個訊息生產者執行緒,我們來看看兩個作業執行緒是如何作業的,
兩個作業執行緒
public class Worker01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到訊息:"+receivedMessage);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(consumerTag+"消費者取消消費介面回呼邏輯");
};
System.out.println("C2 消費者啟動等待消費......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
public class Worker02 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到訊息:"+receivedMessage);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(consumerTag+"消費者取消消費介面回呼邏輯");
};
System.out.println("C1 消費者啟動等待消費......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
生產者
public class Task01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try(Channel channel=RabbitMqUtils.getChannel();) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//從控制臺當中接受資訊
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("發送訊息完成:"+message);
}
}
}
}

通程序式執行發現生產者總共發送 4 個訊息,消費者 1 和消費者 2 分別分得兩個訊息,并且是按照有序的一個接收一次訊息,是通過輪詢的方式發送訊息,
不公平分發機制
🐆🐆RabbitMQ默認情況下是通過輪詢的方式發送訊息,但是在某種場景下這種策略并不是很好,比方說有兩個消費者在處理任務,其中有個消費者 1 處理任務的速度非常快,而另外一個消費者 2處理速度卻很慢,這個時候我們還是采用輪訓分發的化就會到這處理速度快的這個消費者很大一部分時間處于空閑狀態,而處理慢的那個消費者一直在干活,這種分配方式在這種情況下其實就不太好,但是RabbitMQ 并不知道這種情況它依然很公平的進行分發,
為了避免這種情況,我們可以設定引數 channel.basicQos(1); 1為不公平分發,默認為0是公平分發,
訊息預取值
🐩🐩本身訊息的發送就是異步發送的,所以在任何時候,channel 上肯定不止只有一個訊息,另外來自消費者的手動確認本質上也是異步的,因此這里就存在一個未確認的訊息緩沖區,因此希望開發人員能限制此緩沖區的大小,以避免緩沖區里面無限制的未確認訊息問題,
這個時候就可以通過使用 如下方法設定“預取計數”值來完成的
void basicQos(int prefetchCount);
該值定義通道上允許的未確認訊息的最大數量,一旦數量達到配置的數量,RabbitMQ 將停止在通道上傳遞更多訊息,除非至少有一個未處理的訊息被確認,
例如,假設在通道上有未確認的訊息 5、6、7,8,并且通道的預取計數設定為 4,此時 RabbitMQ 將不會在該通道上再傳遞任何
訊息,除非至少有一個未應答的訊息被 ack,比方說 tag=6 這個訊息剛剛被確認 ACK,RabbitMQ 將會感知這個情況到并再發送一條訊息,

🐪🐪訊息應答和 QoS 預取值對用戶吞吐量有重大影響,通常,增加預取將提高向消費者傳遞訊息的速度,雖然自動應答傳輸訊息速率是最佳的,但是,在這種情況下已傳遞但尚未處理的訊息的數量也會增加,從而增加了消費者的 RAM 消耗(隨機存取存盤器)應該小心使用具有無限預處理的自動確認模式或手動確認模式,消費者消費了大量的訊息如果沒有確認的話,會導致消費者連接節點的記憶體消耗變大,所以找到合適的預取值是一個反復試驗的程序,不同的負載該值取值也不同 100 到 300 范圍內的值通常可提供最佳的吞吐量,并且不會給消費者帶來太大的風險,
預取值為 1 是最保守的,當然這將使吞吐量變得很低,特別是消費者連接延遲很嚴重的情況下,特別是在消費者連接等待時間較長的環境中,對于大多數應用來說,稍微高一點的值將是最佳的,
2.訊息應答機制
🐠🐠 問題分析 消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并僅只完成了部分突然它掛掉了,會發生什么情況,RabbitMQ 一旦向消費者傳遞了一條訊息,便立即將該訊息標記為洗掉,在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的訊息,以及后續發送給該消費者的訊息,因為它無法接收到,
🐋🐋 為了保證訊息在發送程序中不丟失,rabbitmq 引入訊息應答機制,訊息應答就是:消費者在接收到訊息并且處理該訊息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該訊息洗掉了,
3.訊息自動應答模式
🐏🐏訊息發送后立即被認為已經傳送成功,這種模式需要在高吞吐量和資料傳輸安全性方面做權衡,因為這種模式如果訊息在接收到之前,消費者那邊出現連接或者 channel 關閉,那么訊息就丟失了,當然另一方面這種模式消費者那邊可以傳遞過載的訊息,沒有對傳遞的訊息數量進行限制,當然這樣有可能使得消費者這邊由于接收太多還來不及處理的訊息,導致這些訊息的積壓,最終使得記憶體耗盡,最終這些消費者執行緒被作業系統殺死,所以這種模式僅適用在消費者可以高效并以某種速率能夠處理這些訊息的情況下使用,
使用方式只需要在發送訊息時將 autoAck引數設定為true即可,
String basicConsume(String queue,
boolean autoAck, DeliverCallback deliverCallback,
CancelCallback cancelCallback)
4. 訊息手動應答模式 可以批量應答并且減少網路擁堵
在發送訊息時將 autoAck引數設定為false,并且需要指定應答策略
三種應答策略
//用于肯定確認 RabbitMQ 已知道該訊息并且成功的處理訊息,
//可以將其丟棄了 第一個引數代表的是訊息的標記,
//第二個引數是代表是否批量應答
1. void basicAck(long deliveryTag, boolean multiple);
//用于否定確認 引數1:訊息的標記
//引數2:是否批量應答
//引數3:是否重新入隊
2.void basicNack(long deliveryTag, boolean multiple, boolean requeue)
//用于否定確認不處理該訊息了直接拒絕,可以將其丟棄
// 引數含義同上
3.void basicReject(long deliveryTag, boolean requeue)
Multiple 的解釋
multiple 的 true 和 false 代表不同意思
🐐🐐true 代表批量應答 channel 上未應答的訊息

🐖🐖false 同上面相比:只會應答 tag=8 的訊息 5,6,7 這三個訊息依然不會被確認收到訊息應答

手動應答案例演示:
消費者:
public class TaskConsumerOne {
public static final String TASK_QUEUE_NAME="task";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqChannel.getChannel();
//是否自動應答
boolean autoAck=false;
//是否公平分發
//channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME,autoAck,(consumerTag,delivery)->{
try {
String message= new String(delivery.getBody());
//睡眠一秒鐘
TimeUnit.SECONDS.sleep(1);
System.out.println("接受到的訊息是:"+message);
//手動應答 arg1:訊息標記,arg2:是否可以批處理
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
},consumerTag->{
System.out.println(consumerTag+"消費者取消消費介面回呼邏輯");
});
}
}
public class TaskConsumerTwo {
public static final String TASK_QUEUE_NAME="task";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqChannel.getChannel();
//是否自動應答
boolean autoAck=false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,(consumerTag,delivery)->{
try {
String message= new String(delivery.getBody());
//睡眠一分鐘
TimeUnit.SECONDS.sleep(60);
System.out.println("接受到的訊息是:"+message);
//手動應答 arg1:訊息標記,arg2:是否可以批處理
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
},consumerTag->{
System.out.println(consumerTag+"消費者取消消費介面回呼邏輯");
});
}
}
生產者:
public class ProductThree {
public static final String TASK_QUEUE_NAME="task";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqChannel.getChannel();
Scanner scanner = new Scanner(System.in);
//申明一個佇列
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
System.out.println("請輸入要發送的訊息");
while (scanner.hasNextLine()){
String message = scanner.nextLine();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("訊息發送完畢");
}
}
}
正常情況下訊息發送方發送兩個訊息 C1 和 C2 分別接收到訊息并進行處理,

如果此時多輸入幾條訊息則會被C1消費,因為C2還在睡眠

5.訊息自動重新入隊
🐊🐊 如果消費者由于某些原因失去連接(其通道已關閉,連接已關倍訓 TCP 連接丟失),導致訊息未發送 ACK 確認,RabbitMQ 將了解到訊息未完全處理,并將對其重新排隊,如果此時其他消費者可以處理,它將很快將其重新分發給另一個消費者,這樣,即使某個消費者偶爾死亡,也可以確保不會丟失任何訊息,
比如P發送了兩個訊息一個發送給C1一個發送給C2,C1未確認資訊,此時資訊可以入隊,佇列可以把訊息發送給其他消費者消費,

6. RabbitMQ 持久化
🐆🐆如何保障當 RabbitMQ 服務停掉以后訊息生產者發送過來的訊息不丟失,默認情況下 RabbitMQ 退出或由于某種原因崩潰時,它會忽視佇列和訊息,除非告知它不要這樣做,確保訊息不會丟失需要做兩件事:我們需要將佇列和訊息都標記為持久化,
佇列如何實作持久化
之前我們創建的佇列都是非持久化的,rabbitmq 如果重啟的化,該佇列就會被洗掉掉,如果要佇列實作持久化 需要在宣告佇列的時候把 durable 引數設定為持久化
queueDeclare(String queue, boolean durable,
boolean exclusive, boolean autoDelete,
Map<String, Object> arguments)
訊息如何實作持久化
要想讓訊息實作持久化需要在訊息生產者修改代碼,MessageProperties.PERSISTENT_TEXT_PLAIN 添加這個屬性,
channel.basicPublish("",QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
🐕🐕將訊息標記為持久化并不能完全保證不會丟失訊息,盡管它告訴 RabbitMQ 將訊息保存到磁盤,但是這里依然存在當訊息剛準備存盤在磁盤的時候 但是還沒有存盤完,訊息還在快取的一個間隔點,此時并沒有真正寫入磁盤,如果此時服務宕機了,則仍然不能持久化,
四.RabbitMQ的作業模式
在了解RabbitMQ的作業模式之前我們先來了解一下交換機的概念和型別,
Exchanges 概念
🐀🐀RabbitMQ 訊息傳遞模型的核心思想是: 生產者生產的訊息從不會直接發送到佇列,實際上,通常生產者甚至都不知道這些訊息傳遞傳遞到了哪些佇列中,
相反,生產者只能將訊息發送到交換機(exchange),交換機作業的內容非常簡單,一方面它接收來自生產者的訊息,另一方面將它們推入佇列,交換機必須確切知道如何處理收到的訊息,是應該把這些訊息放到特定佇列還是說把他們到許多佇列中還是說應該丟棄它們,這就的由交換機的型別來決定,
總共有以下型別:
直接(direct), 主題(topic) ,標題(headers) , 扇出(fanout)
無名 exchange

第一個引數是交換機的名稱,空字串表示默認或無名稱交換機,
臨時佇列
系統會為我們創建一個隨機名字的臨時佇列
String queueName = channel.queueDeclare().getQueue();
系結(bindings)
🐞🐞什么是 bingding 呢,binding 其實是 exchange 和 queue 之間的橋梁,它告訴我們 exchange 和那個佇列進行了系結關系,比如說下面這張圖告訴我們的就是 X 與 Q1 和 Q2 進行了系結

多重系結

如上圖所示exchange 的系結型別是 direct,它系結的多個佇列的 key 都相同,在這種情況下雖然系結型別是 direct 但是它表現的就和 fanout 有點類似了,就跟廣播差不多,
🐨🐨1. 簡單模式 HelloWorld
一個生產者、一個消費者,不需要設定交換機(使用默認的交換機),我們的入門程式就是一個簡單模式,
🐵🐵2. 作業佇列模式 Work Queue
一個生產者、多個消費者(競爭關系),不需要設定交換機(使用默認的交換機),上面的作業佇列就是作業佇列模式,
🐎🐎 3. 發布訂閱模式 Publish/subscribe
需要設定型別為 fanout 的交換機,并且交換機和佇列進行系結,當發送訊息到交換機后,交換機會將訊息發送到系結的佇列,
Fanout介紹
Fanout 這種型別非常簡單,正如從名稱中猜到的那樣,它是將接收到的所有訊息廣播到它知道的所有佇列中,
我們根據如上圖實體來實作發布訂閱模式
//ReceiveLogs01 將接收到的訊息列印在控制臺
public class ReceiveLogs01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一個臨時的佇列 佇列的名稱是隨機的
* 當消費者斷開和該佇列的連接時 佇列自動洗掉
*/
String queueName = channel.queueDeclare().getQueue();
//把該臨時佇列系結我們的 exchange 其中 routingkey(也稱之為 binding key)為空字串
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收訊息,把接收到的訊息列印在螢屏.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("控制臺列印接收到的訊息"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
//ReceiveLogs02 將接收到的訊息存盤在磁盤
public class ReceiveLogs02 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一個臨時的佇列 佇列的名稱是隨機的
* 當消費者斷開和該佇列的連接時 佇列自動洗掉
*/
String queueName = channel.queueDeclare().getQueue();
//把該臨時佇列系結我們的 exchange 其中 routingkey(也稱之為 binding key)為空字串
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收訊息,把接收到的訊息寫到檔案.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
File file = new File("C:\\work\\rabbitmq_info.txt");
FileUtils.writeStringToFile(file,message,"UTF-8");
System.out.println("資料寫入檔案成功");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
EmitLog 發送訊息給兩個消費者接收
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
/**
* 宣告一個 exchange
* 1.exchange 的名稱
* 2.exchange 的型別
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner sc = new Scanner(System.in);
System.out.println("請輸入資訊");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生產者發出訊息" + message);
}
}
}
}
🐒🐒 4. 路由模式 Routing
需要設定型別為 direct 的交換機,交換機和佇列進行系結,并且指定 routing key,當發送訊息到交換機后,交換機會根據 routing key 將訊息發送到對應的佇列,
Direct exchange 介紹
Direct exchange的作業方式是,訊息只去到它系結的routingKey 佇列中去
在這種系結情況下,生產者發布訊息到 exchange 上,系結鍵為 orange 的訊息會被發布到佇列Q1,系結鍵為 blackgreen 和的訊息會被發布到佇列 Q2,其他訊息型別的訊息將被丟棄,
案例根據如下圖示來發送訊息:
public class ReceiveLogsDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "disk";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println("等待接收訊息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
message="接收系結鍵:"+delivery.getEnvelope().getRoutingKey()+",訊息:"+message;
File file = new File("C:\\work\\rabbitmq_info.txt");
FileUtils.writeStringToFile(file,message,"UTF-8");
System.out.println("錯誤日志已經接收");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
public class ReceiveLogsDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
System.out.println("等待接收訊息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收系結鍵 :"+delivery.getEnvelope().getRoutingKey()+", 消
息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//創建多個 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info","普通 info 資訊");
bindingKeyMap.put("warning","警告 warning 資訊");
bindingKeyMap.put("error","錯誤 error 資訊");
//debug 沒有消費這接收這個訊息 所有就丟失了
bindingKeyMap.put("debug","除錯 debug 資訊");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生產者發出訊息:" + message);
}
}
}
}
🐮🐮 5. 通配符模式 Topic
Topic 型別與 Direct 相比,都是可以根據 RoutingKey 把訊息路由到不同的佇列,只不過 Topic 型別Exchange 可以讓佇列在系結 Routing key 的時候使用通配符!
Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規則:# 匹配一個或多個詞,* 匹配不多不少恰好1個詞,例如:item.# 能夠匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
需要設定型別為 topic 的交換機,交換機和佇列進行系結,并且指定通配符方式的 routing key,當發送訊息到交換機后,交換機會根據 routing key 將訊息發送到對應的佇列,

public class TopicConsumerOne {
public static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqChannel.getChannel();
//創建交換機
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queue="Q1";
channel.queueDeclare(queue,false,false,false,null);
//系結佇列
channel.queueBind(queue,EXCHANGE_NAME,"*.orange.*");
System.out.println("系結完成,Q1準備接收訊息");
channel.basicConsume(queue,true,((consumerTag, message) -> {
String msg =new String(message.getBody(),"UTF-8");
System.out.println("路由key是"+message.getEnvelope().getRoutingKey()+"接受到的訊息是:"+msg);
}),consumerTag -> {});
}
}
public class TopicConsumerTwo {
public static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqChannel.getChannel();
//創建交換機
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queue="Q2";
channel.queueDeclare(queue,false,false,false,null);
//系結佇列
channel.queueBind(queue,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queue,EXCHANGE_NAME,"lazy.#");
System.out.println("系結完成,Q2準備接收訊息");
channel.basicConsume(queue,true,((consumerTag, message) -> {
String msg =new String(message.getBody(),"UTF-8");
System.out.println("路由key是"+message.getEnvelope().getRoutingKey()+"接受到的訊息是:"+msg);
}),consumerTag -> {});
}
}
public class TopicProducter {
public static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqChannel.getChannel();
//創建交換機
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put(" quick.orange.rabbit","被佇列 Q1Q2 接收到");
bindingKeyMap.put(" lazy.orange.elephant","被佇列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox"," 被佇列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被佇列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","雖然滿足兩個系結但只被佇列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何系結不會被任何佇列接收到會被丟棄");
bindingKeyMap.put("quick.orange.male.rabbit","是四個單詞不匹配任何系結會被丟棄");
bindingKeyMap.put("lazy.orange.male.rabbit"," 是四個單詞但匹配 Q2");
//發送訊息
for (Map.Entry<String, String> entry : bindingKeyMap.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
channel.basicPublish(EXCHANGE_NAME,key,null,value.getBytes("UTF-8"));
}
System.out.println("訊息發送完畢");
}
}

五.RabbitMQ的發布確認
1.發布確認原理
🐜🐜生產者將信道設定成 confirm 模式,一旦信道進入 confirm 模式,所有在該信道上面發布的訊息都將會被指派一個唯一的 ID(從 1 開始),一旦訊息被投遞到所有匹配的佇列之后,broker就會發送一個確認給生產者(包含訊息的唯一 ID),這就使得生產者知道訊息已經正確到達目的佇列了,如果訊息和佇列是可持久化的,那么確認訊息會在將訊息寫入磁盤之后發出,broker 回傳給生產者的確認訊息中 delivery-tag 域包含了確認訊息的序列號,此外 broker 也可以設定basic.ack 的 multiple 域,表示到這個序列號之前的所有訊息都已經得到了處理,
🐃🐃confirm 模式最大的好處在于他是異步的,一旦發布一條訊息,生產者應用程式就可以在等信道回傳確認的同時繼續發送下一條訊息,當訊息最終得到確認之后,生產者應用便可以通過回呼方法來處理該確認訊息,如果 RabbitMQ 因為自身內部錯誤導致訊息丟失,就會發送一條 nack 訊息,生產者應用程式同樣可以在回呼方法中處理該 nack 訊息,
2.發布確認的策略
發布確認默認是沒有開啟的,如果要開啟需要呼叫方法confirmSelect,每當你要想使用發布確認,都需要在 channel 上呼叫該方法,
1) 單個確認發布
🐆🐆這是一種簡單的確認方式,它是一種同步確認發布的方式,也就是發布一個訊息之后只有它被確認發布,后續的訊息才能繼續發布,waitForConfirmsOrDie(long)這個方法只有在訊息被確認的時候才回傳,如果在指定時間范圍內這個訊息沒有被確認那么它將拋出例外,
🐌🐌這種確認方式有一個最大的缺點就是:發布速度特別的慢,因為如果沒有確認發布的訊息就會阻塞所有后續訊息的發布,這種方式最多提供每秒不超過數百條發布訊息的吞吐量,當然對于某些應用程式來說這可能已經足夠了,
2)批量確認發布
上面那種方式非常慢,與單個等待確認訊息相比,先發布一批訊息然后一起確認可以極大地提高吞吐量,
🐘🐘當然這種方式的缺點就是:當發生故障導致發布出現問題時,不知道是哪個訊息出現問題了,我們必須將整個批處理保存在記憶體中,以記錄重要的資訊而后重新發布訊息,當然這種方案仍然是同步的,也一樣阻塞訊息的發布,
3)異步確認發布
🐴🐴異步確認雖然編程邏輯比上兩個要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回呼函式來達到訊息可靠性傳遞的,這個中間件也是通過函式回呼來保證是否投遞成功,
三種發布確認代碼如下:
//三種發布確認訊息的耗時情況
public class ProductMessageAck {
public static final int message_number =200;
public static void main(String[] args) throws Exception {
//單個發布確認 發布200個訊息完成所花費的時間是:8257ms
//SinglePublishAck();
//批量發布確認 發布200個訊息完成所花費的時間是:227ms
// BatchPublishAck();
//異步發布確認 發布200個訊息完成所花費的時間是:45ms
AsyncPublishAck();
}
//單個發布確認
public static void SinglePublishAck() throws Exception{
//獲取信道
Channel channel=RabbitMqChannel.getChannel();
/**
* 生成一個佇列
* 1.佇列名稱
* 2.佇列里面的訊息是否持久化 默認訊息存盤在記憶體中
* 3.該佇列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費
* 4.是否自動洗掉 最后一個消費者端開連接以后 該佇列是否自動洗掉 true 自動洗掉
* 5.其他引數
*/
String queue_name= UUID.randomUUID().toString();
channel.queueDeclare(queue_name,false,false,false,null);
//設定發布確認
channel.confirmSelect();
//開始時間
long startTime=System.currentTimeMillis();
for (int i = 0; i < message_number; i++) {
String message=i+"";
channel.basicPublish("",queue_name,null,message.getBytes());
//服務端回傳 false 或超時時間內未回傳,生產者可以訊息重發
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("訊息發送成功");
}
}
//結束時間
long endTime=System.currentTimeMillis();
System.out.println("發布"+message_number+"個訊息完成所花費的時間是:"+(endTime-startTime)+"ms");
}
//批量發布確認
public static void BatchPublishAck() throws Exception{
//獲取信道
Channel channel=RabbitMqChannel.getChannel();
/**
* 生成一個佇列
* 1.佇列名稱
* 2.佇列里面的訊息是否持久化 默認訊息存盤在記憶體中
* 3.該佇列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費
* 4.是否自動洗掉 最后一個消費者端開連接以后 該佇列是否自動洗掉 true 自動洗掉
* 5.其他引數
*/
String queue_name= UUID.randomUUID().toString();
channel.queueDeclare(queue_name,false,false,false,null);
//設定發布確認
channel.confirmSelect();
//開始時間
long startTime=System.currentTimeMillis();
for (int i = 0; i < message_number; i++) {
String message=i+"";
channel.basicPublish("",queue_name,null,message.getBytes());
if(i%50==0){
//發送50條之后確認一次
channel.waitForConfirms();
}
}
//結束時間
long endTime=System.currentTimeMillis();
System.out.println("發布"+message_number+"個訊息完成所花費的時間是:"+(endTime-startTime)+"ms");
}
//異步發布確認
public static void AsyncPublishAck() throws Exception{
//獲取信道
Channel channel=RabbitMqChannel.getChannel();
/**
* 生成一個佇列
* 1.佇列名稱
* 2.佇列里面的訊息是否持久化 默認訊息存盤在記憶體中
* 3.該佇列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費
* 4.是否自動洗掉 最后一個消費者端開連接以后 該佇列是否自動洗掉 true 自動洗掉
* 5.其他引數
*/
String queue_name= UUID.randomUUID().toString();
channel.queueDeclare(queue_name,false,false,false,null);
//設定發布確認
channel.confirmSelect();
//開始時間
long startTime=System.currentTimeMillis();
/**
* 執行緒安全有序的一個哈希表,適用于高并發的情況
* 1.輕松的將序號與訊息進行關聯
* 2.輕松批量洗掉條目 只要給到序列號
* 3.支持并發訪問
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/**
* 確認收到訊息的一個回呼
* 1.訊息序列號
* 2.true 可以確認小于等于當前序列號的訊息
* false 確認當前序列號訊息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//回傳的是小于等于當前序列號的未確認訊息 是一個 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除該部分未確認訊息
confirmed.clear();
}else{
//只清除當前序列號的訊息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("發布的訊息"+message+"未被確認,序列號"+sequenceNumber);
};
/**
* 添加一個異步確認的監聽器
* 1.確認收到訊息的回呼
* 2.未收到訊息的回呼
*/
channel.addConfirmListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < message_number; i++) {
String message = "訊息" + i;
/**
* channel.getNextPublishSeqNo()獲取下一個訊息的序列號
* 通過序列號與訊息體進行一個關聯
* 全部都是未確認的訊息體
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queue_name, null, message.getBytes());
}
//結束時間
long endTime=System.currentTimeMillis();
System.out.println("發布"+message_number+"個訊息完成所花費的時間是:"+(endTime-startTime)+"ms");
}
}
3.發布確認 springboot 版本

代碼架構圖

在組態檔當中需要添加
spring.rabbitmq.publisher-confirm-type=correlated
? NONE
禁用發布確認模式,是默認值
? CORRELATED
發布訊息成功到交換器后會觸發回呼方法
? SIMPLE
經測驗有兩種效果,其一效果和 CORRELATED 值一樣會觸發回呼方法,
其二在發布訊息成功后使用 rabbitTemplate 呼叫 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 節點回傳發送結果,根據回傳結果來判定下一步的邏輯,要注意的點是
waitForConfirmsOrDie 方法如果回傳 false 則會關閉 channel,則接下來無法發送訊息到 broker
配置類
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
//宣告業務 Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
// 宣告確認佇列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// 宣告確認佇列系結關系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
回呼介面
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 交換機不管是否收到訊息的一個回呼方法
* CorrelationData
* 訊息相關資料
* ack
* 交換機是否收到訊息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交換機已經收到 id 為:{}的訊息",id);
}else{
log.info("交換機還未收到 id 為:{}訊息,由于原因:{}",id,cause);
}
}
}
@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
//依賴注入 rabbitTemplate 之后再設定它的回呼物件
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
//指定訊息 id 為 1
CorrelationData correlationData1=new CorrelationData("1");
String routingKey="key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);
CorrelationData correlationData2=new CorrelationData("2");
routingKey="key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);
log.info("發送訊息內容:{}",message);
}
}
消費者
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues =CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg=new String(message.getBody());
log.info("接受到佇列 confirm.queue 訊息:{}",msg);
}
}

🐥🐥可以看到,發送了兩條訊息,第一條訊息的 RoutingKey 為 “key1”,第二條訊息的 RoutingKey 為"key2",兩條訊息都成功被交換機接收,也收到了交換機的確認回呼,但消費者只收到了一條訊息,因為第二條訊息的 RoutingKey 與佇列的 BindingKey 不一致,也沒有其它佇列能接收這個訊息,所有第二條訊息被直接丟棄了,
回退訊息
🐧🐧在僅開啟了生產者確認機制的情況下,交換機接收到訊息后,會直接給訊息生產者發送確認訊息,如果發現該訊息不可路由,那么訊息會被直接丟棄,此時生產者是不知道訊息被丟棄這個事件的,那么如何讓無法被路由的訊息幫我想辦法處理一下?最起碼通知我一聲,我好自己處理啊,
🐇🐇通過設定 mandatory 引數可以在當訊息傳遞程序中不可達目的地時將訊息回傳給生產者,
@Slf4j
@Component
public class MessageProducer implements RabbitTemplate.ConfirmCallback ,
RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//rabbitTemplate 注入之后就設定該值
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
/**
* true:
* 交換機無法將訊息進行路由時,會將該訊息回傳給生產者
* false:
* 如果發現訊息無法進行路由,則直接丟棄
*/
rabbitTemplate.setMandatory(true);
//設定回退訊息交給誰處理
rabbitTemplate.setReturnCallback(this);
}
@GetMapping("sendMessage")
public void sendMessage(String message){
//讓訊息系結一個 id 值
CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange","key1",message+"key1",correlationData1)
;
log.info("發送訊息 id 為:{}內容為{}",correlationData1.getId(),message+"key1");
CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange","key2",message+"key2",correlationData2)
;
log.info("發送訊息 id 為:{}內容為{}",correlationData2.getId(),message+"key2");
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交換機收到訊息確認成功, id:{}", id);
} else {
log.error("訊息 id:{}未成功投遞到交換機,原因是:{}", id, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
log.info("訊息:{}被服務器退回,退回原因:{}, 交換機是:{}, 路由 key:{}",
new String(message.getBody()),replyText, exchange, routingKey);
}
}
回呼介面
@Component
@Slf4j
public class MyCallBack implements
RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
/**
* 交換機不管是否收到訊息的一個回呼方法
* CorrelationData
* 訊息相關資料
* ack
* 交換機是否收到訊息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交換機已經收到 id 為:{}的訊息",id);
}else{
log.info("交換機還未收到 id 為:{}訊息,由于原因:{}",id,cause);
}
}
//當訊息無法路由的時候的回呼方法
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
log.error(" 消 息 {}, 被交換機 {} 退回,退回原因 :{}, 路 由 key:{}",new
String(message.getBody()),exchange,replyText,routingKey);
}
}

六.RabbitMQ的死性佇列
死信的概念
🐰🐰先從概念解釋上搞清楚這個定義,死信,顧名思義就是無法被消費的訊息,字面意思可以這樣理解,一般來說,producer 將訊息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出訊息進行消費,但某些時候由于特定的原因導致 queue 中的某些訊息無法被消費,這樣的訊息如果沒有后續的處理,就變成了死信,有死信自然就有了死信佇列,
🐯🐯應用場景:為了保證訂單業務的訊息資料不丟失,需要使用到 RabbitMQ 的死信佇列機制,當訊息消費發生例外時,將訊息投入死信佇列中.還有比如說: 用戶在商城下單成功并點擊去支付后在指定時間未支付時自動失效,
Dead Letter Exchange(死信交換機),當訊息成為Dead message后,可以被重新發送到另一個交換機,這個交換機就是DLX,

訊息成為死信的三種情況:
- 佇列訊息長度到達限制;
- 消費者拒接消費訊息,basicNack/basicReject,并且不把訊息重新放入原目標佇列,requeue=false;
- 原佇列存在訊息過期設定,訊息到達超時時間未被消費;
死性佇列實戰架構圖

🐗🐗1.訊息 TTL 過期
佇列中第五個引數map中可以設定的引數
| 引數 | 作用 |
|---|---|
| x-message-ttl | 設定ttl |
| x-expires | 設定訊息過期時間 |
| x-max-length | 設定佇列接收訊息的長度 |
| x-max-length-bytes | 設定訊息的最大位元組長度 |
| x-overflow | |
| x-dead-letter-exchange | 設定死性佇列名 |
| x-dead-letter-routing-key | 設定死性佇列路由名 |
| x-max-priority | 設定佇列優先級 |
| x-queue-mode | 設定佇列模式 |
| x-queue-type | 設定佇列型別 |
| x-delivery-limit | delivery的最大長度限制 |
| x-single-active-consumer |
我們將C1開啟之后然后關閉再開啟C2此時訊息會發送到C2中

public class DeadQueueConsumerOne {
public static final String NORMAL_EXCHANGE="normal_exchange";
public static final String NORMAL_QUEUE="normal_queue";
public static final String DEAD_EXCHANGE="dead_exchange";
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqChannel.getChannel();
//創建交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//宣告佇列
//設定引數
//正常佇列系結死信佇列資訊
Map<String, Object> params = new HashMap<>();
//正常佇列設定死信交換機 引數 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常佇列設定死信 routing-key 引數 key 是固定值
params.put("x-dead-letter-routing-key", "wf");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,params);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//系結佇列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"wlf");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"wf");
//消費訊息
channel.basicConsume(NORMAL_QUEUE,true,(consumerTag, message) -> {
String msg=new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到的訊息是:"+msg);
},consumerTag -> {});
}
}
public class DeadQueueConsumerTwo {
public static final String DEAD_EXCHANGE="dead_exchange";
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqChannel.getChannel();
//創建交換機
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//宣告佇列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"wf");
//消費訊息
channel.basicConsume(DEAD_QUEUE,true,(consumerTag, message) -> {
String msg=new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("死信佇列接收到的訊息是:"+msg);
},consumerTag -> {});
}
}
public class DeadQueueProduce {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception{
try(Channel channel= RabbitMqChannel.getChannel()){
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//設定訊息的 TTL 時間
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
//發送訊息
String msg="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"wlf",properties,msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("生產者訊息發送完畢");
}
}
}
🐌🐌 2.佇列達到最大長度
將正常佇列收到的訊息設定為6,向佇列中發送10條訊息,死性佇列中收到4條,
@Configuration
public class DeadQueueConfig {
public static final String NORMAL_EXCHANGE="normal_exchange";
public static final String NORMAL_QUEUE="normal_queue";
public static final String DEAD_EXCHANGE="dead_exchange";
public static final String DEAD_QUEUE="dead_queue";
@Bean("normalExchange")
public DirectExchange normalExchange(){
return new DirectExchange(NORMAL_EXCHANGE);
}
@Bean("deadExchange")
public DirectExchange deadExchange(){
return new DirectExchange(DEAD_EXCHANGE);
}
@Bean("normalQueue")
public Queue normalQueue(
){
return QueueBuilder
.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey("lishi")
.maxLength(6)
.build();
}
@Bean("deadQueue")
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE).build();
}
@Bean
public Binding normalQueueBinding(
@Qualifier("normalQueue") Queue queue,
@Qualifier("normalExchange")DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("zhangsan");
}
@Bean
public Binding deadQueueBinding(
@Qualifier("deadQueue") Queue queue,
@Qualifier("deadExchange")DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("lishi");
}
}
@Test
public void TestDeadMsg(){
for (int i = 1; i < 11; i++) {
String msg="info"+i;
rabbitTemplate.convertAndSend(DeadQueueConfig.NORMAL_QUEUE,"zhangsan",msg);
}
}
@Component
@RabbitListener(queues = "normalQueue")
public class DeadConsumerTwo {
@RabbitHandler
public void ReceiveMessage(String message){
System.out.println("normalQueue---->receive message is:"+message);
}
}
@Component
@RabbitListener(queues = "deadQueue")
public class DeadConsumerTwo {
@RabbitHandler
public void ReceiveMessage(String message){
System.out.println("deadQueue---->receive message is:"+message);
}
}
🐡🐡 3.消費者拒接消費訊息:設定拒絕接受第五條訊息
public class Consumer01 {
//普通交換機名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
//宣告死信和普通交換機 型別為 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//宣告死信佇列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信佇列系結死信交換機與 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常佇列系結死信佇列資訊
Map<String, Object> params = new HashMap<>();
//正常佇列設定死信交換機 引數 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常佇列設定死信 routing-key 引數 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收訊息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if(message.equals("info5")){
System.out.println("Consumer01 接收到訊息" + message + "并拒絕簽收該訊息");
//requeue 設定為 false 代表拒絕重新入隊 該佇列如果配置了死信交換機將發送到死信佇列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01 接收到訊息"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
});
}
}

七.RabbitMQ的延遲佇列
1. 延遲佇列概念
🐂🐂延時佇列,佇列內部是有序的,最重要的特性就體現在它的延時屬性上,延時佇列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說,延時佇列就是用來存放需要在指定時間被處理的元素的佇列,
2.延遲佇列相關使用場景
- 訂單在十分鐘之內未支付則自動取消
- 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送訊息提醒,
- 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒,
- 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員,
- 預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議
3. RabbitMQ 中的 TTL
- TTL 全稱 Time To Live(存活時間/過期時間,單位是毫秒),
- RabbitMQ可以對訊息設定過期時間,也可以對整個佇列(Queue)設定過期時間,
- 如果在 TTL 設定的時間內沒有被消費,則會成為"死信",
- 如果同時配置了佇列的 TTL 和訊息的
TTL,那么較小的那個值將會被使用,有兩種方式設定 TTL,
訊息宣告TTL

佇列宣告TTL

🐆🐆如果設定了佇列的 TTL 屬性,那么一旦訊息過期,就會被佇列丟棄(如果配置了死信佇列被丟到死信佇列中),而第二種方式,訊息即使過期,也不一定會被馬上丟棄,因為訊息是否過期是在即將投遞到消費者之前判定的,如果當前佇列有嚴重的訊息積壓情況,則已過期的訊息也許還能存活較長時間;
💐💐另外,還需要注意的一點是,如果不設定 TTL,表示訊息永遠不會過期,如果將 TTL 設定為 0,則表示除非此時可以直接投遞該訊息到消費者,否則該訊息將會被丟棄,
延時佇列=TTL+死性佇列+一點點調味品
延時佇列,不就是想要訊息延遲多久被處理嗎,TTL 則剛好能讓訊息在延遲多久之后成為死信,另一方面,成為死信的訊息都會被投遞到死信佇列里,這樣只需要消費者一直消費死信佇列里的訊息就完事了,因為里面的訊息都是希望被立即處理的訊息,
佇列 TTL實戰
創建兩個佇列 QA 和 QB,兩者佇列 TTL 分別設定為 10S 和 40S,然后在創建一個交換機 X 和死信交換機 Y,它們的型別都是 direct,創建一個死信佇列 QD,它們的系結關系如下:

@Configuration
public class TTLQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
// 宣告 xExchange
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
// 宣告 xExchange
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//宣告佇列 A ttl 為 10s 并系結到對應的死信交換機
@Bean("queueA")
public Queue queueA(){
Map<String, Object> args = new HashMap<>(3);
//宣告當前佇列系結的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//宣告當前佇列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//宣告佇列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 宣告佇列 A 系結 X 交換機
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//宣告佇列 B ttl 為 40s 并系結到對應的死信交換機
@Bean("queueB")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
//宣告當前佇列系結的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//宣告當前佇列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//宣告佇列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//宣告佇列 B 系結 X 交換機
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
//宣告死信佇列 QD
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
//宣告死信佇列 QD 系結關系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
@Test
void sendMsgToQD(){
String message="hello world";
rabbitTemplate.convertAndSend("X", "XA", "訊息來自 ttl 為 10S 的佇列: "+message);
rabbitTemplate.convertAndSend("X", "XB", "訊息來自 ttl 為 40S 的佇列: "+message);
}
訊息消費者代碼
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("當前時間:{},收到死信佇列資訊{}", new Date().toString(), msg);
}
}
😹😹第一條訊息在 10S 后變成了死信訊息,然后被消費者消費掉,第二條訊息在 40S 之后變成了死信訊息,然后被消費掉,這樣一個延時佇列就打造完成了,
問題分析
💀💀如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個佇列,這里只有 10S 和 40S兩個時間選項,如果需要一個小時后處理,那么就需要增加 TTL 為一個小時的佇列,如果是預定會議室然后提前通知這樣的場景,豈不是要增加無數個佇列才能滿足需求?
優化后的架構圖:添加了一個不設定時間的佇列,可以在發送訊息時設定資訊的發送時間,這樣更加的靈活,

添加一個配置
@Configuration
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
//宣告佇列 C 死信交換機
@Bean("queueC")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
//宣告當前佇列系結的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//宣告當前佇列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//沒有宣告 TTL 屬性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
//宣告佇列 B 系結 X 交換機
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
@Test
void contextLoads() {
String message="hello world";
String ttlTime="20000";
rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("當前時間:{},發送一條時長{}毫秒 TTL 資訊給佇列 C:{}", new Date(),ttlTime, message);
}
控制臺輸出:
當前時間:Thu Aug 19 09:55:29 CST 2021,發送一條時長20000毫秒
TTL 資訊給佇列 C:hello world
當前時間:Thu Aug 19 09:55:49 CST 2021,收到死信佇列資訊hello world
問題分析
🙈🙈看起來似乎沒什么問題,但是在最開始的時候,就介紹過如果使用在訊息屬性上設定 TTL 的方式,訊息可能并不會按時“死亡“,因為 RabbitMQ 只會檢查第一個訊息是否過期,如果過期則丟到死信佇列,如果第一個訊息的延時時長很長,而第二個訊息的延時時長很短,第二個訊息并不會優先得到執行,
😸😸解決辦法:安裝延時佇列插件
在官網上下載 https://www.rabbitmq.com/community-plugins.html,下載
rabbitmq_delayed_message_exchange 插件,然后解壓放置到 RabbitMQ 的插件目錄,進入 RabbitMQ 的安裝目錄下的 plgins 目錄,執行下面命令讓該插件生效,然后重啟 RabbitMQ,
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在這里新增了一個隊列 delayed.queue,一個自定義交換機 delayed.exchange,系結關系如下:

配置如下
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//自定義交換機 我們在這里定義的是一個延遲交換機
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
//自定義交換機的型別
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange
delayedExchange) {
return
BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295122.html
標籤:其他


