在生產環境中,因為機器以及網路設備的不可靠,保證訊息的可靠是待解決的問題,在特定場景下訊息可能存在丟失風險
訊息發送流程
我們可以將 RabbitMQ 訊息處理的程序分為三個步驟:
- 生產階段:生產者生產訊息并且發送到訊息佇列;
- 儲存階段:訊息佇列存盤和處理訊息;
- 消費階段:訊息佇列將訊息轉發到消費者,
上述每個步驟都有可能出現訊息丟失的風險;

生產者生產訊息并且發送到訊息佇列
丟失場景
生產者將資料發送到 RabbitMQ 的時候,可能資料就在半路給搞丟了,比如:
- 網路故障,網路環境的不可靠導致訊息發送失敗,例如網路丟包、網路故障,
- 資料在網路中傳輸會經過諸多網路設備,只要其中一個網路鏈接在資料抵達前已經流量滿載,新到的資料將會阻塞一段時間段,
AMQP事務機制
使用AMQP協議的事務機制,生產者在發出訊息之后,訊息是否到達RabbitMQ服務器是默認不可知的,在生產者發送訊息之前,呼叫channel.txSelect 陳述句開啟事務
- 如果訊息發送失敗,那么呼叫channel.txRollback回滾事務,嘗試重新發送一條訊息;
- 如果訊息發送成功,那么呼叫channel.txCommit提交事務,
采用事務的缺點是增加耗時,會降低RabbitMQ的吞吐性能,
Confirm機制
RabbitMQ有一種性能改進方案,即Confirm機制
- 生產者呼叫channel.confirmSelect將通信方式設定為confirm模式;
- 生產者發送的所有訊息都會被分配一個唯一 ID;
- 當生產者發送的訊息成功投遞到佇列之后,RabbitMQ會發送一個ack給生產者,生產者即得知這條訊息已經成功發送,
- 如果 RabbitMQ 沒能處理這個訊息,會回呼你一個nack介面,告訴你這個訊息接收失敗,需要重試
我們也可以結合這個機制在記憶體里維護每個訊息 id 的狀態,如果超過一定時間還沒接收到這個訊息的回呼,那么就需要重新發送
區別
- 事務機制是同步的,你提交一個事務之后會阻塞在那里
- confirm機制是異步的,發送訊息之后就可以發送下一個訊息,然后那個訊息RabbitMQ 接收了之后會異步回呼你一個介面通知你這個訊息接收到了,
訊息佇列存盤和處理訊息
訊息存盤在 RabbitMQ 佇列中,如果佇列沒有持久化,如果服務器宕機,RabbitMQ 服務器重啟后會導致訊息丟失,
解決方案
開啟 RabbitMQ 的持久化,就是訊息寫入之后會持久化到磁盤,如果 RabbitMQ 自己宕機了,重啟之后會自動讀取之前存盤的資料;
也有可能RabbitMQ 還沒持久化,就宕機了,這種情況發生的概率比較小
持久化也可以跟生產者的confirm機制配合起來,只有訊息被持久化到磁盤之后,才給生產者發送ack,所以哪怕是出現上面這種情況,RabbitMQ 宕機了,訊息丟了,生產者收不到ack,還是回重發的,
訊息佇列持久化
-
Exchange 持久化:以 Direct 模式為例,將 durable 引數設定為 true,
-
Queue 持久化:將 durable 引數設定為 true,但是這樣只能保證持久化 Queue 的元資料,但是不會持久化 Queue 里存盤的訊息,
-
訊息持久化:發送訊息的時候將deliveryMode設定為2,將訊息設定為持久化的
-
SpringBoot中的rabbitTemplate默認設定訊息是持久化,不需要手動配置
訊息佇列將訊息轉發到消費者
消費者在收到訊息之后,還沒來得及處理訊息的消費邏輯,所在機器就宕機了,導致記憶體中的訊息丟失,
解決方案
RabbitMQ 默認采用自動 ACK 機制,在沒有處理業務邏輯之前,消費者就會告知訊息佇列已經成功收到訊息,這種方式并不能解決這種問題
- 我們可以關閉自動ACK模式,通過一個 呼叫API介面就行,消費完訊息后,再回傳ACK,這樣的話,如果你還沒處理完,就沒有ACK
- 這樣 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,訊息是不會丟的,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/301707.html
標籤:其他
上一篇:手撕三種分布式鎖
下一篇:[MySQL]聚合查詢
