發布確認原理
生產者將信道設定成 confirm 模式,一旦信道進入 confirm 模式, 所有在該信道上面發布的訊息都將會被指派一個唯一的 ID(從 1 開始),一旦訊息被投遞到所有匹配的佇列之后, broker就會發送一個確認給生產者(包含訊息的唯一 ID),這就使得生產者知道訊息已經正確到達目的佇列了,如果訊息和佇列是可持久化的,那么確認訊息會在將訊息寫入磁盤之后發出, broker 回傳給生產者的確認訊息中 delivery-tag 域包含了確認訊息的序列號,此外 broker 也可以設定basic.ack 的 multiple 域,表示到這個序列號之前的所有訊息都已經得到了處理,
confirm 模式最大的好處在于他是異步的,一旦發布一條訊息,生產者應用程式就可以在等信道回傳確認的同時繼續發送下一條訊息,當訊息最終得到確認之后,生產者應用便可以通過回呼方法來處理該確認訊息,如果 RabbitMQ 因為自身內部錯誤導致訊息丟失,就會發送一條 nack 訊息,生產者應用程式同樣可以在回呼方法中處理該 nack 訊息,

發布確認的策略
開啟發布確認的方法
發布確認默認是沒有開啟的,如果要開啟需要呼叫方法 confirmSelect,每當你要想使用發布確認,都需要在 channel 上呼叫該方法
Channel channel = RabbitMqUtils.getChannel();
// 生產者 信道開啟發布確認
channel.confirmSelect();

測驗方法
發布確認模式
1、單個確認模式
2、批量確認模式
3、異步確認
測驗三種確認方式的執行時間
單個確認發布
這是一種簡單的確認方式,它是一種同步確認發布的方式,也就是發布一個訊息之后只有它被確認發布,后續的訊息才能繼續發布,waitForConfirmsOrDie(long)這個方法只有在訊息被確認的時候才回傳,如果在指定時間范圍內這個訊息沒有被確認那么它將拋出例外,
這種確認方式有一個最大的缺點就是:發布速度特別的慢, 因為如果沒有確認發布的訊息就會阻塞所有后續訊息的發布,這種方式最多提供每秒不超過數百條發布訊息的吞吐量,當然對于某些應用程式來說這可能已經足夠了,
// 單個確認
public static void publishMessageOne() throws Exception {
String queueName = "one";
Channel channel = RabbitMqUtils.getChannel();
//宣告佇列
channel.queueDeclare(queueName,true,false,false,null);
// 開啟發布確認
channel.confirmSelect();
// 記錄開始時間
long begin = System.currentTimeMillis();
//批量發訊息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//單個訊息 就馬上進行發布確認
boolean flag = channel.waitForConfirms();
if (flag){
System.out.println("訊息發送成功");
}
}
//結束時間
long end = System.currentTimeMillis();
System.out.println("發布條數:"+ MESSAGE_COUNT + "單條訊息確認,耗時:" + (end - begin) +"ms");
}
//批量發訊息的條數
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
// 單個確認
publishMessageOne();
/**
* 測驗第一次 發布條數:1000單條訊息確認,耗時:531ms
* 測驗第二次 發布條數:1000單條訊息確認,耗時:530ms
* 測驗第三次 發布條數:1000單條訊息確認,耗時:527ms
*/
}
批量確認發布
上面那種方式非常慢,與單個等待確認訊息相比,先發布一批訊息然后一起確認可以極大地提高吞吐量,當然這種方式的缺點就是:當發生故障導致發布出現問題時, 不知道是哪個訊息出現問題了, 我們必須將整個批處理保存在記憶體中,以記錄重要的資訊而后重新發布訊息,當然這種方案仍然是同步的,也一樣阻塞訊息的發布 ,
// 批量發布確認
public static void publishMessageMultiple() throws Exception {
String queueName = "multiple";
Channel channel = RabbitMqUtils.getChannel();
//宣告佇列
channel.queueDeclare(queueName,true,false,false,null);
// 開啟發布確認
channel.confirmSelect();
// 記錄開始時間
long begin = System.currentTimeMillis();
// 批量確認訊息條數的大小
int batchSizes = 100;
//批量發訊息
for (int i = 0; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//發送了100 條 batchSizes 確認一次
if (i % batchSizes == 0){
//全部訊息完畢 進行發布確認
boolean flag = channel.waitForConfirms( );
if (flag){
System.out.println("訊息發送成功");
}
}
}
//結束時間
long end = System.currentTimeMillis();
System.out.println("發布條數:"+ MESSAGE_COUNT + "批量訊息確認,耗時:" + (end - begin) +"ms");
}
//批量發訊息的條數
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
// 批量確認
publishMessageMultiple();
/**
* 測驗第一次 發布條數:1000批量訊息確認,耗時:83ms
* 測驗第二次 發布條數:1000批量訊息確認,耗時:71ms
* 測驗第三次 發布條數:1000批量訊息確認,耗時:77ms
*/
}
異步確認發布
異步確認雖然編程邏輯比上兩個要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回呼函式來達到訊息可靠性傳遞的,這個中間件也是通過函式回呼來保證是否投遞成功,下面就讓我們來詳細講解異步確認是怎么實作的,

// 異步發布確認
public static void publishMessageAsync() throws Exception {
String queueName = "async";
Channel channel = RabbitMqUtils.getChannel();
//宣告佇列
channel.queueDeclare(queueName,true,false,false,null);
// 開啟發布確認
channel.confirmSelect();
// 記錄開始時間
long begin = System.currentTimeMillis();
//訊息成功回呼函式
ConfirmCallback ackCallback = (deliveryTag, multiple)->{
System.out.println("確認訊息:" + deliveryTag);
};
//訊息失敗回呼函式
ConfirmCallback nackCallback = (deliveryTag, multiple)->{
System.out.println("未確認訊息:" + deliveryTag);
};
// 異步訊息監聽器, 哪些訊息成功?失敗?
channel.addConfirmListener(ackCallback,nackCallback);
//批量發訊息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
}
//結束時間
long end = System.currentTimeMillis();
System.out.println("發布條數:"+ MESSAGE_COUNT + ",異步訊息確認,耗時:" + (end - begin) +"ms");
}
//批量發訊息的條數
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
// 單個確認
//publishMessageOne();
/**
* 測驗第一次 發布條數:1000單條訊息確認,耗時:531ms
* 測驗第二次 發布條數:1000單條訊息確認,耗時:530ms
* 測驗第三次 發布條數:1000單條訊息確認,耗時:527ms
*/
// 批量確認
//publishMessageMultiple();
/**
* 測驗第一次 發布條數:1000批量訊息確認,耗時:83ms
* 測驗第二次 發布條數:1000批量訊息確認,耗時:71ms
* 測驗第三次 發布條數:1000批量訊息確認,耗時:77ms
*/
//異步訊息確認
publishMessageAsync();
/**
* 測驗第一次 發布條數:1000,異步訊息確認,耗時:33ms
* 測驗第二次 發布條數:1000,異步訊息確認,耗時:34ms
* 測驗第三次 發布條數:1000,異步訊息確認,耗時:32ms
*/
}
//批量發訊息的條數
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
//異步訊息確認
publishMessageAsync();
/**
* 測驗第一次 發布條數:1000,異步訊息確認,耗時:33ms
* 測驗第二次 發布條數:1000,異步訊息確認,耗時:34ms
* 測驗第三次 發布條數:1000,異步訊息確認,耗時:32ms
*/
}
查看一下異步訊息的輸出結果

如何處理異步未確認訊息
最好的解決的解決方案就是把未確認的訊息放到一個基于記憶體的能被發布執行緒訪問的佇列,比如說用 ConcurrentLinkedQueue 這個佇列在 confirm callbacks 與發布執行緒之間進行訊息的傳遞,
// 異步發布確認
public static void publishMessageAsync() throws Exception {
String queueName = "async";
Channel channel = RabbitMqUtils.getChannel();
//宣告佇列
channel.queueDeclare(queueName,true,false,false,null);
// 開啟發布確認
channel.confirmSelect();
/**
* 執行緒安全有序的哈希表,適用于高并發的情況
* 將序號與訊息進關聯 k 序號 v訊息
* 批量洗掉訊息條數
* 支持高并發(多執行緒)
*/
ConcurrentSkipListMap<Long,String> outStandConfirm = new ConcurrentSkipListMap<>();
// 訊息成功回呼函式
ConfirmCallback ackCallback = (deliveryTag, multiple)->{
System.out.println("確認訊息:" + deliveryTag);
// 洗掉已經確認的訊息,剩下的就是未確認的訊息
if (multiple){ // 批量
ConcurrentNavigableMap<Long, String> confirmedMessage = outStandConfirm.headMap(deliveryTag);
confirmedMessage.clear();
}else { // 單條
outStandConfirm.remove(deliveryTag);
}
};
//訊息失敗回呼函式
ConfirmCallback nackCallback = (deliveryTag, multiple)->{
String nackMessage = outStandConfirm.get(deliveryTag);
System.out.println("未確認訊息是:" + nackMessage);
System.out.println("未確認訊息序號:" + deliveryTag);
};
// 異步訊息監聽器, 哪些訊息成功?失敗?
channel.addConfirmListener(ackCallback,nackCallback);
// 記錄開始時間
long begin = System.currentTimeMillis();
// 批量發訊息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
// 記錄下所有要發送的訊息記錄
outStandConfirm.put(channel.getNextPublishSeqNo(),message);
}
//結束時間
long end = System.currentTimeMillis();
System.out.println("發布條數:"+ MESSAGE_COUNT + ",異步訊息確認,耗時:" + (end - begin) +"ms");
}
結論
- 單獨發布訊息
同步等待確認, 簡單,但吞吐量非常有限, - 批量發布訊息
批量同步等待確認, 簡單,合理的吞吐量, 一旦出現問題但很難推斷出是那條訊息出現了問題, - 異步處理:
最佳性能和資源使用,在出現錯誤的情況下可以很好地控制,但是實作起來稍微難些
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/374649.html
標籤:其他
