訊息中間件-訊息的可靠性傳遞
前言
訊息中間件的可靠性訊息傳遞,是訊息中間件領域非常重要的方案落實問題(在這之前的MQ理論,MQ選型是抽象層次更高的問題,這里不談),
并且這個問題與日常開發是存在較大的關聯的,可以這么說,凡是使用了MQ的,機會都要考慮這個問題,當然也有一些原始資料采集,日志資料收集等應用場景對此沒有過高要求,但是大多數的業務場景,對此還是有著較高要求的,比如訂單系統,支付系統,訊息系統等,你弄丟一條訊息,嘿嘿,
網上對于這方面的博客,大多從單一MQ,或者干脆就是在論述MQ,我不喜歡這樣的論述,這樣的論述太過局限,也過于拖沓,
這次,主要從理論方面論證訊息的可靠性傳遞的落實,具體技術,都是依據這些理論的,具體實作都差不多,不過為了便于大家理解,我在文中會以RabbitMq,Kafka這兩個主流MQ稍作舉例,
在日常開發中,我更傾向于在具體開發前,先整理思路,走通理論,再開始編碼,畢竟,如果連理論都走不同,還談什么編碼,
另外,我按照訊息可靠性層次逐步推進,形成相應的目錄,希望大家喜歡(因為我認為,相較網上這方面現有博客的目錄,這樣的目錄更合理,更人性化),
概述
這里簡單談一些有關訊息可靠性傳遞的理論,
訊息傳遞次數
訊息在訊息系統(生產者+MQ+消費者),其消費的次數,無非一下三種情況:
- 最多一次
- 最少一次
- 不多不少一次
訊息可靠性層次
這也代表著訊息系統的訊息可靠性的三個層次:
- 最多一次:上游服務的訊息發出了,至于下游能不能收到服務,就不管了,結果就是下游服務,可能根本就沒有接收到訊息,
- 最少一次:上游服務的訊息發出了,并通過某些機制,確保下游服務一定收到了該訊息,但是收到了幾次,就不管了,結果就是下游服務,可能多次收到同一條訊息,
- 不多不少一次:上游服務的訊息發出了,并確保下游服務一定收到了訊息,下游服務通過某些機制,確保多次收到該訊息與單次收到該訊息,對其系統狀態的影響是相同的,
方案落實
實作上述三個層次,需要逐步從三個方面考慮:
- 最多一次:會用訊息佇列即可,只要確保訊息的連通性即可
- 最少一次:通過MQ提供的確認機制,確保訊息的傳遞
- 不多不少一次:通過外部應用程式,確保訊息的單次消費與多次消費對系統狀態影響是一致的
上述三個層次,對系統的性能損耗,系統復雜度等都是逐步上升的,
當然,我們首先,需要了解這三個層次分別如何實作,
再在實際開發中,根據需要,靈活選取合適方案,
最多一次的訊息傳遞
這個方案是最簡單的,只要確保訊息系統的正確運作,以及系統的連通性即可,在正常情況下,可以保證絕大部分資料的可靠性傳遞,但是仍舊存在極小資料的丟失,并且資料的丟失會因為訊息佇列的選擇,以及訊息并發量,而受到影響,
優點
- 實作簡單,只要搭建對應的MQ服務器,寫出對應的生產者與消費者,以及相應配置,即可正常作業,
缺點
- 無法保證資料的可靠性,會存在一定的資料丟失情況,尤其是在并發量較大時
實際應用
可以應用于日志上傳這樣對訊息可靠性要求低的應用場景,
總結
如果資料量不大的情況下,推薦使用RabbitMQ,其訊息可靠性在地資料量下,是最可靠的,但是在達到萬級并發時,會存在訊息丟失,丟失的比例可以達到千分之一,
如果資料量較大的情況下,要么采用集群,要么就采用Kafk(Kafka可支持十萬級并發)
一般來說,這種訊息可靠性多見于專案初建,或類似日志采集,原始資料采集這樣的特定場景,
最少一次的訊息傳遞
這個方案開始利用MQ提供的特定機制,來提高訊息傳遞的可靠性,
優點
- 不錯的訊息可靠性,確保不會出現訊息丟失的情況
- 實作并不復雜,只需要合理使用MQ的API,設定合理引數(如重試次數)即可
缺點
- 會出現訊息重復消費的情況
- 引數的設定需要合理,如重試次數,一般設定為5次,也可根據情況,進行調整
- 資源占用的提升,如帶寬(每次訊息成功生產,消費都需要回傳一條資料進行確認)等
方案落實
該方案的實作組成,由以下三個方面構成:
- 訊息的可靠生產
- 訊息的可靠存盤
- 訊息的可靠消費
通過以上三個方面的落實,確保可訊息一定被下游服務消費,
訊息的可靠生產
訊息的可靠生產,是通過回呼確認機制,確保訊息一定被訊息服務器接收,
訊息生產,發送給訊息服務器后,訊息服務器會回傳一個確認資訊,表示資料正常接收,
如果生產者在一定時間內沒有接收到確認資訊,就會觸發重試機制,進行訊息的重發,
如RabbitMq的comfirm機制,Kafka的acks機制等,
RabbitMq的confirm機制存在三個模式:
- 普通模式:channel.waitForConfirms()
- 批量模式:channel.waitForConfirmsOrDie()
- 異步模式:channel.addConfirmListener()
這三個模式,看名稱就可以知道具體作用了,如果希望了解具體代碼落實,詳見RabbitMQ事務和Confirm發送方訊息確認——深入解讀,其中確認機制寫得較為簡潔,
至于Kafka的acks機制,同樣存在三個模式:
- acks = 0 :不需要Kafka的任何Partition確認,即確認發送成功(這個之確保訊息發送出去了,并不保證訊息服務器是否成功接收)
- acks = 1 :(默認)需要Kafka的Partition Leader確認,即被Kafka的一個Partition(Leader)接收,但是這樣依舊存在極小概率的訊息丟失,即Partition Leader獲取了對應訊息,并給了acks確認回復,但是在其他Partition同步前,Partition Leader宕機,資料丟失,那么這就造成了訊息丟失,
- acks = all :需要Kafka對應ISR中的全部Partition確認,才確認訊息發送成功(當然,這里假定Kafka是多節點集群,如果只有一個磁區,那就毫無意義了),
說到這里,簡單說一下,上述的操作可能造成訊息的重復生產,
最簡單的例子,訊息成功發送,但是對應的訊息確認資訊由于網路波動而丟失,那么生產者就會重復發送該訊息,所以訊息服務器接收到了兩條相同訊息,故產生了訊息的重復生產,
另外,上述的重試,都是存在回應時長判斷(超出1min,就認為資料丟失),以及重試次數限制(超過5次,就不進行重試,否則,大量重試資料可能會拖垮整個服務),
訊息的可靠存盤
訊息的可靠存盤,是確保訊息在訊息服務器經過,或者說堆積時不會因為宕機,網路等狀況,丟失訊息,
網上很多博客在論述訊息的可靠性傳遞時,常常把這點遺漏,因為他們理所當然地認為訊息佇列已經通過集群等實作了訊息佇列服務的可用性,故訊息的可靠性存盤也就實作了,
但是這里存在兩個問題,第一,可靠性不等于可用性,第二,訊息的可靠存盤,作為訊息可靠性傳遞的一部分,是不可缺失的,
可用性:確保服務的可用,即對應的服務,可以提供服務,
可靠性:確保服務的正確,即對應的服務,提供的是正確的服務,
區別:我瀏覽淘寶,淘寶頁面打不開,這就涉及了可用性問題(可用性計算公式:可用時間/全部時長*100%),而我瀏覽淘寶,查詢訂單,給我顯示的是別人的訂單,這就涉及了可靠性問題,
另外這里再糾正一點,可靠性并不依賴于可用性,即使我打不開淘寶頁面,我也不能說淘寶提供訂單查詢就有問題(只是如果沒有了可用性,談論可靠性是非常沒有意義的,畢竟都用不了了,誰還關心其內容是否正確呢,都看不到)
訊息佇列的可用性,是通過多個節點構成集群,避免單點故障,從而提升可用性,
訊息佇列的可靠存盤,是通過備份實作(這里不糾結備份如何確保正確)的,如RabbitMq集群的MemNode與DiskNode,又或者Kafka的replication機制等,
訊息的可靠消費
訊息的可靠消費,就是確保訊息被消費者獲取,并被成功消費,避免由于訊息丟失,或者消費者宕機而造成訊息消費不成功,最終造成訊息的丟失(因為RabbitMq服務器在認為訊息被成功消費后,將對應資料洗掉或標記為“已消費”),
至于訊息的可靠消費,核心理念還是重試,重試,再重試,不過具體的實作就八仙過海,各顯神通了,
這里分別說一下RabbitMq,Kafka,Rocket三者對于可靠消費的處理:
RabbitMq
提供ack機制,默認是auto,直接在拿到訊息時,直接ack,確保了訊息到達了消費者,但是無法解決消費者消費失敗這樣的問題,
實際開發中,為了確保訊息的可靠消費,一般會設定為munal,只有在程式正確運行后,才會呼叫對應api,表示訊息正確消費,
Kafka
由于Kafka的訊息是落地到硬碟檔案的,而且Kafka的訊息分發方式是pull的,所以訊息的拉取是通過offset機制去確認對應位置訊息的,
當然,Kafka的offset默認是自動提交的(可通過nable_auto_commit與auto_commit_interval_ms控制),
所以消費者呼叫服務失敗等原因,可以通過手動offset提交,來實作對資料的重復消費(甚至是歷史資料的消費),也就可以在消費失敗時對同一訊息進行再消費,
如果是消費者宕機等原因,由于Kafka服務器沒有收到對應的offset提交,所以認為那條訊息沒有被消費成功,故回傳的依舊是那條訊息,
RocketMq
其實RocketMq的處理有些類似Kafka確認機制+RabbitMq死信佇列的感覺,
首先,消費者從RocketMq拉取訊息,如果成功消費,就回傳確認訊息,
如果未成功消費,就嘗試重新消費,
嘗試消費一定次數后(如5次),就會將該訊息發送之RocketMq中的重試佇列,
如果遇到消費者宕機的情況,RocketMq會認為該訊息未成功消費,會被其他消費者繼續消費,
其實在RabbitMq的可靠性消費時,我們也會將多次消費失敗的資料保存下來,便于后期修復等,不過保存的方式由很多種,日志,資料庫,訊息佇列等,而RocketMq則給出了具體的落實方案,
上述的操作,可能造成訊息的重復消費,
最簡單的例子,訊息成功被消費者消費,但是消費者還沒來得及發送確認資訊,就宕機了,
訊息佇列由于沒有收到確認訊息,認為該條訊息尚未被訊息,就將該訊息交由其他消費者繼續消費,
不多不少一次的訊息傳遞
這個方案,就是通過MQ以外的應用程式,來進行擴展,最終達到訊息準確消費的目的,
那么為什么不將這個功能,囊括在MQ中呢?
個人認為有四個方面的考慮:
- 訊息中間件,應該明確其功能域,而訊息生產與訊息消費往往涉及業務,所以避免與業務的耦合,所以訊息中間件只完善了可靠存盤,
- 準確消費,往往涉及MQ以外的部分,需要其他部分的配合,就類似與XA介面一樣,這樣會帶來編碼的約束,系統的耦合性等,
- 準確消費的實作可以通過一個工具,模塊去實作,但是不該硬編碼,畢竟現有的處理方案并不一定就是最優解(尤其是在調控中心,TCC框架展現的現在),
- 性能影響,為了一個不通用的功能,會帶來訊息中間件的性能大幅下降
優勢
- 確保訊息被準確消費(不多不少一次)
缺點
- 實作復雜(生產者與消費者都需要建立對應資料庫)
- 需要建立對應規范(但是通用規范確定后,實作就會變得快速)
- 資源占用的提升,如帶寬(每次訊息成功生產,消費都需要回傳一條資料進行確認)等
存在的問題
訊息存盤部分的準確存盤,不該我們來操心,所以只闡述訊息生產與訊息消費兩個部分,
訊息的重復生產
- 訊息發給了訊息佇列服務器,訊息佇列服務器的確認資訊由于網路波動等,沒有及時到達生產者
- 訊息發送給了訊息佇列服務器,生產者在接收訊息前,宕機
- 訊息發送給了訊息佇列服務器,生產者在接收訊息后,還沒來得及進行確認邏輯,宕機
綜上來看,就是訊息發出后,到生產者訊息確認資訊的處理之間,出現各種意外,導致重復生產,
訊息的重復消費
- 訊息已經被消費,消費者還沒來得及發送確認資訊,就宕機了
- 訊息已經被消費,消費者發出確認資訊,確認資訊由于網路波動等,沒有及時到達訊息佇列服務器
- 訊息已經被消費,消費者發出確認資訊,訊息佇列服務器對應實體在接收到確認資訊前,宕機
- 訊息已經被消費,消費者發出確認資訊,訊息佇列服務器接受到了確認資訊,還沒來得及進行確認邏輯,宕機
綜上來看,就是訊息已經被消費后,到訊息佇列服務器進行確認訊息處理之間,出現各種意外,導致重復消費,
解決方案
解決方案:messageId+冪等
準確來說,解決方案的核心是冪等,而messageId是作為輔助手段的,
冪等
冪等,簡單說明一下,就是多次操作與單次操作對系統狀態的影響是一致的,
如
i = 1;
就是冪等操作,因為無論進行幾次,i的值都沒有變化,
而
i++;
則不是冪等操作,因為i的值與執行次數息息相關,
故通過冪等操作來確保同一條訊息,不被執行多次,
messageId
但是,消費者如何確定是否為同一條訊息呢?
有的訊息體存在唯一性欄位,如orderId等,但有的訊息并沒有這樣的唯一性欄位,
所以需要一個專門的欄位,來表示唯一性,并且與業務訊息解耦,這就是messageId,
既可以采用訊息體的唯一性欄位(可以是單一欄位,也可以是組合欄位),也可以通過特定方式生成對應標識(分布式系統,需要注意不同實體生產者產生相同標識的可能,詳見分布式全域唯一ID的實作),
具體的生成情況,就不在這里贅述了,
方案落實
先來一張大圖(這種事情,圖片展示最直觀了),展示一下流程:

(圖片是絕對清晰的,看不清圖片的朋友,請將圖片在新頁面打開,或下載,說實話,來到新公司,首先提升的就是畫圖能力,囧)
簡單說一下流程,大家可以對照著上圖,看一下:
生產者到訊息中間件服務器
- 生產者根據需要發送的訊息,生成對應messageId,并封裝對應message至生產者資料庫(該操作應該利用事務性,確保生產者事件處理與message保存至資料庫的原子性),同時標注message狀態為sending(發送中狀態)
- 將對應message發送給訊息佇列服務器
- 如果沒有收到生產確認資訊,則重新發送message(如果這個時候遇到生產者實體宕機,也不用擔心,因為后續會有補償程式,進行補償重發操作)
- 當收到訊息中間件服務器的訊息生產確認訊息(即確定訊息已經達到訊息中間件服務器),將資料庫中對應message的狀態修改為sended(已發送狀態)
上述中提到的補償機制,其實是類似事務中的一個操作,通過一個定時任務,定時巡檢資料庫處于sending狀態的message,并通過生產者極性發送(所以message一般都保存source,target等資訊),
之所以會有sending狀態的message,就是因為存在生產者訊息發送出去了,還沒收到生產確認資訊,結果生產者實體自己宕機的情況,
至于補償機制的定時任務,是一個非常簡單的實作,這里就不再贅述了,
訊息中間件到消費者
這里進行的操作是針對非冪等的操作,
如果是冪等操作,則可以直接進行,畢竟多次執行與單次執行對資料庫的影響是一致的,
但是注意冪等操作在部分場景下無效的問題(時間影響上),如“余額 = 1k”的操作對于資料庫而言是冪等的,但是在兩次“余額 = 1k”操作間,有一個“余額 = 2k”的操作,則會發生問題(丟失了“余額 = 2k”操作),當然,這種類似ABA問題,完全可以引入版本號,來進行解決,
綜上,還是推薦采用以下解決方法,流程較為簡單:
- 消費者獲取資料
- 消費者判斷資料庫是否有對應message
- 如果存在對應message,則放棄執行(因為這是一個重復操作)
- 如果不存在,則進行相關訊息處理,并通過事務控制,在消費者資料庫中添加message(確保訊息的處理與資料庫添加message是原子操作)
至此,訊息的準確傳遞就完成了,
總結
訊息可靠性傳遞的發展程序,也體現了人們對訊息中間件功能的一步步追求,更是體現了工程師們解決問題的思路,
很多時候,我們會遇到很多問題,甚至令人感到雜亂不堪,無從下手,這個時候,最好的辦法就是靜下心來,對它們進行劃分(按照重要程度,緊迫度,實作難度),再進行一個長期規劃,一步步來解決,往往這個時候,動動筆,在筆記上列下清單,會是一個不錯的辦法,
其中訊息的準確傳遞,涉及一些事務相關的內容,也許有人已經聯想到,訊息佇列是否可以作為分布式事務的一種手段呢?我會在之后的博客中,來闡述分布式事務這一重要主題,
如果有什么問題或想法,可以私信或@我,
愿與諸君共進步,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/22680.html
標籤:架構設計
下一篇:微服務,為什么從前后端分離開始?
