MQ相關概念
什么是mq
MQ(message queue),從字面意思上看,本質是個佇列,FIFO 先入先出,只不過佇列中存放的內容是 message 而已,還是一種跨行程的通信機制,用于上下游傳遞訊息,在互聯網架構中,MQ 是一種非常常 見的上下游“邏輯解耦+物理解耦”的訊息通信服務,使用了 MQ 之后,訊息發送上游只需要依賴 MQ,不 用依賴其他服務,
為什么要用mq
- 流量削峰
- 應用解耦
- 異步處理
RabbitMq
RabbitMq的概念
RabbitMQ 是一個訊息中間件:它接受并轉發訊息,
四大核心概念
生產者
產生資料發送訊息的程式是生產者
交換機
交換機是 RabbitMQ 非常重要的一個部件,一方面它接收來自生產者的訊息,另一方面它將訊息 推送到佇列中,交換機必須確切知道如何處理它接收到的訊息,是將這些訊息推送到特定佇列還是推 送到多個佇列,亦或者是把訊息丟棄,這個得有交換機型別決定
佇列
佇列是 RabbitMQ 內部使用的一種資料結構,盡管訊息流經 RabbitMQ 和應用程式,但它們只能存 儲在佇列中,佇列僅受主機的記憶體和磁盤限制的約束,本質上是一個大的訊息緩沖區,許多生產者可 以將訊息發送到一個佇列,許多消費者可以嘗試從一個佇列接收資料,這就是我們使用佇列的方式
消費者
消費與接收具有相似的含義,消費者大多時候是一個等待接收訊息的程式,請注意生產者,消費 者和訊息中間件很多時候并不在同一機器上,同一個應用程式既可以是生產者又是可以是消費者,
安裝并開啟rabbitmq的web界面
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
訊息應答
概念
消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并且只完成了部分突然就會掛掉,會發生什么情況,rabbitmq一旦向消費者傳遞了一條訊息,便立即將該訊息標記為洗掉,在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的訊息,以及后續發送給該消費者的資訊,因為他無法接收到
為了保證訊息在發送程序中不丟失,rabbitmq引入了訊息應答機制,訊息應答就是:
消費者在接收到訊息并且處理該訊息之后,告訴rabbitmq它已經處理了,rabbitmq可以把該訊息洗掉了
自動應答
訊息發送后立即被認為已經傳送成功,這種模式需要在高吞吐量和資料傳輸安全性方面做權衡,因為這種模式如果訊息在接受到之前,消費者那邊出現連接或者channel關閉,那么訊息就丟失了,當然另一方面這種模式消費者那邊可以傳遞過載的訊息,沒有對傳遞的訊息數量進行限制,當然這樣有可能使得消費者由于接收太多來不及處理的訊息,導致這些訊息的積壓,最終使得記憶體耗盡,最終這些消費者執行緒被作業系統殺死,所以這種模式僅適用在消費者可以高效并以某種速率能夠處理這些訊息的情況下使用
手動應答
- Channel.basicAck(勇于肯定確認) rabbitmq已經知道該訊息并且成功的處理訊息,可以將其丟棄了
- Channel.basicNack(用于否定確認)
- channel.basicReject(用于否定確認) 與chanel.basicnack相比較少了一個引數 不處理該訊息了 直接拒絕,可以將其丟棄了
multiple的解釋
手動應答的好處是可以批量應答并且減少網路擁堵
channel.basicAck(deliveryTag,true)后邊的引數即為multiple
multiple的true和false代表不同的含義
true代表批量應答channel上未應答的訊息
比如說channel上有傳送tag的訊息,5,6,7,8 當前tag是8 那么此時5-8這些還未應答的訊息都會被確認收到訊息應答
false同上邊相比
只會應答tag=8的訊息5,6,7這三個訊息依然不會被確認收到訊息應答
盡量使用非全應答的訊息,能夠最大程度避免訊息丟失
訊息自動重新入隊
如果消費者由于某些原因失去連接(其通道已經關閉,連接已經關倍訓者tcp連接丟失),導致訊息未發送ack確認,rabbitmq將了解到訊息未完全處理,并將其重新排隊,如果此時其他消費者可以處理,它將很快將其重新分發給另一個消費者,這樣,即使某個消費者偶爾死亡,也可以確保不會丟失任何訊息

c1處理訊息一,但由于c1斷開連接,c1并沒有向mq發送ack,此時訊息一不能夠被mq洗掉,會重新入隊,c2消費者可以處理訊息一,重新交由c2處理
訊息手動應答代碼
默認訊息采用的是自動應答,所以我們想要實作訊息消費程序中不丟失,需要把自動應答改為手動應答
Channel getchannel = utils.getchannel();
getchannel.queueDeclare("hello",false,false,false,null);
boolean autoAck=false;
getchannel.basicConsume("hello", autoAck, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String string = new String(delivery.getBody(),"utf-8");
System.err.println(string);
getchannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, s -> { });
訊息生產者
Channel getchannel = utils.getchannel();
getchannel.queueDeclare("hello",false,false,false,null);
getchannel.basicPublish("","hello",false,null,new String("肥肥是只豬").getBytes());
System.out.println("success");
rabbitmq 持久化
概念
通過手動應答可以保證處理任務時任務不丟失,但是如何保證當mq服務停掉之后訊息生產者發送過來的訊息不丟失,默認情況下mq由于某種原因崩潰時,它忽視佇列和訊息,除非告知它不要這么做,確保訊息不會丟失需要做兩件事:
我們需要將佇列和訊息都標記為持久化
佇列如何實作持久化
如果要實作佇列持久化,需要在宣告佇列的時候把durable引數設定為持久化
bolean durable=true
//訊息佇列持久化
channel.queueDeclare(queue_name,durable,false,false,null);
但是需要注意的是如果之前宣告的佇列不是持久化的,需要把原先的佇列先洗掉,或者重新創建一個持久化的佇列,不然就會出錯
訊息實作持久化
只需在訊息進行發布的時候,添加引數即可
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,new String("肥肥是只豬").getBytes());
將訊息標記為持久化并不能完全保證不會丟失訊息,盡管它告訴mq將訊息保存到磁盤,但是這里依然存在當訊息準備存盤在磁盤的時候,但是還沒有存盤完,訊息還在快取的一個間隔點,此時并沒有完全的寫入磁盤,持久性保證不強,但是對于我們的簡單任務佇列而言,這已經綽綽有余了,
不公平分發
最開始的時候我們學習到 RabbitMQ 分發訊息采用的輪訓分發,但是在某種場景下這種策略并不是 很好,比方說有兩個消費者在處理任務,其中有個消費者 1 處理任務的速度非常快,而另外一個消費者 2 處理速度卻很慢,這個時候我們還是采用輪訓分發的化就會到這處理速度快的這個消費者很大一部分時間 處于空閑狀態,而處理慢的那個消費者一直在干活,這種分配方式在這種情況下其實就不太好,但是 RabbitMQ 并不知道這種情況它依然很公平的進行分發, 為了避免這種情況,我們可以設定引數
channel.basicQos(1);

預取值
? 本身訊息的發送就是異步發送的,所以在任何時候,channel 上肯定不止只有一個訊息另外來自消費 者的手動確認本質上也是異步的,因此這里就存在一個未確認的訊息緩沖區,因此希望開發人員能限制此 緩沖區的大小,以避免緩沖區里面無限制的未確認訊息問題,這個時候就可以通過使用 basic.qos 方法設 置“預取計數”值來完成的,該值定義通道上允許的未確認訊息的最大數量,一旦數量達到配置的數量, RabbitMQ 將停止在通道上傳遞更多訊息,除非至少有一個未處理的訊息被確認,例如,假設在通道上有 未確認的訊息 5、6、7,8,并且通道的預取計數設定為 4,此時 RabbitMQ 將不會在該通道上再傳遞任何 訊息,除非至少有一個未應答的訊息被 ack,比方說 tag=6 這個訊息剛剛被確認 ACK,RabbitMQ 將會感知 這個情況到并再發送一條訊息,訊息應答和 QoS 預取值對用戶吞吐量有重大影響,通常,增加預取將提高 向消費者傳遞訊息的速度,雖然自動應答傳輸訊息速率是最佳的,但是,在這種情況下已傳遞但尚未處理 的訊息的數量也會增加,從而增加了消費者的 RAM 消耗(隨機存取存盤器)應該小心使用具有無限預處理 的自動確認模式或手動確認模式,消費者消費了大量的訊息如果沒有確認的話,會導致消費者連接節點的 記憶體消耗變大,所以找到合適的預取值是一個反復試驗的程序,不同的負載該值取值也不同 100 到 300 范 圍內的值通常可提供最佳的吞吐量,并且不會給消費者帶來太大的風險,預取值為 1 是最保守的,當然這 將使吞吐量變得很低,特別是消費者連接延遲很嚴重的情況下,特別是在消費者連接等待時間較長的環境 中,對于大多數應用來說,稍微高一點的值將是最佳的,
發布確認
發布確認原理

在設定了佇列持久化和訊息持久化之后,并不能完全保證訊息的不丟失,因為訊息持久化時存在一個訊息快取的間隔點,此時就需要發布確認來保證生產者在發布訊息到相應佇列時不會出現訊息丟失
單一發布
簡單的同步確認發布的方式,也就是發布一個訊息之后只有他被確認發布,后續的訊息才能繼續發布
這種確認方式有一個最大的缺點就是:發布速度特別慢,因為如果沒有確認發布的訊息就會阻塞所有后續訊息的發布,這種方式最多提供每秒不超過百條訊息的吞吐量,當然對于某些應用程式來說這可能已經足夠了
Channel channel = utils.getchannel();
//開啟發布確認
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
long start=System.currentTimeMillis();
for (int i = 0; i < MESSAGE_ACCOUNT; i++) {
channel.basicPublish("",QUEUE_NAME,null,(i+"").getBytes());
boolean b = channel.waitForConfirms();
if(b){
System.out.println("訊息發布成功");
}
}
long end=System.currentTimeMillis();
System.out.println("總共花費時間"+(end-start));//31362
批量發布
與單個確認訊息相比,先發布一批訊息然后一起確認可以極大的提高吞吐量,當然這種方式的缺點就是:當發生故障導致發布出現問題的時,不知道那個訊息出現問題了,我們必須將整個批處理保存在記憶體中,以記錄重要的訊息而后重新發布訊息
package com.example.mqfeifei.test;
import com.rabbitmq.client.Channel;
public class MultPartConfirm {
public static void main(String[] args) throws Exception {
Channel channel = utils.getchannel();
channel.confirmSelect();
channel.queueDeclare("cxf",false,false,false,null);
long start=System.currentTimeMillis();
for (int i = 0; i < 1000 ; i++) {
channel.basicPublish("","cxf",null,(i+"").getBytes());
if(i%100==0) channel.waitForConfirms();
}
long end=System.currentTimeMillis();
System.out.println("總共耗費的時間為"+(end-start));//5827
}
}
異步發布
異步確認邏輯雖然比上邊兩個都要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回呼函式來達到訊息可靠性傳遞的,這個中間件也是通過函式回呼來保證是否投遞成功

Channel channel = utils.getchannel();
channel.confirmSelect();
channel.queueDeclare("aysnc",false,false,false,null);
//添加確認的回呼函式
/**
* 第一個引數為成功的回呼函式
* 第二個引數為失敗的回呼函式
* 當然這里也可以進行確認失敗的訊息的處理
* 1.將所有發送的訊息存到一個執行緒安全的map中 key為在channel中的id value為訊息的message value
* 2.在成功的回呼函式中,將該成功的回呼函式移除
* 3. 最后執行緒安全的map 剩余的即為所有的未能被broker確認的訊息,統一進行處理
*/
channel.addConfirmListener((deliveryTag, multiple) -> System.out.println(deliveryTag+"已經被broker確認"), (deliveryTag, multiple) -> { });
long start=System.currentTimeMillis();
for (int i = 0; i < 1000 ; i++) {
channel.basicPublish("","aysnc",null,(i+"").getBytes());
}
long end=System.currentTimeMillis();
System.err.println("總共消耗的時間為"+(end-start)+"feifeifeifeifeifeifeifeifeifeifeifeifeifeifei");//31ms
三種發布確認速度對比
單獨發布訊息
同步等待確認,簡單,但吞吐量非常有限
批量發布訊息
批量同步等待確認,簡單,合理的吞吐量,一旦出現問題但很難推斷出是哪條訊息出現了問題
異步處理
最佳性能和資源使用,在出現錯誤的情況下可以很好的控制,但是實作起來稍微難些
交換機
rabbitmq訊息傳遞模型的核心思想是:生產者生產的訊息從不會直接發送到佇列,實際上,通常生產者甚至不知道這些訊息傳遞到了哪些佇列中
相反生產者只能將訊息發送到交換機,交換機作業的內容非常簡單,一方面他接受來自生產者的訊息,另一方面將他們推入佇列,交換機必須確切知道如何處理收到的訊息,是應該吧這些訊息放到特定佇列還是說應該丟棄他們,這就由交換機的型別類決定

fanout
類似于廣播的模式,它是將收到的所有訊息廣播到它知道的所有佇列中,當然系統中也是有默認的exchange型別的

Direct exchange
什么是bindings,系結是交換機和佇列之間的橋梁關系,也可以這么理解:佇列只對它系結的交換機的訊息感興趣,系結引數:rouingkey 來表示也可稱為binding key,創建系結可以使用代碼:
channel.queueBind(queueName,exchangename,"routingKey")
系結之后的意義尤其交換型別決定
Fanout這種交換型別并不能給我們帶來很大的靈活性-它只能進行無意識的廣播,在這里我們將使用direct這種型別來進行替換,這種型別的作業方式是,訊息只去到它系結的routingkey佇列中去

消費者代碼如下:
public static void main(String[] args) throws IOException {
Channel channel = utils.getchannel();
channel.exchangeDeclare("feifei_direct","direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"feifei_direct","fei");
channel.basicConsume(queue, true, (consumerTag, message) -> {
String msg=new String(message.getBody());
System.out.println(msg);
}, consumerTag -> {});
}
生產者代碼如下
public static void main(String[] args) throws Exception{
Channel channel = utils.getchannel();
channel.exchangeDeclare("feifei_direct","direct");
channel.basicPublish("feifei_direct","xiang",null,"菲菲啊啊啊啊".getBytes("utf-8"));
}
多重系結

如果exchange的系結型別是direct,但是它系結的多個佇列的key如果都相同,這種情況下雖然系結型別是direct但是它表現得就和fanout就有點類似了,就跟廣播差不多
Topic exchange
發送到型別是topic交換機的訊息routing_key不能隨意寫,必須滿足一定的條件,它必須是一個單詞串列,以點號分割開,這些單詞可以是任意單詞,當然這個單詞串列最多不能超過255個位元組
*:可以替代一個單詞
#:可以替代零個或者多個單詞
示例代碼如下:
消費者端
public static void main(String[] args) throws Exception {
Channel channel = utils.getchannel();
String queue = channel.queueDeclare().getQueue();
channel.exchangeDeclare("feifei_topic","topic");
channel.queueBind(queue,"feifei_topic","cxf.*");
channel.basicConsume(queue, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg=new String(message.getBody());
System.out.println(msg);
}
}, consumerTag -> {});
}
生產者端
public static void main(String[] args) throws Exception{
Channel channel = utils.getchannel();
channel.exchangeDeclare("feifei_topic","topic");
channel.basicPublish("feifei_topic","cxf.name",null,"cxf".getBytes());
}
死信佇列
概念
先從概念解釋上搞清楚這個定義,死信,顧名思義就是無法被消費的訊息,字面意思可以這樣理 解,一般來說,producer 將訊息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出訊息 進行消費,但某些時候由于特定的原因導致 queue 中的某些訊息無法被消費,這樣的訊息如果沒有 后續的處理,就變成了死信,有死信自然就有了死信佇列, 應用場景:為了保證訂單業務的訊息資料不丟失,需要使用到 RabbitMQ 的死信佇列機制,當訊息 消費發生例外時,將訊息投入死信佇列中.還有比如說: 用戶在商城下單成功并點擊去支付后在指定時 間未支付時自動失效
死信的來源
訊息 TTL 過期
佇列達到最大長度(佇列滿了,無法再添加資料到 mq 中)
訊息被拒絕(basic.reject 或 basic.nack)并且 requeue=false

下邊示例下訊息訊息ttl過期的代碼
c1的代碼
public static void main(String[] args) throws Exception{
/**
* 正常佇列的系結和死信佇列的系結都在這里創建好
* 同時,正常佇列和死信佇列的關系也在這里定義好了
*/
Channel channel = utils.getchannel();
channel.exchangeDeclare("normal","direct");
channel.exchangeDeclare("sixin","direct");
Map<String,Object> arg=new HashMap<>();
//設定訊息在佇列中過期時間,但不建議在消費者定義,生產者定義更為靈活
// arg.put("x-message-ttl",10000);
arg.put("x-dead-letter-exchange","sixin");
arg.put("x-dead-letter-routing-key","fei");
channel.queueDeclare("normal_queue",false,false,false,arg);
channel.queueDeclare("sixin_queue",false,false,false,null);
channel.queueBind("normal_queue","normal","xiang");
channel.queueBind("sixin_queue","sixin","fei");
}
producer的代碼
public static void main(String[] args) throws Exception{
Channel channel = utils.getchannel();
channel.exchangeDeclare("normal","direct");
AMQP.BasicProperties pro=new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i <10 ; i++) {
channel.basicPublish("normal","xiang",pro,"肥肥".getBytes("utf-8"));
}
}
延遲佇列
先來回顧下上邊提到的死信佇列,它的形成原因可能是
- ttl過期
- 超過最大佇列長度
- 訊息被拒絕
此時如果對應的消費者一直不存在,生產者對訊息設定了過期時間,在不考慮,超過最大佇列長度和訊息被拒絕,那么每條訊息被消費的最小時間即為設定的過期時間,即隨后所有的訊息在過期后都會被,死信佇列對應的消費者消費
這即為延遲佇列
延遲佇列概念
延時佇列,佇列內都是有序的,最重要的特性就體現在它的延時屬性上,延時佇列中的元素是希望在指定時間到了以后或者之前取出和處理,簡單來說,延時佇列就是用來存放需要在指定時間被處理的元素的佇列
使用場景
以下圖為例子

拿購票為例子,先進性座位的選取,在最后在進行付款,但付款肯定是有時效的,比如說要在30分鐘內進行付款
- 生成訂單后,將其記錄到延遲佇列中,生產者端將ttl設定為30分鐘
- 延遲佇列的消費者檢查資料庫中,對應訂單的付款欄位是否已經付款,未付款的話將座位的狀態再次更改為可售
- 若用戶在30分鐘內已經付款則更新該訂單的資料庫表
- 若在30分鐘內已經付款,延遲佇列的訊息在消費的時候也會在查表時,判斷出已經進行了付款,即不用再進行操作
時延佇列的優化,時延佇列如果只是在和交換機系結的佇列中進行設定,此時使用時只需要將對應的時延的訊息按照routingkey轉發到不同的時延佇列也同樣可以實作,時延佇列,但是同樣的問題也就來了,如果我們需要的時延改變了,就得創建一個新的佇列,設定我們需要的時延,此時即可以設定一個沒有過期時間的佇列,在訊息的生產者端進行訊息的生產

/*
按照上圖架構圖構建的mq
*/
@Configuration
public class MqConfig {
@Bean("queue1")
Queue queue1(){
return QueueBuilder.nonDurable("10sde")
.ttl(10000)
.deadLetterExchange("direct_low")
.deadLetterRoutingKey("sila")
.build();
}
@Bean("queue2")
Queue queue2(){
return QueueBuilder.nonDurable("30sde")
.ttl(30000)
.deadLetterExchange("direct_low")
.deadLetterRoutingKey("sila")
.build();
}
@Bean("queue3")
Queue queue3(){
return QueueBuilder.nonDurable("sixin")
.build();
}
@Bean("queue4")
Queue queue4(){
return QueueBuilder.nonDurable("0sde")
.deadLetterRoutingKey("sila")
.deadLetterExchange("direct_low")
.build();
}
@Bean("direct_top")
DirectExchange directExchange(){
return new DirectExchange("direct_top");
}
@Bean("direct_low")
DirectExchange directExchange1(){
return new DirectExchange("direct_low");
}
@Bean
Binding binding0(DirectExchange direct_top,Queue queue4){
return BindingBuilder.bind(queue4).to(direct_top).with("0");
}
@Bean
Binding binding1(Queue queue1,DirectExchange direct_top){
return BindingBuilder.bind(queue1).to(direct_top).with("10");
}
@Bean
Binding binding2(Queue queue2,DirectExchange direct_top){
return BindingBuilder.bind(queue2).to(direct_top).with("30");
}
@Bean
Binding binding3(Queue queue3,DirectExchange direct_low){
return BindingBuilder.bind(queue3).to(direct_low).with("sila");
}
}
訊息的生產者端進行訊息過期時間的設定
@RestController
public class test {
@Autowired
AmqpTemplate amqpTemplate;
@RequestMapping("/test")
public void test(){
amqpTemplate.convertAndSend("direct_top","0","菲菲菲", CorrelationData->{
CorrelationData.getMessageProperties().setExpiration("20000");
return CorrelationData;
});
}
}
看起來似乎沒有什么問題,但是 最開始的時候,就介紹過如果使用在訊息屬性上設定ttl的方式,訊息可能不會按時“死亡”,因為rabbbitmq只會檢查第一個訊息是否過期,如果過期則丟到死信佇列,如果第一個訊息的延時時長很長,而第二個訊息的延時 時長很短,第二個訊息并不會優先得到執行
這里就需要基于插件的延遲佇列
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/355272.html
標籤:其他
下一篇:flink運行架構詳解
