RabbitMQ 入門篇🚪
MQ 的基本概念:
什么是 MQ ?
MQ全稱為Message Queue即訊息佇列
-
"訊息佇列"
是在訊息的傳輸程序中保存訊息的容器 -
它是典型的:生產者————消費者模型
生產者不斷向訊息佇列中生產訊息 ———————— 消費者不斷的從佇列中獲取訊息.
這樣的好處: 生產者只需要關注發訊息,消費者只需要關注收訊息,二者沒有業務邏輯的侵入,這樣就實作了生產者和消費者的解耦.
為什么要使用 MQ?
或者說MQ 有什么好處,MQ 主要可以實作三種功能:
服務解耦
-
場景:服務A產生資料, 而服務B,C,D需要這些資料
那么我們可以在A服務中直接呼叫B,C,D服務,把資料傳遞到下游服務即可;
-
但
隨著我們的應用規模不斷擴大,會有更多的服務需要A的資料
如果有幾十甚至幾百個下游服務,而且會不斷變更,再加上還要考慮下游服務出錯的情況 A服務中呼叫代碼的維護會極為困難
程式非常的耦合
-
而 ,通過 MQ訊息佇列 可以實作,對程式的
解耦
A服務只需要向訊息服務器發送訊息,而不用考慮誰需要這些資料
下游服務如果需要資料,自行從訊息服務器訂閱訊息,不再需要資料時則取消訂閱即可
流量削峰
-
場景:
我們有一個應用,平時訪問量是每秒300請求,我們用一臺服務器即可輕松應對 √
但,在高峰期, 訪問量瞬間翻了十倍, 達到每秒3000次請求, 單臺服務器無法應對
我們增加到10臺服務器,減壓
-
而,很多時候這種高壓 每天只出現一次,每次只有半小時
那么我們10臺服務器在多數時間都只分擔每秒幾十次請求,這樣就有點浪費資源了
-
使用MQ來進行流量削峰
我們可以對于這種,可能會突然產生高請求的功能,設定一個MQ
當用戶發起請求后臺并不會立刻處理,而是通過 MQ 發送一個請求,發送到佇列里面,排隊等待處理…
我們的后臺,接收者,發現佇列中有訊息,一個一個的取出,進行后臺處理…
避免了同一時刻大量的請求,而處理不過來導致 服務崩潰~
異步呼叫
-
場景:
對于有些服務之間的呼叫會有很長時間的回應,而用戶并不能接受這么時間的回應:
A 呼叫 B,B 需要花費很長時間執行,但是 A 需要知道 B 什么時候可以執行完給頁面回應…
-
外賣支付
相信大家都點過外賣,用戶支付完成之后,到真正外賣到手是一個很漫長復雜的程序~
我們不可能一直停留在頁面上進行等待~
支付后————發送支付成功的通知————再尋找外賣小哥來進行配送…
而尋找外賣小哥的程序非常耗時,高峰期,可能要等待幾十秒甚至更長,這樣就造成整條呼叫鏈路回應非常緩慢
-
MQ解決方案:
用戶下單,訂單資料可以發送到訊息佇列服務器,立刻回應客戶端
為您尋找騎手,整條鏈路的回應時間只有200毫秒左右
訊息接收方,監聽獲取每一個訂單訊息后臺緩慢的尋找外賣小哥~
AMQP 和 JMS ?
AMQP 和 JMS 是目前市面上常見的兩種 訊息佇列協議
AMQP
-
AMQP
高級訊息佇列協議!
是應用層協議的一個開放標準,為面向訊息的中間件設計,兼容 JMS
RabbitMQ 就是基于 AMQP 協議實作的
JMS
-
JMS
Java 訊息服務
JMS的客戶端之間可以通過
JMS服務
進行異步的訊息傳輸 -
JMS(Java Message Service,Java訊息服務)API是一個訊息服務的標準或者說是規范
就像JDBC一樣通過介面定義一組規范,不同的實作嘗試實作對于的驅動來完成開發...
它使分布式通信耦合度更低,訊息服務更加可靠以及異步性, ActiveMQ 就是基于 JMS 規范實作的
總結:
規范:
-
AMQP 為訊息定義了線路層(wire-level protocol)的協議
-
JMS所定義的是API規范
跨平臺
- Java 體系中,多個client均可以通過JMS進行互動,不需要應用修改代碼,但是其對跨平臺的支持較差
- AMQP天然具
有跨平臺、跨語言特性
支持訊息型別
JMS
支持TextMessage、MapMessage 等復雜的訊息型別AMQP
僅支持 byte[] 訊息型別(復雜的型別可序列化后發送
Exchange 交換機
提供的路由演算法
- AMQP可以提供多樣化的路由方式來傳遞訊息到訊息佇列 4種交換機型別,6種模式
- JMS 僅支持 佇列 和 主題/訂閱 方式兩種
常見MQ產品:
- ActiveMQ:基于JMS,
早期的MQ框架,現在已經很少使用了
- Kafka:分布式訊息系統,高吞吐量
- RabbitMQ:基于AMQP協議,erlang語言開發,穩定性好
本篇學習😶
- RocketMQ:基于JMS,阿里巴巴產品,目前交由Apache基金會
RabbitMQ
-
官方地址
-
RabbitMQ是由
erlang
語言開發,所以安裝環境需要安裝erlang
-
基于
AMQP
(Advanced Message Queue 高級訊息佇列協議
)協議實作的訊息佇列 -
它是一種應用程式之間的通信方法,訊息佇列在分布式系統開發中應用非常廣泛

RabbitMQ 的作業原理

組成部分:
Producer 訊息生產者
- 訊息生產者,即生產方客戶端,生產方客戶端將訊息,
通過信道Channel
發送到MQ
Connection 連接物件
-
Producer /Consumer 和 broker 之間的 TCP 連接
程式通過,Connection連接物件將,創建出
Channel信道
:生產者通過 信道 將訊息發送給MQ
消費者通過 信道 獲取到MQ的訊息~
-
Channel 信道:
如果每一次訪問 RabbitMQ 都建立一個 Connection,訊息量大的時候,對于性能也是巨大的;
Channel 是在 connection 內部建立的邏輯連接,為 Connection 減少了作業系統建立 TCP connection 的開銷;
細節不詳細介紹
可以理解為是一個,訊息資料傳遞的一個
通到
可以通過它,來創建配置,
生產者|消費者 與MQ通信
宣告設定系結:交換機|佇列
Broker 可以認為是 MQ
-
訊息佇列服務行程
:此行程包括兩個部分:Exchange交換機
和Queue佇列
-
Exchange交換機
是 RabbitMQ 非常重要的一個部件
一方面它接收來自生產者的訊息,另一方面它將訊息 推送到佇列中
-
Queue 佇列
RabbitMQ 內部使用的一種資料結構
佇列
佇列就像是一個“吸管” 一邊吸水一邊出水,遵循 “先進先出”原則;
生產者發訊息——交換機——轉發到佇列上
是真正訊息存盤的地方~
Consumer 訊息消費者
- 訊息消費者,即消費方客戶端,
通關信道Channel
接收MQ轉發的訊息,并進行相關的處理;
-----發送訊息-----
- 生產者通過 Connection 和Broker建立TCP連接,
- Connection 建立 Channel 通道
- 生產者通過信道,將訊息發送給Broker(MQ),由Exchange將訊息進行轉發~
佇列中去!
-----接收訊息-----
- 消費者通過 Connection 和Broker建立TCP連接
- Connection 建立 Channel 通道
- 消費者監聽指定的Queue(佇列),當有訊息到達Queue時Broker默認將訊息,通過 Channel 推送給消費者
Exchange 交換機四種型別 ?
RabbitMQ訊息傳遞模型的核心思想是:
-
生產者永遠不會將任何訊息直接發送到佇列,通常生產者甚至不知道訊息是否會被傳遞到任何佇列 生產者只能向交換機(Exchange)發送訊息
-
交換機是一個非常簡單的東西,一邊接收來自生產者的訊息,另一邊將訊息推送到佇列.
-
RabbitMQ 的交換機具有很多中型別,可以完成很多種復雜的場景操作:
交換機型別:
-
fanout: 廣播模式
發布/訂閱
,交換機給所有的佇列,發送相同的訊息; -
direct : 路由模式
routing key
交換機,根據對應的routing key
的佇列上發送訊息; -
topic: 動態路由模式,可以用過一定的規則定義
roting key
使 交換機動態的多樣性選擇佇列
* 表示一個單詞
# 表示任意數量(零個或多個)單詞
-
headers: 請求頭模式,
目前用的很少了
,就像請求頭一樣,發送訊息時候附帶頭部資料
,交換機根據訊息的頭部資訊匹配對應的佇列;
RabbitMQ環境搭建 ?
本次搭建是Linux 的 如果有朋友是Win的話可以參考這篇文章:🚀
工具準備🔨:
RabbitMQ是由erlang
語言開發,所以安裝環境需要安裝 erlang
- erlang-21.3.8.21-1.el7.x86_64.rpm
erlang環境
- rabbitmq-server-3.8.8-1.el7.noarch.rpm
rabbit安裝
官網下載,如果沒有的話也可以底部本人的網盤下載
環境搭建🏚:
本人使用的是 阿里云服務器
沒有的話也可以使用虛擬機… 事先使用連接工具上傳了檔案
本人喜歡把工具都安裝在 /usr/wsm
目錄下:
[root@iZj6ciuzx7luldnazt4iswZ ~]# cd /
[root@iZj6ciuzx7luldnazt4iswZ /]# ls
bin dev home lib lost+found mnt patch root sbin sys usr www
boot etc install.sh lib64 media opt proc run srv tmp var
[root@iZj6ciuzx7luldnazt4iswZ /]# cd usr
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin etc games include lib lib64 libexec local sbin share src tmp
[root@iZj6ciuzx7luldnazt4iswZ usr]# mkdir wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin etc games include lib lib64 libexec local sbin share src tmp wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# cd wsm
[root@iZj6ciuzx7luldnazt4iswZ wsm]# ls
erlang-21.3-1.el7.x86_64.rpm rabbitmq-server-3.8.8-1.el7.noarch.rpm #上傳的兩個檔案
解壓安裝:
# 解壓安裝 erlang
rpm -ivh erlang-21.3.8.21-1.el7.x86_64.rpm
# 云下載一個 初始化一些配置, 程序比較慢請耐心等待~, 在這之后才可以進行 安裝 RabbitMQ
yum install socat -y
# 解壓安裝 rabbitmq
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
ok ,安裝完畢了解一些 RabbitMQ 命令:
# 啟動服務
systemctl start rabbitmq-server
# 查看服務狀態
systemctl status rabbitmq-server
# 開機自啟動
systemctl enable rabbitmq-server
# 停止服務
systemctl stop rabbitmq-server
# 重啟服務
systemctl restart rabbitmq-server
注意:這里只是把RabbitMQ 服務
給搭建好了,為了方便操作我們還需要安裝一個web控制面板
# 安裝web控制面板
rabbitmq-plugins enable rabbitmq_management
# 安裝完畢以后,重啟服務即可
systemctl restart rabbitmq-server
# 訪問 http://服務器ip:15672 ,用默認賬號密碼(guest)登錄,出現權限問題
# 默認情況只能在 localhost 本機下訪問,所以需要添加一個遠程登錄的用戶
# 創建賬號和密碼: admin 123456
rabbitmqctl add_user admin 123456
# 設定用戶角色,用戶級別: administrator monitoring policymaker managment
rabbitmqctl set_user_tags admin administrator
# 為用戶添加資源權限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 添加配置、寫、讀權限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
##### 擴展一些命令:#####
關閉應用的命令為: rabbitmqctl stop_app
清除的命令為: rabbitmqctl reset
重新啟動命令為: rabbitmqctl start_app
- 如果是阿里云的服務器
別忘記開啟埠 還有 關閉防火墻~
用戶級別:
- administrator:可以登錄控制臺、查看所有資訊、可以對 rabbitmq 進行管理
- monitoring:監控者 登錄控制臺,查看所有資訊
- policymaker:策略制定者 登錄控制臺,指定策略
- managment:普通管理員 登錄控制臺
主要埠介紹:阿里云建議將這些都打開~
-
4369 – erlang發現口
-
5672 – client端通信口
-
15672 – 管理界面ui埠
-
25672 – server間內部通信口
測驗是否可以訪問:
訪問頁面:
-
Overview
概覽 RabbitMQ 的整體情況,也可以查看
集群各個節點的資訊 情況
MQ 各個埠映射資訊
-
Connection
該 選項專欄 下是MQ 與各個
生產者
消費者
連接情況. -
Channels
這里展示,各個
通道 與 連接的關系
-
Exchanage
展示所有的 交換機
-
Queue
展示所有的 佇列
-
Admin
這里管理著,MQ 所有的操作用戶~
RabbitMQ 管理頁面:
Overview
Connections
- Name 連接名
點擊連接名, 還可以查看詳細的資訊~
- User name 當前連接登錄MQ 的用戶
- State 當前連接的狀態,
running 運行
idle 空閑
- SSL|TLS 是否使用的是
SSL|TLS協議
- Peotocol
AMQP 0-9-1
指的是AMQP 的協議版本號 - Channels 當前連接創建通道的 通道總數
- From client 每秒發出的訊息數
- To client 每秒接收的訊息數
Channels
記錄各個連接的信道:
一個連接IP 可以有多個信道
多個通道通過多執行緒實作,不相互干擾 我們在 信道中創建:佇列 交換機 ...
生產者的通道一般使用完之后會立馬關閉,消費者是一直監聽的…
-
Channel 通道名稱
-
User Name 該通道,創建者 用戶名
-
Model 通道的
確認模式
C confirm模式
T 表示事務
-
State 通道當前的狀態
running 運行
idie 空閑
-
Unconfirmed 待確認的訊息數
-
Prefetch
預先載入
Prefetch 表示每個消費者最大的能承受的未確認訊息數目
簡單來說就是用來指定一個消費者一次可以從 RabbitMQ 中獲取多少條訊息并快取在消費者中,
一旦消費者的緩沖區滿了,RabbitMQ 將會停止投遞新的訊息到該消費者中直到它發出有訊息被 ack 了
消費者負責不斷處理訊息,不斷 ack,然后只要
UnAcked
數少于Prefetch * consumer
數目,RabbitMQ 就不斷將訊息投遞過去 -
Unacker 待 ack 的訊息數
-
publish 訊息生產者發送訊息的
速率
-
confirm 訊息生產者確認訊息的
速率
-
unroutable
drop
表示訊息,未被接收,且已經洗掉的訊息. -
deliver / get 訊息消費者獲取訊息的
速率
-
ack 訊息消費者 ack 訊息的速率.
MQ 的 ACK機制:100%訊息消費!
Exchange
Queue

- Name 表示訊息佇列的名稱
- Type 訊息佇列的型別…
- Features:表示訊息佇列的特性,D 表示訊息佇列持久化
- State:表示當前佇列的狀態,running 表示運行中;idle 表示空閑
- Ready:表示待消費的訊息總數
- Unacked:表示待應答的訊息總數
- Total:表示訊息總數 Ready+Unacked
- incoming:表示訊息進入的速率
- deliver/get:表示獲取訊息的速率
- ack:表示訊息應答的速
Admin
Java 集成 RabbitMQ 案例
創建一個Maven專案并使用 git 進行管理, wlog.md
檔案進行著專案日志的記錄?~
引入RabbitMQ
的依賴:
pom.xml
<dependencies>
<!-- rabbitMQ 依賴 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
</dependencies>
簡單模式 Hello Word:
如圖,顯而易見,非常簡單就是一個一發一讀
的程序…
- P:生產者,也就是要發送訊息的程式
- C:消費者:訊息的接受者,會一直等待訊息到來,
- queue:訊息佇列,圖中紅色部分,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息
發送者
Producer.Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** 訊息生產者 **/
public class Producer {
// 定義佇列名稱
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//創建一個連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.243.109.199");
factory.setUsername("admin");
factory.setPassword("123456");
//channel 實作了自動 close 介面 自動關閉 不需要顯示關閉
//創建連接物件
Connection connection = factory.newConnection();
//根據連接物件,獲取信道
Channel channel = connection.createChannel();
/**設定訊息佇列的屬性!
* queue :佇列名稱
* durable :是否持久化 如果持久化,mq重啟后佇列資料還在! (佇列是在虛擬路徑上的...)
* exclusive :佇列是否獨占此連接,佇列只允許在該連接中訪問,如果connection連接關閉佇列則自動洗掉,如果將此引數設定true可用于臨時佇列的創建
* autoDelete :佇列不再使用時是否自動洗掉此佇列,如果將此引數和exclusive引數設定為true就可以實作臨時佇列(佇列不用了就自動洗掉)
* arguments :佇列引數 null,可以設定一個佇列的擴展引數,需要時候使用!比如:可設定存活時間
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**發送訊息,引數:
* exchange :指定的交換機,不指定就會有默認的....
* routingKey :路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用默認交換機routingKey設定為佇列的名稱
* props :訊息包含的屬性: 后面介紹,可以是一個一個物件... 訊息持久化配置...
* body :發送的訊息,AMQP以位元組方式傳輸...
* */
channel.basicPublish("", QUEUE_NAME, null, "Hello Word你好世界".getBytes());
System.out.println("訊息發送完畢");
}
}
消費者
Consumer.Java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** 訊息消費者 **/
public class Consumer {
// 定義佇列名稱
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.243.109.199");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收訊息.........");
//收到訊息后用來處理訊息的回呼物件
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消費的一個回呼介面 如在消費的時候佇列被洗掉掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("訊息消費被中斷");
};
/**
* 消費者消費訊息 - 接受訊息
* queue 消費哪個佇列
* autoAck 消費成功之后是否要自動應答 true 代表自動應答 false 手動應答,要通過編程實作回復驗證,這就是Unacked 為回傳ack的資料
* deliverCallback 消費方法,當消費者接收到訊息要執行的方法, 引數是一個函式式介面可以使用 lambda運算式~
* cancelCallback 訊息被取消時的回呼
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
-
建議先啟動
消費者
在啟動發送者
可以看到,消費者啟動之后,在等待 發生者 發送訊息,發送者啟動發送訊息,
消費者控制臺會立刻接收到訊息!
-
MQ 發送者一般情況下都不會直接忘佇列發訊息
這種情況下MQ 都會有一個默認的交換機~
作業模式 Work Queues
作業模式 相當于 簡單模式的 升級版!
- 多個消費者,對應一個發送者,發送者 產生的訊息存在佇列種,佇列會以
復雜均衡形式
輪詢的發送給多個消費者
一般應用于:發送方事務簡單,接收方事務復雜…
美團外賣:用戶下單——后臺內部要聯系商家 騎手 生產訂單 處理...
抽取作業類:
因為上面示例我們知道,創建交換機|佇列 需要Channel信道
交換機 佇列是創建在信道里面的
- 而每次創建交換機的時候,都要創建一次
Connection
Channel
- 于是我們可以將它抽離出一個工具類
MQChannelUtil.Java
MQChannelUtil.Java
- com.wsm目錄下創建一個
util包
專門用來存盤工具類🛠
import com.rabbitmq.client.Channel; //匯入MQ的包~
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/** RabbitMQ 連接配置類: **/
public class MQChannelUtil {
//得到一個連接的 channel
public static Channel getChannel() throws Exception {
//創建一個連接工廠, 設定連接: IP 埠 用戶 密碼
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("42.192.149.71");
// factory.setPort("設定對應的埠,默認就是: 5672");
factory.setUsername("admin");
factory.setPassword("123456");
//創建連接物件 信道物件
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
發送者
Producer.Java
import com.rabbitmq.client.Channel;
import com.wsm.Util.MQChannelUtil;
import java.util.Scanner;
/** 訊息生產者 **/
public class Producer {
// 定義佇列名稱
private final static String QUEUE_NAME = "Word";
public static void main(String[] args) throws Exception {
// 工具類創建一個信道
Channel channel = MQChannelUtil.getChannel();
// Java控制臺測驗法訊息:
Scanner scanner = new Scanner(System.in);
//創建交換機
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 回圈多次發布訊息:
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("訊息發送完畢");
}
}
}
消費者1
Consumer1.Java
import com.rabbitmq.client.*;
import com.wsm.Util.MQChannelUtil;
/** 訊息消費者 **/
public class Consumer1 {
// 定義佇列名稱
private final static String QUEUE_NAME = "Word";
public static void main(String[] args) throws Exception {
// 工具類創建一個信道
Channel channel = MQChannelUtil.getChannel();
//收到訊息后用來處理訊息的回呼物件
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消費的一個回呼介面 如在消費的時候佇列被洗掉掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("訊息消費被中斷");
};
/** 消費者消費訊息 - 接受訊息: 注意引數兩個回呼函式~ */
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
消費者2
Consumer2.Java
和消費者1 一模一樣換一個名字,兩個消費者監聽一個佇列 進行資料處理....
結果測驗:
訊息被輪詢消費
:
- 通程序式執行發現生產者總共發送 4 個訊息,消費者 1 和消費者 2 分別分得兩個訊息,并且是按照有序的一個接收一次訊息
- MQ 發送訊息,一般情況下只會被一個
消費者執行消費
消費者執行之后, 佇列就會將訊息洗掉,(ACK機制...
- 后面可以通過,交換機模式完成,一個訊息被多個消費者消費…
訊息確認接識訓制 ACK
訊息一旦被消費者接收,佇列中的訊息就會被洗掉
RabbitMQ怎么知道訊息被接收了呢?
-
如果消費者領取訊息后,還沒執行操作就掛掉了呢?或者拋出了例外?訊息消費失敗!
-
但是RabbitMQ無從得知,這樣訊息就丟失了!
因此,RabbitMQ有一個ACK機制
- 當消費者獲取訊息后,會向RabbitMQ發送回執ACK 告知訊息已經被接收,不過這種回執ACK分兩種情況:
- 自動ACK: 訊息一旦被接收,消費者自動發送ACK
- 手動ACK: 訊息接收后,不會發送ACK,需要手動呼叫
自動ACK
-
RabbitMQ
默認此種模式:
訊息發送后立即被認為已經傳送成功!
消費者 接收到訊息,就向佇列發送ack,佇列立刻就洗掉訊息
-
這種模式需要在高吞吐量和資料傳輸安全性方面做權衡 僅適用在消費者可以高效并以 某種速率能夠處理這些訊息的情況下使用
手動ACK
-
訊息接收后,不會發送ACK,需要手動代碼進行呼叫
待消費者 執行完畢之后,在通過代碼向 佇列發送ack,佇列接收到ack 之后會將訊息洗掉!
-
channel.basicAck(long deliveryTag,boolean multiple); 用于肯定確認
RabbitMQ 已知道該訊息并且成功的處理訊息,可以將其丟棄了
引數1 long型別,表示處理的訊息標識,MQ沒發送一個訊息都一個對于該訊息的唯一標識…
就像序列化 序列號一樣,用于網路傳輸..
引數2 boolean型別,表示是否支持批量處理
-
channel.basicNack(deliveryTag, false, true); 用于否定確認, 消費者 訊息執行程序中失敗,或服務器掛機…
引數1
同上,訊息的唯一標識
引數2
表示是否支持批量處理
引數3
requeue true則重新入佇列 false丟棄或者進入死信佇列
-
channel.basicReject(deliveryTag, true); 用于否定確認
引數1
同上
引數2
requeue true則重新入佇列 false丟棄或者進入死信佇列
與 Channel.basicNack 相比少一個引數,不可以進行批量處理…
Multiple 批量訊息處理:
-
true 代表批量應答處理
比如,現在佇列上存在
1 2 3 4
四個訊息,都發送給了消費者,而消費者逐一處理,4
結束了.不管是否
ACK|NACK
都直接將,其它的1 2 3
都以相同的,方式進行 批量處理!好處:在MQ 服務,穩定的時候,支持大量的訊息處理速度…
缺點,容易造成資料丟失💀...
-
flase
建議使用,不批量應答
就是, 一次只處理當前訊息的
ACK|NACK
訊息自動重新入隊
消費者設定了手動ACK 之后....
如果消費者由于某些原因失去連接 其通道已關閉,連接已關倍訓 TCP 連接丟失
導致訊息未發送 ACK 確認
- 消費者監聽 佇列訊息,消費者開始處理,但是處理程序中,消費者突然與MQ 連接斷開
消費者服務掛了
- MQ 正常情況下會與 消費者建立連接,當消費者突然斷開,一段時間沒有回傳,訊息處理的 ack,MQ就會當作消費者出現故障. 將訊息重新交給其它消費者處理!心跳機制?
生產者
Producer.Java
import com.rabbitmq.client.Channel;
import com.wsm.Util.MQChannelUtil;
import java.util.Scanner;
/** 訊息生產者 **/
public class Producer {
// 定義佇列名稱
private final static String QUEUE_NAME = "ack_test";
public static void main(String[] args) throws Exception {
// 工具類創建一個信道
Channel channel = MQChannelUtil.getChannel();
// Java控制臺測驗法訊息:
Scanner scanner = new Scanner(System.in);
// 創建佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 回圈多次發布訊息:
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("訊息發送完畢");
}
}
}
消費者1
Consumer1.Java
- basicConsume(); 消費者監聽訊息方法,第二個引數:
true自動ack
false手動ack
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wsm.Util.MQChannelUtil;
/** 訊息消費者 **/
public class Consumer2 {
// 定義佇列名稱
private final static String QUEUE_NAME = "ack_test";
public static void main(String[] args) throws Exception {
// 工具類創建一個信道
Channel channel = MQChannelUtil.getChannel();
//收到訊息后用來處理訊息的回呼物件
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 為了方便查看處理效果,我們將消費者 執行緒休眠一段時間 模擬處理資料;
try {
Thread.sleep(10000); // 毫秒 *1000;
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody());
System.out.println(message);
/** 接收處理完畢訊息之后給MQ 回復ack
* 引數1 訊息的唯一標識Tag
* 引數2 是否支持批量回復,一般建議false 保證資料安全!
* **/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//取消消費的一個回呼介面 如在消費的時候佇列被洗掉掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("訊息消費被中斷");
};
/** 消費者消費訊息 - 接受訊息: 注意引數兩個回呼函式~ */
/** 引數二 設定 false 手動應答 **/
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}
消費者2
Consumer2.Java
和 消費者1 類似,為了方便測驗更改了,執行緒休眠時間 30s
結果測驗:
測驗1
-
正常的發送訊息,
AA BB CC DD
-
發現啊,消費者1 消費者2 還是輪詢的進行消費資料,只不過消費者2 比較慢…
測驗2
-
正常的發送訊息,
AA BB CC DD
-
當消費者1 接收到訊息后,消費者2 也正在接收訊息,因為有
Thread.sleep(30000)
所以會有很長時間的處理,此時關閉消費者2
MQ 與 消費者2 斷開連接,消費者2 也沒有發送ACK 所以消費者2的訊息將會
重回佇列...又交給了 消費者1來進行消費!
-
當然,實際開發中因為
消費1 | 2 都存在處理了 BB
為了確保資料安全,還要進行冪等的處理!
RabbitMQ 持久化 簡單
正常情況下,RabbitMQ 只是一個訊息中間件 一邊接收者生產者訊息
等待消費者監聽處理...
- 訊息不會持久化保存在
佇列中
- 如果:突然某天 RabbitMQ 突然掛機,那么就會造成資料的丟失:
佇列中為處理的訊息...
生產者最新發送的訊息...
...
- 為了,保證資料的安全,我們需要將
佇列
訊息
都進行持久化處理,防止資料丟失~
佇列持久化
然佇列持久化非常簡單, 只需要一個配置即可:
在宣告佇列的時候,就通過引數就可以完成對佇列的持久化,注意:佇列持久化 并不是 訊息持久化 佇列每次重啟都會恢復但是內部的訊息 還需要另外的配置!
// 讓佇列持久化
boolean durable = true; // false 不持久化 true 持久化
// 宣告佇列
channel.queueDeclare("佇列名", durable, false, false, null); // 設定true 之后,每次MQ重啟的時候,該佇列都會自動重新在虛擬路徑上自動加載...
訊息持久化
需要在 生產者
發送訊息的時候添加一個配置 MessageProperties.PERSISTENT_TEXT_PLAIN
,告訴MQ 這個訊息很重要,要進行持久化保存!
// 發送者發送訊息的時候,帶上 MessageProperties.PERSISTENT_TEXT_PLAIN 告訴,MQ訊息要進行持久化;
channel.basicPublish("交換機", "佇列名", MessageProperties.PERSISTENT_TEXT_PLAIN, "要發送的訊息,位元組傳輸");
-
當然這里,
訊息仍然存在丟失問題
當訊息,剛發到MQ 中,還沒有準備,
存盤磁盤,訊息還在快取的一個"間隔點"
MQ 突然掛了… 也會影響到訊息的 持久化;但這里對于,普通的存盤已經綽綽有余了…
不公平 分發
前提是,設定訊息手動ACK
在最開始的時候我們學習到 RabbitMQ 分發訊息采用的輪訓分發
-
但是在某種場景下這種策略并不是很好:
比方說有兩個消費者在處理任務,
消費者 1 處理任務的速度非常快
消費者 2 處理速度卻很慢
這個時候依然采用默認的
輪詢分發
勢必不太合理… -
為了避免這種情況,MQ 支持,我們切換不同的
分發模式
-
不公平 分發
我們可以通過 信道設定:
channel.basicQos(1);
意思就是如果這個任務我還沒有處理完或者我還沒有應答你,你先別分配給我,我目前只能處理一個任務.
rabbitmq 就會把該任務分配給沒有那么忙的那個空閑消費者,如果沒有空閑消費者,訊息就會堆積在佇列中去~
預取值 分發
預取值 分發
- 就是說,我們在設定
訊息者 連接 佇列
的時候,可以給 信道設定 預取值.
其實預取值,和 不公平分發 很類似
-
都是設定: 消費者 連接 信道 時候設定 ``channel.basicQos(?);` 消費者最高訊息堆積數;
-
``channel.basicQos(0);` 默認0 輪詢模式
-
``channel.basicQos(1);` 不公平分發模式
消費者1
消費者2
都設定,channel.basicQos(1);
消費者最高允許訊息堆積數.兩個消費者每次只能從,佇列中拿一個訊息進行消費,完了就立刻在從 佇列中,在拿,這樣做的快的消費者,自然就處理的訊息多了!不公平分發
-
channel.basicQos(>1);
值大于>1
,預取值 分發消費者1
消費者2
分別跟據服務的性能設定,channel.basicQos(?);
消費者最高允許訊息堆積數.假設: 消費者1
basicQos(2)
消費者2basicQos(5)
這樣,假設佇列中有訊息:
1 2 3 4 5 6 7
,當最初消費者1 | 2
都空閑時候…消費者1獲取1
消費者2獲取2
消費者1獲取3
消費者2獲取4
消費者2獲取5
消費者2獲取6
....
消費者2 允許訊息在信道中最大的堆積數 5當然也有可能會出現,消費者1 處理很快,消費者2 很慢,消費者1處理完1 3,消費者2還在處理2 4567,那后面的 8 9 10 都給1處理…
預取值 分發,就是預計這個消費者,性能高低,設定消費者 允許最高堆積?個訊息等待這個處理!
RabbitMQ - 發布確認confirm
confirm
發布確認機制:
生產者將信道設定成 confirm 模式
-
一旦信道進入
confirm
模式,所有在該信道上面發布的訊息都將會被指派一個唯一的 ID從1開始
-
發送者 —— 訊息 —— 佇列,上后
MQ
broker
就會發送一個確認給生產者
生產者就知道訊息已經正確到達目的佇列了. -
如果
訊息佇列 和 訊息
進行了持久化設定
那么確認訊息會在將訊息寫入磁盤之后發出,broker回傳給生產者的確認訊息已經,發送到佇列上!
發布確認策略:
RabbiMQ 默認是沒有開啟 comfirm 發布確認機制
- 如果要開啟需要呼叫方法 confirmSelect,每當你要想使用發布確認,都需要在 channel 上呼叫該方法
// 開啟發布確認 channel.confirmSelect();
單個發布確認:
它是一種同步確認發布的方式,也就是發布一個訊息之后只有它被確認發布,后續的訊息才能繼續發布
-
這種確認方式有一個最大的缺點就是:發布速度特別的慢,
一次只能發一個!
因為如果沒有確認發布的訊息就會阻塞所有后續訊息的發布,這種方式最多提供每秒不超過數百條發布訊息的吞吐量;
-
優點:保證訊息的100%發送,缺點:對于訊息的發送實在太慢,對于大量資料不適合!
-
實作:
1. 在宣告佇列之后,開啟發布確認,
channel.confirmSelect();
2. 通過信道
channel.waitForConfirms();
來判斷當前訊息是否發送成功!這個方法只有在訊息被確認 的時候才回傳,如果在指定時間范圍內這個訊息沒有被確認那么它將拋出例外.
單個發布確認,只有一個訊息發送 才能發送下一個訊息
Producer.Java
/** 單個發布確認 **/
public static void singleConfirm() throws Exception {
Channel channel = MQChannelUtil.getChannel();
// 佇列宣告
channel.queueDeclare("singleConfirm", false, false, false, null);
// 開啟發布確認
channel.confirmSelect();
// 為了方便計算各個 發布確認策略 耗時: 開始-結束放一個系統時間獲取毫秒數;
long begin = System.currentTimeMillis();
// 發送1000 個訊息....
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", "singleConfirm", null, message.getBytes());
// 服務端回傳 false 或超時時間內未回傳,生產者可以訊息重發
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("訊息發送成功");
}
// else{ /** 訊息重發處理... **/ }
}
// 1000個訊息結束...
long end = System.currentTimeMillis();
System.out.println("發布" + MESSAGE_COUNT + "個單獨確認訊息,耗時" + (end - begin) + "ms");
}
本次執行耗時:67042毫秒
批量發布確認:
相比 單個發布確認
, 發送確認機制,實在是太慢了! 每次只能保證一個訊息的發送成功,實在是太慢了!
批量發布確認:
-
其實和
單個發布確認
實作方式一樣1. 開啟發布確認
channel.confirmSelect();
2. 獲取批量的訊息,是否全部發送到MQ
channel.waitForConfirms();
-
waitForConfirms(); 方法()
單個發布 和 批量發布其實都一樣,都是呼叫
waitForConfirms()
方法,查看當前訊息是否都到達MQ不同的是,
單個發布每次發一條都驗證
批量是在一定數量進行驗證
waitForConfirms();
方法會使,當前 發生者執行緒進行阻塞,等待MQ 回傳資料,MQ回傳 上一次呼叫waitForConfirms() 到現在呼叫waitForConfirms() 所有發送的訊息是否抵達MQ
全部抵達true 則false -
因此:批量發布確認,相當于 單個發布確認,一個批量執行,
大大節省了程序中冗余的一些步驟性能...
當然,如果其中有一個訊息沒有發送到MQ 它并不能確定是那一個 訊息 沒有抵達MQ
Producer.Java
/** 批量發布確認 **/
public static void batchConfirm() throws Exception {
Channel channel = MQChannelUtil.getChannel();
// 佇列宣告
channel.queueDeclare("batchConfirm", false, false, false, null);
// 開啟發布確認
channel.confirmSelect();
// 為了方便計算各個 發布確認策略 耗時: 開始-結束放一個系統時間獲取毫秒數;
long begin = System.currentTimeMillis();
// 批量確認訊息大小,當發送的訊息數到 100 執行 waitForConfirms(); 詢問MQ 當前所有的訊息有沒有抵達~
int batchSize = 100;
// 未確認訊息個數, 每次發送訊息 ++ 用于判斷是否改批量驗證訊息發送;
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
// 發送的訊息,并發送~
String message = i + "";
channel.basicPublish("", "batchConfirm", null, message.getBytes());
// 每次發送一個訊息進行計算當前是第幾個,為批量驗證的訊息;
outstandingMessageCount++;
// 判斷當前的 100 個訊息有沒有都發送的MQ上!
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//為了確保還有剩余沒有確認訊息 再次確認
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("發布" + MESSAGE_COUNT + "個批量確認訊息,耗時" + (end - begin) + "ms");
}
本次執行耗時:1149毫秒
快速了很多...
異步發布確認:
異步發布確認 相對于上面兩個比較復雜,但: 性價比 效率
具有顯著的提升
- 它是采用回呼的方式來完成,訊息傳遞的可靠性.
實作原理:
-
① 發送者 只需要關注 發訊息
**發送者 每次 給MQ 發送訊息的時候,會默認的給每個訊息帶上一個
唯一的ID標識
**后面我們就可以通過這個標識來,確定是那一個訊息發送 成功|失敗
-
② 發送者,方法體中寫一個 異步確認
監聽器
addConfirmListener(ConfirmCallback,ConfirmCallback);
方法引數支持兩個,ConfirmCallback類物件
一個表示接收訊息做的事情
另一個是未接收到訊息做的事情...
ConfirmCallback 是一個
函式式介面
, 支持 lambda運算式 和 內部類形式書寫… -
③ 發送者 一直往MQ 上發送訊息,MQ 每收到一個訊息會,呼叫發送者的
addConfirmListener(ack,nack)
方法告知發送者,訊息成功發送 | 或 未發送成功!
-
發送者,在根據:
addConfirmListener(ack,nack)
來處理,訊息成功處理,訊息失敗處理…
Producer.Java
/** 異步發布確認 **/
public static void syncConfirm() throws Exception{
Channel channel = MQChannelUtil.getChannel();
// 佇列宣告
channel.queueDeclare("syncConfirm", false, false, false, null);
/** 開啟發布確認 **/
channel.confirmSelect();
// 為了方便計算各個 發布確認策略 耗時: 開始-結束放一個系統時間獲取毫秒數;
long begin = System.currentTimeMillis();
/** 步驟一: 創建一個執行緒安全的一個哈希表,用于記錄每一個訊息發送,這樣MQ異步回傳時候可以知道具體是那一個訊息發送成功|失敗 **/
/**
* 執行緒安全有序的一個哈希表,適用于高并發的情況
* 1.輕松的將序號與訊息進行關聯: k,v 存盤結構, k訊息標識 v發送的訊息體,每次發送訊息前先存在集合中;
* 2.輕松批量洗掉條目只要給到序列號: 對于發現成功的訊息,直接從集合中移除...
* 3.支持并發訪問,Concurrent介面 是執行緒安全的; */
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/** 步驟三: 撰寫回呼監聽器,因為: 訊息發送出錯要立刻進行監聽所以,所以創建在發送訊息之前; **/
/** ack 確認收到訊息的一個回呼 1.訊息序列號 2.true 批量確認接受小于等于當前序列號的資料 false 確認當前序列號訊息 */
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
System.out.println("訊息成功接收:"+sequenceNumber);
// ConcurrentNavigableMap方法()回傳的是小于|等于 K 的集合, true:小于等于 false:回傳小于該序列號的資料集合;
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
// 清除該部分確認訊息 confirmed 里保存的都是,MQ 已經接收的訊息;
// 遍歷 confirmed K, 根據 K 洗掉 outstandingConfirms 的值...
// outstandingConfirms 里面保存的都是,MQ 還未確認的訊息...
}else{
//只清除當前序列號的訊息
outstandingConfirms.remove(sequenceNumber);
}
};
// nack 訊息失敗執行{} 可以寫,訊息失敗需要執行的代碼...
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
// 這里就輸出一下為被確認的訊息...
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("發布的訊息"+message+"未被確認,序列號"+sequenceNumber);
};
// 發生者 等待MQ回呼訊息確認的 監聽器, 本次程式值監聽 ack成功的訊息;
channel.addConfirmListener(ackCallback, null);
/** 步驟二: 發送者一直往MQ發送訊息 **/
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "訊息" + i;
// channel.getNextPublishSeqNo() 獲取下一個訊息的序列號,通過序列號與訊息體進行一個關聯,全部都是未確認的訊息體
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
// 發送訊息;
channel.basicPublish("", "syncConfirm", null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("發布" + MESSAGE_COUNT + "個批量確認訊息,耗時" + (end - begin) + "ms");
}
耗時:96毫秒
超級快的好吧
- 發送者,只需要關注訊息的發送,MQ 會將每條訊息發布情況,回呼給發生者
- 發送者每次發訊息前,將訊息存盤在快取中,成功了就洗掉,失敗了就重新發送~
非常nice👍
RabbitMQ 交換機Exchange:
以上我們創建一個: 發送者
佇列
消費者
就可以完成通信了
而,RabbitMQ 的核心思想是: 生產者 生產的訊息不會直接發送到隊佇列上, 甚至不知道佇列的存在,通過一個交換機
-
生產者,只需要關注 往交換機上發送訊息即可!
交換機作業的內容非常簡單:
一方面它接收來自生產者的訊息
另一方面將它們推入佇列
交換機必須確切知道如何處理收到的訊息,這就的由交換機的型別來決定. -
Exchange 交換機型別:
fanout: 廣播模式
發布/訂閱
,交換機給所有的佇列,發送相同的訊息;direct : 路由模式
routing key
交換機,根據對應的routing key
的佇列上發送訊息;topic: 動態路由模式,可以用過一定的規則定義
roting key
使 交換機動態的多樣性選擇佇列
-
* 表示一個單詞
-
# 表示任意數量(零個或多個)單詞
headers: 請求頭模式,
目前用的很少了
,就像請求頭一樣,發送訊息時候附帶頭部資料
,交換機根據訊息的頭部資訊匹配對應的佇列; -
無名交換機:
-
上面的 Demo案例,我們幾乎沒有對
交換機
進行任何的操作,但是,仍然可以進行訊息發送|接收 -
因為:
channel.basicPublish("", "佇列名", null, "發送的訊息".getBytes());
對于 “” 空字串的交換機,MQ 會有默認的交換機進行操作…
臨時佇列:
-
對于有些時候,我們需要連接一個佇列, 而這個佇列,并不常用,用完即丟的情況下,可以考慮使用:
臨時佇列
-
String queueName = channel.queueDeclare().getQueue();
讓服務器 信道,給我們創建一個臨時的佇列,
隨機佇列名稱
一旦我們斷開了消費者的連接,佇列將被自動洗掉
發布訂閱模式 Publish/Subscribe 交換機型別:Fanout
Fanout 型別:
- 這種型別非常簡單,它可以將,它知道的所有的訊息,廣播到所有佇列中去. 也成為:廣播模式
- 常見場景:
某某軟體很多人關注/訂閱了一個博主,博主一更新,所有的粉絲都收到更新訊息!
Fanout 實戰:
-
定義一個生產者,交換機,生產者不停的往交換機中發訊息
-
交換機提前與 一個|多個佇列
系結
,每當有訊息來的時候,交換機會將訊息發送到所有的佇列中去…每個訊息者監聽(訂閱)一個佇列,多個訊息者可以同,監聽到相同的訊息;
生產者:
Producer.Java
/** 訊息生產者 **/
public class Producer {
// 交換機名
public static final String EXCHANGE_NAME = "wsm";
public static void main(String[] args) throws Exception {
// 創建連接物件,宣告交換機 發送訊息
Channel channel = MQChannelUtil.getChannel();
/**
* 宣告一個 exchange
* 1.exchange 的名稱
* 2.exchange 的型別, 可以是String直接寫,也可以是 列舉型別;
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Scanner sc = new Scanner(System.in);
System.out.println("請輸入資訊");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生產者發出訊息" + message);
}
}
}
消費者1:
/** 訊息消費者 **/
public class Consumer1 {
// 定義交換機名稱
public static final String EXCHANGE_NAME = "wsm";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/** 生成一個臨時的佇列 佇列的名稱是隨機的 當消費者斷開和該佇列的連接時 佇列自動洗掉 */
String queueName = channel.queueDeclare().getQueue();
// 系結: 把該臨時佇列系結我們的 exchange 其中 routingkey(也稱之為 binding key)為空字串: Fanout模式 routingkey 沒作用!
channel.queueBind(queueName, EXCHANGE_NAME, "");
// 發送回呼
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("wsm 發布的最新訊息:"+message);
};
// 消費者監聽訊息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
消費者2:
因為:
-
佇列使用了MQ自動生成,所有幾乎代碼無改動一模一樣即可!
String queueName = channel.queueDeclare().getQueue(); 臨時佇列
-
生產者——發送訊息> 交換機 ——將訊息發送到對應的> 佇列 ——> 消費者(監聽佇列,處理訊息…
路由模式 Routing 交換機型別:DIRECT
DIRECT 模式:
-
和 Fanout 模式類似
-
系結
交換機/佇列
時候,需要指定 routing key一個佇列,可以設定多個 routingkey
發送者發送訊息, 會攜帶上 routingkey
-
佇列在與交換機進行系結的時候,會設定好 佇列的
routingkey
生產者 往交換機上發送訊息,交換機只會將訊息 向匹配的佇列上發送訊息,
消費者 監聽佇列訊息消費
DIRECT 實體:
-
創建一個生產者,同時發送兩個訊息,分別指定
Conkey1
Conkey2
-
創建兩個接收者,
一個監聽的佇列 系結交換機時指定 Conkey1
另一個系結交換機時 系結兩個routingkey:Conkey1 Conkey2
-
啟動:生成者 消費1 消費2 查看結果…
生產者:
Producer.Java
/** 訊息生產者 **/
public class Producer {
// 交換機名
public static final String EXCHANGE_NAME = "DIRECT";
public static void main(String[] args) throws Exception {
// 創建連接物件,宣告交換機 發送訊息
Channel channel = MQChannelUtil.getChannel();
/**
* 宣告一個 exchange
* 1.exchange 的名稱
* 2.exchange 的型別, 可以是String直接寫,也可以是 列舉型別; */
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/** 發送訊息 **/
channel.basicPublish(EXCHANGE_NAME, "Conkey1", null, "Conkey1 發送的訊息".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "Conkey2", null, "Conkey2 發送的訊息".getBytes("UTF-8"));
}
}
消費者1:
Consumer1.Java
/** 訊息消費者 **/
public class Consumer1 {
// 定義交換機名稱
public static final String EXCHANGE_NAME = "DIRECT";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/** 生成一個臨時的佇列 佇列的名稱是隨機的 當消費者斷開和該佇列的連接時 佇列自動洗掉 */
String queueName = channel.queueDeclare().getQueue();
// 系結: 把該臨時佇列系結我們的 exchange
// 引數二 設定該佇列和交換和系結的 routingkey
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey1");
// 發送回呼
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("最新的訊息是:"+message);
};
// 消費者監聽訊息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
消費者2:
比 消費者1
多添加:
// 可以設定多個key
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey2");
-
可以多次呼叫
.queueBing("佇列名","交換機","routingkey");
系結 佇列交換機 指定多個 routingkey -
生成者發送訊息,時候指定訊息的
routingkey
交換機根據 key 將對應的訊息發送到 佇列上進行處理!消費者2 可以同時接收到
Conkey1
和Conkey2
發送的訊息
主題模式 Topic 交換機型別:TOPIC
該模式與Routingkey 非常型別,就相當于是一個 動態路由模式!!
-
TOPIC 就像是 DIRECT 的升級版
DIRECT 固定了
routingkey
而,TOPIC 可以動態的進行routingkey選擇
使用上更加的個性化 -
主題模式 可以根據一些特殊的符合匹配多種 Routingkey 的匹配
通配符規則:
#:匹配一個或多個詞
舉例:wsm.# 等于:wsm.1 / wsm.w.s.m / wsm.sm .后多個單詞
*:匹配不多不少恰好1個詞
舉例:wsm.* 等于:wsm.sm / wsm.m .后一個單詞
TOPIC 實體:
- 修改上面的 DIRECT
- 修改生產者的
交換機型別
routingkey
,消費者交換機型別
系結交換機時候佇列的 routingkey
生產者:
Producer.Java
改變發送訊息時候指定的 routingkey
還有交換機型別:TPOIC
//交換機型別
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/** 發送訊息 **/
channel.basicPublish(EXCHANGE_NAME, "Conkey.one", null, "Conkey.one 發送的訊息".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "Conkey.two.123", null, "Conkey.two.123 發送的訊息".getBytes("UTF-8"));
消費者1:
Consumer1.Java
更改交換機型別 Topic
,接收訊息,佇列 交換機系結時候,指定一下 routingkey通配符
// 引數二 設定該佇列和交換和系結的routingkey , Topic模式可以用過 通配符進行動態匹配: * 表示一個任意的單詞;
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey.*");
消費者2:
// 引數二 設定該佇列和交換和系結的routingkey , Topic模式可以用過 通配符進行動態匹配: # 表示一個|多個任意的單詞;
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey.#");
頭部模式 交換機型別:HEAD
使用的很少,跟 TOPIC
動態路由型別,只不過它并不是通過 routingkey 進行訊息與佇列進行匹配
-
headers型別的交換器不依賴于路由鍵的匹配規則來路由訊息,而是根據發送的訊息內容中的headers屬性進行匹配
-
生產者發送訊息的時候可以,給訊息指定一個
head頭部引數 map型別
-
交換機 與 佇列系結的時候也定義一組:
頭部資訊規則, 只有訊息頭部規則 和 佇列的頭部規則 匹配才能發送到對應的頭部上!
不常用了解即可~
生產者:
Producer.Java
/** 訊息生產者 **/
public class Producer {
// 交換機名
public static final String EXCHANGE_NAME = "HEADERS";
public static void main(String[] args) throws Exception {
// 創建連接物件,宣告交換機 發送訊息
Channel channel = MQChannelUtil.getChannel();
/**
* 宣告一個 exchange
* 1.exchange 的名稱
* 2.exchange 的型別, 可以是String直接寫,也可以是 列舉型別; */
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
//發送的訊息的訊息頭head map型別, 需要與佇列的map 規范匹配才可以發送成功! 不然發送失敗(交換機不知道往那個佇列上發送;
HashMap<String ,Object> param = new HashMap<String, Object>();
param.put("id","1");
param.put("name","wsm");
//設定Map 匹配引數!
AMQP.BasicProperties.Builder builder=new AMQP.BasicProperties.Builder();
builder.headers(param);
/** 發送訊息 **/
channel.basicPublish(EXCHANGE_NAME, "", builder.build(), "header的內容lalala~~".getBytes("UTF-8"));
}
}
消費者:
Consumer1.Java
/** 訊息消費者 **/
public class Consumer1 {
// 定義交換機名稱
public static final String EXCHANGE_NAME = "HEADERS";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
/** 生成一個臨時的佇列 佇列的名稱是隨機的 當消費者斷開和該佇列的連接時 佇列自動洗掉 */
String queueName = channel.queueDeclare().getQueue();
//設定佇列上的 map 引數,用于匹配請求時候的引數!
//特殊引數 x-match 值 all 或 any
//all 在發布訊息時攜帶的map 必須和系結在佇列上的所有map 完全匹配
//any 只要在發布訊息時攜帶的有一對鍵值map 滿足佇列定義的多個引數map的其中一個就能匹配上
//注意: 這里是鍵值對的完全匹配,只匹配到鍵了,值卻不一樣是不行的;
HashMap<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1"); // 可以嘗試改變,map 資訊,生產者訊息還能發送到佇列上面~
param.put("name","wsm");
// 系結: 把該臨時佇列系結我們的 exchange
// 佇列系結時需要指定引數,注意雖然不需要路由鍵但仍舊不能寫成null,需要寫成空字串"", 引數四: map引數,規范!
channel.queueBind(queueName, EXCHANGE_NAME, "",param);
// 發送回呼
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("最新的訊息是:"+message);
System.out.println("Map傳入引數資料:"+delivery.getProperties().getHeaders());
};
// 消費者監聽訊息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
Head 匹配規則:
//消費者:佇列規則all 所有匹配即可
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1");
param.put("name","wsm");
//生產者:傳入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","2");
param.put("name","wsm");
//不匹配
//生產者
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1");
param.put("name","wsm");
//匹配
//消費者:佇列規則any 一個匹配即可
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","any");
param.put("id","1");
param.put("name","wsm");
//生產者:傳入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","1"); //匹配
//生產者:傳入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","2"); //不匹配 key /value 都要匹配才可以
....
死信佇列 DLX 💀
死信,顧名思義就是無法被消費的訊息
正常情況下:
- producer 將訊息投遞到 broker 或者直接到queue 里了,consumer 從 queue 取出訊息 進行消費
但
-
某些時候由于特定的原因導致 queue 中的某些訊息無法被消費
-
這樣的訊息如果沒有后續的處理,就變成了死信,有死信自然就有了死信佇列
應用場景:
-
用戶在商城下單成功并點擊去支付后在指定時間支付,
下單成功
,指定時間內訂單未支付下單失敗
-
用戶下單,向MQ 發送一條訂單的訊息,并設定訊息時間 30分鐘,到佇列
succeed 成功佇列
等待用戶確認訂單,支付訂單,訊息被消費 下單成功
如果 30 分鐘用戶沒有下單,則
succeed 成功佇列訊息
超時,為了確保訊息不丟失,將訊息發送到死信交換機 —— defeated死信佇列
消費者接收處理:下單失敗!
死信佇列:
- 相當于對于一個,特定時間|場景 需要被處理的事情,但因為某種原因沒有正常處理,的一個兜底操作…
死信佇列產生:
-
訊息 TTL 過期
TTL是Time To Live的縮寫, 也就是生存時間,
訂單超時支付 下單失敗
-
佇列達到最大長度
佇列滿了,無法再添加資料到 mq 中
-
訊息被拒絕
(basic.reject 或 basic.nack) 并且 requeue=false,
訂單被用戶取消 下單失敗
死信佇列的實作:
- 正常的
生產者 交換機 普通佇列
- 不正常的,為了保證普通佇列,訊息穩定:
當訊息出現意外, 普通佇列上配置了 DXL交換機
,訊息超時 超出佇列... 直接發送到 DLX交換機———— DLX佇列
消費TTL 過期?
生產者:
Producer.Java
/** 訊息生產者 **/
public class Producer {
// 普通交換機名
public static final String EXCHANGE_NAME = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//設定訊息的 TTL 時間 10s秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//該資訊是用作演示佇列個數限制
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(EXCHANGE_NAME, "zhangsan", properties, message.getBytes());
System.out.println("生產者發送訊息:" + message);
}
}
}
消費者:
Consumer1.Java
/** 訊息消費者 **/
public class Consumer1 {
// 普通交換機名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交換機名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
// 宣告死信和普通交換機 型別為 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 宣告死信佇列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信佇列系結:佇列、交換機、路由鍵(routingKey)
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
// 正常佇列系結死信佇列資訊
HashMap<String, Object> params = new HashMap<>();
// 正常佇列設定死信交換機 引數 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常佇列設定死信 routing-key 引數 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
// 正常佇列
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收訊息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到訊息" + message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
先啟動消費者,創建宣告好佇列之后,關閉,啟動生產者發送訊息…
DLX 消費者:
Consumer2.Java
/** 訊息消費者 **/
public class Consumer2 {
//死信交換機名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
//宣告交換機
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//宣告佇列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信訊息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收到訊息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
佇列達到最大長度
訊息生產者代碼去掉 TTL 屬性
C1 消費者修改以下代碼 (啟動之后關閉該消費者 模擬其接收不到訊息)
//設定正常佇列的長度限制,例如發10個,4個則為死信 注意:此時需要把原先佇列洗掉 因為引數改變了
params.put("x-max-length",6);
C2 消費者代碼不變(啟動 C2 消費者)
訊息被拒絕
訊息生產者代碼同上 佇列達到最大長度
生產者一致
C1 消費者代碼(啟動之后關閉該消費者 模擬其接收不到訊息) 注釋 params.put("x-max-length",6);
C2 消費者代碼不變 ,啟動消費者1關閉
然后再啟動消費者 2
SpringBoot 集成 RabbitMQ 模式開發
延遲佇列,其實就是 死信佇列
的一種,所有為了方便查看,使用SpringBoot 來進行搭建順便了解學習一些SpringBoot 集成 RabbitMQ
① 創建SpringBoot 工程 啟動類....
② 引入Maven依賴:pom.xml
<dependencies>
<!-- SpringBoot依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 阿里巴巴fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!-- lombok依賴 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--RabbitMQ 測驗依賴-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
③ 撰寫組態檔:application.yml | properties
# SpringBoot 配置RabbitMQ ip 埠 用戶 密碼;
spring.rabbitmq.host=47.243.109.199
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
延遲佇列 TTL ?
延時佇列,佇列內部是有序的,最重要的特性就體現在它的延時屬性上
延時佇列中的元素是希望 在指定時間到了以后或之前取出和處理 簡單來說,延時佇列就是用來存放需要在指定時間被處理的 元素的佇列
延遲佇列使用場景:
- 訂單在十分鐘之內未支付則自動取消
- 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送訊息提醒
- 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒
**這些場景都有一個特點,需要在某個事件發生之后或者之前的指定時間點完成某一項任務: **就幾乎等于一個 死信佇列
RabbitMQ 的 TTL
TTL 是 RabbitMQ 中一個訊息或者佇列的屬性,表明一條訊息或者該佇列中的所有訊息的最大存活時間,單位是毫秒 RabbitMQ 有兩種方式:
佇列設定TTL
在創建佇列的時候設定佇列的“x-message-ttl”屬性
Map<String, Object> args = new HashMap<>(3);
//宣告當前佇列系結的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//宣告當前佇列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//宣告佇列的 TTL
args.put("x-message-ttl", 10000);
//佇列系結交換機
QueueBuilder.durable(QUEUE_A).withArguments(args).build();
訊息設定TTL
是針對每條訊息設定TTL 生產者 生產訊息時候設定:
// Spring方式
// 編輯引數
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
// 發送訊息
channel.basicPublish("交換機", "佇列", properties, "訊息".getBytes());
// SpringBoot方式
// 通過 rabbitTemplate 發送訊息...
rabbitTemplate.convertAndSend("交換機", "佇列", "訊息", correlationData -> {
correlationData.getMessageProperties().setExpiration("10000");
return correlationData;
});
佇列設定TTL
代碼實作:
-
創建一個交換機 X 和死信交 換機 Y,它們的型別都是direct
-
創建兩個佇列 QA 和 QB
兩者佇列 TTL 分別設定為 10S 和 40S
,訊息超時會進入到死信交換機 —— 發送到 QD死信佇列
交換機 系結 佇列 組態檔:
因為,專案采用SpringBoot 進行管理 原先配置佇列資訊,寫在了生產者和消費者代碼中,現在可寫咋配置類中,生產者只發訊息,消費者只接受訊息
TtlQueueConfig.Java
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
// 普通交換機 普通佇列
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
// 死信交換機
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 死信佇列
public static final String DEAD_LETTER_QUEUE = "QD";
// 宣告 xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 宣告 死信佇列交換機
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//宣告佇列 A ttl 為 10s 并系結到對應的死信交換機
@Bean("queueA")
public Queue queueA() { // 導包: org.springframework.amqp.core
Map<String, Object> args = new HashMap<>(3);
//宣告當前佇列系結的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//宣告當前佇列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//宣告佇列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 宣告佇列 A 系結 X 交換機
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//宣告佇列 B ttl 為 40s 并系結到對應的死信交換機
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//宣告當前佇列系結的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//宣告當前佇列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//宣告佇列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//宣告佇列 B 系結 X 交換機
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
//宣告死信佇列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
//宣告死信佇列 QD 系結關系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
生產者:
controller 用戶發送一個請求,服務將資料進行處理直接發送到MQ,由其它服務模塊處理…
SendMsgController .Java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
// RabbitTemplate 對rabbitmq 的服務介面API 進行了封裝;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("當前時間:{},發送一條資訊給兩個 TTL 佇列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "訊息來自 ttl 為 10S 的佇列: " + message);
rabbitTemplate.convertAndSend("X", "XB", "訊息來自 ttl 為 40S 的佇列: " + message);
}
}
消費者:
DeadLetterQueueConsumer .Java
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
/**
* 消費者 - 死信佇列
* @author wsm
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
//@RabbitListener 負責監聽具體那個佇列...
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("當前時間:{},收到死信佇列資訊{}", new Date().toString(), msg);
}
}
-
@RabbitListener 宣告在放上,處理要監聽的佇列,訊息進行處理...
-
流量器請求
http://localhost:8080/ttl/sendMsg/拉拉拉
-
間隔 10s 40s 控制臺輸出:
2022-01-23 19:12:01.380 INFO 1508 --- [nio-8080-exec-3] c.example.controller.SendMsgController : 當前時間:Sun Jan 23 19:12:01 CST 2022,發送一條資訊給兩個 TTL 佇列:嘻嘻嘻 2022-01-23 19:12:11.684 INFO 1508 --- [ntContainer#0-1] c.e.consumer.DeadLetterQueueConsumer : 當前時間:Sun Jan 23 19:12:11 CST 2022,收到死信佇列資訊訊息來自 ttl 為 10S 的佇列: 拉拉拉 2022-01-23 19:12:41.566 INFO 1508 --- [ntContainer#0-1] c.e.consumer.DeadLetterQueueConsumer : 當前時間:Sun Jan 23 19:12:41 CST 2022,收到死信佇列資訊訊息來自 ttl 為 40S 的佇列: 拉拉拉
訊息設定TTL 存在問題bug
上面通過,SpringBoot 集成了 RabbitMQ 通過 佇列設定TTL
-
如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個佇列, 所以一般建議使用:
訊息設定TTL
-
生產者 每次發送訊息的時候,設定訊息的存活時間,這樣:
即使只有一個佇列,也可以設定不同訊息的 延遲時間
修改上面業務,添加一個 QC
普通佇列,不設定佇列 延遲時間
… 每次發送訊息給訊息設定延遲時間...
MQ 組態檔:
MsgTtlQueueConfig.Java
@Configuration
public class MsgTtlQueueConfig {
// 死信交換機
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 普通的佇列
public static final String QUEUE_C = "QC";
//宣告佇列 C 死信交換機
@Bean("queueC")
public Queue queueB() {
HashMap<String, Object> args = new HashMap<>(3);
//宣告當前佇列系結的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//宣告當前佇列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//沒有宣告 TTL 屬性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
//宣告佇列 B 系結 X 交換機
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
生產者:
SendMsgController.Java
/**
* 延時佇列優化
* @param message 訊息
* @param ttlTime 延時的毫秒
*/
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
// 發送訊息的時候,設定訊息的延遲時間...
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("當前時間:{},發送一條時長{}毫秒 TTL 資訊給佇列 C:{}", new Date(), ttlTime, message);
}
消費者:
不需要任何更改,只需要等待接收訊息即可…
結果查看 存在bug
瀏覽器請求:
-
http://localhost:8080/ttl/sendExpirationMsg/你好1/20000
-
http://localhost:8080/ttl/sendExpirationMsg/你好1/4000
-
-
因為 RabbitMQ 只會檢查第一個訊息是否過期,如果過期則丟到死信佇列,
如果第一個訊息的延時時長很長,而第二個訊息的延時時長很短,第二個訊息并不會優先得到執行 這是RabbitMQ 的bug 好在已經存在插件可以解決該問題💡
訊息設定TTL 插件解決bug
安裝插件:rabbitmq_delayed_message_exchange
下載地址🚀
# 工具引入,插件安裝包;
[root@iZj6ciuzx7luldnazt4iswZ wsm]# ls
erlang-21.3-1.el7.x86_64.rpm rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq-server-3.8.8-1.el7.noarch.rpm
# 將插件移動到 RabbitMQ的plugins 包下: /usr/lib 是linux 默認安裝服務路徑...
[root@iZj6ciuzx7luldnazt4iswZ wsm]# cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
# rabbitmq 開始安裝啟動插件補丁...
[root@iZj6ciuzx7luldnazt4iswZ wsm]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@iZj6ciuzx7luldnazt4iswZ:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@iZj6ciuzx7luldnazt4iswZ...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
# 重啟服務:
[root@iZj6ciuzx7luldnazt4iswZ wsm]# systemctl restart rabbitmq-server
安裝成功,查看頁面中發現,交換機多了一種信的型別:x-delayed-message
測驗實作:
新增了一個佇列delayed.queue,一個自定義交換機 delayed.exchange,系結關系如下
- 正常的一組生產者消費者,設定自定義交換機型別
生產者發送訊息指定訊息 延遲,到交換機上 到達固定的時間才會發送到交換機上...
來實作訊息的延遲TTL
組態檔類代碼
在我們自定義的交換機中,這是一種新的交換型別
- 該型別訊息支持延遲投遞機制訊息傳遞后并不會立即投遞到目標佇列中
- 而是存盤在 mnesia(一個分布式資料系統)表中,當達到投遞時間時,才投遞到目標佇列中
DelayedQueueConfig.Java
@Configuration
public class DelayedQueueConfig {
// 佇列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
// 交換機
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
// key
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 宣告佇列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 自定義交換機 我們在這里定義的是一個延遲交換機
@Bean
public CustomExchange delayedExchange() {
HashMap<String, Object> args = new HashMap<>();
//自定義交換機的型別
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 系結 佇列和交換機;
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生產者
SendMsgController.Java
// 交換機 和 routingkey
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 請求介面:
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 當 前 時 間 : {}, 發 送 一 條 延 遲 {} 毫秒的資訊給佇列 delayed.queue:{}", new Date(), delayTime, message);
}
消費者
DelayQueueConsumer.Java
/**
* 消費者 - 基于插件的延時佇列
*
* @author wsm
*/
@Slf4j
@ComponentScan
public class DelayQueueConsumer {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info("當前時間:{},收到延時佇列的訊息:{}", new Date().toString(), msg);
}
}
結果查看
瀏覽器請求:
-
http://localhost:8080/ttl/sendDelayMsg/wsm/20000
-
http://localhost:8080/ttl/sendDelayMsg/www/4000
-
-
ok,發送的訊息,確實在 4s 20s 進行了處理
發布確認高級:
發布確認 springboot 版本
在生產環境中由于一些不明原因,導致 RabbitMQ 重啟
- 在 RabbitMQ 重啟期間生產者訊息投遞失敗, 導致訊息丟失,需要手動處理和恢復
- 于是,我們開始思考,如何才能進行 RabbitMQ 的訊息可靠投遞呢?
確認機制方案:
-
① 生產者 每次發送訊息的時候,將訊息存入快取中,
Map kv結構:k每個訊息唯一的標識 v每個訊息體
-
② 將訊息發送到 交換機
交換機接收到訊息,回傳 生產者
ack
生產者根據對于的k 洗掉快取資料交換機超時|宕機,沒有收到訊息,生產者
回呼 nack
生產者,重新發送訊息,或其他處理 -
實體說明:
-
① SpringBoot 組態檔
開啟發布確認模式
-
② 添加配置類,宣告定義:
交換機 direct型別
佇列
系結
資訊為了方便測驗,訊息沒有發送到佇列上,訊息丟失的場景,使用
direct型別:發送訊息指定 routingkey 只會發送到相同的 佇列上
,沒有匹配的佇列 訊息丟失
-
③ 撰寫訊息回呼類
生產者——發送訊息——交換機(接收到訊息,進行回呼
ack
, 長時間沒有收到也會回呼觸發nack
)但,注意
交換機將訊息發送到對應佇列上,如果,訊息沒有匹配的佇列,所以訊息還是會丟失(沒有匹配的佇列,發送;
-
④ 撰寫訊息生產者:
發送兩個訊息,一個與佇列匹配routingkey 另一個不匹配…
-
⑤ 撰寫消費者,監聽佇列進行消費…
注意:首先要開啟Rabbit MQ的發布確認模式:
# 開啟RabbitMQ 發布確認模式:
spring.rabbitmq.publisher-confirm-type=correlated
# NONE 值是禁用發布確認模式,是默認值
# CORRELATED 值是發布訊息成功到交換器后會觸發回呼方法
# SIMPLE 值經測驗有兩種效果
# 其一效果和 CORRELATED 值一樣會觸發回呼方法
# 其二在發布訊息成功后使用 rabbitTemplate 呼叫 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節點回傳發送結果
# 根據回傳結果來判定下一步的邏輯,注意的點是 waitForConfirmsOrDie 方法如果回傳 false 則會關閉 channel 則接下來無法發送訊息到 broker
添加配置類
宣告定義:交換機 佇列 并進行系結
ConfirmConfig.Java
/** SpringBoot 訊息確認模式 **/
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
//宣告業務 Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
// 宣告確認佇列
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// 宣告確認佇列系結關系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
- 宣告
交換機confirm.exchange
佇列 confirm.queue
佇列與交換機系結 routingkey: key1
訊息回呼類
com.example.producercallack包下: MyCallBack.Java
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 交換機不管是否收到訊息的一個回呼方法
*
* @param correlationData 訊息相關資料
* @param ack 交換機是否收到訊息, true(ack) false(nack)
* @param cause 為收到訊息的原因: 例外資訊
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交換機已經收到 id 為:{}的訊息", id);
} else {
log.info("交換機還未收到 id 為:{}訊息,原因:{}", id, cause);
}
}
}
生產者
ProducerController.Java
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
@Autowired // rabbitmq 模板物件;
private RabbitTemplate rabbitTemplate;
@Autowired // 發布確認訊息,訊息回呼方法類;
private MyCallBack myCallBack;
//依賴注入 rabbitTemplate 之后再設定它的回呼物件
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(myCallBack);
}
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
/**
* 訊息回呼和退回
* @param message
*/
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
//指定訊息 id 為 1
CorrelationData correlationData1 = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
log.info(routingKey + "發送訊息內容:{}", message + routingKey);
CorrelationData correlationData2 = new CorrelationData("2");
routingKey = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
log.info(routingKey + "發送訊息內容:{}", message + routingKey);
}
}
- 生產者發送兩個訊息,一個訊息key1有匹配的佇列,另一個key2沒有匹配的佇列
消費者
ConfirmConsumer.Java
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
log.info("消費方法接受到佇列 confirm.queue 訊息:{}", msg);
}
}
結果測驗:
瀏覽器請求:http://localhost:8080/confirm/sendMessage/你好
- 圖片 cmd 輸出有一點問題…
回呼方法接收
應該是消費方法接收
訊息回退🔙:
對于上面的操作: 如果發現該訊息不可路由,那么訊息會被直接丟棄,此時生產者是不知道訊息被丟棄這個事件的
- 如何讓無法被路由的訊息幫我想辦法處理一下?
最起碼通知我一聲,我好自己處理啊
通過設定 mandatory
引數可以在當訊息傳遞程序中不可達目的地時將訊息回傳給生產者:
#訊息退回
spring.rabbitmq.publisher-returns=true
修改回呼介面:
com.example.producercallack包下: MyCallBack.Java
-
實作:
RabbitTemplate.ReturnsCallback介面
低版本可能沒有
RabbitTemplate.ReturnsCallback
請用RabbitTemplate.ReturnCallback
-
添加介面實作:
returnedMessage(ReturnedMessage returned)
當訊息無法路由的時候的回呼方法
//當訊息無法路由的時候的回呼方法
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("訊息:{},被交換機 {} 退回,原因:{},路由key:{},code:{}",
new String(returned.getMessage().getBody()), returned.getExchange(),
returned.getReplyText(), returned.getRoutingKey(),
returned.getReplyCode());
}
低版本:訊息無法路由回呼方法()
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
log.info("訊息:{}被服務器退回,退回原因:{}, 交換機是:{}, 路由 key:{}",new String(message.getBody()),replyText, exchange, routingKey);
}
修改發送者 ProducerController
- 修改RabbitTemplate 的 init() 初始化配置:
//依賴注入 rabbitTemplate 之后再設定它的回呼物件
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(myCallBack);
/**
* true:交換機無法將訊息進行路由時,會將該訊息回傳給生產者
* false:如果發現訊息無法進行路由,則直接丟棄
*/
rabbitTemplate.setMandatory(true);
//設定回退訊息交給誰處理
rabbitTemplate.setReturnsCallback(myCallBack);
}
重啟測驗:
http://localhost:8080/confirm/sendMessage/你好
ok, 訊息成功回退,剩下的處理代碼可以自定義了…
Rabbit MQ概念:
冪等
因為訊息 ack 持久化機制存在一定的缺點
-
持久化機制保證訊息100%消費:
消費者處理訊息 突然崩潰,長時間沒有處理完,佇列中的訊息不會洗掉,而是發送給其它消費者處理,如果這個時候消費者恢復了
就有相同消費者,消費同一個資料的情況了!
-
為了解決這個問題 RabbitMQ 消費者通常都需要做
冪等性
操作 -
冪等:
無論,程式執行多少次,結果不會發送任何改變!
實作原理:
-
MQ 消費者的冪等性的解決一般使用全域 ID
或者寫個唯一標識比如時間戳 或者 UUID 或者訂單消費者消費 MQ 中的訊息也可利用 MQ 的該 id 來判斷
-
利用 redis 執行 setnx 命令,天然具有冪等性,從而實作不重復消費
redis
的setnx 也就是只有不存在key的時候才設定
每個訊息具有一個唯一的標識, 消費者第一次消費成功的時候,使用
setnx
設定,這樣無論后面多少次操作,都不在進行操作了!
優先級
使用場景:
-
通常商城專案中:
訂單催付的場景
我們的客戶在天貓下的訂單,淘寶會及時將訂單推送給我們,如果在用戶設定的時間內未付款那么就會給用戶推送一條短信提醒,很簡單的一個功能對吧
但是,tmall 商家對我們來說,肯定是要分大客戶和小客戶的對吧,比如像蘋果,小米這樣大商家一年起碼能給我們創造很大的利潤 所以理應當然,他們的訂單必須得到優先處理
如何實作:
- 控制臺頁面添加
-
佇列中代碼添加優先級
Map<String, Object> params = new HashMap(); params.put("x-max-priority", 10); channel.queueDeclare("hello", true, false, false, params);
-
訊息中代碼添加優先級
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();
注意事項:
要讓佇列實作優先級需要做的事情有如下事情
- 佇列需要設定為優先級佇列,訊息需要設定訊息的優先級
- 消費者需要等待訊息已經發送到佇列中才去消費因為,這樣才有機會對訊息進行排序
生產者:
public class PriorityProducer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//給訊息賦予一個 priority 屬性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
if (i == 5) {
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
} else {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("發送訊息完成:" + message);
}
}
}
消費者:
public class PriorityConsumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//設定佇列的最大優先級 最大可以設定到 255 官網推薦 1-10 如果設定太高比較吃記憶體和 CPU
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
//推送的訊息如何進行消費的介面回呼
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消費的一個回呼介面 如在消費的時候佇列被洗掉掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("訊息消費被中斷");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
惰性佇列
使用場景
-
RabbitMQ 從 3.6.0 版本開始引入了惰性佇列的概念
惰性佇列會盡可能的將訊息存入磁盤中,而在消費者消費到相應的訊息時才會被加載到記憶體中,它的一個重要的設計目標是能夠支持更長的佇列,即支持更多的訊息存盤
當消費者由于各種各樣的原因(比如消費者下線、宕機亦或者是由于維護而關閉等)而致使長時間內不能消費訊息造成堆積時
-
默認情況下,當生產者將訊息發送到 RabbitMQ 的時候:
佇列中的訊息會盡可能的存盤在記憶體之中, 這樣可以更加快速的將訊息發送給消費者
持久化
在被寫入磁盤的同時也會在記憶體中駐留一份備份當RabbitMQ 需要釋放記憶體的時候,會將記憶體中的訊息換頁至磁盤中,這個操作會耗費較長的時間,也會阻塞佇列的操作,進而無法接收新的訊息.
兩種模式
佇列具備兩種模式:default默認
和 lazy
-
lazy 模式即為惰性佇列的模式,
可以通過呼叫 channel.queueDeclare 方法的時候在引數中設定
也可以通過 Policy 的方式設定,如果一個佇列同時使用這兩種方式設定的話,那么 Policy 的方式具備更高的優先級
-
在佇列宣告的時候可以通過“x-queue-mode”引數來設定佇列的模式,取值為“default”和“lazy”
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
記憶體開銷對比
在發送 1 百萬條訊息,每條訊息大概占 1KB 的情況下,普通佇列占用記憶體是 1.2GB,而惰性佇列僅僅 占用 1.5MB
常見錯誤:
佇列宣告錯誤:
channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_test' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
有的時候對于,已經宣告的佇列,更改了其配置,需要在RabbitMQ管理頁面手動洗掉MQ的佇列,才能進行重新宣告,不然會報錯…
完:
終于寫完了…
需要代碼,安裝工具的兄弟可以下方下載: 點個👍吧!
鏈接:https://pan.baidu.com/s/1M0m0xKBtZlAs3v3FYKq6Tw
提取碼:2540
MQ
-
大神筆記:
-
https://juejin.cn/post/7051469607806173221 超級詳細筆記
https://note.oddfar.com/rabbitmq/#%E8%A7%86%E9%A2%91%E6%95%99%E7%A8%8B
RPC + MQ: https://zhuanlan.zhihu.com/p/48230422
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/420474.html
標籤:其他
上一篇:九章云極DataCanvas公司榮獲機器之心三大獎項,助力產業數智化升級
下一篇:返回列表