📒博客首頁:崇尚學技術的科班人
🏇小肖來了
🍣今天給大家帶來的文章是《萬字 +圖片決議死信佇列和死信實戰演練》🍣
🍣有的小伙伴可能會問死信佇列有啥用?你看了這篇文章就知道了🍣
🍣希望各位小伙伴們能夠耐心的讀完這篇文章🍣
🙏博主也在學習階段,如若發現問題,請告知,非常感謝🙏
💗同時也非常感謝各位小伙伴們的支持💗
文章目錄
- 1、死信佇列
- 1.1、概念
- 1.2、死信來源
- 1.3、死信實戰
- 1.3.1、代碼架構圖
- 1.3.2、TTL過期情況
- 1.3.3、佇列達到最大長度情況
- 1.3.4、訊息被拒情況
- 2、總結
1、死信佇列
1.1、概念
- 死信:就是
無法被消費的訊息,由于特定的原因導致queue中的某些訊息無法被消費,這樣的訊息如果沒有后續的處理,就變成了死信,有死信自然就有了死信佇列, - 應用場景:保證訂單業務的訊息資料不丟失,當訊息發生例外時,將訊息投入死信佇列中,比如說:用戶在商城下單成功并點擊去支付后在指定時間未支付時自動失效,
1.2、死信來源
- 訊息
TTL過期 - 佇列達到最大長度(佇列滿了,無法再添加資料到佇列中),
- 訊息被拒絕并且
requeue = false
1.3、死信實戰
1.3.1、代碼架構圖

1.3.2、TTL過期情況
1. 消費者01
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer01 {
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_QUEUE = "dead_queue";
public static final String NORMAL_QUEUE = "normal_queue";
/**
* 死信實戰
* 消費者01
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
// 死信交換機
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
map.put("x-message-ttl",10000);
// 普通佇列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
// 死信佇列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 佇列系結
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收訊息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer01控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});
}
}
- 最為復雜的就是
消費者01,它需要進行 死信交換機系結死信佇列、普通交換機系結普通佇列、普通佇列系結死信交換機, - 我們為了讓訊息不被消費,我們需要制造假死現象,也就是
關閉消費者01,
2. 消費者02
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer02 {
/**
* 消費者02
*/
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
System.out.println("等待接收訊息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer02控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
};
// 只需要面向 死信佇列消費訊息,里面的關系系結已經由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});
}
}
3. 生產者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.nio.charset.StandardCharsets;
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 單位是毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i = 1; i < 11; i ++){
String message = "info" + i;
// 只需要向交換機里面發送訊息,里面的關系系結已經由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
4. 測驗結果

- 所有的訊息在超過過期時間之后,全部轉移到了死信佇列中,

1.3.3、佇列達到最大長度情況
1. 消費者01
public class Consumer01 {
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_QUEUE = "dead_queue";
public static final String NORMAL_QUEUE = "normal_queue";
/**
* 死信實戰
* 消費者01
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
// 死信交換機
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
map.put("x-max-length",6);
//map.put("x-message-ttl",10000);
// 普通佇列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
// 死信佇列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 佇列系結
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收訊息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer01控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});
}
}
- 這里我們將過期時間引數改為了
佇列最大長度 - 我們為了讓訊息不被消費和觀察到明顯現象,我們需要制造假死現象,也就是
關閉消費者01,
2. 消費者02
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer02 {
/**
* 消費者02
*/
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
System.out.println("等待接收訊息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer02控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
};
// 只需要面向 死信佇列消費訊息,里面的關系系結已經由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});
}
}
- 消費者02和TTL過期情況下的一模一樣
3. 生產者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.nio.charset.StandardCharsets;
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 單位是毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i = 1; i < 11; i ++){
String message = "info" + i;
// 只需要向交換機里面發送訊息,里面的關系系結已經由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
- 我們將對應的設定過期時間注釋掉
4. 測驗結果
- 如果我們啟動消費者01會報錯,那是因為我們所創建的佇列已經存在,我們需要把普通佇列洗掉,因為只有它的引數發生了改變,

- 因為我們設定了普通佇列的最大長度6,所以當超過了最大長度的訊息都會被作為死信,
1.3.4、訊息被拒情況
1. 消費者01
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer01 {
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_QUEUE = "dead_queue";
public static final String NORMAL_QUEUE = "normal_queue";
/**
* 死信實戰
* 消費者01
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
// 死信交換機
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
//map.put("x-max-length",6);
//map.put("x-message-ttl",10000);
// 普通佇列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
// 死信佇列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 佇列系結
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收訊息......");
DeliverCallback deliverCallback = (var1, var2)->{
String msg = new String(var2.getBody(),"UTF-8");
if(msg.equals("info5")){
System.out.println("Consumer01控制臺接收到的訊息是:" + msg + ": 此訊息被拒" );
channel.basicReject(var2.getEnvelope().getDeliveryTag(),false);
}else{
System.out.println("Consumer01控制臺接收到的訊息是:" + msg);
channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
}
};
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,var1->{});
}
}
- 這里我們將
佇列最大長度注釋掉 - 我們
還需要開啟手動應答,因為不開啟就不會存在訊息被拒 的問題,
2. 消費者02
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer02 {
/**
* 消費者02
*/
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
System.out.println("等待接收訊息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer02控制臺接收到的訊息是:" + new String(var2.getBody(),"UTF-8"));
};
// 只需要面向 死信佇列消費訊息,里面的關系系結已經由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});
}
}
- 消費者02和佇列達到最大長度情況下的一模一樣
3. 生產者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.nio.charset.StandardCharsets;
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 單位是毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i = 1; i < 11; i ++){
String message = "info" + i;
// 只需要向交換機里面發送訊息,里面的關系系結已經由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
- 生產者和佇列達到最大長度情況下的一模一樣
4. 測驗結果
- 測驗之前我們需要將佇列中的訊息消費掉,并且需要將普通佇列洗掉,


- 可見只有
"info5"被作為死信,
2、總結
- 如果覺得這篇文章對你有幫助的話,請給我一個五星好評唄,評論地址,感謝鐵汁的支持!!!

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/396136.html
標籤:其他
下一篇:R語言使用haven包的read_spss函式讀取spss格式資料、使用haven包的read_sas函式讀取SAS格式資料、使用haven包的read_dta函式讀取Stata格式資料
