訊息佇列
-
訊息佇列(Message Queue,簡稱MQ),從字面意思上看,本質是個佇列,FIFO先入先出,只不過佇列中存放的內容是message而已
-
常見的訊息佇列
-
RabbitMq ActiveMq ZeroMq kafka等;
-
-
為什么使用RabbitMq?
-
RabbitMQ是一個實作了AMQP(Advanced Message Queuing Protocol)高級訊息佇列協議的訊息佇列服務,用Erlang語言的,
-
可靠性,RabbitMQ的持久化支持,保證了訊息的穩定性;
-
高并發,RabbitMQ使用了Erlang開發語言,Erlang是為電話交換機開發的語言,天生自帶高并發光環,和高可用特性;
-
集群部署簡單,正是應為Erlang使得RabbitMQ集群部署變的超級簡單;
-
社區活躍度高,根據網上資料來看,RabbitMQ也是首選;
-
-
作業機制
-
生產者:訊息的創建者,負責創建和推送資料到訊息服務器;
消費者:訊息的接收方,用于處理資料和確認訊息;
代理:就是RabbitMQ本身,用于扮演“快遞”的角色,本身不生產訊息,只是扮演“快遞”的角色,
-
原理流程:就是生產者發送訊息到虛擬主機,虛擬主機把訊息交給指定的交換機,交換機按照規則扔給訊息佇列進行存盤,訊息佇列等待消費者來消費,
-
訊息持久化(三個條件)
-
投遞訊息的時候durable設定為true,訊息持久化;
-
訊息已經到達持久化交換器上;
-
訊息已經到達持久化的佇列;
-
-
持久化作業原理
-
Rabbit會將你的持久化訊息寫入磁盤上的持久化日志檔案,等訊息被消費之后,Rabbit會把這條訊息標識為等待垃圾回收,
-
-
持久化缺點:
-
訊息持久化的優點顯而易見,但缺點也很明顯,那就是性能,因為要寫入硬碟要比寫入記憶體性能較低很多,從而降低了服務器的吞吐量,盡管使用SSD硬碟可以使事情得到緩解,但他仍然吸干了Rabbit的性能,當訊息成千上萬條要寫入磁盤的時候,性能是很低的,
-
-
MQ用途
-
同步變異步訊息
-
用戶下單完成后,發送郵件和短信通知, 運用訊息佇列之后,用戶下單完之后,下單資訊寫入資料庫,再寫入訊息佇列,發送郵件和發送短信各自去訊息佇列進行讀取,節省時間,提高效率,
-
-
應用解耦
-
場景:用戶下單后,訂單系統需要多渠道通知用戶, a、下單服務系統:用戶使用下單服務后,將下單資訊寫入資料庫,下單成功, b、短信服務系統:用戶下單后,將短信資訊寫入訊息佇列,以發送短信資訊通知用戶交易資訊, c、郵件服務系統:用戶下單后,將郵件資訊寫入訊息佇列,以發送郵件資訊通知用戶交易資訊, 這樣,如果微信通知不能正常使用,也不影響用戶下單,用戶下單后,只用把下單通知資訊寫入訊息佇列,不用關心后續操作,實作了訂單系統和通知系統的解耦,
-
-
流量削峰
-
一般在秒殺或者團購活動中使用, 場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉,針對這個問題,一般需要在應用前端加入訊息佇列, a、可以控制活動的人數 b、可以緩解短時間內高流量壓垮應用 用戶的請求,服務器接收后,首先寫入訊息佇列,如果訊息佇列的數量大于最大的數量,則直接拋棄用戶請求或者跳轉錯誤頁面,
-
訊息流程
-
*生產者(producer)*把訊息發送給交換機,當你創建交換機的時候,你需要指定型別,交換機的型別接下來會講到,
-
*交換機(exchange)*接收訊息并且負責對訊息進行路由,根據交換機的型別,訊息的多個屬性會被使用,例如路由鍵,
-
*系結(binding)*需要從交換機到佇列的這種方式來進行創建,在這個例子里,我們可以看到交換機有到兩個不同佇列的系結,交換機根據訊息的屬性來把訊息分發到不同的佇列上,
-
*訊息(message)*訊息會一直留在佇列里直到被消費,
-
*消費者(consumer)*處理訊息,
RabbitMQ核心概念
-
生產者(Producer):發送訊息的應用,
消費者(Consumer):接收訊息的應用,
佇列(Queue):存盤訊息的快取,
訊息(Message):又生產者通過RabbitMQ發送給消費者的資訊,
連接(Connection):連接RabbitMQ和應用服務器的TCP連接,
通道(Channel):連接里的一個虛擬通道,當你通過訊息佇列發送或者接收訊息時,這個操作都是通過通道進行的,
交換機(Exchange):從生產者那里接收訊息,并根據交換型別分發到對應的訊息列隊里,要實作訊息的接收,一個佇列必須系結一個交換機,
系結(Binding):系結是佇列和交換機的一個鏈接,
RabbitMQ幾種集群模式
-
單機模式
-
普通集群模式
-
鏡像集群模式
保證冪等性
-
假設你有個系統,消費一條往資料庫里插入一條,要是你一個訊息重復兩次,你不就插入了兩條,這資料不就錯了?但是你要是消費到第二次的時候,自己判斷一下已經消費過了,直接扔了,不就保留了一條資料?
-
一條資料重復出現兩次,資料庫里就只有一條資料,這就保證了系統的冪等性冪等性,我通俗點說,就一個資料,或者一個請求,給你重復來多次,你得確保對應的資料是不會改變的,不能出錯,
-
解決
-
比如你拿個資料要寫庫,你先根據主鍵查一下,如果這資料都有了,你就別插入了,update一下好吧
-
通過redis,那沒問題了,反正每次都是set,天然冪等性
-
比如你不是上面兩個場景,那做的稍微復雜一點,你需要讓生產者發送每條資料的時候,里面加一個全域唯一的id,類似訂單id之類的東西,然后你這里消費到了之后,先根據這個id去比如redis里查一下,之前消費過嗎?如果沒有消費過,你就處理,然后這個id寫redis,如果消費過了,那你就別處理了,保證別重復處理相同的訊息即可;
-
還有比如基于資料庫的唯一鍵來保證重復資料不會重復插入多條,我們之前線上系統就有這個問題,就是拿到資料的時候,每次重啟可能會有重復,因為kafka消費者還沒來得及提交offset,重復資料拿到了以后我們插入的時候,因為有唯一鍵約束了,所以重復資料只會插入報錯,不會導致資料庫中出現臟資料;
-
-
訊息丟失
-
生產者將資料發送到rabbitmq的時候,可能資料就在半路給搞丟了,因為網路啥的問題,都有可能
-
生產者端丟失資料解決:
-
rabbitmq事務機制(同步的):生產者發送資料之前開啟rabbitmq事務(channel.txSelect),然后發送訊息,如果訊息沒有成功被rabbitmq接收到,那么生產者會收到例外報錯,此時就可以回滾事務(channel.txRollback),然后重試發送訊息;如果收到了訊息,那么可以提交事務(channel.txCommit)
-
開啟confirm模式(異步的),在生產者那里設定開啟confirm模式之后,你每次寫的訊息都會分配一個唯一的id,然后如果寫入了rabbitmq中,rabbitmq會給你回傳一個ack訊息,告訴你說這個訊息ok了,如果rabbitmq沒能處理這個訊息,會回呼你一個nack介面,告訴你這個訊息接收失敗,你可以重試,而且你可以結合這個機制自己在記憶體里維護每個訊息id的狀態,如果超過一定時間還沒接收到這個訊息的回呼,那么你可以重發
-
-
rabbitmq弄丟了資料 就是rabbitmq自己弄丟了資料,這個你必須開啟rabbitmq的持久化,就是訊息寫入之后會持久化到磁盤,哪怕是rabbitmq自己掛了,恢復之后會自動讀取之前存盤的資料,一般資料不會丟,除非極其罕見的是,rabbitmq還沒持久化,自己就掛了,可能導致少量資料會丟失的,但是這個概率較小,
-
步驟一:一個是創建queue的時候將其設定為持久化的,這樣就可以保證rabbitmq持久化queue的元資料,但是不會持久化queue里的資料;
-
第二個是發送訊息的時候將訊息的deliveryMode設定為2,就是將訊息設定為持久化的,此時rabbitmq就會將訊息持久化到磁盤上去,必須要同時設定這兩個持久化才行,rabbitmq哪怕是掛了,再次重啟,也會從磁盤上重啟恢復queue,恢復這個queue里的資料,而且持久化可以跟生產者那邊的confirm機制配合起來,只有訊息被持久化到磁盤之后,才會通知生產者ack了,所以哪怕是在持久化到磁盤之前,rabbitmq掛了,資料丟了,生產者收不到ack,你也是可以自己重發的,哪怕是你給rabbitmq開啟了持久化機制,也有一種可能,就是這個訊息寫到了rabbitmq中,但是還沒來得及持久化到磁盤上,結果不巧,此時rabbitmq掛了,就會導致記憶體里的一點點資料會丟失,
-
-
消費者端丟失資料
-
rabbitmq如果丟失了資料,主要是因為你消費的時候,剛消費到,還沒處理,結果行程掛了,比如重啟了,那么就尷尬了,rabbitmq認為你都消費了,這資料就丟了,這個時候得用rabbitmq提供的ack機制,簡單來說,就是你關閉rabbitmq自動ack,可以通過一個api來呼叫就行,然后每次你自己代碼里確保處理完的時候,再程式里ack一把,這樣的話,如果你還沒處理完,不就沒有ack?那rabbitmq就認為你還沒處理完,這個時候rabbitmq會把這個消費分配給別的consumer去處理,訊息是不會丟的,
-
訊息順序性
-
rabbitmq保證資料的順序性
-
如果存在多個消費者,那么就讓每個消費者對應一個queue,然后把要發送 的資料全都放到一個queue,這樣就能保證所有的資料只到達一個消費者從而保證每個資料到達資料庫都是順序的,
-
如何處理訊息佇列積壓問題
-
積壓分三個方面
-
broker producer consumer
-
-
broker
-
處理業務能力極強 性能高 還可以水平拓展 不用擔心
-
-
producer端優化:
-
一般先執行自己端的業務邏輯 才發訊息 檢查是不是自己的業務邏輯耗時太多 (設定批量發送以及批量發送的大小可以改善)
-
發生訊息積壓后 producer端服務降級 關閉一些非核心業務 減少訊息的產生
-
-
consumer端的優化
-
主要原因是:消費者的消費能力跟不上生產者的額生產能力
-
擴容方案:利用臨時消費者 消費原來佇列中的訊息 讓消費者不做任何耗時的動作 將訊息均勻寫入創建的佇列中 將更多consumer部署到更多的服務器來消費新創建的佇列上的訊息;
-
等待積壓的訊息被消耗到正常水平 撤掉擴容服務器;
-
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/291818.html
標籤:其他
上一篇:優化了破網站的搜索功能
