什么是訊息佇列?
-
訊息(Message):傳輸的資料,
-
佇列(Queue):佇列是一種先進先出的資料結構,
-
訊息佇列從字面的含義來看就是一個存放訊息的容器,
-
訊息佇列可以簡單理解為:把要傳輸的資料放在佇列中,
-
把資料放到訊息佇列叫做生產者
-
從訊息佇列里邊取資料叫做消費者
訊息佇列是一種異步的服務間通信方式,是分布式系統中重要的組件,主要解決應用耦合,異步訊息,流量削鋒等問題,實作高性能,高可用,可伸縮和最終一致性架構,使用較多的訊息佇列有RocketMQ、RabbitMQ、Kafka等,
為什么使用訊息佇列?
使用訊息佇列有三個好處:
解耦
耦合性:后臺各個系統相互依賴,如果一個系統掛掉了,其他也會導致無法運行
于是訊息佇列就進行解耦,加入了訊息佇列之后,不同的后臺只需要將自己的資料寫進訊息佇列即可,一個系統掛掉了,他在訊息佇列中的資料依舊存在,不用擔心出現整體無法運行的情況
異步
異步提速:比如說我們原本有3個后臺系統要向前端輸出資料,每個后臺都需要300ms,還要加上訪問資料庫的時間,如果一個用戶訪問的后臺較多,那么訪問的時間也會變得很久,用戶體驗較差
但如果使用了訊息佇列的話,不管要訪問多少個后臺資料,所有的后臺只需要把資料都壓進訊息佇列里面就可行了,如何用戶再根據自己的需求從訊息佇列拿,大大減少所需時間
削峰
如果我們有一段時間的請求量非常大,就好比雙11的時候,我們的后臺只能接受1000個,但一下發過來3000個,這時候后臺扛不住,就會崩潰
但是如果使用了訊息佇列,訊息佇列會把加載不了的資訊丟到訊息佇列里面去,等后臺持續性的加載,這樣就不會出現系統崩潰的問題,頂多也只是慢一點
訊息佇列有什么優點和缺點?
優點就是上面所說的三個: 解耦,異步,削峰
缺點:
系統的可用性降低:我系統引入的外部依賴越多,訊息佇列就越容易掛掉,不加訊息佇列之前,我們雖然有耦合的問題,2個系統可能互相關聯,一方掛掉導致其他一個也無法正常使用,但是如果加了訊息佇列,訊息佇列一旦掛掉,那么所有的系統都會掛掉
系統復雜性變高:硬生生加個 MQ 進來,你怎么保證訊息沒有重復消費?怎么[處理訊息丟失的情況]?怎么保證訊息傳遞的順序性?頭大頭大,問題一大堆,痛苦不已
Kafka ,ActiveMQ ,RabbitMQ ,RocketMQ 都有什么優點和缺點?

如何保證訊息佇列的高可用?
先了解Rabbit的三種作業模式
rabbitmq有3種模式,但集群模式是2種
-
單一模式:即單機情況不做集群,就單獨運行一個rabbitmq而已,
-
普通模式:默認模式,以兩個節點(rabbit01、rabbit02)為例來進行說明,對于Queue來說,訊息物體只存在于其中一個節點rabbit01(或者rabbit02),rabbit01和rabbit02兩個節點僅有相同的元資料,即佇列的結構,當訊息進入rabbit01節點的Queue后,consumer從rabbit02節點消費時,RabbitMQ會臨時在rabbit01、rabbit02間進行訊息傳輸,把A中的訊息物體取出并經過B發送給consumer,所以consumer應盡量連接每一個節點,從中取訊息,即對于同一個邏輯佇列,要在多個節點建立物理Queue,否則無論consumer連rabbit01或rabbit02,出口總在rabbit01,會產生瓶頸,當rabbit01節點故障后,rabbit02節點無法取到rabbit01節點中還未消費的訊息物體,如果做了訊息持久化,那么得等rabbit01節點恢復,然后才可被消費;如果沒有持久化的話,就會產生訊息丟失的現象,
-
普通模式的優點:提高消費的吞吐量
-
鏡像模式:把需要的佇列做成鏡像佇列,存在與多個節點屬于[RabbitMQ的HA方案]該模式解決了普通模式中的問題,其實質和普通模式不同之處在于,訊息物體會主動在鏡像節點間同步,而不是在客戶端取資料時臨時拉取,該模式帶來的副作用也很明顯,除了降低系統性能外,如果鏡像佇列數量過多,加之大量的訊息進入,集群內部的網路帶寬將會被這種同步通訊大大消耗掉,所以在對可靠性要求較高的場合中適用,
-
鏡像集群的特點:
-
性能開銷非常大,因為要同步訊息到對應的節點,這個會造成網路之間的資料量的頻繁互動,對于網路帶寬的消耗和壓力都是比較重的
沒有擴展可言,rabbitMQ是集群,不是分布式的,所以當某個Queue負載過重,我們并不能通過新增節點來緩解壓力,因為所以節點上的資料都是相同的,這樣就沒辦法進行擴展了
如何保證訊息的可靠性傳輸?(如何處理訊息丟失的問題)

生產者沒有成功把訊息發送給訊息佇列
a、丟失的原因:因為網路傳輸的不穩定性,當生產者在向MQ發送訊息的程序中,MQ沒有成功接收到訊息,但是生產者卻以為MQ成功接收到了訊息,不會再次重復發送該訊息,從而導致訊息的丟失,
b、解決辦法: 有兩個解決辦法:事務機制和confirm機制,最常用的是confirm機制,
事務機制:
RabbitMQ 提供了事務功能,生產者發送資料之前開啟 RabbitMQ 事務channel.txSelect,然后發送訊息,如果訊息沒有成功被 RabbitMQ 接收到,那么生產者會收到例外報錯,此時就可以回滾事務channel.txRollback,然后重試發送訊息;如果收到了訊息,那么可以提交事務channel.txCommit
confirm機制:
RabbitMQ可以開啟 confirm 模式,在生產者那里設定開啟 confirm 模式之后,生產者每次寫的訊息都會分配一個唯一的 id,如果訊息成功寫入 RabbitMQ 中,RabbitMQ 會給生產者回傳一個 ack 訊息,告訴你說這個訊息 ok 了,如果 RabbitMQ 沒能處理這個訊息,會回呼你的一個 nack 介面,告訴你這個訊息接收失敗,生產者可以發送,而且你可以結合這個機制自己在記憶體里維護每個訊息 id 的狀態,如果超過一定時間還沒接收到這個訊息的回呼,那么可以重發,
注意:RabbitMQ的事務機制是同步的,很耗型能,會降低RabbitMQ的吞吐量,confirm機制是異步的,生成者發送完一個訊息之后,不需要等待RabbitMQ的回呼,就可以發送下一個訊息,當RabbitMQ成功接收到訊息之后會自動異步的回呼生產者的一個介面回傳成功與否的訊息,
RabbitMQ接收到訊息之后丟失了訊息
a、丟失的原因:RabbitMQ接收到生產者發送過來的訊息,是存在記憶體中的,如果沒有被消費完,此時RabbitMQ宕機了,那么再次啟動的時候,原來記憶體中的那些訊息都丟失了,
b、解決辦法:開啟RabbitMQ的持久化,當生產者把訊息成功寫入RabbitMQ之后,RabbitMQ就把訊息持久化到磁盤,結合上面的說到的confirm機制,只有當訊息成功持久化磁盤之后,才會回呼生產者的介面回傳ack訊息,否則都算失敗,生產者會重新發送,存入磁盤的訊息不會丟失,就算RabbitMQ掛掉了,重啟之后,他會讀取磁盤中的訊息,不會導致訊息的丟失,
c、持久化的配置:
-
第一點是創建 queue 的時候將其設定為持久化,這樣就可以保證 RabbitMQ 持久化 queue 的元資料,但是它是不會持久化 queue 里的資料的,
-
第二個是發送訊息的時候將訊息的
deliveryMode設定為 2,就是將訊息設定為持久化的,此時 RabbitMQ 就會將訊息持久化到磁盤上去,
注意:持久化要起作用必須同時設定這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會從磁盤上重啟恢復 queue,恢復這個 queue 里的資料,
消費者弄丟訊息
a、丟失的原因:如果RabbitMQ成功的把訊息發送給了消費者,那么RabbitMQ的ack機制會自動的回傳成功,表明發送訊息成功,下次就不會發送這個訊息,但如果就在此時,消費者還沒處理完該訊息,然后宕機了,那么這個訊息就丟失了,
b、解決的辦法:簡單來說,就是必須關閉 RabbitMQ 的自動 ack,可以通過一個 api 來呼叫就行,然后每次在自己代碼里確保處理完的時候,再在程式里 ack 一把,這樣的話,如果你還沒處理完,不就沒有 ack了?那 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,訊息是不會丟的,
如何保證訊息不被重復消費? (如何保證訊息消費的冪等性)
先說為什么會重復消費:正常情況下,消費者在消費訊息的時候,消費完畢后,會發送一個確認訊息給訊息佇列,訊息佇列就知道該訊息被消費了,就會將該訊息從訊息佇列中洗掉;但是因為網路傳輸等等故障,確認資訊沒有傳送到訊息佇列,導致訊息佇列不知道自己已經消費過該訊息了,再次將訊息分發給其他的消費者,
解決思路是:保證訊息的唯一性,就算是多次傳輸,不要讓訊息的多次消費帶來影響;保證訊息等冪性;
-
在訊息生產時,MQ內部針對每條生產者發送的訊息生成一個inner-msg-id,作為去重和冪等的依據(訊息投遞失敗并重傳),避免重復的訊息進入佇列;
-
在訊息消費時,要求訊息體中必須要有一個bizId(對于同一業務全域唯一,如支付ID、訂單ID、帖子ID等)作為去重和冪等的依據,避免同一條訊息被重復消費,
這個問題針對業務場景來答分以下幾點:
-
-
如果訊息是做資料庫的insert操作,給這個訊息做一個唯一主鍵,那么就算出現重復消費的情況,就會導致主鍵沖突,避免資料庫出現臟資料,
-
如果訊息是做redis的set的操作,不用解決,因為無論set幾次結果都是一樣的,set操作本來就算冪等操作,
-
如果以上兩種情況還不行,可以準備一個第三方介質,來做消費記錄,以redis為例,給訊息分配一個全域id,只要消費過該訊息,將<id,message>以K-V形式寫入redis,那消費者開始消費前,先去redis中查詢有沒消費記錄即可,
-
如何保證訊息的順序性?
mysql的binlog同步,你再mysql里增刪改3條binlog,接著這三條binlog發送到MQ里面,到消費出來依次執行,起碼要保證人家是按照順序來的吧,不然本來是增加、修改、洗掉,你愣是給更改了順序,換成了洗掉、修改、增加,這就亂了,
搞3個Queue,每個消費者就消費其中的一個Queue,把需要保證順序的資料發到1個Queue里去
如何解決訊息佇列的延遲以及過期失效的問題?
過期失效就是TTL,如果訊息在Queue中積壓超過一定的時間就會被RabbitMQ給清理掉,這個資料就沒了,這就不是資料積壓MQ中了,而是大量的資料會直接搞丟,
在這種情況下,增加consume消費積壓就不起作用了,此時,只能將丟失的那批資料,寫個臨時的程式,一點一點查出來,然后再灌入MQ中,把白天丟失的資料補回來,
訊息佇列滿了之后該如何解決?
如果訊息積壓在 mq 里,你很長時間都沒有處理掉,此時導致 mq 都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程式,接入資料來消費,消費一個丟棄一個,都不要了,快速消費掉所有的訊息,然后走第二個方案,到了晚上再補資料吧,
有幾百萬訊息持續積壓幾小時,說說該怎么解決?
在日常作業中使用RabbitMQ偶爾會遇不可預料的情況導致的訊息積壓,一般出現訊息積壓基本上分為幾種情況:
-
消費者消費訊息的速度趕不上生產速度,這總問題主要是業務邏輯沒設計好消費者和生產者之間的平衡,需要改業務流程或邏輯已保證消費度跟上生產訊息的速,譬如增加消費者的數量等,
-
消費者出現例外,導致一直無法接收新的訊息,這種問題需要排查消費的邏輯是不是又問題,需要優化程式,
幾千萬條資料在MQ里,積壓了七八個小時,這個時候就是恢復consumer的問題,讓它恢復消費速度,然后傻傻地等待幾個小時消費完畢,這個肯定不能再面試的時候說,1個消費者1秒時1000條,1秒3個消費者是3000條,1分鐘是18萬條,1個小時是1000多萬條,如果積壓了上萬條資料,即使消費者恢復了,也大概需要1個多小時才能恢復過來,
原來3個消費者1個小時,現在30個消費者,需要10分鐘搞定,
一般情況下,這個時候只能做臨時擴容了,具體操作步驟和思路如下:
① 先修改consumer的問題,確保其恢復消費速度,然后將現有consumer都停掉,
② 新建1個topic,partition是原來的10倍,臨時建立好原來10倍或者20倍的Queue,
③ 然后寫一個臨時的分發資料的consumer程式,這個程式部署上去,消費積壓的資料,消費之后,不做耗時的處理,直接均勻輪訓寫入臨時建立好的10倍數量的Queue,
④ 接著征用10倍的機器來部署consume,每一批consumer消費1個臨時的queue,
⑤ 這種做法,相當于將queue資源和consume資源擴大10倍,以10倍的速度來消費資料,
⑥ 等快速消費完積壓資料之后,恢復原來的部署架構,重新用原先的consumer來消費訊息,
如果讓你寫一個訊息佇列,該如何進行架構設計?說說你的思路?
1、首先MQ得支持可伸縮性吧,就是需要的時候增加吞吐量和容量?
2、其次,需要考慮一下MQ的資料是不是要持久化到磁盤
3、再次,考慮一下MQ的可用性,
4、最后,考慮一下能不能支持資料零丟失
首先這個mq得支持可伸縮性吧,就是需要的時候快速擴容,就可以增加吞吐量和容量,那怎么搞?設計個分布式的系統唄,參照一下kafka的設計理念,broker -> topic -> partition,每個partition放一個機器,就存一部分資料,如果現在資源不夠了,簡單啊,給topic增加partition,然后做資料遷移,增加機器,不就可以存放更多資料,提供更高的吞吐量了? 其次你得考慮一下這個mq的資料要不要落地磁盤吧?那肯定要了,落磁盤,才能保證別行程掛了資料就丟了,那落磁盤的時候怎么落啊?順序寫,這樣就沒有磁盤隨機讀寫的尋址開銷,磁盤順序讀寫的性能是很高的,這就是kafka的思路, (3)其次你考慮一下你的mq的可用性啊?這個事兒,具體參考我們之前可用性那個環節講解的kafka的高可用保障機制,多副本 -> leader & follower -> broker掛了重新選舉leader即可對外服務, 能不能支持資料0丟失啊?可以的,參考我們之前說的那個kafka資料零丟失方案
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/340686.html
標籤:其他
