文章目錄
- 1. 默認情況
- 2. TTL
- 2.1 單條訊息過期
- 2.2 佇列訊息過期
- 2.3 特殊情況
- 3. 死信佇列
- 3.1 死信交換機
- 3.2 死信佇列
- 3.3 實踐
- 4. 小結
RabbitMQ 中的訊息長期未被消費會過期嗎?用過 RabbitMQ 的小伙伴可能都有這樣的疑問,今天松哥就來和大家捋一捋這個問題,
1. 默認情況
首先我們來看看默認情況,
默認情況下,訊息是不會過期的,也就是我們平日里在訊息發送時,如果不設定任何訊息過期的相關引數,那么訊息是不會過期的,即使訊息沒被消費掉,也會一直存盤在佇列中,
這種情況具體代碼就不用我再演示了吧,松哥之前的文章凡是涉及到 RabbitMQ 的,基本上都是這樣的,
2. TTL
TTL(Time-To-Live),訊息存活的時間,即訊息的有效期,如果我們希望訊息能夠有一個存活時間,那么我們可以通過設定 TTL 來實作這一需求,如果訊息的存活時間超過了 TTL 并且還沒有被訊息,此時訊息就會變成死信,關于死信以及死信佇列,松哥后面再和大家介紹,
TTL 的設定有兩種不同的方式:
- 在宣告佇列的時候,我們可以在佇列屬性中設定訊息的有效期,這樣所有進入該佇列的訊息都會有一個相同的有效期,
- 在發送訊息的時候設定訊息的有效期,這樣不同的訊息就具有不同的有效期,
那如果兩個都設定了呢?
以時間短的為準,
當我們設定了訊息有效期后,訊息過期了就會被從佇列中洗掉了(進入到死信佇列,后文一樣,不再標注),但是兩種方式對應的洗掉時機有一些差異:
- 對于第一種方式,當訊息佇列設定過期時間的時候,那么訊息過期了就會被洗掉,因為訊息進入 RabbitMQ 后是存在一個訊息佇列中,佇列的頭部是最早要過期的訊息,所以 RabbitMQ 只需要一個定時任務,從頭部開始掃描是否有過期訊息,有的話就直接洗掉,
- 對于第二種方式,當訊息過期后并不會立馬被洗掉,而是當訊息要投遞給消費者的時候才會去洗掉,因為第二種方式,每條訊息的過期時間都不一樣,想要知道哪條訊息過期,必須要遍歷佇列中的所有訊息才能實作,當訊息比較多時這樣就比較耗費性能,因此對于第二種方式,當訊息要投遞給消費者的時候才去洗掉,
介紹完 TTL 之后,接下來我們來看看具體用法,
接下來所有代碼松哥都以 Spring Boot 中封裝的 AMPQ 為例來講解,
2.1 單條訊息過期
我們先來看單條訊息的過期時間,
首先創建一個 Spring Boot 專案,引入 Web 和 RabbitMQ 依賴,如下:

然后在 application.properties 中配置一下 RabbitMQ 的連接資訊,如下:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
接下來稍微配置一下訊息佇列:
@Configuration
public class QueueConfig {
public static final String JAVABOY_QUEUE_DEMO = "javaboy_queue_demo";
public static final String JAVABOY_EXCHANGE_DEMO = "javaboy_exchange_demo";
public static final String HELLO_ROUTING_KEY = "hello_routing_key";
@Bean
Queue queue() {
return new Queue(JAVABOY_QUEUE_DEMO, true, false, false);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(JAVABOY_EXCHANGE_DEMO, true, false);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(directExchange())
.with(HELLO_ROUTING_KEY);
}
}
這個配置類主要干了三件事:配置訊息佇列、配置交換機以及將兩者系結在一起,
- 首先配置一個訊息佇列,new 一個 Queue:第一個引數是訊息佇列的名字;第二個引數表示訊息是否持久化;第三個引數表示訊息佇列是否排他,一般我們都是設定為 false,即不排他;第四個引數表示如果該佇列沒有任何訂閱的消費者的話,該佇列會被自動洗掉,一般適用于臨時佇列,
- 配置一個 DirectExchange 交換機,
- 將交換機和佇列系結到一起,
這段配置應該很簡單,沒啥好解釋的,有一個排他性,松哥這里稍微多說兩句:
關于排他性,如果設定為 true,則該訊息佇列只有創建它的 Connection 才能訪問,其他的 Connection 都不能訪問該訊息佇列,如果試圖在不同的連接中重新宣告或者訪問排他性佇列,那么系統會報一個資源被鎖定的錯誤,另一方面,對于排他性佇列而言,當連接斷掉的時候,該訊息佇列也會自動洗掉(無論該佇列是否被宣告為持久性佇列都會被洗掉),
接下來提供一個訊息發送介面,如下:
@RestController
public class HelloController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/hello")
public void hello() {
Message message = MessageBuilder.withBody("hello javaboy".getBytes())
.setExpiration("10000")
.build();
rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message);
}
}
在創建 Message 物件的時候我們可以設定訊息的過期時間,這里設定訊息的過期時間為 10 秒,
這就可以啦!
接下來我們啟動專案,進行訊息發送測驗,當訊息發送成功之后,由于沒有消費者,所以這條訊息并不會被消費,打開 RabbitMQ 管理頁面,點擊到 Queues 選項卡,10s 之后,我們會發現訊息已經不見了:

很簡單吧!
單條訊息設定過期時間,就是在訊息發送的時候設定一下訊息有效期即可,
2.2 佇列訊息過期
給佇列設定訊息過期時間,方式如下:
@Bean
Queue queue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 10000);
return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
}
設定完成后,我們修改訊息的發送邏輯,如下:
@RestController
public class HelloController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/hello")
public void hello() {
Message message = MessageBuilder.withBody("hello javaboy".getBytes())
.build();
rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message);
}
}
可以看到,訊息正常發送即可,不用設定訊息過期時間,
OK,啟動專案,發送一條訊息進行測驗,查看 RabbitMQ 管理頁面,如下:

可以看到,訊息佇列的 Features 屬性為 D 和 TTL,D 表示訊息佇列中訊息持久化,TTL 則表示訊息會過期,
10s 之后重繪頁面,發現訊息數量已經恢復為 0,
這就是給訊息佇列設定訊息過期時間,一旦設定了,所有進入到該佇列的訊息都有一個過期時間了,
2.3 特殊情況
還有一種特殊情況,就是將訊息的過期時間 TTL 設定為 0,這表示如果訊息不能立馬消費則會被立即丟掉,這個特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 引數,之所以所部分代替,是因為 immediate 引數在投遞失敗會有 basic.return 方法將訊息體回傳(這個功能可以利用死信佇列來實作),
具體代碼松哥就不演示了,這個應該比較容易,
3. 死信佇列
有小伙伴不禁要問,被洗掉的訊息去哪了?真的被洗掉了嗎?非也非也!這就涉及到死信佇列了,接下來我們來看看死信佇列,
3.1 死信交換機
死信交換機,Dead-Letter-Exchange 即 DLX,
死信交換機用來接收死信訊息(Dead Message)的,那什么是死信訊息呢?一般訊息變成死信訊息有如下幾種情況:
- 訊息被拒絕(Basic.Reject/Basic.Nack) ,井且設定requeue 引數為false
- 訊息過期
- 佇列達到最大長度
當訊息在一個佇列中變成了死信訊息后,此時就會被發送到 DLX,系結 DLX 的訊息佇列則稱為死信佇列,
DLX 本質上也是一個普普通通的交換機,我們可以為任意佇列指定 DLX,當該佇列中存在死信時,RabbitMQ 就會自動的將這個死信發布到 DLX 上去,進而被路由到另一個系結了 DLX 的佇列上(即死信佇列),
3.2 死信佇列
這個好理解,系結了死信交換機的佇列就是死信佇列,
3.3 實踐
我們來看一個簡單的例子,
首先我們來創建一個死信交換機,接著創建一個死信佇列,再將死信交換機和死信佇列系結到一起:
public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
public static final String DLX_QUEUE_NAME = "dlx_queue_name";
public static final String DLX_ROUTING_KEY = "dlx_routing_key";
/**
* 配置死信交換機
*
* @return
*/
@Bean
DirectExchange dlxDirectExchange() {
return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
}
/**
* 配置死信佇列
* @return
*/
@Bean
Queue dlxQueue() {
return new Queue(DLX_QUEUE_NAME);
}
/**
* 系結死信佇列和死信交換機
* @return
*/
@Bean
Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxDirectExchange())
.with(DLX_ROUTING_KEY);
}
這其實跟普通的交換機,普通的訊息佇列沒啥兩樣,
接下來為訊息佇列配置死信交換機,如下:
@Bean
Queue queue() {
Map<String, Object> args = new HashMap<>();
//設定訊息過期時間
args.put("x-message-ttl", 0);
//設定死信交換機
args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
//設定死信 routing_key
args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
}
就兩個引數:
- x-dead-letter-exchange:配置死信交換機,
- x-dead-letter-routing-key:配置死信
routing_key,
這就配置好了,
將來發送到這個訊息佇列上的訊息,如果發生了 nack、reject 或者過期等問題,就會被發送到 DLX 上,進而進入到與 DLX 系結的訊息佇列上,
死信訊息佇列的消費和普通訊息佇列的消費并無二致:
@RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
public void dlxHandle(String msg) {
System.out.println("dlx msg = " + msg);
}
很容易吧~
4. 小結
好啦,今天就和小伙伴們聊一聊 RabbitMQ 中的訊息過期問題,感興趣的小伙伴可以去試試哦~
公眾號江南一點雨后臺回復本文標題,可以獲取本文案例下載鏈接,
參考資料:
- blog.csdn.net/u012988901/article/details/88958654
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/302280.html
標籤:其他
