1.RabbitMQ TTL及死信佇列
1.1.TTL概述
過期時間TTL表示可以對訊息設定預期的時間,在這個時間內都可以被消費者接識訓取;過了之后訊息將自動被洗掉,RabbitMQ可以對訊息和佇列設定TTL,目前有兩種方法可以設定,
-
第一種方法是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間,
-
第二種方法是對訊息進行單獨設定,每條訊息TTL可以不同,
注意:
如果上述兩種方法同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準,訊息在佇列的生存時間一旦超過設定的TTL值,就稱為dead message被投遞到死信佇列, 消費者將無法再收到該訊息,
界面具體設定如下圖所示:

1.2.TTL簡單實作
①基于佇列屬性進行設定:
這里在springBoot-order-rabbitmq-producer專案中config目錄新建一個TTLRabbitMqConfiguration,宣告ttl交換機與ttlQueue,代碼如下:
@Configuration public class TTLRabbitMqConfiguration { //宣告交換機,不同的交換機型別不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange @Bean public DirectExchange ttldirectOrderExchange() { return new DirectExchange("ttl_direct_exchange", true, false); } //定義佇列的過期時間 @Bean public Queue directttlQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); //這里一定是int型別 return new Queue("ttl.direct.queue", true, false, false, args); } @Bean public Binding ttlBingding(){ return BindingBuilder.bind(directttlQueue()).to(ttldirectOrderExchange()).with("ttl"); } }
在OrderService中進行訊息發送至訊息佇列:
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定義交換機 private String exchangeName = ""; // 2: 路由key private String routeKey = ""; //ttl--死信佇列 public void makeOrderTTLQueue(Long userId, Long productId, int num) { exchangeName = "ttl_direct_exchange"; routeKey = "ttl"; // 1: 模擬用戶下單 String orderNumer = UUID.randomUUID().toString(); // 2: 根據商品id productId 去查詢商品的庫存 // int numstore = productSerivce.getProductNum(productId); // 3:判斷庫存是否充足 // if(num > numstore ){ return "商品庫存不足..."; } // 4: 下單邏輯 // orderService.saveOrder(order); // 5: 下單成功要扣減庫存 // 6: 下單完成以后 System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer); // 發送訂單資訊給RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); } }
進行測驗:
@SpringBootTest class RabbitmqApplicationTests { @Autowired private OrderService orderService; @Test void ttlQueueTest() throws InterruptedException { for (int i = 0; i < 5; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 1; orderService.makeOrderTTLQueue(userId, productId, num); } } }
可以看到訊息向佇列中發送,但是5s之后訊息會自動從佇列中移除,這就是TTL訊息過期移除,

②基于某個訊息發送時單獨設定過期時間:
這種方式不需要在佇列與交換機系結時設定Queue過期屬性,只需要宣告為普通佇列即可,
@Configuration public class TTLRabbitMqConfiguration { //宣告交換機,不同的交換機型別不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange @Bean public DirectExchange ttldirectOrderExchange() { return new DirectExchange("ttl_direct_exchange", true, false); } //定義佇列的過期時間 --定義一個普通佇列,在外面設定過期時間 @Bean public Queue directttlMessageQueue() { return new Queue("ttl.message.direct.queue", true, false, false); } @Bean public Binding ttlMsgBingding(){ return BindingBuilder.bind(directttlMessageQueue()).to(ttldirectOrderExchange()).with("ttlmsg"); } }
在發送時進行單獨訊息過期時間屬性設定:
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定義交換機 private String exchangeName = ""; // 2: 路由key private String routeKey = ""; //ttl--死信佇列--普通佇列設定 public void makeOrderTTLMsgQueue(Long userId, Long productId, int num) { exchangeName = "ttl_direct_exchange"; routeKey = "ttlmsg"; String orderNumer = UUID.randomUUID().toString(); System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer); //給訊息設定過期時間 MessagePostProcessor postProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); //時間為5s message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; // 發送訂單資訊給RabbitMQ fanout,指定訊息的擴展資訊 rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer, postProcessor); } }
進行測驗:
@SpringBootTest class RabbitmqApplicationTests { @Autowired private OrderService orderService; @Test void ttlMsgQueueTest() throws InterruptedException { for (int i = 0; i < 3; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 1; orderService.makeOrderTTLMsgQueue(userId, productId, num); } } }
可以看到普通訊息也可以通過設定過期時間,實作在訊息佇列中進行過期移除的功能,

①與②的主要區別在于:
通過佇列設定ttl過期可以與死信佇列進行系結,后期過期之后可以加入死信佇列;而通過單獨普通訊息后期設定屬性無法加入到死信佇列中,即沒有備胎,
下面簡單地使用SpringBoot方式實作一下死信佇列,
1.3.死信佇列
DLX,全稱為Dead-Letter-Exchange , 可以稱之為死信交換機,也有人稱之為死信郵箱,當訊息在一個佇列中變成死信(dead message)之后,它能被重新發送到另一個交換機中,這個交換機就是DLX ,系結DLX的佇列就稱之為死信佇列, 訊息變成死信,可能是由于以下的原因:
-
訊息被拒絕
-
訊息過期
-
佇列達到最大長度
DLX也是一個正常的交換機,和一般的交換機沒有區別,它能在任何的佇列上被指定,實際上就是設定某一個佇列的屬性,當這個佇列中存在死信時,Rabbitmq就會自動地將這個訊息重新發布到設定的DLX上去,進而被路由到另一個佇列,即死信佇列,
要想使用死信佇列,只需要在定義佇列的時候設定佇列引數 x-dead-letter-exchange 指定交換機即可,
死信佇列的執行流程:

1.4.死信佇列簡單實作
①在config目錄下創建TTLRabbitMqConfiguration,宣告ttl交換機及佇列系結關系,同時宣告死信佇列:
這里最主要的就是按照界面引數設定了死信佇列exchange及routekey:

@Configuration public class TTLRabbitMqConfiguration { //宣告交換機,不同的交換機型別不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange @Bean public DirectExchange ttldirectOrderExchange() { return new DirectExchange("ttl_direct_exchange", true, false); } //定義佇列的過期時間 //定義佇列的死信佇列 //死信佇列的route key @Bean public Queue directttlQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); //這里一定是int型別 args.put("x-dead-letter-exchange", "dead_direct_exchange"); //這里與定義好的死信交換機進行系結,死信交換機會去找死信佇列 args.put("x-dead-letter-routing-key", "dead"); //如果是fanout模式這里不需要route key args.put("x-max-length", 5); //設定每次給死信佇列中發送訊息的長度 return new Queue("ttl.direct.queue", true, false, false, args); } @Bean public Binding ttlBingding(){ return BindingBuilder.bind(directttlQueue()).to(ttldirectOrderExchange()).with("ttl"); } }
②業務層呼叫及測驗:
//ttl--死信佇列 public void makeOrderTTLQueue(Long userId, Long productId, int num) { exchangeName = "ttl_direct_exchange"; routeKey = "ttl"; String orderNumer = UUID.randomUUID().toString(); System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer); // 發送訂單資訊給RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); }
測驗:
@Test void ttlQueueTest() throws InterruptedException { for (int i = 0; i < 5; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 1; orderService.makeOrderTTLQueue(userId, productId, num); } }
可以看到Queue屬性中TTL、Lim相關的設定,5s過期后都加入到了死信佇列中:

2.RabbitMQ記憶體管控
2.1.RibbitMQ持久化
持久化就把資訊寫入到磁盤的程序,

RabbitMQ的持久化佇列分為:
1:佇列持久化
2:訊息持久化
3:交換機持久化
不論是持久化的訊息還是非持久化的訊息都可以寫入到磁盤中,只不過非持久的是等記憶體不足的情況下才會被寫入到磁盤中,
2.2.RabbitMQ記憶體磁盤監控
RabbitMQ的記憶體警告
當記憶體使用超過配置的閾值或者磁盤空間剩余空間對于配置的閾值時,RabbitMQ會暫時阻塞客戶端的連接,并且停止接收從客戶端發來的訊息,以此避免服務器的崩潰,客戶端與服務端的心態檢測機制也會失效, 如下圖:

當出現blocking或blocked話說明到達了閾值和以及高負荷運行了,
RabbitMQ的記憶體控制
參考幫助檔案:https://www.rabbitmq.com/configure.html
當出現警告的時候,可以通過配置去修改和調整
①命令的方式
rabbitmqctl set_vm_memory_high_watermark <fraction>rabbitmqctl
set_vm_memory_high_watermark absolute 50MB
fraction/value 為記憶體閾值,默認情況是:0.4/2GB,代表的含義是:當RabbitMQ的記憶體超過40%時,就會產生警告并且阻塞所有生產者的連接,通過此命令修改閾值在Broker重啟以后將會失效,通過修改組態檔方式設定的閾值則不會隨著重啟而消失,但修改了組態檔一樣要重啟broker才會生效,
分析:


②組態檔方式 rabbitmq.conf
當前組態檔地址:/etc/rabbitmq/rabbitmq.conf
#默認 #vm_memory_high_watermark.relative = 0.4 # 使用relative相對值進行設定fraction,建議取值在04~0.7之間,不建議超過0.7. vm_memory_high_watermark.relative = 0.6 # 使用absolute的絕對值的方式,但是是KB,MB,GB對應的命令如下: vm_memory_high_watermark.absolute = 2GB
RabbitMQ的記憶體換頁
在某個Broker節點及記憶體阻塞生產者之前,它會嘗試將佇列中的訊息換頁到磁盤以釋放記憶體空間,持久化和非持久化的訊息都會寫入磁盤中,其中持久化的訊息本身就在磁盤中有一個副本,所以在轉移的程序中持久化的訊息會先從記憶體中清除掉,
默認情況下,記憶體到達的閾值是50%時就會換頁處理, 也就是說,在默認情況下該記憶體的閾值是0.4的情況下,當記憶體超過0.4*0.5=0.2時,會進行換頁動作,
比如有1000MB記憶體,當記憶體的使用率達到了400MB,已經達到了極限,但是因為配置的換頁記憶體0.5,這個時候會在達到極限400mb之前,會把記憶體中的200MB進行轉移到磁盤中,從而達到穩健的運行,
可以通過設定 vm_memory_high_watermark_paging_ratio 來進行調整,
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7(設定小于1的值)
為什么設定小于1,以為你如果你設定為1的閾值,記憶體都已經達到了極限了,你在去換頁意義不是很大了,
RabbitMQ的磁盤預警
當磁盤的剩余空間低于確定的閾值時,RabbitMQ同樣會阻塞生產者,這樣可以避免因非持久化的訊息持續換頁而耗盡磁盤空間導致服務器崩潰,
默認情況下:磁盤預警為50MB的時候會進行預警,表示當前磁盤空間第50MB的時候會阻塞生產者并且停止記憶體訊息換頁到磁盤的程序, 這個閾值可以減小,但是不能完全的消除因磁盤耗盡而導致崩潰的可能性,比如在兩次磁盤空間的檢查空隙內,第一次檢查是:60MB ,第二檢查可能就是1MB,就會出現警告,
通過命令方式修改如下:
rabbitmqctl set_disk_free_limit <disk_limit> rabbitmqctl set_disk_free_limit memory_limit <fraction> disk_limit:固定單位 KB MB GB fraction :是相對閾值,建議范圍在:1.0~2.0之間,(相對于記憶體)
通過組態檔配置如下:
disk_free_limit.relative = 3.0 disk_free_limit.absolute = 50mb
本博客示例涉及代碼均已上傳至Github:
RabbitMQStudy
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/281935.html
標籤:Java
