📒博客首頁:崇尚學技術的科班人
🏇小肖來了
🍣今天給大家帶來的文章是《RabbitMQ發布確認和交換機基礎總結與實戰》🍣
🍣這是RabbitMQ的發布確認和交換機基礎總結與實戰🍣
🍣希望各位小伙伴們能夠耐心的讀完這篇文章🍣
🙏博主也在學習階段,如若發現問題,請告知,非常感謝🙏
💗同時也非常感謝各位小伙伴們的支持💗
🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣原始碼地址🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣
文章目錄
- 1、發布確認
- 1.1、發布確認的引出
- 1.2、發布確認的策略
- 1.2.1、開啟發布確認的方法
- 1.2.2、單個確認發布
- 1.2.3、批量確認發布
- 1.2.4、異步確認發布
- 1.2.5、如何處理異步未確認訊息
- 1.2.6、以上3種發布確認的速度對比
- 2、交換機
- 2.1、Exchanges
- 2.1.1、概念
- 2.1.2、型別
- 2.1.3、無名exchange
- 2.2、臨時佇列
- 2.3、系結(binding)
- 2.4、Fanout(發布訂閱模式)
- 2.4.1、介紹
- 2.4.2、實戰
- 2.5、Direct(路由模式)
- 2.5.1、介紹
- 2.5.2、實戰
- 2.6、Topic
- 2.6.1、介紹
- 2.6.2、Topic的要求
- 2.6.3、Topic的匹配案例
- 2.6.4、實戰
1、發布確認
1.1、發布確認的引出

一個訊息的持久化需要經歷的步驟:
- 設定要求佇列持久化,
- 設定要求佇列中的訊息必須持久化,
- 發布確認
- 如果缺少了發布確認的話,那么訊息在磁盤上持久化之前會發生丟失,從而不能滿足訊息持久化的目的,
1.2、發布確認的策略
1.2.1、開啟發布確認的方法
Channel channel = RabbitmqUtil.getChannel();
//開啟發布確認
channel.confirmSelect();
- 發布確認默認是沒有開啟的,如果需要開啟需要呼叫
confirmSelect,每當需要使用發布確認的時候,都需要呼叫該方法,
1.2.2、單個確認發布
- 單個確認發布是一種簡單的確認方式,它是一種同步確認發布的方式,也就是發布一個訊息之后只有它被確認發布,后續的訊息才能繼續發布,
- 該確認方式主要通過
waitForConfirms方法實作,這個方法只有在訊息被確認的時候才會回傳,如果在指定時間范圍內這個訊息沒有被確認那么它將會拋出例外, - 這種確認方式的最大的缺點就是:發布速度特別慢,
public static void ConfirmMessageIndividually() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 進行單個發布確認
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("訊息發送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("單個確認發送" + MESSAGE_COUNT + "條訊息所消耗的時間是" + (end - begin) + "ms");
}
1.2.3、批量確認發布
- 先發布一批訊息然后一起確認,
- 缺點:當發生故障導致發布出現問題時,不知道那個訊息出現了問題,我們必須將整個批處理保存在記憶體中,以記錄重要的訊息而后重新發布訊息,
public static void ConfirmMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
// 批量處理訊息的個數
int batchSize = 100;
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 進行批量發布確認
if(i % batchSize == 0){
channel.waitForConfirms();
System.out.println("批量處理訊息成功");
}
}
long end = System.currentTimeMillis();
System.out.println("批量確認發送" + MESSAGE_COUNT + "條訊息所消耗的時間是" + (end - begin) + "ms");
}
1.2.4、異步確認發布
原理

- 異步確認發布是利用
回呼函式來達到訊息可靠性傳遞的,這個中間件也是通過函式回呼來保證是否投遞成功,
代碼
public static void ConfirmMessageAsync() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
ConfirmCallback ackCallback = (var1,var2)->{
System.out.println("已確認的訊息" + var1);
};
ConfirmCallback nackCallback = (var1,var2)->{
System.out.println("未確認的訊息" + var1);
};
/**
* 1. 確認收到訊息的回呼函式
* 2. 未確認收到訊息的回呼函式
*/
channel.addConfirmListener(ackCallback,nackCallback);
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("異步確認發送" + MESSAGE_COUNT + "條訊息所消耗的時間是" + (end - begin) + "ms");
}
1.2.5、如何處理異步未確認訊息
- 最好的解決方案就是把未確認的訊息放到一個基于記憶體的能被發布執行緒訪問的佇列,比如說
ConcurrentSkipListMap
public static void ConfirmMessageAsync() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
/**
* 1. 訊息標識
* 2. 是否批量處理
*/
ConfirmCallback ackCallback = (var1,var2)->{
if(var2){
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
longStringConcurrentNavigableMap.clear();
}else{
map.remove(var1);
}
String message = map.get(var1);
System.out.println("已確認的訊息是:" + message + " 已確認的訊息tag:" + var1);
};
ConfirmCallback nackCallback = (var1,var2)->{
// 未確認的訊息
String s = map.get(var1);
System.out.println(s);
System.out.println("未確認的訊息" + var1);
};
/**
* 1. 確認收到訊息的回呼函式
* 2. 未確認收到訊息的回呼函式
*/
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 1. 將訊息保存到一個執行緒安全地佇列中
map.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("異步確認發送" + MESSAGE_COUNT + "條訊息所消耗的時間是" + (end - begin) + "ms");
}
1.2.6、以上3種發布確認的速度對比
- 單個發布確認:同步等待確認,簡單,但吞吐量非常有限,
- 批量確認發布:批量同步等待確認,簡單,合理的吞吐量,一旦出現問題但很難推斷出是哪一條訊息出現了問題,
- 異步確認發布:最佳性能和資源使用,在出現錯誤的情況下可以很好地控制,但是實作起來稍微難些,
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.xiao.utils.RabbitmqUtil;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class ConfirmMessage {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
// 單個發布確認
ConfirmMessageIndividually(); // 單個確認發送1000條訊息所消耗的時間是680ms
// 批量發布確認
ConfirmMessageBatch(); //批量確認發送1000條訊息所消耗的時間是112ms
//異步發布確認
ConfirmMessageAsync(); // 異步確認發送1000條訊息所消耗的時間是41ms
// 異步確認發送1000條訊息所消耗的時間是33ms
}
public static void ConfirmMessageIndividually() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 進行單個發布確認
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("訊息發送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("單個確認發送" + MESSAGE_COUNT + "條訊息所消耗的時間是" + (end - begin) + "ms");
}
public static void ConfirmMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
// 批量處理訊息的個數
int batchSize = 100;
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 進行批量發布確認
if(i % batchSize == 0){
channel.waitForConfirms();
System.out.println("批量處理訊息成功");
}
}
long end = System.currentTimeMillis();
System.out.println("批量確認發送" + MESSAGE_COUNT + "條訊息所消耗的時間是" + (end - begin) + "ms");
}
public static void ConfirmMessageAsync() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
/**
* 1. 訊息標識
* 2. 是否批量處理
*/
ConfirmCallback ackCallback = (var1,var2)->{
if(var2){
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
longStringConcurrentNavigableMap.clear();
}else{
map.remove(var1);
}
String message = map.get(var1);
System.out.println("已確認的訊息是:" + message + " 已確認的訊息tag:" + var1);
};
ConfirmCallback nackCallback = (var1,var2)->{
// 未確認的訊息
String s = map.get(var1);
System.out.println(s);
System.out.println("未確認的訊息" + var1);
};
/**
* 1. 確認收到訊息的回呼函式
* 2. 未確認收到訊息的回呼函式
*/
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 1. 將訊息保存到一個執行緒安全地佇列中
map.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("異步確認發送" + MESSAGE_COUNT + "條訊息所消耗的時間是" + (end - begin) + "ms");
}
}
2、交換機

- 在這一部分中,我們將做一些完全不同的事情 – 我們將訊息傳達給多個消費者,這種模式成為“發布/訂閱模式”,這里需要使用到交換機,
2.1、Exchanges
2.1.1、概念
- RabbitMQ訊息傳遞模型的核心思想是:生產者生產的訊息從不會直接發送到佇列,
- 相反,生產者只能將訊息發送到交換機,
2.1.2、型別
- 直接(direct)
- 主題(topic)
- 標題(headers)
- 扇出(fanout)
2.1.3、無名exchange
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
- 我們在發送訊息時,
""表示的就是默認的無名佇列, - 訊息能路由發送到佇列中其實是由
routingKey系結key指定的,
2.2、臨時佇列
臨時佇列:一個具有隨即名稱的佇列,一旦我們斷開了連接,佇列將被自動洗掉,
String queueName = channel.queueDeclare().getQueue();

2.3、系結(binding)
- 系結其實是交換機和佇列之間的橋梁,它能夠標識哪個交換機和哪個佇列進行了系結關系,

2.4、Fanout(發布訂閱模式)
2.4.1、介紹
-
它是將接收到的所有訊息廣播到它知道的所有佇列中,
-
系統有默認的的
Fanout交換機型別

2.4.2、實戰

- 交換機轉發一條訊息,其所系結的所有佇列都可以接收到訊息,
1. 兩個消費者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogs01 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 宣告交換機
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 宣告一個臨時佇列
String queueName = channel.queueDeclare().getQueue();
/**
* 1. 佇列名稱
* 2. 交換機名稱
* 3. RoutingKey
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("等待接收訊息");
DeliverCallback deliverCallback = (var1,var2)->{
System.out.println("ReceiveLogs01控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(queueName,true,deliverCallback,var1->{});
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogs02 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 宣告交換機
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 宣告一個臨時佇列
String queueName = channel.queueDeclare().getQueue();
/**
* 1. 佇列名稱
* 2. 交換機名稱
* 3. RoutingKey
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("等待接收訊息");
DeliverCallback deliverCallback = (var1,var2)->{
System.out.println("ReceiveLogs02控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(queueName,true,deliverCallback,var1->{});
}
}
2. 一個生產者
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.util.Scanner;
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("成功發送訊息:" + message);
}
}
}
3. 測驗結果
生產者

消費者


- 所以在
fanout模式下,所有佇列都可以收到訊息,
2.5、Direct(路由模式)
2.5.1、介紹

- 直接交換機和
fanout交換機的差別在于RoutingKey的系結上,它系結的的多個佇列的key一般是不同的,如果是相同的,那么它表現得就和fanout有點類似,
2.5.2、實戰
佇列和交換機的系結關系

1. 生產者
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.util.Scanner;
public class DirectLogs {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes("UTF-8"));
System.out.println("成功發送訊息:" + message);
}
}
}
2. 兩個消費者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 宣告交換機
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
// 宣告佇列
channel.queueDeclare("console",false,false,false,null);
// 進行系結
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
System.out.println("等待接收訊息");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("ReceiveLogsDirect01控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume("console",true,deliverCallback,var1->{});
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogsDirect02 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 宣告交換機
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
// 宣告佇列
channel.queueDeclare("disk",false,false,false,null);
// 進行系結
channel.queueBind("disk",EXCHANGE_NAME,"error");
System.out.println("等待接收訊息");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("ReceiveLogsDirect02控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume("disk",true,deliverCallback,var1->{});
}
}
3. 測驗結果
- 當生產者進行發送訊息的時候,它會優先進行
RoutingKey的比較,然后才會發送給相應的佇列,
2.6、Topic
2.6.1、介紹
之前我們Fanout可以將所有訊息發送到所有佇列,direct可以將訊息發送到某個佇列,但我們假設我們當前有3個佇列,我們想只發送訊息到其中的兩個佇列,那么這就需要Topic,
2.6.2、Topic的要求
topic的RoutingKey不能隨意寫,它必須是一個單詞串列,以點號隔開,*(星號)可以代替一個單詞,#(井號)可以代替零個或多個單詞
2.6.3、Topic的匹配案例

quick.orange.rabbit:被佇列Q1、Q2接收到lazy.orange.elephant:被佇列Q1、Q2接收到quick.orange.fox:被佇列Q1接收到lazy.brown.fox:被佇列Q2接收到lazy.pink.rabbit:雖然滿足兩個系結但只被佇列Q2接收一次,quick.brown.fox:不匹配任何系結不會被任何佇列接收到會被丟棄,quick.orange.male.rabbit:是四個單詞不匹配任何系結會被丟棄lazy.orange.male.rabbit:是四個單詞當匹配Q2
- 當一個佇列系結鍵是
#,那么這個佇列將接收所有資料,就有點像fanout了, - 如果佇列系結鍵當中沒有
#和*出現,那么該佇列系結型別就是direct了,
2.6.4、實戰
1. 兩個消費者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "Q1";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接受訊息.....");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("ReceiveLogsTopic01控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
System.out.println("接收佇列:" + queueName + " 接受的鍵:" + var2.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,var1->{});
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
public class ReceiveLogsTopic02 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "Q2";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("等待接受訊息.....");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("ReceiveLogsTopic02控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
System.out.println("接收佇列:" + queueName + " 接受的鍵:" + var2.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,var1->{});
}
}
2. 生產者
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
Map<String,String> map = new HashMap<>();
map.put("quick.orange.rabbit","被佇列Q1、Q2接收到");
map.put("lazy.orange.elephant","被佇列Q1、Q2接收到");
map.put("quick.orange.fox","被佇列Q1接收到");
map.put("lazy.brown.fox","被佇列Q2接收到");
map.put("lazy.pink.rabbit","雖然滿足兩個系結但只被佇列Q2接收一次,");
map.put("quick.brown.fox","不匹配任何系結不會被任何佇列接收到會被丟棄,");
map.put("quick.orange.male.rabbit","是四個單詞不匹配任何系結會被丟棄");
map.put("lazy.orange.male.rabbit","是四個單詞當匹配Q2");
for(String key : map.keySet()){
String message = map.get(key);
channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes("UTF-8"));
System.out.println("發送的訊息是:" + message);
}
}
}
3. 測驗結果
1. 生產者

2. 消費者


轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/393914.html
標籤:java
