主頁 > 後端開發 > ActiveMQ、RabbitMQ、RocketMQ、Kafka四種訊息中間件分析介紹

ActiveMQ、RabbitMQ、RocketMQ、Kafka四種訊息中間件分析介紹

2022-07-24 07:37:00 後端開發

ActiveMQ、RabbitMQ、RocketMQ、Kafka四種訊息中間件分析介紹

我們從四種訊息中間件的介紹到基本使用,以及高可用,訊息重復性,訊息丟失,訊息順序性能方面進行分析介紹!

一、訊息中間件的使用場景

訊息中間件的使用場景總結就是六個字:解耦、異步、削峰

1.解耦

如果我方系統A要與三方B系統進行資料對接,推送系統人員資訊,通常我們會使用介面開發來進行,但是如果運維期間B系統進行了調整,或者推送程序中B系統網路進行了調整,又或者后續程序中我們需要推送資訊到三方C系統中,這樣的話就需要我們進行頻繁的介面開發調整,還需要考慮介面推送訊息失敗的場景,

如果我們使用訊息中間件進行訊息推送,我們只需要按照一種約定的資料結構進行資料推送,其他三方系統從訊息中間件取值消費就可以,即便是三方系統出現宕機或者其他調整,我們都可以正常進行資料推送,

總結:通過一個 MQ,Pub/Sub 發布訂閱訊息這么一個模型,A 系統就跟其它系統徹底解耦了,

2.異步

繼續我們上述的訊息推送業務,如果我們現在需要同時推送訊息到BCD三個系統中,而BCD系統接收到訊息后需要進行復雜的邏輯處理,以及讀庫寫庫處理,如果一個三方系統進行訊息處理需要1s,那我們遍歷推送完一次訊息,就需要三秒,

而如果我們使用訊息中間件,我們只需要推送到訊息中間件,然后進行介面回傳,可能只需要20ms,大大提升了用戶體驗,訊息推送后BCD系統各自進行訊息消費即可,不需要我們等待,

3.削峰

還是上述我們的應用場景,假設某一時間段內,每秒都有一條訊息推送,如果我們使用介面進行推送,BCD三個系統處理完就需要三秒,這樣會導致用戶前端體驗較差,而且系統后臺一直處于阻塞狀態,后續的訊息推送介面一直在等待,

如果我們使用訊息中間件,我們只需要將訊息推送至訊息中間件中,BCD系統對積壓的訊息進行相應的處理,

在上述高頻發的訊息時間段內,會在訊息中間中產生訊息積壓,BCD系統在上述時間段外對積壓訊息進行相應的處理即可,

二、訊息中間件的優缺點

訊息中間件的優點其實就是他的使用場景,

訊息中間件的缺點與優點也是相輔相成的,主要有以下三個

1.系統可用性降低

系統關聯的中間件越多,越容易引發宕機問題,

如上述案例中的問題,原本進行訊息推送我們只需要開發介面進行推送即可,引入訊息中間件后就需要考慮訊息中間件的高可用問題,如果訊息中間件出現宕機問題,我們所有的訊息推送都會失敗,

2.系統復雜度提高

上述案例中,如果我們使用介面進行訊息推送,我們只需要考慮介面超時以及介面推送訊息失敗的問題,但我們引入訊息中間件后,就需要考慮訊息中間件的維護,以及訊息重復消費,訊息丟失的問題,

3.一致性問題

上述案例中,如果我們使用介面進行訊息推送,推送訊息我們可以放在事務中處理,如果推送程序中出現例外,我們可以進行資料回滾,但我們引入訊息中間件后,就需要考慮訊息推送后,消費失敗的問題,以及如果我們同時推送訊息到BCD系統中,如何保證他們的事務一致性,

三、四種訊息中間件的基本介紹

特性 ActiveMQ RabbitMQ RocketMQ Kafka
單機吞吐量 萬級,比 RocketMQ、Kafka 低一個數量級 同 ActiveMQ 10 萬級,支撐高吞吐 10 萬級,高吞吐,一般配合大資料類的系統來進行實時資料計算、日志采集等場景
topic 數量對吞吐量的影響 topic 可以達到幾百/幾千的級別,吞吐量會有較小幅度的下降,這是 RocketMQ 的一大優勢,在同等機器下,可以支撐大量的 topic topic 從幾十到幾百個時候,吞吐量會大幅度下降,在同等機器下,Kafka 盡量保證 topic 數量不要過多,如果要支撐大規模的 topic,需要增加更多的機器資源
時效性 ms 級 微秒級,這是 RabbitMQ 的一大特點,延遲最低 ms 級 延遲在 ms 級以內
可用性 高,基于主從架構實作高可用 同 ActiveMQ 非常高,分布式架構 非常高,分布式,一個資料多個副本,少數機器宕機,不會丟失資料,不會導致不可用
訊息可靠性 有較低的概率丟失資料 基本不丟 經過引數優化配置,可以做到 0 丟失 同 RocketMQ
功能支持 MQ 領域的功能極其完備 基于 erlang 開發,并發能力很強,性能極好,延時很低 MQ 功能較為完善,還是分布式的,擴展性好 功能較為簡單,主要支持簡單的 MQ 功能,在大資料領域的實時計算以及日志采集被大規模使用
其他 Apache軟體基金會開發、起步較早,但沒有經過大量吞吐場景驗證,目前社區不是很活躍 開源,穩定,社區活躍度高 阿里出品,目前已交給Apache,但社區活躍度較低 Apache軟體基金會開發、開源、高通吐量,社區活躍度高

1.ActiveMQ

1.1:Activemq 是什么

Activemq 是一種開源的,實作了JMS1.1規范的,面向訊息(MOM)的中間件,為應用程式提供高效的、可擴展的、穩定的和安全的企業級訊息通信,

1.2:Activemq 的作用及原理

Activemq 的作用就是系統之間進行通信,原理就是生產者生產訊息, 把訊息發送給activemq, Activemq 接收到訊息, 然后查看有多少個消費者,

然后把訊息轉發給消費者, 此程序中生產者無需參與, 消費者接收到訊息后做相應的處理和生產者沒有任何關系,

1.3:Activemq 的通信方式

publish(發布)-subscribe(訂閱)(發布-訂閱方式)

發布/訂閱方式用于多接收客戶端的方式,作為發布訂閱的方式,可能存在多個接收客戶端,并且接收端客戶端與發送客戶端存在時間上的依賴,一個接收端只能接收他創建以后發送客戶端發送的資訊,

p2p(point-to-point)(點對點)

p2p的程序則理解起來比較簡單,它好比是兩個人打電話,這兩個人是獨享這一條通信鏈路的,一方發送訊息,另外一方接收,就這么簡單,在實際應用中因為有多個用戶對使用p2p的鏈路,相互通信的雙方是通過一個類似于佇列的方式來進行交流,和前面pub-sub的區別在于一個topic有一個發送者和多個接收者,而在p2p里一個queue只有一個發送者和一個接收者,

1.4:Activemq 的訊息持久化機制

JDBC: 持久化到資料庫
AMQ :日志檔案(已基本不用)
KahaDB : AMQ基礎上改進,默認選擇
LevelDB :谷歌K/V資料庫
在activemq.xml中查看默認的broker持久化機制,

1.5:Activemq 的訊息確認機制

(1)AUTO_ACKNOWLEDGE = 1 自動確認

(2)CLIENT_ACKNOWLEDGE = 2 客戶端手動確認

(3)DUPS_OK_ACKNOWLEDGE = 3 自動批量確認

(4)SESSION_TRANSACTED = 0 事務提交并確認

(5)INDIVIDUAL_ACKNOWLEDGE = 4 單條訊息確認

前四種是JMS API中提供的客戶端ACK_MODE,第五種是InforSuiteMQ自定義補充的一種ACK_MODE,

2.RabbitMQ

2.1:RabbitMQ是什么

RabbitMQ是一個由erlang語言撰寫的、開源的、在AMQP基礎上完整的、可復用的企業訊息系統,

2.2:RabbitMQ的作用及原理

基本概念

關鍵名稱 說明
Channel(信道) 訊息推送使用的通道
Producer(訊息的生產者) 向訊息佇列發布訊息的客戶端應用程式
Consumer(訊息的消費者) 從訊息佇列取得訊息的客戶端應用程式
Message(訊息) 訊息由訊息頭和訊息體組成
Routing Key(路由鍵) 訊息頭的一個屬性,用于標記訊息的路由規則,決定了交換機的轉發路徑
Queue(訊息佇列) 用于存盤生產者的訊息
Exchange(交換器路由器) 提供Producer到Queue之間的匹配
Binding(系結) 用于建立Exchange和Queue之間的關聯
Binding Key(系結鍵) Exchange與Queue的系結關系,用于匹配Routing Key
Broker(服務主體) RabbitMQ Server,服務器物體

2.3:RabbitMQ的通信方式

2.3.1:簡單佇列

最簡單的作業佇列,其中一個訊息生產者,一個訊息消費者,一個佇列,也稱為點對點模式

2.3.2:作業佇列模式

一個訊息生產者,一個交換器,一個訊息佇列,多個消費者,同樣也稱為點對點模式

2.3.3:發布訂閱模式

Pulish/Subscribe,無選擇接收訊息,一個訊息生產者,一個交換機(交換機型別為fanout),多個訊息佇列,多個消費者

生產者只需把訊息發送到交換機,系結這個交換機的佇列都會獲得一份一樣的資料,

2.3.4:路由模式

在發布/訂閱模式的基礎上,有選擇的接收訊息,也就是通過 routing 路由進行匹配條件是否滿足接收訊息,

2.3.5:主體模式

topics(主題)模式跟routing路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似于SQL中 = 和 like 的關系,

2.3.6:RPC模式

與上面其他5種所不同之處,該模式是擁有請求/回復的,也就是有回應的,上面5種都沒有,

RPC是指遠程程序呼叫,也就是說兩臺服務器A,B,一個應用部署在A服務器上,想要呼叫B服務器上應用提供的處理業務,處理完后然后在A服務器繼續執行下去,把異步的訊息以同步的方式執行,

2.4:RabbitMQ的訊息持久化機制

Queue(訊息佇列)的持久化是通過durable=true來實作的,

Message(訊息)的持久化 ,通過設定訊息是持久化的標識,

Exchange(交換機)的持久化 ,

2.5:RabbitMQ的訊息確認機制

confirm機制:確認訊息是否成功發送到Exchange

ack機制:確認訊息是否被消費者成功消費

  • AcknowledgeMode.NONE:自動確認
  • AcknowledgeMode.AUTO:根據情況確認
  • AcknowledgeMode.MANUAL:手動確認

3.RocketMQ

3.1:RocketMQ是什么

RocketMQ是阿里開發的一款純java、分布式、佇列模型的開源訊息中間件,支持事務訊息、順序訊息、批量訊息、定時訊息、訊息回溯等,

3.2:RocketMQ的作用及原理

基本概念

關鍵名稱 說明
Producer 訊息生產者
Producer Group 生產者組
Consumer 訊息消費者
Consumer Group 消費者組
Topic Topic用于將訊息按主題做劃分,Producer將訊息發往指定的Topic,Consumer訂閱該Topic就可以收到這條訊息
Message 代表一條訊息
Tag 標簽可以被認為是對 Topic 進一步細化
Broker 負責接收并存盤訊息
Queue Topic和Queue是1對多的關系,一個Topic下可以包含多個Queue,主要用于負載均衡
Offset RocketMQ在存盤訊息時會為每個Topic下的每個Queue生成一個訊息的索引檔案,每個Queue都對應一個Offset記錄當前Queue中訊息條數,
NameServer NameServer可以看作是RocketMQ的注冊中心

3.3:RocketMQ的通信方式

RocketMQ訊息訂閱有兩種模式

一種是Push模式(MQPushConsumer),即MQServer主動向消費端推送

另外一種是Pull模式(MQPullConsumer),即消費端在需要時,主動到MQ Server拉取

但在具體實作時,Push和Pull模式本質都是采用消費端主動拉取的方式,即consumer輪詢從broker拉取訊息

集群模式和廣播模式

集群模式:默認情況下我們都是使用的集群模式,也就是說消費者組收到訊息后,只有其中的一臺機器會接收到訊息,

廣播模式:消費者組內的每臺機器都會收到這條訊息,

3.4:RocketMQ的訊息持久化機制

exchange持久化、queue持久化、message持久化

CommitLog:日志資料檔案,存盤訊息內容,所有 queue 共享,不區分 topic ,順序讀寫 ,1G 一個檔案

ConsumeQueue:邏輯 Queue,基于 topic 的 CommitLog 的索引檔案,訊息先到達 commitLog,然后異步轉發到 consumeQueue,包含 queue 在 commitLog 中的物理偏移量 offset,訊息物體內容大小和 Message Tag 的 hash 值,大于 600W 個位元組,寫滿之后重新生成,順序寫

IndexFile:基于 Key 或 時間區間的 CommitLog 的索引檔案,檔案名以創建的時間戳命名,固定的單個 indexFile 大小為 400M,可以保存 2000W 個索引

3.5:RocketMQ的訊息確認機制

confirm機制:確認訊息是否成功發送到Exchange

ack機制:確認訊息是否被消費者成功消費

4.Kafka

4.1:Kafka是什么

Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java撰寫,Kafka是一個分布式、磁區的、多副本的、多訂閱者,基于zookeeper協調的分布式日志系統,可作為訊息中間件

4.2:Kafka的作用及原理

基本概念

關鍵名稱 說明
producer 生產者
consumer 消費者
consumer group 消費者組
broker 一臺kafka服務器就是一個broker,一個集群由多個broker組成,一個broker可以容納多個topic
topic 一個訊息佇列,生產者和消費者都是面對一個Topic
partition 每個partition時一個有序佇列,partition是topic中存盤資料和消費資料所使用的佇列所在
replica 副本,為了保證當前某個節點發生故障時,當前節點上的資料不會發生丟失
leader 每個磁區多個副本的“主”,生產者生產資料的物件,以及消費組消費者消費的物件
follower 每個磁區多個副本的“從”,實時從leader資料的同步

4.3:Kafka的通信方式

生產者發送模式

1.發后即忘(fire-and-forget):只管往Kafka中發送訊息而并不關心訊息是否正確到達

2.同步(sync):一般是在send()方法里指定一個Callback的回呼函式,Kafka在回傳回應時呼叫該函式來實作異步的發送確認,

3.異步(async):send()方法會回傳Futrue物件,通過呼叫Futrue物件的get()方法,等待直到結果回傳

消費者消費模式

1.At-most-once(最多一次):在每一條訊息commit成功之后,再進行消費處理;設定自動提交為false,接收到訊息之后,首先commit,然后再進行消費,

2.At-least-once(最少一次):在每一條訊息處理成功之后,再進行commit;設定自動提交為false;訊息處理成功之后,手動進行commit,

3.Exactly-once(正好一次):將offset作為唯一id與訊息同時處理,并且保證處理的原子性;設定自動提交為false;訊息處理成功之后再提交,

4.4:Kafka的訊息持久化機制

Kafka直接將資料寫入到日志檔案中,以追加的形式寫入

4.5:Kafka的訊息確認機制

confirm機制:確認訊息是否成功發送

ack機制:確認訊息是否被消費者成功消費

四、訊息佇列高可用

引言:系統應用MQ作為訊息中間件后,會導致系統可用性降低,所以只要你用了 MQ,高可用肯定是要考慮到的

1.ActiveMQ高可用

ActiveMQ的部署方式有三種,分別為:單節點部署(不支持高可用),Master-Slave部署方式(主從模式),Broker-Cluster部署方式(負載均衡)

1.1.單節點部署(不支持高可用)

單節點部署方式因為不支持高可用,只會在開發或者測驗環境下用到,且單節點部署方式較簡單,不進行詳述,

1.2.Master-Slave部署方式(支持高可用)

1.2.1.shared filesystem Master-Slave部署方式

主要是通過共享存盤目錄來實作master和slave的熱備,所有的ActiveMQ應用都在不斷地獲取共享目錄的控制權,哪個應用搶到了控制權,它就成為master,

多個共享存盤目錄的應用,誰先啟動,誰就可以最早取得共享目錄的控制權成為master,其他的應用就只能作為slave,

1.2.2.shared database Master-Slave方式

與shared filesystem方式類似,只是共享的存盤介質由檔案系統改成了資料庫而已,

1.2.3.Replicated LevelDB Store方式

這種主備方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協調選擇一個node作為master,被選擇的master broker node開啟并接受客戶端連接,

其他node轉入slave模式,連接master并同步他們的存盤狀態,slave不接受客戶端連接,所有的存盤操作都將被復制到連接至Master的slaves,

如果master死了,得到了最新更新的slave被允許成為master,fialed node能夠重新加入到網路中并連接master進入slave mode,所有需要同步的disk的訊息操作都將等待存盤狀態被復制到其他法定節點的操作完成才能完成,

當一個新的master被選中,你需要至少保障一個法定node在線以能夠找到擁有最新狀態的node,這個node將會成為新的master,因此,推薦運行至少3個replica nodes,以防止一個node失敗了,服務中斷,

1.3.Broker-Cluster部署方式(不支持高可用)

前面的Master-Slave的方式雖然能解決多服務熱備的高可用問題,但無法解決負載均衡和分布式的問題,Broker-Cluster的部署方式就可以解決負載均衡的問題,

? Broker-Cluster部署方式中,各個broker通過網路互相連接,并共享queue,當broker-A上面指定的queue-A中接收到一個message處于pending狀態,而此時沒有consumer連接broker-A時,如果cluster中的broker-B上面由一個consumer在消費queue-A的訊息,那么broker-B會先通過內部網路獲取到broker-A上面的message,并通知自己的consumer來消費,

1.3.1.static Broker-Cluster部署

在activemq.xml檔案中靜態指定Broker需要建立橋連接的其他Broker

1.3.2.Dynamic Broker-Cluster部署

在activemq.xml檔案中不直接指定Broker需要建立橋連接的其他Broker,由activemq在啟動后動態查找

1.4.Master-Slave與Broker-Cluster相結合的部署方式

可以看到Master-Slave的部署方式雖然解決了高可用的問題,但不支持負載均衡,

Broker-Cluster解決了負載均衡,但當其中一個Broker突然宕掉的話,那么存在于該Broker上處于Pending狀態的message將會丟失,無法達到高可用的目的,

Master-Slave與Broker-Cluster相結合的部署方式是目前ActiveMQ比較推薦的部署方案,

2.RabbitMQ高可用

RabbitMQ的部署方式有三種,分別為:單機模式(不支持高可用),普通集群模式(不支持高可用),鏡像集群模式(支持高可用)

2.1單機模式(不支持高可用)

單節點部署方式因為不支持高可用,只會在開發或者測驗環境下用到,且單節點部署方式較簡單,不進行詳述,

2.2普通集群模式(不支持高可用)

普通集群模式,意思就是在多臺機器上啟動多個 RabbitMQ 實體,每個機器啟動一個,你創建的 queue,只會放在一個 RabbitMQ 實體上,但是每個實體都同步 queue 的元資料(元資料可以認為是 queue 的一些配置資訊,通過元資料,可以找到 queue 所在實體),你消費的時候,實際上如果連接到了另外一個實體,那么那個實體會從 queue 所在實體上拉取資料過來,

這種方式確實很麻煩,也不怎么好,沒做到所謂的分布式,就是個普通集群,因為這導致你要么消費者每次隨機連接一個實體然后拉取資料,要么固定連接那個 queue 所在實體消費資料,前者有資料拉取的開銷,后者導致單實體性能瓶頸,

而且如果那個放 queue 的實體宕機了,會導致接下來其他實體就無法從那個實體拉取,如果你開啟了訊息持久化,讓 RabbitMQ 落地存盤訊息的話,訊息不一定會丟,得等這個實體恢復了,然后才可以繼續從這個 queue 拉取資料,

2.3鏡像集群模式(支持高可用)

這種模式,才是所謂的 RabbitMQ 的高可用模式,跟普通集群模式不一樣的是,在鏡像集群模式下,你創建的 queue,無論元資料還是 queue 里的訊息都會存在于多個實體上,就是說,每個 RabbitMQ 節點都有這個 queue 的一個完整鏡像,包含 queue 的全部資料的意思,然后每次你寫訊息到 queue 的時候,都會自動把訊息同步到多個實體的 queue 上,

3.RocketMQ高可用

RocketMQ的部署方式有兩種,分別為:單節點模式(不支持高可用),多節點模式

3.1.單節點模式(不支持高可用)

單節點部署方式因為不支持高可用,只會在開發或者測驗環境下用到,且單節點部署方式較簡單,不進行詳述,

3.2.多節點模式

3.2.1.多Master模式(不支持高可用)

一個集群無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master

配置簡單,單個Master 宕機或重啟維護對應用無影響,

單臺機器宕機期間,這臺機器上未被消費的訊息在機器恢復之前不可訂閱,訊息實時性會受到受到影響,

3.2.2.多Master多Slave模式(異步復制)(支持高可用)

每個 Master 配置一個 Slave,有多對Master-Slave, HA,采用異步復制方式,主備有短暫訊息延遲,毫秒級,

即使磁盤損壞,訊息丟失的非常少,且訊息實時性不會受影響,因為Master 宕機后,消費者仍然可以從 Slave消費,此程序對應用透明,不需要人工干預,性能同多 Master 模式幾乎一樣,

Master 宕機,磁盤損壞情況,會丟失少量訊息,

3.2.3.多Master多Slave模式(同步雙寫)(支持高可用)

每個 Master 配置一個 Slave,有多對Master-Slave, HA采用同步雙寫方式,主備都寫成功,向應用回傳成功,

資料與服務都無單點, Master宕機情況下,訊息無延遲,服務可用性與資料可用性都非常高

性能比異步復制模式略低,大約低 10%左右,發送單個訊息的 RT會略高,

4.Kafka高可用

Kafka的部署方式有三種,分別為:單broke節點(不支持高可用),單機多broker模式(支持高可用),多機多broker模式(支持高可用)

4.1.單broke節點(不支持高可用)

單節點部署方式因為不支持高可用,只會在開發或者測驗環境下用到,且單節點部署方式較簡單,不進行詳述,

4.2.單機多broker模式(支持高可用)

這種部署方式其實是一種偽集群模式,單機部署多節點如果出現服務器宕機,那么所有節點都不能正常提供服務,

4.3.多機多broker模式(支持高可用)

Kafka 0.8 以前,是沒有 HA 機制的,就是任何一個 broker 宕機了,那個 broker 上的 partition 就廢了,沒法寫也沒法讀,沒有什么高可用性可言,

Kafka 0.8 以后,提供了 HA 機制,就是 replica(復制品) 副本機制,每個 partition 的資料都會同步到其它機器上,形成自己的多個 replica 副本,所有 replica 會選舉一個 leader 出來,那么生產和消費都跟這個 leader 打交道,然后其他 replica 就是 follower,寫的時候,leader 會負責把資料同步到所有 follower 上去,讀的時候就直接讀 leader 上的資料即可,只能讀寫 leader?很簡單,要是你可以隨意讀寫每個 follower,那么就要 care 資料一致性的問題,系統復雜度太高,很容易出問題,Kafka 會均勻地將一個 partition 的所有 replica 分布在不同的機器上,這樣才可以提高容錯性,

五、訊息重復消費問題

引言:為什么要考慮重復消費的問題?比如我們消費后通過消費中間件來呼叫,扣費10元,但是消費者消費訊息后還沒來得及進行確認,訊息中間件進行了重啟,那么訊息者就會進行再次扣費處理,這樣就會出問題!

ActiveMQ、RabbitMQ、RocketMQ、Kafka,都有可能會出現訊息重復消費的問題,正常,因為這問題通常不是 MQ 自己保證的,是由我們開發來保證的,

我們以Kafka為例說明一下重復消費的問題:

Kafka 實際上有個 offset 的概念,就是每個訊息寫進去,都有一個 offset,代表訊息的序號,然后 consumer 消費了資料之后,每隔一段時間(定時定期),會把自己消費過的訊息的 offset 提交一下,表示“我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的 offset 來繼續消費吧”,

但是,如果在這期間重啟系統或者直接 kill 行程了,再重啟,這會導致 consumer 有些訊息處理了,但是沒來得及提交 offset,重啟之后,少數訊息會再次消費一次,

如果消費者干的事兒是拿一條資料就往資料庫里寫一條,會導致說,你可能就把資料在資料庫里插入了 2 次,那么資料就錯啦,

重復消費問題引發后,我們就需要考慮怎么保證冪等性,

冪等性,通俗點說,就一個資料,或者一個請求,給你重復來多次,你得確保對應的資料是不會改變的,不能出錯,

保證冪等性的具體實作方式需要結合對應的業務去實作,這里提供幾個思路:

  • 如果是資料插入操作,插入前我們根據唯一鍵先進行查詢,如果已有資料那我們只進行更新就行,
  • 如果是寫 Redis,則我們無需考慮冪等性,反正每次都是 set,天然冪等性,
  • 如果是基于資料庫的唯一鍵來保證重復資料不會重復插入多條,因為有唯一鍵約束了,重復資料插入只會報錯,不會導致資料庫中出現臟資料,
  • 如果不是以上集中通用的場景,那需要我們發送訊息的時候攜帶唯一ID,消費者在消費前進行相應的查重處理,處理后在進行相應的業務操作,

六、訊息丟失問題

引言:MQ 有個基本原則,就是資料不能多一條,也不能少一條,

不能多,就是上面說的重復消費和冪等性問題,

不能少,就是說這資料別搞丟了,那這個問題你必須得考慮一下,

訊息丟失的問題需要從生產者、MQ、消費者三個方面來進行考慮,相應的解決方案也需要從這三方面出發(生產者確認機制,MQ訊息持久化、消費者確認機制),

1.ActiveMQ

1.生產者丟失訊息

生產者丟失訊息的問題可以通過訊息重投、重試機制來解決

2.ActiveMQ丟失訊息

ActiveMQ丟失訊息的問題需要通過ActiveMQ訊息持久化機制+高可用(見ActiveMQ章節)來解決,ActiveMQ的訊息持久化機制有以下幾種

JDBC: 持久化到資料庫
AMQ :日志檔案(已基本不用)
KahaDB : AMQ基礎上改進,默認選擇
LevelDB :谷歌K/V資料庫
在activemq.xml中查看默認的broker持久化機制,

3.訊息者丟失訊息

消費者丟失訊息通過ack機制來解決,訊息者進行業務處理后,再進行ack確認,避免訊息丟失,

2.RabbitMQ

1.生產者丟失訊息

生產者訊息丟失,通過confirm機制來確認訊息發送,然后進行相應的訊息重投、重試機制

2.RabbitMQ丟失訊息

RabbitMQ丟失訊息的問題需要通過RabbitMQ訊息持久化機制+高可用(見RabbitMQ章節)來解決,

RabbitMQ持久化包含:

Queue(訊息佇列)的持久化是通過durable=true來實作的,

Message(訊息)的持久化 ,通過設定訊息是持久化的標識,

Exchange(交換機)的持久化 ,

3.訊息者丟失訊息

消費者丟失訊息通過ack機制來解決,訊息者進行業務處理后,再進行ack確認,避免訊息丟失,

3.RocketMQ

1.生產者丟失訊息

生產者訊息丟失,通過confirm機制來確認訊息發送,然后進行相應的訊息重投、重試機制

2.RocketMQ丟失訊息

RocketMQ丟失訊息的問題需要通過RocketMQ訊息持久化機制+高可用(見RocketMQ章節)來解決,

RocketMQ持久化包含:exchange持久化、queue持久化、message持久化

3.訊息者丟失訊息

消費者丟失訊息通過ack機制來解決,訊息者進行業務處理后,再進行ack確認,避免訊息丟失,

4.Kafka

1.生產者丟失訊息

生產者訊息丟失,通過confirm機制來確認訊息發送,然后進行相應的訊息重投、重試機制

2.Kafka丟失訊息

Kafka直接將資料寫入到日志檔案中,以追加的形式寫入

3.訊息者丟失訊息

消費者丟失訊息通過ack機制來解決,訊息者進行業務處理后,再進行ack確認,避免訊息丟失,

總結:其實MQ訊息丟失,無非就是生產者發送時丟失,MQ傳遞時丟失,消費者消費時丟失幾種問題,我們相應的從以上三方面解決就可以,但是上述三種方式使用后,其實也不能保證100%訊息不丟失,所以往往在業務場景還會使用資料庫輔助記錄的方式,來保證訊息不丟失,但資料庫輔助記錄方式對相關性能以及使用用較大的影響,所以一般資料只需要進行上面三種方式處理,就能保證訊息基本不丟失,發生訊息丟失時我們配合日志進行相應的訊息恢復就可以,

資料庫輔助記錄:生產者發送訊息時同步發送一條訊息到資料庫中,消費者拿到訊息并完成業務處理后,從資料庫洗掉對應的記錄,

七、訊息順序性問題

引言:為什么要保證訊息的順序性?

比如現在我們有個賬號余額為5,我們充值50元,購買一件20元的商品,但因訊息不能保證順序,導致先進行扣費處理,這樣就會導致我們購買失敗,

訊息順序性消費情況,尤其在高可用(集群方式)下一定要考慮,

1.ActiveMQ

ActiveMQ因為默認是單queue 佇列,所以它模式就是保證訊息順序性消費的,

2.RabbitMQ

  • 將RabbitMQ拆分多個 queue,每個 queue 一個 consumer,保證訊息的順序性,
  • 一個 queue 但是對應一個 consumer,然后這個 consumer 內部用記憶體佇列做排隊,然后分發給底層不同的 worker 來處理,

3.RocketMQ

RocketMQ保證訊息順序性方法與Kafka大致相同,

  • 一個 topic,一個 queue,一個 consumer,內部單執行緒消費,單執行緒吞吐量太低,一般不會用這個,
  • 寫 N 個記憶體 queue,具有相同 key 的資料都到同一個記憶體 queue;然后對于 N 個執行緒,每個執行緒分別消費一個記憶體 queue 即可,這樣就能保證順序性,

4.Kafka

  • 一個 topic,一個 partition,一個 consumer,內部單執行緒消費,單執行緒吞吐量太低,一般不會用這個,
  • 寫 N 個記憶體 queue,具有相同 key 的資料都到同一個記憶體 queue;然后對于 N 個執行緒,每個執行緒分別消費一個記憶體 queue 即可,這樣就能保證順序性,

八、訊息積壓問題

引言:如何解決訊息佇列的延時以及過期失效問題?訊息佇列滿了以后該怎么處理?有幾百萬訊息持續積壓幾小時,怎么處理?

其實訊息積壓的問題,一般都是由消費端出了問題導致的,在實際業務場景中一般不會出現,但是出現問題一般都是大問題,

模擬場景:

一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鐘就是 18 萬條,由于消費者宕機導致現在MQ中積壓幾百萬資料

解決思路:

  • 先修復 consumer 的問題,確保其恢復消費速度,然后將現有 consumer 都停掉(避免重復消費),
  • 新建一個 topic,partition 是原來的 10 倍,臨時建立好原先 10 倍的 queue 數量,
  • 然后寫一個臨時的分發資料的 consumer 程式,這個程式部署上去消費積壓的資料,消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數量的 queue,
  • 接著臨時征用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的資料,這種做法相當于是臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費資料,
  • 等快速消費完積壓資料之后,得恢復原先部署的架構,重新用原先的 consumer 機器來消費訊息,

mq 中的訊息過期失效了

假設你用的是 RabbitMQ,RabbtiMQ 是可以設定過期時間的,也就是 TTL,如果訊息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉,這個資料就沒了,

假設 1 萬個訂單積壓在 mq 里面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程式把那 1000 個訂單給查出來,手動發到 mq 里去再補一次,

mq 都快寫滿了

如果訊息積壓在 mq 里,長時間都沒有處理掉,此時導致 mq 都快寫滿了,咋辦?

這種情況下只能是通過增加臨時Consumer將資料進行快速消費,等MQ恢復正常后再補充資料,

RocketMQ方案

對于 RocketMQ,官方針對訊息積壓問題,提供了解決方案,

  1. 提高消費并行度
    絕大部分訊息消費行為都屬于 IO 密集型,即可能是操作資料庫,或者呼叫 RPC,這類消費行為的消費速度在于后端資料庫或者外系統的吞吐量,通過增加消費并行度,可以提高總的消費吞吐量,但是并行度增加到一定程度,反而會下降,所以,應用必須要設定合理的并行度, 如下有幾種修改消費并行度的方法:

? 同一個 ConsumerGroup 下,通過增加 Consumer 實體數量來提高并行度(需要注意的是超過訂閱佇列數的 Consumer 實體無效),可以通過加機器,或者 在已有機器啟動多個行程的方式, 提高單個 Consumer 的消費并行執行緒,通過修改引數 consumeThreadMin、consumeThreadMax 實作,

  1. 批量方式消費
    某些業務流程如果支持批量方式消費,則可以很大程度上提高消費吞吐量,例如訂單扣款類應用,一次處理一個訂單耗時 1 s,一次處理 10 個訂單可能也只耗時 2 s,這樣即可大幅度提高消費的吞吐量,通過設定 consumer 的 consumeMessageBatchMaxSize 返個引數,默認是 1,即一次只消費一條訊息,例如設定為 N,那么每次消費的訊息數小于等于 N,

  2. 跳過非重要訊息
    發生訊息堆積時,如果消費速度一直追不上發送速度,如果業務對資料要求不高的話,可以選擇丟棄不重要的訊息,例如,當某個佇列的訊息數堆積到 100000 條以上,則嘗試丟棄部分或全部訊息,這樣就可以快速追上發送訊息的速度,示例代碼如下:

  3. 優化每條訊息消費程序
    舉例如下,某條訊息的消費程序如下:

九、自我實作訊息佇列思路

引言:如果讓你寫一個訊息佇列,該如何進行架構設計?

比如說訊息佇列系統,我們從以下幾個角度來考慮一下:

  • 可擴展性:就是需要的時候快速擴容,就可以增加吞吐量和容量,可以參考afka 的設計理念,broker -> topic -> partition,每個 partition 放一個機器,就存一部分資料,
  • 持久化:為了保證MQ的訊息不丟失,設計時一定要考慮訊息的持久化機制,且持久化要順序寫,這樣就沒有磁盤隨機讀寫的尋址開銷,磁盤順序讀寫的性能是很高的,這就是 kafka 的思路,
  • 高可用:保證MQ的可靠性,可以參考kafka ,多副本 -> leader & follower -> broker 掛了重新選舉 leader 即可對外服務,
  • 能不能支持資料 0 丟失啊?可以的,參考我們之前說的那個 kafka 資料零丟失方案,

十、MQ總結

其實MQ的使用,無非就是從原理,高可用,重復訊息,順序讀寫,資料丟失幾個方面開展,

上述的介紹是偏重思路方面來進行展開的,至于具體的MQ使用細節,我想你有了對應的思路去查會有一大堆,這也是我學習技術的一個思路,先掌握一個大的方向,然后沿著一個大的方向再進行相應的詳細學習,

最后,上述MQ介紹中,大部分都是有我平時開發積累所得,也有一部分是借助網路現場學習,

如有不足或錯誤,歡迎大家指出,我們共同學習進步!

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/500108.html

標籤:其他

上一篇:HttpClient如何進行RequestConfig的相關配置呢?

下一篇:ngnix+tomcat轉發、負載均衡

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more