作者:Jaskey Lam
來源:https://jaskey.github.io/blog/2020/06/08/rocketmq-message-dedup/
訊息中間件是分布式系統常用的組件,無論是異步化、解耦、削峰等都有廣泛的應用價值,我們通常會認為,訊息中間件是一個可靠的組件——這里所謂的可靠是指,只要我把訊息成功投遞到了訊息中間件,訊息就不會丟失,即訊息肯定會至少保證訊息能被消費者成功消費一次,這是訊息中間件最基本的特性之一,也就是我們常說的“AT LEAST ONCE”,即訊息至少會被“成功消費一遍”,
舉個例子,一個訊息M發送到了訊息中間件,訊息投遞到了消費程式A,A接受到了訊息,然后進行消費,但在消費到一半的時候程式重啟了,這時候這個訊息并沒有標記為消費成功,這個訊息還會繼續投遞給這個消費者,直到其消費成功了,訊息中間件才會停止投遞,
然而這種可靠的特性導致,訊息可能被多次地投遞,舉個例子,還是剛剛這個例子,程式A接受到這個訊息M并完成消費邏輯之后,正想通知訊息中間件“我已經消費成功了”的時候,程式就重啟了,那么對于訊息中間件來說,這個訊息并沒有成功消費過,所以他還會繼續投遞,這時候對于應用程式A來說,看起來就是這個訊息明明消費成功了,但是訊息中間件還在重復投遞,
這在RockectMQ的場景來看,就是同一個messageId的訊息重復投遞下來了,
基于訊息的投遞可靠(訊息不丟)是優先級更高的,所以訊息不重的任務就會轉移到應用程式自我實作,這也是為什么RocketMQ的檔案里強調的,消費邏輯需要自我實作冪等,背后的邏輯其實就是:不丟和不重是矛盾的(在分布式場景下),但訊息重復是有解決方案的,而訊息丟失是很麻煩的,
簡單的訊息去重解決方案
例如:假設我們業務的訊息消費邏輯是:插入某張訂單表的資料,然后更新庫存:
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';
要實作訊息的冪等,我們可能會采取這樣的方案:
select * from t_order where order_no = 'order123'
if(order != null) {
return ;//訊息重復,直接回傳
}
這對于很多情況下,的確能起到不錯的效果,但是在并發場景下,還是會有問題,
并發重復訊息
假設這個消費的所有代碼加起來需要1秒,有重復的訊息在這1秒內(假設100毫秒)內到達(例如生產者快速重發,Broker重啟等),那么很可能,上面去重代碼里面會發現,資料依然是空的(因為上一條訊息還沒消費完,還沒成功更新訂單狀態),
那么就會穿透掉檢查的擋板,最后導致重復的訊息消費邏輯進入到非冪等安全的業務代碼中,從而引發重復消費的問題(如主鍵沖突拋出例外、庫存被重復扣減而沒釋放等)
并發去重的解決方案之一
要解決上面并發場景下的訊息冪等問題,一個可取的方案是開啟事務把select 改成 select for update陳述句,把記錄進行鎖定,
select * from t_order where order_no = 'THIS_ORDER_NO' for update //開啟事務 if(order.status != null) {
return ;//訊息重復,直接回傳
}
但這樣消費的邏輯會因為引入了事務包裹而導致整個訊息消費可能變長,并發度下降,
當然還有其他更高級的解決方案,例如更新訂單狀態采取樂觀鎖,更新失敗則訊息重新消費之類的,但這需要針對具體業務場景做更復雜和細致的代碼開發、庫表設計,不在本文討論的范圍,
但無論是select for update, 還是樂觀鎖這種解決方案,實際上都是基于業務表本身做去重,這無疑增加了業務開發的復雜度, 一個業務系統里面很大部分的請求處理都是依賴MQ的,如果每個消費邏輯本身都需要基于業務本身而做去重/冪等的開發的話,這是繁瑣的作業量,本文希望探索出一個通用的訊息冪等處理的方法,從而抽象出一定的工具類用以適用各個業務場景,
Exactly Once
在訊息中間件里,有一個投遞語意的概念,而這個語意里有一個叫”Exactly Once”,即訊息肯定會被成功消費,并且只會被消費一次,以下是阿里云里對Exactly Once的解釋:
Exactly-Once 是指發送到訊息系統的訊息只能被消費端處理且僅處理一次,即使生產端重試訊息發送導致某訊息重復投遞,該訊息在消費端也只被消費一次,
在我們業務訊息冪等處理的領域內,可以認為業務訊息的代碼肯定會被執行,并且只被執行一次,那么我們可以認為是Exactly Once,
但這在分布式的場景下想找一個通用的方案幾乎是不可能的,不過如果是針對基于資料庫事務的消費邏輯,實際上是可行的,
基于關系資料庫事務插入訊息表
假設我們業務的訊息消費邏輯是:更新MySQL資料庫的某張訂單表的狀態:
update t_order set status = 'SUCCESS' where order_no= 'order123';
要實作Exaclty Once即這個訊息只被消費一次(并且肯定要保證能消費一次),我們可以這樣做:在這個資料庫中增加一個訊息消費記錄表,把訊息插入到這個表,并且把原來的訂單更新和這個插入的動作放到同一個事務中一起提交,就能保證訊息只會被消費一遍了,
- 開啟事務
- 插入訊息表(處理好主鍵沖突的問題)
- 更新訂單表(原消費邏輯)
- 提交事務
說明:
- 這時候如果訊息消費成功并且事務提交了,那么訊息表就插入成功了,這時候就算RocketMQ還沒有收到消費位點的更新再次投遞,也會插入訊息失敗而視為已經消費過,后續就直接更新消費位點了,這保證我們消費代碼只會執行一次,
- 如果事務提交之前服務掛了(例如重啟),對于本地事務并沒有執行所以訂單沒有更新,訊息表也沒插入成功;而對于RocketMQ服務端來說,消費位點也沒更新,所以訊息還會繼續投遞下來,投遞下來發現這個訊息插入訊息表也是成功的,所以可以繼續消費,這保證了訊息不丟失,
事實上,阿里云ONS的EXACTLY-ONCE語意的實作上,就是類似這個方案基于資料庫的事務特性實作的,
基于這種方式,的確這是有能力拓展到不同的應用場景,因為他的實作方案與具體業務本身無關——而是依賴一個訊息表,
但是這里有它的局限性
- 訊息的消費邏輯必須是依賴于關系型資料庫事務,如果消費的消費程序中還涉及其他資料的修改,例如Redis這種不支持事務特性的資料源,則這些資料是不可回滾的,
- 資料庫的資料必須是在一個庫,跨庫無法解決
注:業務上,訊息表的設計不應該以訊息ID作為標識,而應該以業務的業務主鍵作為標識更為合理,以應對生產者的重發,阿里云上的訊息去重只是RocketMQ的messageId,在生產者因為某些原因手動重發(例如上游針對一個交易重復請求了)的場景下起不到去重/冪等的效果(因訊息id不同),
更復雜的業務場景
如上所述,這種方式Exactly Once語意的實作,實際上有很多局限性,這種局限性使得這個方案基本不具備廣泛應用的價值,并且由于基于事務,可能導致鎖表時間過長等性能問題,
例如我們以一個比較常見的一個訂單申請的訊息來舉例,可能有以下幾步(以下統稱為步驟X):
- 檢查庫存(RPC)
- 鎖庫存(RPC)
- 開啟事務,插入訂單表(MySQL)
- 呼叫某些其他下游服務(RPC)
- 更新訂單狀態
- commit 事務(MySQL)
這種情況下,我們如果采取訊息表+本地事務的實作方式,訊息消費程序中很多子程序是不支持回滾的,也就是說就算我們加了事務,實際上這背后的操作并不是原子性的,怎么說呢,就是說有可能第一條小在經歷了第二步鎖庫存的時候,服務重啟了,這時候實際上庫存是已經在另外的服務里被鎖定了,這并不能被回滾,當然訊息還會再次投遞下來,要保證訊息能至少消費一遍,換句話說,鎖庫存的這個RPC介面本身依舊要支持“冪等”,
再者,如果在這個比較耗時的長鏈條場景下加入事務的包裹,將大大的降低系統的并發,所以通常情況下,我們處理這種場景的訊息去重的方法還是會使用一開始說的業務自己實作去重邏輯的方式,如前面加select for update,或者使用樂觀鎖,
那我們有沒有方法抽取出一個公共的解決方案,能兼顧去重、通用、高性能呢?
拆解訊息執行程序
其中一個思路是把上面的幾步,拆解成幾個不同的子訊息,例如:
- 庫存系統消費A:檢查庫存并做鎖庫存,發送訊息B給訂單服務
- 訂單系統消費訊息B:插入訂單表(MySQL),發送訊息C給自己(下游系統)消費
- 下游系統消費訊息C:處理部分邏輯,發送訊息D給訂單系統
- 訂單系統消費訊息D:更新訂單狀態
注:上述步驟需要保證本地事務和訊息是一個事務的(至少是最終一致性的),這其中涉及到分布式事務訊息相關的話題,不在本文論述,
可以看到這樣的處理方法會使得每一步的操作都比較原子,而原子則意味著是小事務,小事務則意味著使用訊息表+事務的方案顯得可行,
然而,這太復雜了!這把一個本來連續的代碼邏輯割裂成多個系統多次訊息互動!那還不如業務代碼層面上加鎖實作呢,
更通用的解決方案
上面訊息表+本地事務的方案之所以有其局限性和并發的短板,究其根本是因為它依賴于關系型資料庫的事務,且必須要把事務包裹于整個訊息消費的環節,
如果我們能不依賴事務而實作訊息的去重,那么方案就能推廣到更復雜的場景例如:RPC、跨庫等,
例如,我們依舊使用訊息表,但是不依賴事務,而是針對訊息表增加消費狀態,是否可以解決問題呢?
基于訊息冪等表的非事務方案

以上是去事務化后的訊息冪等方案的流程,可以看到,此方案是無事務的,而是針對訊息表本身做了狀態的區分:消費中、消費完成,只有消費完成的訊息才會被冪等處理掉,
而對于已有消費中的訊息,后面重復的訊息會觸發延遲消費(在RocketMQ的場景下即發送到RETRY TOPIC),之所以觸發延遲消費是為了控制并發場景下,第二條訊息在第一條訊息沒完成的程序中,去控制訊息不丟(如果直接冪等,那么會丟失訊息(同一個訊息id的話),因為上一條訊息如果沒有消費完成的時候,第二條訊息你已經告訴broker成功了,那么第一條訊息這時候失敗broker也不會重新投遞了)
上面的流程不再細說,后文有github原始碼的地址,讀者可以參考原始碼的實作,這里我們回頭看看我們一開始想解決的問題是否解決了:
- 訊息已經消費成功了,第二條訊息將被直接冪等處理掉(消費成功),
- 并發場景下的訊息,依舊能滿足不會出現訊息重復,即穿透冪等擋板的問題,
- 支持上游業務生產者重發的業務重復的訊息冪等問題,
關于第一個問題已經很明顯已經解決了,在此就不討論了,
關于第二個問題是如何解決的?主要是依靠插入訊息表的這個動作做控制的,假設我們用MySQL作為訊息表的存盤媒介(設定訊息的唯一ID為主鍵),那么插入的動作只有一條訊息會成功,后面的訊息插入會由于主鍵沖突而失敗,走向延遲消費的分支,然后后面延遲消費的時候就會變成上面第一個場景的問題,
關于第三個問題,只要我們設計去重的訊息鍵讓其支持業務的主鍵(例如訂單號、請求流水號等),而不僅僅是messageId即可,所以也不是問題,
此方案是否有訊息丟失的風險?
如果細心的讀者可能會發現這里實際上是有邏輯漏洞的,問題出在上面聊到的個三問題中的第2個問題(并發場景),在并發場景下我們依賴于訊息狀態是做并發控制使得第2條訊息重復的訊息會不斷延遲消費(重試),但如果這時候第1條訊息也由于一些例外原因(例如機器重啟了、外部例外導致消費失敗)沒有成功消費成功呢?也就是說這時候延遲消費實際上每次下來看到的都是消費中的狀態,最后消費就會被視為消費失敗而被投遞到死信Topic中(RocketMQ默認可以重復消費16次),
有這種顧慮是正確的!對于此,我們解決的方法是,插入的訊息表必須要帶一個最長消費過期時間,例如10分鐘,意思是如果一個訊息處于消費中超過10分鐘,就需要從訊息表中洗掉(需要程式自行實作),所以最后這個訊息的流程會是這樣的:

更靈活的訊息表存盤媒介
我們這個方案實際上沒有事務的,只需要一個存盤的中心媒介,那么自然我們可以選擇更靈活的存盤媒介,例如Redis,使用Redis有兩個好處:
- 性能上損耗更低
- 上面我們講到的超時時間可以直接利用Redis本身的ttl實作
當然Redis存盤的資料可靠性、一致性等方面是不如MySQL的,需要用戶自己取舍,
原始碼:RocketMQDedupListener
以上方案針對RocketMQ的Java實作已經開源放到Github中,具體的使用檔案可以參考:
https://github.com/Jaskey/RocketMQDedupListener
以下僅貼一個Readme中利用Redis去重的使用樣例,用以意業務中如果使用此工具加入訊息去重冪等的是多么簡單:
//利用Redis做冪等表
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
consumer.subscribe("TEST-TOPIC", "*");
String appName = consumer.getConsumerGroup();// 大部分情況下可直接使用consumer group名
StringRedisTemplate stringRedisTemplate = null;// 這里省略獲取StringRedisTemplate的程序
DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
DedupConcurrentListener messageListener = new SampleListener(dedupConfig);
consumer.registerMessageListener(messageListener);
consumer.start();
以上代碼大部分是原始RocketMQ的必須代碼,唯一需要修改的僅僅是創建一個DedupConcurrentListener示例,在這個示例中指明你的消費邏輯和去重的業務鍵(默認是messageId),
更多使用詳情請參考Github上的說明,
這種實作是否一勞永逸?
實作到這里,似乎方案挺完美的,所有的訊息都能快速的接入去重,且與具體業務實作也完全解耦,那么這樣是否就完美的完成去重的所有任務呢?
很可惜,其實不是的,原因很簡單:因為要保證訊息至少被成功消費一遍,那么訊息就有機會消費到一半的時候失敗觸發訊息重試的可能,還是以上面的訂單流程X:
- 檢查庫存(RPC)
- 鎖庫存(RPC)
- 開啟事務,插入訂單表(MySQL)
- 呼叫某些其他下游服務(RPC)
- 更新訂單狀態
- commit 事務(MySQL)
當訊息消費到步驟3的時候,我們假設MySQL例外導致失敗了,觸發訊息重試,因為在重試前我們會洗掉冪等表的記錄,所以訊息重試的時候就會重新進入消費代碼,那么步驟1和步驟2就會重新再執行一遍,如果步驟2本身不是冪等的,那么這個業務訊息消費依舊沒有做好完整的冪等處理,
本實作方式的價值?
那么既然這個并不能完整的完成訊息冪等,還有什么價值呢?價值可就大了!雖然這不是解決訊息冪等的銀彈(事實上,軟體工程領域里基本沒有銀彈),但是他能以便捷的手段解決:
1.各種由于Broker、負載均衡等原因導致的訊息重投遞的重復問題
2.各種上游生產者導致的業務級別訊息重復問題
3.重復訊息并發消費的控制視窗問題,就算重復,重復也不可能同一時間進入消費邏輯
一些其他的訊息去重的建議
也就是說,使用這個方法能保證正常的消費邏輯場景下(無例外,無例外退出),訊息的冪等作業全部都能解決,無論是業務重復,還是rocketmq特性帶來的重復,
事實上,這已經能解決99%的訊息重復問題了,畢竟例外的場景肯定是少數的,那么如果希望例外場景下也能處理好冪等的問題,可以做以下作業降低問題率:
- 訊息消費失敗做好回滾處理,如果訊息消費失敗本身是帶回滾機制的,那么訊息重試自然就沒有副作用了,
- 消費者做好優雅退出處理,這是為了盡可能避免訊息消費到一半程式退出導致的訊息重試,
- 一些無法做到冪等的操作,至少要做到終止消費并告警,例如鎖庫存的操作,如果統一的業務流水鎖成功了一次庫存,再觸發鎖庫存,如果做不到冪等的處理,至少要做到訊息消費觸發例外(例如主鍵沖突導致消費例外等)
- 在#3做好的前提下,做好訊息的消費監控,發現訊息重試不斷失敗的時候,手動做好#1的回滾,使得下次重試消費成功,
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2021最新版)
2.終于靠開源專案弄到 IntelliJ IDEA 激活碼了,真香!
3.阿里 Mock 工具正式開源,干掉市面上所有 Mock 工具!
4.Spring Cloud 2020.0.0 正式發布,全新顛覆性版本!
5.《Java開發手冊(嵩山版)》最新發布,速速下載!
覺得不錯,別忘了隨手點贊+轉發哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/292463.html
標籤:其他
下一篇:Java基本程式設計結構
