我是3y,一年CRUD經驗用十年的markdown程式員???????常年被譽為優質八股文選手

今天繼續更新austin專案,如果還沒看過該系列的同學可以點開我的歷史文章回顧下,在看的程序中不要忘記了點贊喲!建議不要漏了或者跳著看,不然這篇就看不懂了,之前寫過的知識點和業務我就不再贅述啦,
今天要實作的是handler模塊的消費資料隔離,在聊這個之前,先看下之前的實作是怎么樣的,
austin-api接收到了請求之后,將請求發往Kafka,topicName為austin,而在austin-handler起了一個groupName名為austinGroup監聽austin這個topic的資料,進而實作訊息發送,
從系統架構來說,austin專案是可以發送多種型別訊息的:短信、微信小程式、郵件等等等
那如果是單個topic單個group的話,有沒有想過一個問題:如果某個發送渠道介面存在例外,超時了,此時會怎么樣?
沒錯,訊息都會堵住,因為它們消費同一個topic,用的是同一個消費者,
01、資料隔離
要破局?很簡單,多topic多group就行啦,
上面這種能解決所有問題嗎?并不,即便是同一個渠道,但不同型別的訊息發送特性是不一樣的,比如我要發push營銷訊息,有可能在某個時刻就要推送4000W的人群,
那這4000W人在短時間內完全發送出去,不太現實,這很可能意味著會影響到通知類的push訊息
還要破局?很簡單, 畢竟我們在設計訊息模板的時候就已經考慮到這點了,訊息模板有msgType欄位來標識當前的模板屬于哪種型別,那我們可以根據不同的訊息型別再劃分對應的group,
從理論上來說,我們可以為每種渠道的每種訊息型別單獨區分一個topic和group,因為topic間的資料是隔離的,不同的group間消費也是隔離的,那我們消費時肯定是資料隔離的,
不過,我目前的做法是:單topic多group,消費是隔離的,但生產的topic是共享的,我認為這樣代碼會更加清晰和易懂些,后期如果存在瓶頸了我們可以繼續改,
02、消費端設計
從上面已經定了通過單topic多group來實作資料隔離,比如,我目前定義了6個渠道(im/push/郵件/短信/小程式/微信服務號)和3種訊息型別(通知/營銷/驗證碼),那相當于起了18個消費者,
從kafka獲取得到訊息以后,我暫定規劃是走幾個步驟:訊息丟棄->去重->真正發送
從本質上看去重和發送訊息都是網路IO密集型,于是,為了提高吞吐量,我這邊決定消費Kafka后存入快取,做一層緩沖區,
做一層緩沖區可提高吞吐量,但同樣會帶來別的問題,如:當應用重啟時,緩沖區的資料還沒消費完,那是不是就會丟失?
這個我們可以后面再看看怎么把帶來的問題給搞掂(持續關注,專案優化后面多著呢),現在還是認為緩沖區的利大于弊,所以回到緩沖區上,
緩沖區給我的第一反應是實作生產者消費者模式
要實作這種模式,我初想了下挺簡單的:消費Kafka的訊息作為生產者,然后把資料扔進阻塞佇列上,開多個執行緒去消費阻塞佇列的資料就完事了,
后來又想了下,直接執行緒池不就完事了嗎?執行緒池不就是生產者和消費者的實作嗎,
于是乎,架構就變成了下圖:
03、代碼設計
在消費端首先看Receiver的代碼,該類看起來看簡單,就只有一個@KafkaListener注解修飾方法,從Kafka消費出來隨后交給pending做處理
我用的是@KafkaListener注解從Kafka拉取訊息,而沒有用低級的Kafka api,原因無他:在專案前期無需做到完美,等有瓶頸的時候再想辦法就好了,雖說如此,但我寫的時候還是給我帶來了不少的麻煩,
第一個問題:@KafkaListener是一個注解,從原始碼注釋看它的傳值只能夠用Spring EL運算式和讀取某個配置,但要知道的是,我的目的是想有多個group消費同一個topic,而我不可能說給每個group都定義一個消費的方法吧?(寫這種破代碼,我都睡不著覺)
翻了一個晚上技術博客我都沒找到方案,甚至還發了個朋友圈吐槽下有沒有人遇到過,第二天我仔細翻了下Spring的官方檔案,終于給我找到了方案,
還是官方檔案實在!
有了解決辦法了以后,那事情就好辦了,既然我是每種訊息渠道的每種訊息型別都要隔離,那我把這給列舉出來就完事啦!
我的Receiver是多例的,那么只要我遍歷這個List就好了(初始化消費者在ReceiverStart類上),
解決了用@KafkaListener注解動態傳入groupId 進而創建多個消費者了之后,
我又遇到了第二個問題:Spring有@Aysnc注解來優雅實作執行緒池的方法呼叫,我之前是沒用過@Aysnc注解的,但我看了下原理和使用姿勢,我感覺這樣挺優雅的(優雅永不過時),但是用@Aysnc是肯定要自己創建執行緒池,并且我要給每個消費者都創建自己獨有的執行緒池,而我不可能說給每個group都定義一個創建執行緒池的方法吧?(寫這種破代碼,我都睡不著覺)
這次翻了官網和各種技術博客,都沒能解決掉我的問題:在Spring環境下@Async注解上動態傳入執行緒池實體,以及創建執行緒池實體時可支持根據條件傳參,
最后只能放棄掉@Aysnc注解了,以編程的方式去實作:
下面是TaskPendingHolder的實作(無非就是給每個消費者創建對應的執行緒池),后面會考慮是否做成動態的:
而Task實作目前就比較簡單啦,直接呼叫對應的Handler進而下發訊息就好:
04、總結
代碼看似簡單,業務看似容易理解,但是要知道的是即便是很多小公司的生產專案都沒有這種設計,一把梭可真的是太常見了(功能又不是不能實作,代碼又不是不能跑,最主要的:人也不是不能跑)
這篇文章主要講述了一個思路:在消費MQ的時候,多group是可以實作資料隔離的,想要提高消費的吞吐量,可以再做一層緩沖區(前提是消費是IO密集型的)
關注我的微信公眾號【Java3y】除了技術我還會聊點日常,有些話只能悄悄說~ 【對線面試官+從零撰寫Java專案】 持續高強度更新中!求star!!原創不易!!求三連!!
原始碼Gitee鏈接:gitee.com/austin
原始碼GitHub鏈接:github.com/austin
更多的文章可往:文章的目錄導航轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/426366.html
標籤:其他
上一篇:Pywifi用法-python
