一、前言
訊息佇列,簡單來講,就是通過佇列來存盤與傳遞訊息,具有解耦服務、削峰填谷、異步處理等優勢,
由于業務發展的需要,引入訊息佇列后,確實能利用以上的優勢,但同時提高了系統的復雜性,降低了可用性,
也會帶來各種各樣的問題,例如訊息丟失、亂序與重復消費等,今天就簡單講講如何保證訊息不丟失,
如果佇列只是去傳遞一些日志型的訊息,那丟失也無所謂,但如果傳遞的是一些核心業務型別的訊息,那就要保證訊息不能丟失,
訊息從生產到消費,要經歷三個階段,分別是生產、佇列轉發與消費,每個環節都可能丟失訊息,

以下以RabbitMQ為例,來說明各個階段會產生的問題以及解決方式,在說明之前,先回顧一下RabbitMQ的一個基本架構圖

以上的架構圖來自于RabbitMQ架構圖和簡介以及交換器模式
二、生產端投遞到佇列程序中可能丟失訊息
1、生產端發送訊息時,由于網路閃斷原因,訊息未到達mq
這種情況可以立即進行重試,但是一般也會失敗,因為網路閃斷的特性就是間歇性,較短時間內的重試大概率會失敗,
這個時候,需要我們對發送失敗的訊息做出補償,
2、生產端發送訊息成功,mq也接收到了訊息,剛準備處理時,mq宕機,
生產端無法感知訊息是否已經正確到達交換機上,無法采取下一步的動作,例如是洗掉訊息,還是重新投遞,這就需要mq在適當的時機對生產端進行通知,
mq提供了兩種方式
(1)事務機制,屬于同步方式,訊息發送完之后會阻塞等待mq回應,在此期間無法發送下一條訊息,嚴重降低吞吐量與性能,
(2)confirm確認機制,屬于異步方式,訊息發送完之后不需要阻塞等待,當訊息達到指定的佇列后,mq將會主動回傳一個ack,代表訊息入隊成功,
因此,這里將會對當前channel開啟confirm機制,來顯示地告知訊息的處理進度,
3、mq接收到訊息,還沒落盤就ack
當mq接收到訊息后,需要在落盤后通知生產端,如果不落盤就確認的話,mq一旦宕機,訊息就會丟失,而生產端根本察覺不到,
想要在落盤后通知生產端,開啟佇列的confirm機制,即mq會對落盤后的訊息進行異步ack,
4、mq落盤后ack,但由于網路閃斷,生產端未收到ack
同第1點,需要作出補償機制,
綜合以上3點,需要在網路閃斷時,作出相應的補償機制,
可以先利用本地訊息表(mysql或者redis)記錄訊息狀態,發送并落盤成功后,立即洗掉該訊息記錄,
對于那些處理失敗的訊息,再使用定時任務進行重新發送即可,
初步的設計方案如圖:

1、生產端首先將業務資料以及訊息資料入庫,需要在同一個事務中,訊息資料入庫失敗,則整體回滾,
2、假設生產端此時將a,b,c三條資料入庫,他們的狀態都為發送中,
3、mq收到了a訊息,mq落盤后回傳ack,生產端接收到了ack后,將訊息庫中的a洗掉(當然你可以將其狀態置為發送成功,看業務需要了)
4、mq接著收到了b訊息,但回傳ack時,由于網路閃斷一直未能讓生產端接收到,此時定時任務會根據預設的超時時間掃描到發送超時或mq處理超時的訊息,對其進行重試,重試成功后,生產端對其進行洗掉,
5、c訊息就沒有這么好運,由于其他原因,比如路由鍵設定錯誤、佇列被誤洗掉等,始終無法路由到對應的佇列中,導致重試一直失敗,在達到最大次數后,將會進行報警通知,后續由人工處理,
三、佇列本身可能丟失訊息
1、訊息達到mq,但mq中出現內部錯誤,無法處理該訊息
由于我們已經開啟了confirm機制,這個時候mq會回傳nack,代表處理失敗,
對于這種問題,由以上的補償方案可以解決,只要mq不回傳ack,生產端就不洗掉訊息,
2、訊息還沒來得及刷盤,mq就宕機了,重啟后,訊息丟失,
開啟交換機、佇列與訊息的持久化,三者缺一不可,訊息刷盤后,再批量異步回傳ack,
3、開啟持久化后,但是硬碟壞了,無法恢復資料,
鏡像部署mq,訊息在所有或部分副本中寫完再回傳ack,
mq有以下三種部署方案:
單節點部署,訊息只存在與當前節點,硬碟壞了,那訊息真的就無法恢復了,
集群部署
(1)默認的集群部署,但訊息只會存在與當前節點中,并不會同步到其他節點,其他節點也僅只會同步該節點的佇列結構,
(2)鏡像部署,訊息會同步到其他節點上,可以設定同步的節點個數,但吞吐量會下降,
四、消費端可能丟失訊息
1、消費端采用自動ack機制,還沒有處理完畢,消費端宕機,
改為手動ack,當訊息正確處理完成后,再通知mq,消費端處理訊息例外后,回傳nack,這樣mq會把這條訊息投遞到另外一個消費端上,
2、消費端處理完訊息后,回傳ack時發生網路閃斷,mq未收到ack,
mq會將超時未ack的訊息重新放回佇列,
五、注意點
1、mq的ack回傳是批量異步的方式,生產端對ack的監聽也是異步的
生產端生產一條訊息后,mq接收到該訊息,先進行落庫,再進行ack回傳,生產端收到ack后,再去洗掉訊息記錄,
如果上述程序是一個同步程序的話,那整個吞吐量以及性能可太低了,
所以mq為了提高效率,會等到訊息在記憶體中達到一定數量的時候,統一進行落盤,再回傳ack,(這種模式和Redis中NO策略下的AOF持久化,以及Mysql中的redolog刷盤很類似),
當然生產端也不是傻乎乎地一直等待,而是往mq中投遞一個訊息后,設定對當前佇列或者channel的一個監聽器,在異步的回呼方法中進行ack與nack的處理,
2、在重試的補償機制下,消費端需要保證冪等,
在生產端長時間未收到ack或者nack的情況下,定時任務會該訊息進行重試,因此會往佇列中投遞重復的訊息,這時候就需要消費端保證冪等性,
在消費端拿到一個訊息時,可以將訊息中的業務引陣列合成為一個key,利用資料庫唯一索引或者redis來判斷是否之前是否執行過,
六、總結
如果需要保證訊息在整條鏈路中不丟失,那就需要生產端、mq自身與消費端共同去保障,
生產端:對生產的訊息進行狀態標記,開啟confirm機制,依據mq的回應來更新訊息狀態,使用定時任務重新投遞超時的訊息,多次投遞失敗進行報警,
mq自身:開啟持久化,并在落盤后再進行ack,如果是鏡像部署模式,需要在同步到多個副本之后再進行ack,
消費端:開啟手動ack模式,在業務處理完成后再進行ack,并且需要保證冪等,
通過以上的處理,理論上不存在訊息丟失的情況,但是系統的吞吐量以及性能有所下降,
在實際開發中,需要考慮訊息丟失的影響程度,來做出對可靠性以及性能之間的權衡,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/301913.html
標籤:其他
上一篇:史上最強zookeeper詳解
