訊息佇列掃盲
訊息佇列顧名思義就是存放訊息的佇列,佇列我就不解釋了,別告訴我你連佇列都不知道似啥吧?
所以問題并不是訊息佇列是什么,而是 訊息佇列為什么會出現?訊息佇列能用來干什么?用它來干這些事會帶來什么好處?訊息佇列會帶來副作用嗎?
訊息佇列為什么會出現?
訊息佇列算是作為后端程式員的一個必備技能吧,因為分布式應用必定涉及到各個系統之間的通信問題,這個時候訊息佇列也應運而生了,可以說分布式的產生是訊息佇列的基礎,而分布式怕是一個很古老的概念了吧,所以訊息佇列也是一個很古老的中間件了,
訊息佇列能用來干什么?
異步
你可能會反駁我,應用之間的通信又不是只能由訊息佇列解決,好好的通信為什么中間非要插一個訊息佇列呢?我不能直接進行通信嗎?
很好??,你又提出了一個概念,同步通信,就比如現在業界使用比較多的 Dubbo 就是一個適用于各個系統之間同步通信的 RPC 框架,
我來舉個??吧,比如我們有一個購票系統,需求是用戶在購買完之后能接收到購買完成的短信,

我們省略中間的網路通信時間消耗,假如購票系統處理需要 150ms ,短信系統處理需要 200ms ,那么整個處理流程的時間消耗就是 150ms + 200ms = 350ms,
當然,乍看沒什么問題,可是仔細一想你就感覺有點問題,我用戶購票在購票系統的時候其實就已經完成了購買,而我現在通過同步呼叫非要讓整個請求拉長時間,而短息系統這玩意又不是很有必要,它僅僅是一個輔助功能增強用戶體驗感而已,我現在整個呼叫流程就有點 頭重腳輕 的感覺了,購票是一個不太耗時的流程,而我現在因為同步呼叫,非要等待發送短信這個比較耗時的操作才回傳結果,那我如果再加一個發送郵件呢?

這樣整個系統的呼叫鏈又變長了,整個時間就變成了550ms,
當我們在學生時代需要在食堂排隊的時候,我們和食堂大媽就是一個同步的模型,
我們需要告訴食堂大媽:“姐姐,給我加個雞腿,再加個酸辣土豆絲,幫我澆點汁上去,多打點飯哦” 咦~~~~~ 為了多吃點,真惡心,
然后大媽幫我們打飯配菜,我們看著大媽那顫抖的手和掉落的土豆絲不禁咽了咽口水,
最終我們從大媽手中接過飯菜然后去尋找座位了...
回想一下,我們在給大媽發送需要的資訊之后我們是 同步等待大媽給我配好飯菜 的,上面我們只是加了雞腿和土豆絲,萬一我再加一個番茄牛腩,韭菜雞蛋,這樣是不是大媽打飯配菜的流程就會變長,我們等待的時間也會相應的變長,

那后來,我們作業賺錢了有錢去飯店吃飯了,我們告訴服務員來一碗牛肉面加個荷包蛋 (傳達一個訊息) ,然后我們就可以在飯桌上安心的玩手機了 (干自己其他事情) ,等到我們的牛肉面上了我們就可以吃了,這其中我們也就傳達了一個訊息,然后我們又轉過頭干其他事情了,這其中雖然做面的時間沒有變短,但是我們只需要傳達一個訊息就可以看其他事情了,這是一個 異步 的概念,
所以,為了解決這一個問題,聰明的程式員在中間也加了個類似于服務員的中間件——訊息佇列,這個時候我們就可以把模型給改造了,

這樣,我們在將訊息存入訊息佇列之后我們就可以直接回傳了(我們告訴服務員我們要吃什么然后玩手機),所以整個耗時只是 150ms + 10ms = 160ms,
但是你需要注意的是,整個流程的時長是沒變的,就像你僅僅告訴服務員要吃什么是不會影響到做面的速度的,
解耦
回到最初同步呼叫的程序,我們寫個偽代碼簡單概括一下,

那么第二步,我們又添加了一個發送郵件,我們就得重新去修改代碼,如果我們又加一個需求:用戶購買完還需要給他加積分,這個時候我們是不是又得改代碼?

如果你覺得還行,那么我這個時候不要發郵件這個服務了呢,我是不是又得改代碼,又得重啟應用?

這樣改來改去是不是很麻煩,那么 此時我們就用一個訊息佇列在中間進行解耦 ,你需要注意的是,我們后面的發送短信、發送郵件、添加積分等一些操作都依賴于上面的 result ,這東西抽象出來就是購票的處理結果呀,比如訂單號,用戶賬號等等,也就是說我們后面的一系列服務都是需要同樣的訊息來進行處理,既然這樣,我們是不是可以通過 “廣播訊息” 來實作,
我上面所講的“廣播”并不是真正的廣播,而是接下來的系統作為消費者去 訂閱 特定的主題,比如我們這里的主題就可以叫做 訂票 ,我們購買系統作為一個生產者去生產這條訊息放入訊息佇列,然后消費者訂閱了這個主題,會從訊息佇列中拉取訊息并消費,就比如我們剛付訓的那張圖,你會發現,在生產者這邊我們只需要關注 生產訊息到指定主題中 ,而 消費者只需要關注從指定主題中拉取訊息 就行了,

如果沒有訊息佇列,每當一個新的業務接入,我們都要在主系統呼叫新介面、或者當我們取消某些業務,我們也得在主系統洗掉某些介面呼叫,有了訊息佇列,我們只需要關心訊息是否送達了佇列,至于誰希望訂閱,接下來收到訊息如何處理,是下游的事情,無疑極大地減少了開發和聯調的作業量,
削峰
我們再次回到一開始我們使用同步呼叫系統的情況,并且思考一下,如果此時有大量用戶請求購票整個系統會變成什么樣?

如果,此時有一萬的請求進入購票系統,我們知道運行我們主業務的服務器配置一般會比較好,所以這里我們假設購票系統能承受這一萬的用戶請求,那么也就意味著我們同時也會出現一萬呼叫發短信服務的請求,而對于短信系統來說并不是我們的主要業務,所以我們配備的硬體資源并不會太高,那么你覺得現在這個短信系統能承受這一萬的峰值么,且不說能不能承受,系統會不會 直接崩潰 了?
短信業務又不是我們的主業務,我們能不能 折中處理 呢?如果我們把購買完成的資訊發送到訊息佇列中,而短信系統 盡自己所能地去訊息佇列中取訊息和消費訊息 ,即使處理速度慢一點也無所謂,只要我們的系統沒有崩潰就行了,
留得江山在,還怕沒柴燒?你敢說每次發送驗證碼的時候是一發你就收到了的么?
訊息佇列能帶來什么好處?
其實上面我已經說了,異步、解耦、削峰, 哪怕你上面的都沒看懂也千萬要記住這六個字,因為他不僅是訊息佇列的精華,更是編程和架構的精華,
訊息佇列會帶來副作用嗎?
沒有哪一門技術是“銀彈”,訊息佇列也有它的副作用,
比如,本來好好的兩個系統之間的呼叫,我中間加了個訊息佇列,如果訊息佇列掛了怎么辦呢?是不是 降低了系統的可用性 ?
那這樣是不是要保證HA(高可用)?是不是要搞集群?那么我 整個系統的復雜度是不是上升了 ?
拋開上面的問題不講,萬一我發送方發送失敗了,然后執行重試,這樣就可能產生重復的訊息,
或者我消費端處理失敗了,請求重發,這樣也會產生重復的訊息,
對于一些微服務來說,消費重復訊息會帶來更大的麻煩,比如增加積分,這個時候我加了多次是不是對其他用戶不公平?
那么,又 如何解決重復消費訊息的問題 呢?
如果我們此時的訊息需要保證嚴格的順序性怎么辦呢?比如生產者生產了一系列的有序訊息(對一個id為1的記錄進行洗掉增加修改),但是我們知道在發布訂閱模型中,對于主題是無順序的,那么這個時候就會導致對于消費者消費訊息的時候沒有按照生產者的發送順序消費,比如這個時候我們消費的順序為修改洗掉增加,如果該記錄涉及到金額的話是不是會出大事情?
那么,又 如何解決訊息的順序消費問題 呢?
就拿我們上面所講的分布式系統來說,用戶購票完成之后是不是需要增加賬戶積分?在同一個系統中我們一般會使用事務來進行解決,如果用 Spring 的話我們在上面偽代碼中加入 @Transactional 注解就好了,但是在不同系統中如何保證事務呢?總不能這個系統我扣錢成功了你那積分系統積分沒加吧?或者說我這扣錢明明失敗了,你那積分系統給我加了積分,
那么,又如何 解決分布式事務問題 呢?
我們剛剛說了,訊息佇列可以進行削峰操作,那如果我的消費者如果消費很慢或者生產者生產訊息很快,這樣是不是會將訊息堆積在訊息佇列中?
那么,又如何 解決訊息堆積的問題 呢?
可用性降低,復雜度上升,又帶來一系列的重復消費,順序消費,分布式事務,訊息堆積的問題,這訊息佇列還怎么用啊???

別急,辦法總是有的,
RocketMQ是什么?

哇,你個混蛋!上面給我拋出那么多問題,你現在又講 RocketMQ ,還讓不讓人活了?!
別急別急,話說你現在清楚 MQ 的構造嗎,我還沒講呢,我們先搞明白 MQ 的內部構造,再來看看如何解決上面的一系列問題吧,不過你最好帶著問題去閱讀和了解喔,
RocketMQ 是一個 佇列模型 的訊息中間件,具有高性能、高可靠、高實時、分布式 的特點,它是一個采用 Java 語言開發的分布式的訊息系統,由阿里巴巴團隊開發,在2016年底貢獻給 Apache,成為了 Apache 的一個頂級專案, 在阿里內部,RocketMQ 很好地服務了集團大大小小上千個應用,在每年的雙十一當天,更有不可思議的萬億級訊息通過 RocketMQ 流轉,
廢話不多說,想要了解 RocketMQ 歷史的同學可以自己去搜尋資料,聽完上面的介紹,你只要知道 RocketMQ 很快、很牛、而且經歷過雙十一的實踐就行了!
佇列模型和主題模型
在談 RocketMQ 的技術架構之前,我們先來了解一下兩個名詞概念——佇列模型 和 主題模型 ,
首先我問一個問題,訊息佇列為什么要叫訊息佇列?
你可能覺得很弱智,這玩意不就是存放訊息的佇列嘛?不叫訊息佇列叫什么?
的確,早期的訊息中間件是通過 佇列 這一模型來實作的,可能是歷史原因,我們都習慣把訊息中間件成為訊息佇列,
但是,如今例如 RocketMQ 、Kafka 這些優秀的訊息中間件不僅僅是通過一個 佇列 來實作訊息存盤的,
佇列模型
就像我們理解佇列一樣,訊息中間件的佇列模型就真的只是一個佇列,,,我畫一張圖給大家理解,

在一開始我跟你提到了一個 “廣播” 的概念,也就是說如果我們此時我們需要將一個訊息發送給多個消費者(比如此時我需要將資訊發送給短信系統和郵件系統),這個時候單個佇列即不能滿足需求了,
當然你可以讓 Producer 生產訊息放入多個佇列中,然后每個佇列去對應每一個消費者,問題是可以解決,創建多個佇列并且復制多份訊息是會很影響資源和性能的,而且,這樣子就會導致生產者需要知道具體消費者個數然后去復制對應數量的訊息佇列,這就違背我們訊息中間件的 解耦 這一原則,
主題模型
那么有沒有好的方法去解決這一個問題呢?有,那就是 主題模型 或者可以稱為 發布訂閱模型 ,
感興趣的同學可以去了解一下設計模式里面的觀察者模式并且手動實作一下,我相信你會有所識訓的,
在主題模型中,訊息的生產者稱為 發布者(Publisher) ,訊息的消費者稱為 訂閱者(Subscriber) ,存放訊息的容器稱為 主題(Topic) ,
其中,發布者將訊息發送到指定主題中,訂閱者需要 提前訂閱主題 才能接受特定主題的訊息,

RocketMQ中的訊息模型
RockerMQ 中的訊息模型就是按照 主題模型 所實作的,你可能會好奇這個 主題 到底是怎么實作的呢?你上面也沒有講到呀!
其實對于主題模型的實作來說每個訊息中間件的底層設計都是不一樣的,就比如 Kafka 中的 磁區 ,RocketMQ 中的 佇列 ,RabbitMQ 中的 Exchange ,我們可以理解為 主題模型/發布訂閱模型 就是一個標準,那些中間件只不過照著這個標準去實作而已,
所以,RocketMQ 中的 主題模型 到底是如何實作的呢?首先我畫一張圖,大家嘗試著去理解一下,

我們可以看到在整個圖中有 Producer Group 、Topic 、Consumer Group 三個角色,我來分別介紹一下他們,
Producer Group生產者組: 代表某一類的生產者,比如我們有多個秒殺系統作為生產者,這多個合在一起就是一個Producer Group生產者組,它們一般生產相同的訊息,Consumer Group消費者組: 代表某一類的消費者,比如我們有多個短信系統作為消費者,這多個合在一起就是一個Consumer Group消費者組,它們一般消費相同的訊息,Topic主題: 代表一類訊息,比如訂單訊息,物流訊息等等,
你可以看到圖中生產者組中的生產者會向主題發送訊息,而 主題中存在多個佇列,生產者每次生產訊息之后是指定主題中的某個佇列發送訊息的,
每個主題中都有多個佇列(這里還不涉及到 Broker),集群消費模式下,一個消費者集群多臺機器共同消費一個 topic 的多個佇列,一個佇列只會被一個消費者消費,如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費,就像上圖中 Consumer1 和 Consumer2 分別對應著兩個佇列,而 Consuer3 是沒有佇列對應的,所以一般來講要控制 消費者組中的消費者個數和主題中佇列個數相同 ,
當然也可以消費者個數小于佇列個數,只不過不太建議,如下圖,

每個消費組在每個佇列上維護一個消費位置 ,為什么呢?
因為我們剛付訓的僅僅是一個消費者組,我們知道在發布訂閱模式中一般會涉及到多個消費者組,而每個消費者組在每個佇列中的消費位置都是不同的,如果此時有多個消費者組,那么訊息被一個消費者組消費完之后是不會洗掉的(因為其它消費者組也需要呀),它僅僅是為每個消費者組維護一個 消費位移(offset) ,每次消費者組消費完會回傳一個成功的回應,然后佇列再把維護的消費位移加一,這樣就不會出現剛剛消費過的訊息再一次被消費了,

可能你還有一個問題,為什么一個主題中需要維護多個佇列 ?
答案是 提高并發能力 ,的確,每個主題中只存在一個佇列也是可行的,你想一下,如果每個主題中只存在一個佇列,這個佇列中也維護著每個消費者組的消費位置,這樣也可以做到 發布訂閱模式 ,如下圖,

但是,這樣我生產者是不是只能向一個佇列發送訊息?又因為需要維護消費位置所以一個佇列只能對應一個消費者組中的消費者,這樣是不是其他的 Consumer 就沒有用武之地了?從這兩個角度來講,并發度一下子就小了很多,
所以總結來說,RocketMQ 通過使用在一個 Topic 中配置多個佇列并且每個佇列維護每個消費者組的消費位置 實作了 主題模式/發布訂閱模式 ,
RocketMQ的架構圖
講完了訊息模型,我們理解起 RocketMQ 的技術架構起來就容易多了,
RocketMQ 技術架構中有四大角色 NameServer 、Broker 、Producer 、Consumer ,我來向大家分別解釋一下這四個角色是干啥的,
-
Broker: 主要負責訊息的存盤、投遞和查詢以及服務高可用保證,說白了就是訊息佇列服務器嘛,生產者生產訊息到Broker,消費者從Broker拉取訊息并消費,這里,我還得普及一下關于
Broker、Topic和 佇列的關系,上面我講解了Topic和佇列的關系——一個Topic中存在多個佇列,那么這個Topic和佇列存放在哪呢?一個
Topic分布在多個Broker上,一個Broker可以配置多個Topic,它們是多對多的關系,如果某個
Topic訊息量很大,應該給它多配置幾個佇列(上文中提到了提高并發能力),并且 盡量多分布在不同Broker上,以減輕某個Broker的壓力 ,Topic訊息量都比較均勻的情況下,如果某個broker上的佇列越多,則該broker壓力越大,
所以說我們需要配置多個Broker,
-
NameServer: 不知道你們有沒有接觸過ZooKeeper和Spring Cloud中的Eureka,它其實也是一個 注冊中心 ,主要提供兩個功能:Broker管理 和 路由資訊管理 ,說白了就是Broker會將自己的資訊注冊到NameServer中,此時NameServer就存放了很多Broker的資訊(Broker的路由表),消費者和生產者就從NameServer中獲取路由表然后照著路由表的資訊和對應的Broker進行通信(生產者和消費者定期會向NameServer去查詢相關的Broker的資訊), -
Producer: 訊息發布的角色,支持分布式集群方式部署,說白了就是生產者, -
Consumer: 訊息消費的角色,支持分布式集群方式部署,支持以push推,pull拉兩種模式對訊息進行消費,同時也支持集群方式和廣播方式的消費,它提供實時訊息訂閱機制,說白了就是消費者,
聽完了上面的解釋你可能會覺得,這玩意好簡單,不就是這樣的么?

嗯?你可能會發現一個問題,這老家伙 NameServer 干啥用的,這不多余嗎?直接 Producer 、Consumer 和 Broker 直接進行生產訊息,消費訊息不就好了么?
但是,我們上文提到過 Broker 是需要保證高可用的,如果整個系統僅僅靠著一個 Broker 來維持的話,那么這個 Broker 的壓力會不會很大?所以我們需要使用多個 Broker 來保證 負載均衡 ,
如果說,我們的消費者和生產者直接和多個 Broker 相連,那么當 Broker 修改的時候必定會牽連著每個生產者和消費者,這樣就會產生耦合問題,而 NameServer 注冊中心就是用來解決這個問題的,
如果還不是很理解的話,可以去看我介紹
Spring Cloud的那篇文章,其中介紹了Eureka注冊中心,
當然,RocketMQ 中的技術架構肯定不止前面那么簡單,因為上面圖中的四個角色都是需要做集群的,我給出一張官網的架構圖,大家嘗試理解一下,

其實和我們最開始畫的那張乞丐版的架構圖也沒什么區別,主要是一些細節上的差別,聽我細細道來??,
第一、我們的 Broker 做了集群并且還進行了主從部署 ,由于訊息分布在各個 Broker 上,一旦某個 Broker 宕機,則該Broker 上的訊息讀寫都會受到影響,所以 Rocketmq 提供了 master/slave 的結構, salve 定時從 master 同步資料(同步刷盤或者異步刷盤),如果 master 宕機,則 slave 提供消費服務,但是不能寫入訊息 (后面我還會提到哦),
第二、為了保證 HA ,我們的 NameServer 也做了集群部署,但是請注意它是 去中心化 的,也就意味著它沒有主節點,你可以很明顯地看出 NameServer 的所有節點是沒有進行 Info Replicate 的,在 RocketMQ 中是通過 單個Broker和所有NameServer保持長連接 ,并且在每隔30秒 Broker 會向所有 Nameserver 發送心跳,心跳包含了自身的 Topic 配置資訊,這個步驟就對應這上面的 Routing Info ,
第三、在生產者需要向 Broker 發送訊息的時候,需要先從 NameServer 獲取關于 Broker 的路由資訊,然后通過 輪詢 的方法去向每個佇列中生產資料以達到 負載均衡 的效果,
第四、消費者通過 NameServer 獲取所有 Broker 的路由資訊后,向 Broker 發送 Pull 請求來獲取訊息資料,Consumer 可以以兩種模式啟動—— 廣播(Broadcast)和集群(Cluster),廣播模式下,一條訊息會發送給 同一個消費組中的所有消費者 ,集群模式下訊息只會發送給一個消費者,
如何解決 順序消費、重復消費
其實,這些東西都是我在介紹訊息佇列帶來的一些副作用的時候提到的,也就是說,這些問題不僅僅掛鉤于 RocketMQ ,而是應該每個訊息中間件都需要去解決的,
在上面我介紹 RocketMQ 的技術架構的時候我已經向你展示了 它是如何保證高可用的 ,這里不涉及運維方面的搭建,如果你感興趣可以自己去官網上照著例子搭建屬于你自己的 RocketMQ 集群,
其實
Kafka的架構基本和RocketMQ類似,只是它注冊中心使用了Zookeeper、它的 磁區 就相當于RocketMQ中的 佇列 ,還有一些小細節不同會在后面提到,
順序消費
在上面的技術架構介紹中,我們已經知道了 RocketMQ 在主題上是無序的、它只有在佇列層面才是保證有序 的,
這又扯到兩個概念——普通順序 和 嚴格順序 ,
所謂普通順序是指 消費者通過 同一個消費佇列收到的訊息是有順序的 ,不同訊息佇列收到的訊息則可能是無順序的,普通順序訊息在 Broker 重啟情況下不會保證訊息順序性 (短暫時間) ,
所謂嚴格順序是指 消費者收到的 所有訊息 均是有順序的,嚴格順序訊息 即使在例外情況下也會保證訊息的順序性 ,
但是,嚴格順序看起來雖好,實作它可會付出巨大的代價,如果你使用嚴格順序模式,Broker 集群中只要有一臺機器不可用,則整個集群都不可用,你還用啥?現在主要場景也就在 binlog 同步,
一般而言,我們的 MQ 都是能容忍短暫的亂序,所以推薦使用普通順序模式,
那么,我們現在使用了 普通順序模式 ,我們從上面學習知道了在 Producer 生產訊息的時候會進行輪詢(取決你的負載均衡策略)來向同一主題的不同訊息佇列發送訊息,那么如果此時我有幾個訊息分別是同一個訂單的創建、支付、發貨,在輪詢的策略下這 三個訊息會被發送到不同佇列 ,因為在不同的佇列此時就無法使用 RocketMQ 帶來的佇列有序特性來保證訊息有序性了,

那么,怎么解決呢?
其實很簡單,我們需要處理的僅僅是將同一語意下的訊息放入同一個佇列(比如這里是同一個訂單),那我們就可以使用 Hash取模法 來保證同一個訂單在同一個佇列中就行了,
重復消費
emmm,就兩個字—— 冪等 ,在編程中一個冪等 操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同,比如說,這個時候我們有一個訂單的處理積分的系統,每當來一個訊息的時候它就負責為創建這個訂單的用戶的積分加上相應的數值,可是有一次,訊息佇列發送給訂單系統 FrancisQ 的訂單資訊,其要求是給 FrancisQ 的積分加上 500,但是積分系統在收到 FrancisQ 的訂單資訊處理完成之后回傳給訊息佇列處理成功的資訊的時候出現了網路波動(當然還有很多種情況,比如Broker意外重啟等等),這潭訓應沒有發送成功,
那么,訊息佇列沒收到積分系統的回應會不會嘗試重發這個訊息?問題就來了,我再發這個訊息,萬一它又給 FrancisQ 的賬戶加上 500 積分怎么辦呢?
所以我們需要給我們的消費者實作 冪等 ,也就是對同一個訊息的處理結果,執行多少次都不變,
那么如何給業務實作冪等呢?這個還是需要結合具體的業務的,你可以使用 寫入 Redis 來保證,因為 Redis 的 key 和 value 就是天然支持冪等的,當然還有使用 資料庫插入法 ,基于資料庫的唯一鍵來保證重復資料不會被插入多條,
不過最主要的還是需要 根據特定場景使用特定的解決方案 ,你要知道你的訊息消費是否是完全不可重復消費還是可以忍受重復消費的,然后再選擇強校驗和弱校驗的方式,畢竟在 CS 領域還是很少有技術銀彈的說法,
而在整個互聯網領域,冪等不僅僅適用于訊息佇列的重復消費問題,這些實作冪等的方法,也同樣適用于,在其他場景中來解決重復請求或者重復呼叫的問題 ,比如將HTTP服務設計成冪等的,解決前端或者APP重復提交表單資料的問題 ,也可以將一個微服務設計成冪等的,解決 RPC 框架自動重試導致的 重復呼叫問題 ,
分布式事務
如何解釋分布式事務呢?事務大家都知道吧?要么都執行要么都不執行 ,在同一個系統中我們可以輕松地實作事務,但是在分布式架構中,我們有很多服務是部署在不同系統之間的,而不同服務之間又需要進行呼叫,比如此時我下訂單然后增加積分,如果保證不了分布式事務的話,就會出現A系統下了訂單,但是B系統增加積分失敗或者A系統沒有下訂單,B系統卻增加了積分,前者對用戶不友好,后者對運營商不利,這是我們都不愿意見到的,
那么,如何去解決這個問題呢?
如今比較常見的分布式事務實作有 2PC、TCC 和事務訊息(half 半訊息機制),每一種實作都有其特定的使用場景,但是也有各自的問題,都不是完美的解決方案,
在 RocketMQ 中使用的是 事務訊息加上事務反查機制 來解決分布式事務問題的,我畫了張圖,大家可以對照著圖進行理解,

在第一步發送的 half 訊息 ,它的意思是 在事務提交之前,對于消費者來說,這個訊息是不可見的 ,
那么,如何做到寫入訊息但是對用戶不可見呢?RocketMQ事務訊息的做法是:如果訊息是half訊息,將備份原訊息的主題與訊息消費佇列,然后 改變主題 為RMQ_SYS_TRANS_HALF_TOPIC,由于消費組未訂閱該主題,故消費端無法消費half型別的訊息,然后RocketMQ會開啟一個定時任務,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取訊息進行消費,根據生產者組獲取一個服務提供者發送回查事務狀態請求,根據事務狀態來決定是提交或回滾訊息,
你可以試想一下,如果沒有從第5步開始的 事務反查機制 ,如果出現網路波動第4步沒有發送成功,這樣就會產生 MQ 不知道是不是需要給消費者消費的問題,他就像一個無頭蒼蠅一樣,在 RocketMQ 中就是使用的上述的事務反查來解決的,而在 Kafka 中通常是直接拋出一個例外讓用戶來自行解決,
你還需要注意的是,在 MQ Server 指向系統B的操作已經和系統A不相關了,也就是說在訊息佇列中的分布式事務是——本地事務和存盤訊息到訊息佇列才是同一個事務,這樣也就產生了事務的最終一致性,因為整個程序是異步的,每個系統只要保證它自己那一部分的事務就行了,
訊息堆積問題
在上面我們提到了訊息佇列一個很重要的功能——削峰 ,那么如果這個峰值太大了導致訊息堆積在佇列中怎么辦呢?
其實這個問題可以將它廣義化,因為產生訊息堆積的根源其實就只有兩個——生產者生產太快或者消費者消費太慢,
我們可以從多個角度去思考解決這個問題,當流量到峰值的時候是因為生產者生產太快,我們可以使用一些 限流降級 的方法,當然你也可以增加多個消費者實體去水平擴展增加消費能力來匹配生產的激增,如果消費者消費過慢的話,我們可以先檢查 是否是消費者出現了大量的消費錯誤 ,或者列印一下日志查看是否是哪一個執行緒卡死,出現了鎖資源不釋放等等的問題,
當然,最快速解決訊息堆積問題的方法還是增加消費者實體,不過 同時你還需要增加每個主題的佇列數量 ,
別忘了在
RocketMQ中,一個佇列只會被一個消費者消費 ,如果你僅僅是增加消費者實體就會出現我一開始給你畫架構圖的那種情況,

回溯消費
回溯消費是指 Consumer 已經消費成功的訊息,由于業務上需求需要重新消費,在RocketMQ 中, Broker 在向Consumer 投遞成功訊息后,訊息仍然需要保留 ,并且重新消費一般是按照時間維度,例如由于 Consumer 系統故障,恢復后需要重新消費1小時前的資料,那么 Broker 要提供一種機制,可以按照時間維度來回退消費進度,RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒,
這是官方檔案的解釋,我直接照搬過來就當科普了??????,
RocketMQ 的刷盤機制
上面我講了那么多的 RocketMQ 的架構和設計原理,你有沒有好奇
在 Topic 中的 佇列是以什么樣的形式存在的?
佇列中的訊息又是如何進行存盤持久化的呢?
我在上文中提到的 同步刷盤 和 異步刷盤 又是什么呢?它們會給持久化帶來什么樣的影響呢?
下面我將給你們一一解釋,
同步刷盤和異步刷盤

如上圖所示,在同步刷盤中需要等待一個刷盤成功的 ACK ,同步刷盤對 MQ 訊息可靠性來說是一種不錯的保障,但是 性能上會有較大影響 ,一般地適用于金融等特定業務場景,
而異步刷盤往往是開啟一個執行緒去異步地執行刷盤操作,訊息刷盤采用后臺異步執行緒提交的方式進行, 降低了讀寫延遲 ,提高了 MQ 的性能和吞吐量,一般適用于如發驗證碼等對于訊息保證要求不太高的業務場景,
一般地,異步刷盤只有在 Broker 意外宕機的時候會丟失部分資料,你可以設定 Broker 的引數 FlushDiskType 來調整你的刷盤策略(ASYNC_FLUSH 或者 SYNC_FLUSH),
同步復制和異步復制
上面的同步刷盤和異步刷盤是在單個結點層面的,而同步復制和異步復制主要是指的 Borker 主從模式下,主節點回傳訊息給客戶端的時候是否需要同步從節點,
- 同步復制: 也叫 “同步雙寫”,也就是說,只有訊息同步雙寫到主從結點上時才回傳寫入成功 ,
- 異步復制: 訊息寫入主節點之后就直接回傳寫入成功 ,
然而,很多事情是沒有完美的方案的,就比如我們進行訊息寫入的節點越多就更能保證訊息的可靠性,但是隨之的性能也會下降,所以需要程式員根據特定業務場景去選擇適應的主從復制方案,
那么,異步復制會不會也像異步刷盤那樣影響訊息的可靠性呢?
答案是不會的,因為兩者就是不同的概念,對于訊息可靠性是通過不同的刷盤策略保證的,而像異步同步復制策略僅僅是影響到了 可用性 ,為什么呢?其主要原因是 RocketMQ 是不支持自動主從切換的,當主節點掛掉之后,生產者就不能再給這個主節點生產訊息了,
比如這個時候采用異步復制的方式,在主節點還未發送完需要同步的訊息的時候主節點掛掉了,這個時候從節點就少了一部分訊息,但是此時生產者無法再給主節點生產訊息了,消費者可以自動切換到從節點進行消費(僅僅是消費),所以在主節點掛掉的時間只會產生主從結點短暫的訊息不一致的情況,降低了可用性,而當主節點重啟之后,從節點那部分未來得及復制的訊息還會繼續復制,
在單主從架構中,如果一個主節點掛掉了,那么也就意味著整個系統不能再生產了,那么這個可用性的問題能否解決呢?一個主從不行那就多個主從的唄,別忘了在我們最初的架構圖中,每個 Topic 是分布在不同 Broker 中的,

但是這種復制方式同樣也會帶來一個問題,那就是無法保證 嚴格順序 ,在上文中我們提到了如何保證的訊息順序性是通過將一個語意的訊息發送在同一個佇列中,使用 Topic 下的佇列來保證順序性的,如果此時我們主節點A負責的是訂單A的一系列語意訊息,然后它掛了,這樣其他節點是無法代替主節點A的,如果我們任意節點都可以存入任何訊息,那就沒有順序性可言了,
而在 RocketMQ 中采用了 Dledger 解決這個問題,他要求在寫入訊息的時候,要求至少訊息復制到半數以上的節點之后,才給客?端回傳寫?成功,并且它是?持通過選舉來動態切換主節點的,這里我就不展開說明了,讀者可以自己去了解,
也不是說
Dledger是個完美的方案,至少在Dledger選舉程序中是無法提供服務的,而且他必須要使用三個節點或以上,如果多數節點同時掛掉他也是無法保證可用性的,而且要求訊息復制板書以上節點的效率和直接異步復制還是有一定的差距的,
存盤機制
還記得上面我們一開始的三個問題嗎?到這里第三個問題已經解決了,
但是,在 Topic 中的 佇列是以什么樣的形式存在的?佇列中的訊息又是如何進行存盤持久化的呢? 還未解決,其實這里涉及到了 RocketMQ 是如何設計它的存盤結構了,我首先想大家介紹 RocketMQ 訊息存盤架構中的三大角色——CommitLog 、ConsumeQueue 和 IndexFile ,
CommitLog: 訊息主體以及元資料的存盤主體,存盤Producer端寫入的訊息主體內容,訊息內容不是定長的,單個檔案大小默認1G ,檔案名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個檔案,起始偏移量為0,檔案大小為1G=1073741824;當第一個檔案寫滿了,第二個檔案為00000000001073741824,起始偏移量為1073741824,以此類推,訊息主要是順序寫入日志檔案,當檔案滿了,寫入下一個檔案,ConsumeQueue: 訊息消費佇列,引入的目的主要是提高訊息消費的性能(我們再前面也講了),由于RocketMQ是基于主題Topic的訂閱模式,訊息消費是針對主題進行的,如果要遍歷commitlog檔案中根據Topic檢索訊息是非常低效的,Consumer即可根據ConsumeQueue來查找待消費的訊息,其中,ConsumeQueue(邏輯消費佇列)作為消費訊息的索引,保存了指定Topic下的佇列訊息在CommitLog中的起始物理偏移量offset,訊息大小size和訊息Tag的HashCode值,consumequeue檔案可以看成是基于topic的commitlog索引檔案,故consumequeue檔案夾的組織方式如下:topic/queue/file三層組織結構,具體存盤路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName},同樣consumequeue檔案采取定長設計,每一個條目共20個位元組,分別為8位元組的commitlog物理偏移量、4位元組的訊息長度、8位元組taghashcode,單個檔案由30W個條目組成,可以像陣列一樣隨機訪問每一個條目,每個ConsumeQueue檔案大小約5.72M;IndexFile:IndexFile(索引檔案)提供了一種可以通過key或時間區間來查詢訊息的方法,這里只做科普不做詳細介紹,
總結來說,整個訊息存盤的結構,最主要的就是 CommitLoq 和 ConsumeQueue ,而 ConsumeQueue 你可以大概理解為 Topic 中的佇列,

RocketMQ 采用的是 混合型的存盤結構 ,即為 Broker 單個實體下所有的佇列共用一個日志資料檔案來存盤訊息,有意思的是在同樣高并發的 Kafka 中會為每個 Topic 分配一個存盤檔案,這就有點類似于我們有一大堆書需要裝上書架,RockeMQ 是不分書的種類直接成批的塞上去的,而 Kafka 是將書本放入指定的分類區域的,
而 RocketMQ 為什么要這么做呢?原因是 提高資料的寫入效率 ,不分 Topic 意味著我們有更大的幾率獲取 成批 的訊息進行資料寫入,但也會帶來一個麻煩就是讀取訊息的時候需要遍歷整個大檔案,這是非常耗時的,
所以,在 RocketMQ 中又使用了 ConsumeQueue 作為每個佇列的索引檔案來 提升讀取訊息的效率,我們可以直接根據佇列的訊息序號,計算出索引的全域位置(索引序號*索引固定?度20),然后直接讀取這條索引,再根據索引中記錄的訊息的全域位置,找到訊息,
講到這里,你可能對 RockeMQ 的存盤架構還有些模糊,沒事,我們結合著圖來理解一下,

emmm,是不是有一點復雜??,看英文圖片和英文檔案的時候就不要慫,硬著頭皮往下看就行,
如果上面沒看懂的讀者一定要認真看下面的流程分析!
首先,在最上面的那一塊就是我剛剛講的你現在可以直接 把 ConsumerQueue 理解為 Queue,
在圖中最左邊說明了 紅色方塊 代表被寫入的訊息,虛線方塊代表等待被寫入的,左邊的生產者發送訊息會指定 Topic 、QueueId 和具體訊息內容,而在 Broker 中管你是哪門子訊息,他直接 **全部順序存盤到了 CommitLog **,而根據生產者指定的 Topic 和 QueueId 將這條訊息本身在 CommitLog 的偏移(offset),訊息本身大小,和tag的hash值存入對應的 ConsumeQueue 索引檔案中,而在每個佇列中都保存了 ConsumeOffset 即每個消費者組的消費位置(我在架構那里提到了,忘了的同學可以回去看一下),而消費者拉取訊息進行消費的時候只需要根據 ConsumeOffset 獲取下一個未被消費的訊息就行了,
上述就是我對于整個訊息存盤架構的大概理解(這里不涉及到一些細節討論,比如稀疏索引等等問題),希望對你有幫助,
因為有一個知識點因為寫嗨了忘講了,想想在哪里加也不好,所以我留給大家去思考一下吧,

為什么 CommitLog 檔案要設計成固定大小的長度呢?提醒:記憶體映射機制,
總結
總算把這篇博客寫完了,我講的你們還記得嗎???
這篇文章中我主要想大家介紹了
- 訊息佇列出現的原因
- 訊息佇列的作用(異步,解耦,削峰)
- 訊息佇列帶來的一系列問題(訊息堆積、重復消費、順序消費、分布式事務等等)
- 訊息佇列的兩種訊息模型——佇列和主題模式
- 分析了
RocketMQ的技術架構(NameServer、Broker、Producer、Comsumer) - 結合
RocketMQ回答了訊息佇列副作用的解決方案 - 介紹了
RocketMQ的存盤機制和刷盤策略,
等等,,,
作者:Francis
鏈接:RabbitMQ 入門
來源:gitee
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/236340.html
標籤:Java
上一篇:JVM的藝術—JAVA記憶體模型
下一篇:redis實用中的問題
