主頁 >  其他 > RabbitMQ 超詳細入門篇

RabbitMQ 超詳細入門篇

2022-01-25 07:23:56 其他

RabbitMQ 入門篇🚪

MQ 的基本概念:

什么是 MQ ?

MQ全稱為Message Queue即訊息佇列

  • "訊息佇列" 是在訊息的傳輸程序中保存訊息的容器

  • 它是典型的:生產者————消費者模型

    生產者不斷向訊息佇列中生產訊息 ———————— 消費者不斷的從佇列中獲取訊息.

    這樣的好處: 生產者只需要關注發訊息,消費者只需要關注收訊息,二者沒有業務邏輯的侵入,這樣就實作了生產者和消費者的解耦.

為什么要使用 MQ?

或者說MQ 有什么好處,MQ 主要可以實作三種功能:

服務解耦

  • 場景:服務A產生資料, 而服務B,C,D需要這些資料

    那么我們可以在A服務中直接呼叫B,C,D服務,把資料傳遞到下游服務即可;

  • 隨著我們的應用規模不斷擴大,會有更多的服務需要A的資料

    如果有幾十甚至幾百個下游服務,而且會不斷變更,再加上還要考慮下游服務出錯的情況 A服務中呼叫代碼的維護會極為困難 程式非常的耦合

  • ,通過 MQ訊息佇列 可以實作,對程式的 解耦

    A服務只需要向訊息服務器發送訊息,而不用考慮誰需要這些資料

    下游服務如果需要資料,自行從訊息服務器訂閱訊息,不再需要資料時則取消訂閱即可

流量削峰

  • 場景:

    我們有一個應用,平時訪問量是每秒300請求,我們用一臺服務器即可輕松應對 √

    但,在高峰期, 訪問量瞬間翻了十倍, 達到每秒3000次請求, 單臺服務器無法應對 我們增加到10臺服務器,減壓

  • 而,很多時候這種高壓 每天只出現一次,每次只有半小時

    那么我們10臺服務器在多數時間都只分擔每秒幾十次請求,這樣就有點浪費資源了

  • 使用MQ來進行流量削峰

    我們可以對于這種,可能會突然產生高請求的功能,設定一個MQ

    當用戶發起請求后臺并不會立刻處理,而是通過 MQ 發送一個請求,發送到佇列里面,排隊等待處理…

    我們的后臺,接收者,發現佇列中有訊息,一個一個的取出,進行后臺處理… 避免了同一時刻大量的請求,而處理不過來導致 服務崩潰~
    在這里插入圖片描述

異步呼叫

  • 場景:

    對于有些服務之間的呼叫會有很長時間的回應,而用戶并不能接受這么時間的回應:

    A 呼叫 B,B 需要花費很長時間執行,但是 A 需要知道 B 什么時候可以執行完給頁面回應…

  • 外賣支付

    相信大家都點過外賣,用戶支付完成之后,到真正外賣到手是一個很漫長復雜的程序~ 我們不可能一直停留在頁面上進行等待~

    支付后————發送支付成功的通知————再尋找外賣小哥來進行配送…

    而尋找外賣小哥的程序非常耗時,高峰期,可能要等待幾十秒甚至更長,這樣就造成整條呼叫鏈路回應非常緩慢

    在這里插入圖片描述

  • MQ解決方案:

    用戶下單,訂單資料可以發送到訊息佇列服務器,立刻回應客戶端 為您尋找騎手,整條鏈路的回應時間只有200毫秒左右

    訊息接收方,監聽獲取每一個訂單訊息后臺緩慢的尋找外賣小哥~

    在這里插入圖片描述

AMQP 和 JMS ?

AMQP 和 JMS 是目前市面上常見的兩種 訊息佇列協議

AMQP

  • AMQP 高級訊息佇列協議!

    是應用層協議的一個開放標準,為面向訊息的中間件設計,兼容 JMS RabbitMQ 就是基于 AMQP 協議實作的

JMS

  • JMS Java 訊息服務

    JMS的客戶端之間可以通過JMS服務進行異步的訊息傳輸

  • JMS(Java Message Service,Java訊息服務)API是一個訊息服務的標準或者說是規范

    就像JDBC一樣通過介面定義一組規范,不同的實作嘗試實作對于的驅動來完成開發...

    它使分布式通信耦合度更低,訊息服務更加可靠以及異步性, ActiveMQ 就是基于 JMS 規范實作的

總結:

規范:

  • AMQP 為訊息定義了線路層(wire-level protocol)的協議

  • JMS所定義的是API規范

跨平臺

  • Java 體系中,多個client均可以通過JMS進行互動,不需要應用修改代碼,但是其對跨平臺的支持較差
  • AMQP天然具有跨平臺、跨語言特性

支持訊息型別

  • JMS 支持TextMessage、MapMessage 等復雜的訊息型別
  • AMQP 僅支持 byte[] 訊息型別(復雜的型別可序列化后發送

Exchange 交換機提供的路由演算法

  • AMQP可以提供多樣化的路由方式來傳遞訊息到訊息佇列 4種交換機型別,6種模式
  • JMS 僅支持 佇列 和 主題/訂閱 方式兩種

常見MQ產品:

  • ActiveMQ:基于JMS,早期的MQ框架,現在已經很少使用了
  • Kafka:分布式訊息系統,高吞吐量
  • RabbitMQ:基于AMQP協議,erlang語言開發,穩定性好 本篇學習😶
  • RocketMQ:基于JMS,阿里巴巴產品,目前交由Apache基金會

RabbitMQ

  • 官方地址

  • RabbitMQ是由erlang語言開發,所以安裝環境需要安裝 erlang

  • 基于AMQP(Advanced Message Queue 高級訊息佇列協議)協議實作的訊息佇列

  • 它是一種應用程式之間的通信方法,訊息佇列在分布式系統開發中應用非常廣泛

image-20210625230930992

RabbitMQ 的作業原理

RabbitMQ-00000007

組成部分:

Producer 訊息生產者

  • 訊息生產者,即生產方客戶端,生產方客戶端將訊息,通過信道Channel發送到MQ

Connection 連接物件

  • Producer /Consumer 和 broker 之間的 TCP 連接

    程式通過,Connection連接物件將,創建出Channel信道生產者通過 信道 將訊息發送給MQ 消費者通過 信道 獲取到MQ的訊息~

  • Channel 信道:

    如果每一次訪問 RabbitMQ 都建立一個 Connection,訊息量大的時候,對于性能也是巨大的;

    Channel 是在 connection 內部建立的邏輯連接,為 Connection 減少了作業系統建立 TCP connection 的開銷; 細節不詳細介紹

    可以理解為是一個,訊息資料傳遞的一個通到

    可以通過它,來創建配置,生產者|消費者 與MQ通信 宣告設定系結:交換機|佇列

Broker 可以認為是 MQ

  • 訊息佇列服務行程此行程包括兩個部分:Exchange交換機和Queue佇列

  • Exchange交換機

    是 RabbitMQ 非常重要的一個部件

    一方面它接收來自生產者的訊息,另一方面它將訊息 推送到佇列中

  • Queue 佇列

    RabbitMQ 內部使用的一種資料結構佇列 佇列就像是一個“吸管” 一邊吸水一邊出水,遵循 “先進先出”原則;

    生產者發訊息——交換機——轉發到佇列上 是真正訊息存盤的地方~

Consumer 訊息消費者

  • 訊息消費者,即消費方客戶端,通關信道Channel接收MQ轉發的訊息,并進行相關的處理;

-----發送訊息-----

  • 生產者通過 Connection 和Broker建立TCP連接,
  • Connection 建立 Channel 通道
  • 生產者通過信道,將訊息發送給Broker(MQ),由Exchange將訊息進行轉發~ 佇列中去!

-----接收訊息-----

  • 消費者通過 Connection 和Broker建立TCP連接
  • Connection 建立 Channel 通道
  • 消費者監聽指定的Queue(佇列),當有訊息到達Queue時Broker默認將訊息,通過 Channel 推送給消費者

Exchange 交換機四種型別 ?

RabbitMQ訊息傳遞模型的核心思想是:

  • 生產者永遠不會將任何訊息直接發送到佇列,通常生產者甚至不知道訊息是否會被傳遞到任何佇列 生產者只能向交換機(Exchange)發送訊息

  • 交換機是一個非常簡單的東西,一邊接收來自生產者的訊息,另一邊將訊息推送到佇列.

  • RabbitMQ 的交換機具有很多中型別,可以完成很多種復雜的場景操作:

交換機型別:

  • fanout: 廣播模式發布/訂閱,交換機給所有的佇列,發送相同的訊息;

  • direct : 路由模式routing key 交換機,根據對應的 routing key 的佇列上發送訊息;

  • topic: 動態路由模式,可以用過一定的規則定義 roting key 使 交換機動態的多樣性選擇 佇列

    * 表示一個單詞

    # 表示任意數量(零個或多個)單詞

  • headers: 請求頭模式,目前用的很少了,就像請求頭一樣,發送訊息時候附帶頭部資料,交換機根據訊息的頭部資訊匹配對應的佇列;

RabbitMQ環境搭建 ?

本次搭建是Linux 的 如果有朋友是Win的話可以參考這篇文章:🚀

工具準備🔨:

RabbitMQ是由erlang語言開發,所以安裝環境需要安裝 erlang

  • erlang-21.3.8.21-1.el7.x86_64.rpm erlang環境
  • rabbitmq-server-3.8.8-1.el7.noarch.rpm rabbit安裝

官網下載,如果沒有的話也可以底部本人的網盤下載

環境搭建🏚:

本人使用的是 阿里云服務器 沒有的話也可以使用虛擬機… 事先使用連接工具上傳了檔案

本人喜歡把工具都安裝在 /usr/wsm 目錄下:

[root@iZj6ciuzx7luldnazt4iswZ ~]# cd /
[root@iZj6ciuzx7luldnazt4iswZ /]# ls
bin   dev  home        lib    lost+found  mnt  patch  root  sbin  sys  usr  www
boot  etc  install.sh  lib64  media       opt  proc   run   srv   tmp  var
[root@iZj6ciuzx7luldnazt4iswZ /]# cd usr
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin  etc  games  include  lib  lib64  libexec  local  sbin  share  src  tmp
[root@iZj6ciuzx7luldnazt4iswZ usr]# mkdir wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin  etc  games  include  lib  lib64  libexec  local  sbin  share  src  tmp  wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# cd wsm
[root@iZj6ciuzx7luldnazt4iswZ wsm]# ls
erlang-21.3-1.el7.x86_64.rpm  rabbitmq-server-3.8.8-1.el7.noarch.rpm					#上傳的兩個檔案

解壓安裝:

# 解壓安裝 erlang
rpm -ivh erlang-21.3.8.21-1.el7.x86_64.rpm
# 云下載一個 初始化一些配置, 程序比較慢請耐心等待~, 在這之后才可以進行 安裝 RabbitMQ
yum install socat -y
# 解壓安裝 rabbitmq
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

ok ,安裝完畢了解一些 RabbitMQ 命令:

# 啟動服務
systemctl start rabbitmq-server
# 查看服務狀態
systemctl status rabbitmq-server
# 開機自啟動
systemctl enable rabbitmq-server
# 停止服務
systemctl stop rabbitmq-server
# 重啟服務
systemctl restart rabbitmq-server

注意:這里只是把RabbitMQ 服務給搭建好了,為了方便操作我們還需要安裝一個web控制面板

# 安裝web控制面板
rabbitmq-plugins enable rabbitmq_management

# 安裝完畢以后,重啟服務即可
systemctl restart rabbitmq-server

# 訪問 http://服務器ip:15672 ,用默認賬號密碼(guest)登錄,出現權限問題
# 默認情況只能在 localhost 本機下訪問,所以需要添加一個遠程登錄的用戶
# 創建賬號和密碼: admin 123456
rabbitmqctl add_user admin 123456
# 設定用戶角色,用戶級別: administrator monitoring policymaker managment
rabbitmqctl set_user_tags admin administrator
# 為用戶添加資源權限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read>  # 添加配置、寫、讀權限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

##### 擴展一些命令:#####
關閉應用的命令為:  rabbitmqctl stop_app
清除的命令為:		 rabbitmqctl reset
重新啟動命令為:	rabbitmqctl start_app
  • 如果是阿里云的服務器 別忘記開啟埠 還有 關閉防火墻~

用戶級別:

  1. administrator:可以登錄控制臺、查看所有資訊、可以對 rabbitmq 進行管理
  2. monitoring:監控者 登錄控制臺,查看所有資訊
  3. policymaker:策略制定者 登錄控制臺,指定策略
  4. managment:普通管理員 登錄控制臺

主要埠介紹:阿里云建議將這些都打開~

  1. 4369 – erlang發現口

  2. 5672 – client端通信口

  3. 15672 – 管理界面ui埠

  4. 25672 – server間內部通信口

測驗是否可以訪問:

訪問頁面:

image-20220115041255983

  • Overview

    概覽 RabbitMQ 的整體情況,也可以查看集群各個節點的資訊 情況 MQ 各個埠映射資訊

  • Connection

    該 選項專欄 下是MQ 與各個 生產者 消費者 連接情況.

  • Channels

    這里展示,各個 通道 與 連接的關系

  • Exchanage

    展示所有的 交換機

  • Queue

    展示所有的 佇列

  • Admin

    這里管理著,MQ 所有的操作用戶~

RabbitMQ 管理頁面:

Overview

image-20220118231048282

Connections

image-20220118232637148

  • Name 連接名 點擊連接名, 還可以查看詳細的資訊~
  • User name 當前連接登錄MQ 的用戶
  • State 當前連接的狀態,running 運行 idle 空閑
  • SSL|TLS 是否使用的是 SSL|TLS協議
  • Peotocol AMQP 0-9-1 指的是AMQP 的協議版本號
  • Channels 當前連接創建通道的 通道總數
  • From client 每秒發出的訊息數
  • To client 每秒接收的訊息數

Channels

image-20220118233736878

記錄各個連接的信道:

一個連接IP 可以有多個信道 多個通道通過多執行緒實作,不相互干擾 我們在 信道中創建:佇列 交換機 ...

生產者的通道一般使用完之后會立馬關閉,消費者是一直監聽的…

  • Channel 通道名稱

  • User Name 該通道,創建者 用戶名

  • Model 通道的確認模式 C confirm模式 T 表示事務

  • State 通道當前的狀態 running 運行 idie 空閑

  • Unconfirmed 待確認的訊息數

  • Prefetch 預先載入

    Prefetch 表示每個消費者最大的能承受的未確認訊息數目

    簡單來說就是用來指定一個消費者一次可以從 RabbitMQ 中獲取多少條訊息并快取在消費者中,

    一旦消費者的緩沖區滿了,RabbitMQ 將會停止投遞新的訊息到該消費者中直到它發出有訊息被 ack 了

    消費者負責不斷處理訊息,不斷 ack,然后只要 UnAcked 數少于 Prefetch * consumer 數目,RabbitMQ 就不斷將訊息投遞過去

  • Unacker 待 ack 的訊息數

  • publish 訊息生產者發送訊息的 速率

  • confirm 訊息生產者確認訊息的 速率

  • unroutable drop 表示訊息,未被接收,且已經洗掉的訊息.

  • deliver / get 訊息消費者獲取訊息的 速率

  • ack 訊息消費者 ack 訊息的速率. MQ 的 ACK機制:100%訊息消費!

Exchange

image-20220119002047093

Queue

image-20220119002310809
  • Name 表示訊息佇列的名稱
  • Type 訊息佇列的型別…
  • Features:表示訊息佇列的特性,D 表示訊息佇列持久化
  • State:表示當前佇列的狀態,running 表示運行中;idle 表示空閑
  • Ready:表示待消費的訊息總數
  • Unacked:表示待應答的訊息總數
  • Total:表示訊息總數 Ready+Unacked
  • incoming:表示訊息進入的速率
  • deliver/get:表示獲取訊息的速率
  • ack:表示訊息應答的速

Admin

image-20220119004257754

Java 集成 RabbitMQ 案例

創建一個Maven專案并使用 git 進行管理, wlog.md檔案進行著專案日志的記錄?~

引入RabbitMQ 的依賴:

pom.xml

<dependencies>
    <!-- rabbitMQ 依賴 -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
</dependencies>

簡單模式 Hello Word:

RabbitMQ-00000012

如圖,顯而易見,非常簡單就是一個一發一讀 的程序…

  • P:生產者,也就是要發送訊息的程式
  • C:消費者:訊息的接受者,會一直等待訊息到來,
  • queue:訊息佇列,圖中紅色部分,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息

發送者

Producer.Java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/** 訊息生產者 **/
public class Producer {
    // 定義佇列名稱
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.243.109.199");
        factory.setUsername("admin");
        factory.setPassword("123456");
        //channel 實作了自動 close 介面 自動關閉 不需要顯示關閉
        //創建連接物件
        Connection connection = factory.newConnection();
        //根據連接物件,獲取信道
        Channel channel = connection.createChannel();

        /**設定訊息佇列的屬性!
         *  queue       :佇列名稱
         *  durable     :是否持久化 如果持久化,mq重啟后佇列資料還在! (佇列是在虛擬路徑上的...)
         *  exclusive   :佇列是否獨占此連接,佇列只允許在該連接中訪問,如果connection連接關閉佇列則自動洗掉,如果將此引數設定true可用于臨時佇列的創建
         *  autoDelete  :佇列不再使用時是否自動洗掉此佇列,如果將此引數和exclusive引數設定為true就可以實作臨時佇列(佇列不用了就自動洗掉)
         *  arguments   :佇列引數 null,可以設定一個佇列的擴展引數,需要時候使用!比如:可設定存活時間
         * */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /**發送訊息,引數:
         * exchange     :指定的交換機,不指定就會有默認的....
         * routingKey   :路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用默認交換機routingKey設定為佇列的名稱
         * props        :訊息包含的屬性: 后面介紹,可以是一個一個物件... 訊息持久化配置...
         * body         :發送的訊息,AMQP以位元組方式傳輸...
         * */
        channel.basicPublish("", QUEUE_NAME, null, "Hello Word你好世界".getBytes());
        System.out.println("訊息發送完畢");
    }
}

消費者

Consumer.Java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/** 訊息消費者 **/
public class Consumer {
    // 定義佇列名稱
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.243.109.199");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        System.out.println("等待接收訊息.........");
        //收到訊息后用來處理訊息的回呼物件
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println(message);
        };

        //取消消費的一個回呼介面 如在消費的時候佇列被洗掉掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("訊息消費被中斷");
        };
       	/**
         * 消費者消費訊息 - 接受訊息
         * queue            消費哪個佇列
         * autoAck          消費成功之后是否要自動應答 true 代表自動應答 false 手動應答,要通過編程實作回復驗證,這就是Unacked 為回傳ack的資料
         * deliverCallback  消費方法,當消費者接收到訊息要執行的方法, 引數是一個函式式介面可以使用 lambda運算式~
         * cancelCallback   訊息被取消時的回呼
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}
  • 建議先啟動 消費者 在啟動 發送者

    可以看到,消費者啟動之后,在等待 發生者 發送訊息,發送者啟動發送訊息,消費者控制臺會立刻接收到訊息!

  • MQ 發送者一般情況下都不會直接忘佇列發訊息 這種情況下MQ 都會有一個默認的交換機~

作業模式 Work Queues

在這里插入圖片描述

作業模式 相當于 簡單模式的 升級版!

  • 多個消費者,對應一個發送者,發送者 產生的訊息存在佇列種,佇列會以復雜均衡形式 輪詢的發送給多個消費者

一般應用于:發送方事務簡單,接收方事務復雜…

  • 美團外賣:用戶下單——后臺內部要聯系商家 騎手 生產訂單 處理...

抽取作業類:

因為上面示例我們知道,創建交換機|佇列 需要Channel信道 交換機 佇列是創建在信道里面的

  • 而每次創建交換機的時候,都要創建一次 Connection Channel
  • 于是我們可以將它抽離出一個工具類 MQChannelUtil.Java

MQChannelUtil.Java

  • com.wsm目錄下創建一個 util包專門用來存盤工具類🛠
import com.rabbitmq.client.Channel; //匯入MQ的包~
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/** RabbitMQ 連接配置類: **/
public class MQChannelUtil {
    //得到一個連接的 channel
    public static Channel getChannel() throws Exception {
        //創建一個連接工廠, 設定連接: IP 埠 用戶 密碼
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("42.192.149.71");
//      factory.setPort("設定對應的埠,默認就是: 5672");
        factory.setUsername("admin");
        factory.setPassword("123456");
        //創建連接物件 信道物件
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

發送者

Producer.Java

import com.rabbitmq.client.Channel;
import com.wsm.Util.MQChannelUtil;
import java.util.Scanner;

/** 訊息生產者 **/
public class Producer {
    // 定義佇列名稱
    private final static String QUEUE_NAME = "Word";

    public static void main(String[] args) throws Exception {
        // 工具類創建一個信道
        Channel channel = MQChannelUtil.getChannel();
        // Java控制臺測驗法訊息:
        Scanner scanner = new Scanner(System.in);
        //創建交換機
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 回圈多次發布訊息:
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("訊息發送完畢");
        }
    }
}

消費者1

Consumer1.Java

import com.rabbitmq.client.*;
import com.wsm.Util.MQChannelUtil;

/** 訊息消費者 **/
public class Consumer1 {
    // 定義佇列名稱
    private final static String QUEUE_NAME = "Word";

    public static void main(String[] args) throws Exception {
        // 工具類創建一個信道
        Channel channel = MQChannelUtil.getChannel();

        //收到訊息后用來處理訊息的回呼物件
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println(message);
        };

        //取消消費的一個回呼介面 如在消費的時候佇列被洗掉掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("訊息消費被中斷");
        };

        /** 消費者消費訊息 - 接受訊息: 注意引數兩個回呼函式~ */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

消費者2

Consumer2.Java 和消費者1 一模一樣換一個名字,兩個消費者監聽一個佇列 進行資料處理....

結果測驗:

訊息被輪詢消費

365e3222ea94df441dc80517f8ef4e1

  • 通程序式執行發現生產者總共發送 4 個訊息,消費者 1 和消費者 2 分別分得兩個訊息,并且是按照有序的一個接收一次訊息
  • MQ 發送訊息,一般情況下只會被一個 消費者執行消費 消費者執行之后, 佇列就會將訊息洗掉,(ACK機制...
  • 后面可以通過,交換機模式完成,一個訊息被多個消費者消費…

訊息確認接識訓制 ACK

訊息一旦被消費者接收,佇列中的訊息就會被洗掉

RabbitMQ怎么知道訊息被接收了呢?

  • 如果消費者領取訊息后,還沒執行操作就掛掉了呢?或者拋出了例外?訊息消費失敗!

  • 但是RabbitMQ無從得知,這樣訊息就丟失了!

因此,RabbitMQ有一個ACK機制

  • 當消費者獲取訊息后,會向RabbitMQ發送回執ACK 告知訊息已經被接收,不過這種回執ACK分兩種情況:
  • 自動ACK: 訊息一旦被接收,消費者自動發送ACK
  • 手動ACK: 訊息接收后,不會發送ACK,需要手動呼叫

自動ACK

  • RabbitMQ 默認此種模式:

    訊息發送后立即被認為已經傳送成功! 消費者 接收到訊息,就向佇列發送ack,佇列立刻就洗掉訊息

  • 這種模式需要在高吞吐量和資料傳輸安全性方面做權衡 僅適用在消費者可以高效并以 某種速率能夠處理這些訊息的情況下使用

手動ACK

  • 訊息接收后,不會發送ACK,需要手動代碼進行呼叫 待消費者 執行完畢之后,在通過代碼向 佇列發送ack,佇列接收到ack 之后會將訊息洗掉!

  • channel.basicAck(long deliveryTag,boolean multiple); 用于肯定確認

    RabbitMQ 已知道該訊息并且成功的處理訊息,可以將其丟棄了

    引數1 long型別,表示處理的訊息標識,MQ沒發送一個訊息都一個對于該訊息的唯一標識… 就像序列化 序列號一樣,用于網路傳輸..

    引數2 boolean型別,表示是否支持批量處理

  • channel.basicNack(deliveryTag, false, true); 用于否定確認, 消費者 訊息執行程序中失敗,或服務器掛機…

    引數1 同上,訊息的唯一標識

    引數2 表示是否支持批量處理

    引數3 requeue true則重新入佇列 false丟棄或者進入死信佇列

  • channel.basicReject(deliveryTag, true); 用于否定確認

    引數1 同上

    引數2 requeue true則重新入佇列 false丟棄或者進入死信佇列

    與 Channel.basicNack 相比少一個引數,不可以進行批量處理…

Multiple 批量訊息處理:

  • true 代表批量應答處理

    比如,現在佇列上存在 1 2 3 4 四個訊息,都發送給了消費者,而消費者逐一處理,4 結束了.

    不管是否 ACK|NACK 都直接將,其它的 1 2 3 都以相同的,方式進行 批量處理!

    好處:在MQ 服務,穩定的時候,支持大量的訊息處理速度… 缺點,容易造成資料丟失💀...

  • flase 建議使用,不批量應答

    就是, 一次只處理當前訊息的 ACK|NACK

訊息自動重新入隊

消費者設定了手動ACK 之后....

如果消費者由于某些原因失去連接 其通道已關閉,連接已關倍訓 TCP 連接丟失 導致訊息未發送 ACK 確認

  • 消費者監聽 佇列訊息,消費者開始處理,但是處理程序中,消費者突然與MQ 連接斷開 消費者服務掛了
  • MQ 正常情況下會與 消費者建立連接,當消費者突然斷開,一段時間沒有回傳,訊息處理的 ack,MQ就會當作消費者出現故障. 將訊息重新交給其它消費者處理!心跳機制?

生產者

Producer.Java

import com.rabbitmq.client.Channel;
import com.wsm.Util.MQChannelUtil;
import java.util.Scanner;

/** 訊息生產者 **/
public class Producer {
    // 定義佇列名稱
    private final static String QUEUE_NAME = "ack_test";

    public static void main(String[] args) throws Exception {
        // 工具類創建一個信道
        Channel channel = MQChannelUtil.getChannel();
        // Java控制臺測驗法訊息:
        Scanner scanner = new Scanner(System.in);
        // 創建佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 回圈多次發布訊息:
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("訊息發送完畢");
        }
    }
}

消費者1

Consumer1.Java

  • basicConsume(); 消費者監聽訊息方法,第二個引數:true自動ack false手動ack
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wsm.Util.MQChannelUtil;

/** 訊息消費者 **/
public class Consumer2 {
    // 定義佇列名稱
    private final static String QUEUE_NAME = "ack_test";

    public static void main(String[] args) throws Exception {
        // 工具類創建一個信道
        Channel channel = MQChannelUtil.getChannel();
        //收到訊息后用來處理訊息的回呼物件
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 為了方便查看處理效果,我們將消費者 執行緒休眠一段時間 模擬處理資料;
            try {
                Thread.sleep(10000); // 毫秒 *1000;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String message = new String(delivery.getBody());
            System.out.println(message);
            /** 接收處理完畢訊息之后給MQ 回復ack
             *  引數1 訊息的唯一標識Tag
             *  引數2 是否支持批量回復,一般建議false 保證資料安全!
             * **/
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        //取消消費的一個回呼介面 如在消費的時候佇列被洗掉掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("訊息消費被中斷");
        };
        /** 消費者消費訊息 - 接受訊息: 注意引數兩個回呼函式~ */
        /** 引數二 設定 false 手動應答 **/
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
    }
}

消費者2

Consumer2.Java 和 消費者1 類似,為了方便測驗更改了,執行緒休眠時間 30s

結果測驗:

測驗1

  • 正常的發送訊息,AA BB CC DD

  • 發現啊,消費者1 消費者2 還是輪詢的進行消費資料,只不過消費者2 比較慢…

    10a0ae583454f8dc0440c9f9e8a2dac

測驗2

  • 正常的發送訊息,AA BB CC DD

  • 當消費者1 接收到訊息后,消費者2 也正在接收訊息,因為有 Thread.sleep(30000) 所以會有很長時間的處理,此時關閉消費者2

    MQ 與 消費者2 斷開連接,消費者2 也沒有發送ACK 所以消費者2的訊息將會 重回佇列...又交給了 消費者1來進行消費!

    747ddc18deab952ac3fe830bfc3893d

  • 當然,實際開發中因為 消費1 | 2 都存在處理了 BB 為了確保資料安全,還要進行 冪等的處理!

RabbitMQ 持久化 簡單

正常情況下,RabbitMQ 只是一個訊息中間件 一邊接收者生產者訊息 等待消費者監聽處理...

  • 訊息不會持久化保存在 佇列中
  • 如果:突然某天 RabbitMQ 突然掛機,那么就會造成資料的丟失:佇列中為處理的訊息... 生產者最新發送的訊息... ...
  • 為了,保證資料的安全,我們需要將 佇列 訊息 都進行持久化處理,防止資料丟失~

佇列持久化

然佇列持久化非常簡單, 只需要一個配置即可:

在宣告佇列的時候,就通過引數就可以完成對佇列的持久化,注意:佇列持久化 并不是 訊息持久化 佇列每次重啟都會恢復但是內部的訊息 還需要另外的配置!

// 讓佇列持久化
boolean durable = true;	// false 不持久化   true 持久化 
// 宣告佇列
channel.queueDeclare("佇列名", durable, false, false, null);	// 設定true 之后,每次MQ重啟的時候,該佇列都會自動重新在虛擬路徑上自動加載... 

訊息持久化

需要在 生產者 發送訊息的時候添加一個配置 MessageProperties.PERSISTENT_TEXT_PLAIN,告訴MQ 這個訊息很重要,要進行持久化保存!

// 發送者發送訊息的時候,帶上 MessageProperties.PERSISTENT_TEXT_PLAIN 告訴,MQ訊息要進行持久化;
channel.basicPublish("交換機", "佇列名", MessageProperties.PERSISTENT_TEXT_PLAIN, "要發送的訊息,位元組傳輸");
  • 當然這里,訊息仍然存在丟失問題

    當訊息,剛發到MQ 中,還沒有準備,存盤磁盤,訊息還在快取的一個"間隔點" MQ 突然掛了… 也會影響到訊息的 持久化;

    但這里對于,普通的存盤已經綽綽有余了…

不公平 分發

前提是,設定訊息手動ACK

在最開始的時候我們學習到 RabbitMQ 分發訊息采用的輪訓分發

  • 但是在某種場景下這種策略并不是很好:

    比方說有兩個消費者在處理任務,消費者 1 處理任務的速度非常快 消費者 2 處理速度卻很慢

    這個時候依然采用默認的 輪詢分發 勢必不太合理…

  • 為了避免這種情況,MQ 支持,我們切換不同的 分發模式

  • 不公平 分發

    我們可以通過 信道設定:channel.basicQos(1);

    意思就是如果這個任務我還沒有處理完或者我還沒有應答你,你先別分配給我,我目前只能處理一個任務.

    rabbitmq 就會把該任務分配給沒有那么忙的那個空閑消費者,如果沒有空閑消費者,訊息就會堆積在佇列中去~

預取值 分發

預取值 分發

  • 就是說,我們在設定訊息者 連接 佇列 的時候,可以給 信道設定 預取值.

其實預取值,和 不公平分發 很類似

  • 都是設定: 消費者 連接 信道 時候設定 ``channel.basicQos(?);` 消費者最高訊息堆積數;

  • ``channel.basicQos(0);` 默認0 輪詢模式

  • ``channel.basicQos(1);` 不公平分發模式

    消費者1 消費者2 都設定,channel.basicQos(1); 消費者最高允許訊息堆積數.

    兩個消費者每次只能從,佇列中拿一個訊息進行消費,完了就立刻在從 佇列中,在拿,這樣做的快的消費者,自然就處理的訊息多了!不公平分發

  • channel.basicQos(>1);大于>1預取值 分發

    消費者1 消費者2 分別跟據服務的性能設定,channel.basicQos(?); 消費者最高允許訊息堆積數.

    假設: 消費者1 basicQos(2) 消費者2 basicQos(5)

    這樣,假設佇列中有訊息:1 2 3 4 5 6 7,當最初 消費者1 | 2 都空閑時候…

    消費者1獲取1 消費者2獲取2 消費者1獲取3 消費者2獲取4 消費者2獲取5 消費者2獲取6 .... 消費者2 允許訊息在信道中最大的堆積數 5

    當然也有可能會出現,消費者1 處理很快,消費者2 很慢,消費者1處理完1 3,消費者2還在處理2 4567,那后面的 8 9 10 都給1處理…

預取值 分發,就是預計這個消費者,性能高低,設定消費者 允許最高堆積?個訊息等待這個處理!

RabbitMQ - 發布確認confirm

confirm 發布確認機制:

生產者將信道設定成 confirm 模式

  • 一旦信道進入 confirm 模式,所有在該信道上面發布的訊息都將會被指派一個唯一的 ID 從1開始

  • 發送者 —— 訊息 —— 佇列,上后

    MQ broker 就會發送一個確認給 生產者 生產者就知道訊息已經正確到達目的佇列了.

  • 如果 訊息佇列 和 訊息 進行了持久化設定

    那么確認訊息會在將訊息寫入磁盤之后發出,broker回傳給生產者的確認訊息已經,發送到佇列上!

發布確認策略:

RabbiMQ 默認是沒有開啟 comfirm 發布確認機制

  • 如果要開啟需要呼叫方法 confirmSelect,每當你要想使用發布確認,都需要在 channel 上呼叫該方法
  • // 開啟發布確認 channel.confirmSelect();

單個發布確認:

它是一種同步確認發布的方式,也就是發布一個訊息之后只有它被確認發布,后續的訊息才能繼續發布

  • 這種確認方式有一個最大的缺點就是:發布速度特別的慢,一次只能發一個!

    因為如果沒有確認發布的訊息就會阻塞所有后續訊息的發布,這種方式最多提供每秒不超過數百條發布訊息的吞吐量;

  • 優點:保證訊息的100%發送,缺點:對于訊息的發送實在太慢,對于大量資料不適合!

  • 實作:

    1. 在宣告佇列之后,開啟發布確認,channel.confirmSelect();

    2. 通過信道 channel.waitForConfirms(); 來判斷當前訊息是否發送成功!

    這個方法只有在訊息被確認 的時候才回傳,如果在指定時間范圍內這個訊息沒有被確認那么它將拋出例外. 單個發布確認,只有一個訊息發送 才能發送下一個訊息

Producer.Java

/** 單個發布確認 **/
public static void singleConfirm() throws Exception {
    Channel channel = MQChannelUtil.getChannel();
    // 佇列宣告
    channel.queueDeclare("singleConfirm", false, false, false, null);
    // 開啟發布確認
    channel.confirmSelect();

    // 為了方便計算各個 發布確認策略 耗時: 開始-結束放一個系統時間獲取毫秒數;
    long begin = System.currentTimeMillis();
    
    // 發送1000 個訊息....
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", "singleConfirm", null, message.getBytes());

        // 服務端回傳 false 或超時時間內未回傳,生產者可以訊息重發
        boolean flag = channel.waitForConfirms();
        if (flag) {
            System.out.println("訊息發送成功");
        }
        // else{ /** 訊息重發處理... **/  }
    }
    
    // 1000個訊息結束...
    long end = System.currentTimeMillis();
    System.out.println("發布" + MESSAGE_COUNT + "個單獨確認訊息,耗時" + (end - begin) + "ms");
}

本次執行耗時:67042毫秒

批量發布確認:

相比 單個發布確認, 發送確認機制,實在是太慢了! 每次只能保證一個訊息的發送成功,實在是太慢了!

批量發布確認:

  • 其實和 單個發布確認 實作方式一樣

    1. 開啟發布確認 channel.confirmSelect();

    2. 獲取批量的訊息,是否全部發送到MQ channel.waitForConfirms();

  • waitForConfirms(); 方法()

    單個發布 和 批量發布其實都一樣,都是呼叫 waitForConfirms() 方法,查看當前訊息是否都到達MQ

    不同的是,單個發布每次發一條都驗證 批量是在一定數量進行驗證

    waitForConfirms(); 方法會使,當前 發生者執行緒進行阻塞,等待MQ 回傳資料,

    MQ回傳 上一次呼叫waitForConfirms() 到現在呼叫waitForConfirms() 所有發送的訊息是否抵達MQ 全部抵達true 則false

  • 因此:批量發布確認,相當于 單個發布確認,一個批量執行,大大節省了程序中冗余的一些步驟性能...

    當然,如果其中有一個訊息沒有發送到MQ 它并不能確定是那一個 訊息 沒有抵達MQ

Producer.Java

/** 批量發布確認 **/
public static void batchConfirm() throws Exception {
    Channel channel = MQChannelUtil.getChannel();
    // 佇列宣告
    channel.queueDeclare("batchConfirm", false, false, false, null);
    // 開啟發布確認
    channel.confirmSelect();

    // 為了方便計算各個 發布確認策略 耗時: 開始-結束放一個系統時間獲取毫秒數;
    long begin = System.currentTimeMillis();

    // 批量確認訊息大小,當發送的訊息數到 100 執行 waitForConfirms(); 詢問MQ 當前所有的訊息有沒有抵達~
    int batchSize = 100;
    // 未確認訊息個數, 每次發送訊息 ++ 用于判斷是否改批量驗證訊息發送;
    int outstandingMessageCount = 0;
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        // 發送的訊息,并發送~
        String message = i + "";
        channel.basicPublish("", "batchConfirm", null, message.getBytes());
        // 每次發送一個訊息進行計算當前是第幾個,為批量驗證的訊息;
        outstandingMessageCount++;
        // 判斷當前的 100 個訊息有沒有都發送的MQ上!
        if (outstandingMessageCount == batchSize) {
            channel.waitForConfirms();
            outstandingMessageCount = 0;
        }
    }

    //為了確保還有剩余沒有確認訊息 再次確認
    if (outstandingMessageCount > 0) {
        channel.waitForConfirms();
    }

    long end = System.currentTimeMillis();
    System.out.println("發布" + MESSAGE_COUNT + "個批量確認訊息,耗時" + (end - begin) + "ms");
}

本次執行耗時:1149毫秒 快速了很多...

異步發布確認:

異步發布確認 相對于上面兩個比較復雜,但: 性價比 效率 具有顯著的提升

  • 它是采用回呼的方式來完成,訊息傳遞的可靠性.

實作原理:

  • ① 發送者 只需要關注 發訊息

    **發送者 每次 給MQ 發送訊息的時候,會默認的給每個訊息帶上一個唯一的ID標識 ** 后面我們就可以通過這個標識來,確定是那一個訊息發送 成功|失敗

  • ② 發送者,方法體中寫一個 異步確認監聽器 addConfirmListener(ConfirmCallback,ConfirmCallback);

    方法引數支持兩個,ConfirmCallback類物件 一個表示接收訊息做的事情 另一個是未接收到訊息做的事情...

    ConfirmCallback 是一個 函式式介面, 支持 lambda運算式 和 內部類形式書寫…

  • 發送者 一直往MQ 上發送訊息,MQ 每收到一個訊息會,呼叫發送者的 addConfirmListener(ack,nack) 方法

    告知發送者,訊息成功發送 | 或 未發送成功!

  • 發送者,在根據:

    addConfirmListener(ack,nack) 來處理,訊息成功處理,訊息失敗處理…

Producer.Java

/** 異步發布確認 **/
public static void syncConfirm() throws Exception{
    Channel channel = MQChannelUtil.getChannel();
    // 佇列宣告
    channel.queueDeclare("syncConfirm", false, false, false, null);
    /** 開啟發布確認 **/
    channel.confirmSelect();
    // 為了方便計算各個 發布確認策略 耗時: 開始-結束放一個系統時間獲取毫秒數;
    long begin = System.currentTimeMillis();

    /** 步驟一: 創建一個執行緒安全的一個哈希表,用于記錄每一個訊息發送,這樣MQ異步回傳時候可以知道具體是那一個訊息發送成功|失敗 **/
    /**
         * 執行緒安全有序的一個哈希表,適用于高并發的情況
         * 1.輕松的將序號與訊息進行關聯: k,v 存盤結構, k訊息標識 v發送的訊息體,每次發送訊息前先存在集合中;
         * 2.輕松批量洗掉條目只要給到序列號: 對于發現成功的訊息,直接從集合中移除...
         * 3.支持并發訪問,Concurrent介面 是執行緒安全的; */
    ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

    /** 步驟三: 撰寫回呼監聽器,因為: 訊息發送出錯要立刻進行監聽所以,所以創建在發送訊息之前; **/
    /** ack 確認收到訊息的一個回呼 1.訊息序列號 2.true 批量確認接受小于等于當前序列號的資料 false 確認當前序列號訊息 */
    ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
        if (multiple) {
            System.out.println("訊息成功接收:"+sequenceNumber);
            // ConcurrentNavigableMap方法()回傳的是小于|等于 K 的集合, true:小于等于 false:回傳小于該序列號的資料集合;
            ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
            // 清除該部分確認訊息 confirmed 里保存的都是,MQ 已經接收的訊息;
            // 遍歷 confirmed K, 根據 K 洗掉 outstandingConfirms 的值...
            // outstandingConfirms 里面保存的都是,MQ 還未確認的訊息...
        }else{
            //只清除當前序列號的訊息
            outstandingConfirms.remove(sequenceNumber);
        }
    };
    // nack 訊息失敗執行{} 可以寫,訊息失敗需要執行的代碼...
    ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
        // 這里就輸出一下為被確認的訊息...
        String message = outstandingConfirms.get(sequenceNumber);
        System.out.println("發布的訊息"+message+"未被確認,序列號"+sequenceNumber);
    };
    // 發生者 等待MQ回呼訊息確認的 監聽器, 本次程式值監聽 ack成功的訊息;
    channel.addConfirmListener(ackCallback, null);

    /** 步驟二: 發送者一直往MQ發送訊息 **/
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = "訊息" + i;
        // channel.getNextPublishSeqNo() 獲取下一個訊息的序列號,通過序列號與訊息體進行一個關聯,全部都是未確認的訊息體
        outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
        // 發送訊息;
        channel.basicPublish("", "syncConfirm", null, message.getBytes());
    }

    long end = System.currentTimeMillis();
    System.out.println("發布" + MESSAGE_COUNT + "個批量確認訊息,耗時" + (end - begin) + "ms");
}

耗時:96毫秒 超級快的好吧

  • 發送者,只需要關注訊息的發送,MQ 會將每條訊息發布情況,回呼給發生者
  • 發送者每次發訊息前,將訊息存盤在快取中,成功了就洗掉,失敗了就重新發送~ 非常nice👍

RabbitMQ 交換機Exchange:

以上我們創建一個: 發送者 佇列 消費者 就可以完成通信了

而,RabbitMQ 的核心思想是: 生產者 生產的訊息不會直接發送到隊佇列上, 甚至不知道佇列的存在,通過一個交換機

  • 生產者,只需要關注 往交換機上發送訊息即可!

    交換機作業的內容非常簡單:

    一方面它接收來自生產者的訊息 另一方面將它們推入佇列 交換機必須確切知道如何處理收到的訊息,這就的由交換機的型別來決定.

    RabbitMQ-00000007
  • Exchange 交換機型別:

    fanout: 廣播模式發布/訂閱,交換機給所有的佇列,發送相同的訊息;

    direct : 路由模式routing key 交換機,根據對應的 routing key 的佇列上發送訊息;

    topic: 動態路由模式,可以用過一定的規則定義 roting key 使 交換機動態的多樣性選擇 佇列

    • * 表示一個單詞

    • # 表示任意數量(零個或多個)單詞

    headers: 請求頭模式,目前用的很少了,就像請求頭一樣,發送訊息時候附帶頭部資料,交換機根據訊息的頭部資訊匹配對應的佇列;

無名交換機:

  • 上面的 Demo案例,我們幾乎沒有對 交換機 進行任何的操作,但是,仍然可以進行訊息發送|接收

  • 因為:channel.basicPublish("", "佇列名", null, "發送的訊息".getBytes()); 對于 “” 空字串的交換機,MQ 會有默認的交換機進行操作…

臨時佇列:

  • 對于有些時候,我們需要連接一個佇列, 而這個佇列,并不常用,用完即丟的情況下,可以考慮使用:臨時佇列

  • String queueName = channel.queueDeclare().getQueue();

    讓服務器 信道,給我們創建一個臨時的佇列,隨機佇列名稱 一旦我們斷開了消費者的連接,佇列將被自動洗掉

發布訂閱模式 Publish/Subscribe 交換機型別:Fanout

在這里插入圖片描述

Fanout 型別:

  • 這種型別非常簡單,它可以將,它知道的所有的訊息,廣播到所有佇列中去. 也成為:廣播模式
  • 常見場景: 某某軟體很多人關注/訂閱了一個博主,博主一更新,所有的粉絲都收到更新訊息!

Fanout 實戰:

  • 定義一個生產者,交換機,生產者不停的往交換機中發訊息

  • 交換機提前與 一個|多個佇列系結,每當有訊息來的時候,交換機會將訊息發送到所有的佇列中去… 每個訊息者監聽(訂閱)一個佇列,多個訊息者可以同,監聽到相同的訊息;

生產者:

Producer.Java

/** 訊息生產者 **/
public class Producer {
    // 交換機名
    public static final String EXCHANGE_NAME = "wsm";

    public static void main(String[] args) throws Exception {
        // 創建連接物件,宣告交換機 發送訊息
        Channel channel = MQChannelUtil.getChannel();
        /**
         * 宣告一個 exchange
         * 1.exchange 的名稱
         * 2.exchange 的型別, 可以是String直接寫,也可以是 列舉型別;
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        Scanner sc = new Scanner(System.in);
        System.out.println("請輸入資訊");
        while (sc.hasNext()) {
            String message = sc.nextLine();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println("生產者發出訊息" + message);
        }
    }
}

消費者1:

/** 訊息消費者 **/
public class Consumer1 {
    // 定義交換機名稱
    public static final String EXCHANGE_NAME = "wsm";

    public static void main(String[] args) throws Exception {
        Channel channel = MQChannelUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        /** 生成一個臨時的佇列 佇列的名稱是隨機的 當消費者斷開和該佇列的連接時 佇列自動洗掉  */
        String queueName = channel.queueDeclare().getQueue();
        // 系結: 把該臨時佇列系結我們的 exchange 其中 routingkey(也稱之為 binding key)為空字串: Fanout模式 routingkey 沒作用!
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        // 發送回呼
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("wsm 發布的最新訊息:"+message);
        };
        // 消費者監聽訊息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    }
}

消費者2:

因為:

  • 佇列使用了MQ自動生成,所有幾乎代碼無改動一模一樣即可! String queueName = channel.queueDeclare().getQueue(); 臨時佇列

  • 生產者——發送訊息> 交換機 ——將訊息發送到對應的> 佇列 ——> 消費者(監聽佇列,處理訊息…

fda52544a8cac7b991b140743f82a82

路由模式 Routing 交換機型別:DIRECT

在這里插入圖片描述

DIRECT 模式:

  • 和 Fanout 模式類似

  • 系結 交換機/佇列 時候,需要指定 routing key 一個佇列,可以設定多個 routingkey

    發送者發送訊息, 會攜帶上 routingkey

  • 佇列在與交換機進行系結的時候,會設定好 佇列的 routingkey

    生產者 往交換機上發送訊息,交換機只會將訊息 向匹配的佇列上發送訊息, 消費者 監聽佇列訊息消費

DIRECT 實體:

  • 創建一個生產者,同時發送兩個訊息,分別指定 Conkey1 Conkey2

  • 創建兩個接收者,一個監聽的佇列 系結交換機時指定 Conkey1 另一個系結交換機時 系結兩個routingkey:Conkey1 Conkey2

  • 啟動:生成者 消費1 消費2 查看結果…

生產者:

Producer.Java

/** 訊息生產者 **/
public class Producer {
    // 交換機名
    public static final String EXCHANGE_NAME = "DIRECT";

    public static void main(String[] args) throws Exception {
        // 創建連接物件,宣告交換機 發送訊息
        Channel channel = MQChannelUtil.getChannel();
        /**
         * 宣告一個 exchange
         * 1.exchange 的名稱
         * 2.exchange 的型別, 可以是String直接寫,也可以是 列舉型別; */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        /** 發送訊息 **/
        channel.basicPublish(EXCHANGE_NAME, "Conkey1", null, "Conkey1 發送的訊息".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "Conkey2", null, "Conkey2 發送的訊息".getBytes("UTF-8"));
    }
}

消費者1:

Consumer1.Java

/** 訊息消費者 **/
public class Consumer1 {
    // 定義交換機名稱
    public static final String EXCHANGE_NAME = "DIRECT";

    public static void main(String[] args) throws Exception {
        Channel channel = MQChannelUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        /** 生成一個臨時的佇列 佇列的名稱是隨機的 當消費者斷開和該佇列的連接時 佇列自動洗掉  */
        String queueName = channel.queueDeclare().getQueue();
        // 系結: 把該臨時佇列系結我們的 exchange
        // 引數二 設定該佇列和交換和系結的 routingkey
        channel.queueBind(queueName, EXCHANGE_NAME, "Conkey1");

        // 發送回呼
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("最新的訊息是:"+message);
        };
        // 消費者監聽訊息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    }
}

消費者2:

消費者1 多添加:

// 可以設定多個key
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey2");
  • 可以多次呼叫 .queueBing("佇列名","交換機","routingkey"); 系結 佇列交換機 指定多個 routingkey

  • 生成者發送訊息,時候指定訊息的 routingkey 交換機根據 key 將對應的訊息發送到 佇列上進行處理!

    4bd01438c28bd4a2b3659401e37a725

    消費者2 可以同時接收到 Conkey1Conkey2 發送的訊息

主題模式 Topic 交換機型別:TOPIC

該模式與Routingkey 非常型別,就相當于是一個 動態路由模式!!

在這里插入圖片描述

  • TOPIC 就像是 DIRECT 的升級版

    DIRECT 固定了 routingkey 而,TOPIC 可以動態的進行 routingkey選擇 使用上更加的個性化

  • 主題模式 可以根據一些特殊的符合匹配多種 Routingkey 的匹配

    通配符規則:

    #:匹配一個或多個詞 舉例:wsm.# 等于:wsm.1 / wsm.w.s.m / wsm.sm .后多個單詞
    *:匹配不多不少恰好1個詞 舉例:wsm.* 等于:wsm.sm / wsm.m .后一個單詞

TOPIC 實體:

  • 修改上面的 DIRECT
  • 修改生產者的 交換機型別 routingkey ,消費者 交換機型別 系結交換機時候佇列的 routingkey

生產者:

Producer.Java 改變發送訊息時候指定的 routingkey 還有交換機型別:TPOIC

//交換機型別
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/** 發送訊息 **/
channel.basicPublish(EXCHANGE_NAME, "Conkey.one", null, "Conkey.one 發送的訊息".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "Conkey.two.123", null, "Conkey.two.123 發送的訊息".getBytes("UTF-8"));

消費者1:

Consumer1.Java 更改交換機型別 Topic,接收訊息,佇列 交換機系結時候,指定一下 routingkey通配符

// 引數二 設定該佇列和交換和系結的routingkey , Topic模式可以用過 通配符進行動態匹配: * 表示一個任意的單詞;
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey.*");

消費者2:

// 引數二 設定該佇列和交換和系結的routingkey , Topic模式可以用過 通配符進行動態匹配: # 表示一個|多個任意的單詞;
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey.#");

頭部模式 交換機型別:HEAD

20210325225219627

使用的很少,跟 TOPIC動態路由型別,只不過它并不是通過 routingkey 進行訊息與佇列進行匹配

  • headers型別的交換器不依賴于路由鍵的匹配規則來路由訊息,而是根據發送的訊息內容中的headers屬性進行匹配

  • 生產者發送訊息的時候可以,給訊息指定一個 head頭部引數 map型別

  • 交換機 與 佇列系結的時候也定義一組:頭部資訊規則, 只有訊息頭部規則 和 佇列的頭部規則 匹配才能發送到對應的頭部上!

不常用了解即可~

生產者:

Producer.Java

/** 訊息生產者 **/
public class Producer {
    // 交換機名
    public static final String EXCHANGE_NAME = "HEADERS";

    public static void main(String[] args) throws Exception {
        // 創建連接物件,宣告交換機 發送訊息
        Channel channel = MQChannelUtil.getChannel();
        /**
         * 宣告一個 exchange
         * 1.exchange 的名稱
         * 2.exchange 的型別, 可以是String直接寫,也可以是 列舉型別; */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);

        //發送的訊息的訊息頭head map型別, 需要與佇列的map 規范匹配才可以發送成功! 不然發送失敗(交換機不知道往那個佇列上發送;
        HashMap<String ,Object> param = new HashMap<String, Object>();
        param.put("id","1");
        param.put("name","wsm");
        //設定Map 匹配引數!
        AMQP.BasicProperties.Builder builder=new AMQP.BasicProperties.Builder();
        builder.headers(param);

        /** 發送訊息 **/
        channel.basicPublish(EXCHANGE_NAME, "", builder.build(), "header的內容lalala~~".getBytes("UTF-8"));
    }
}

消費者:

Consumer1.Java

/** 訊息消費者 **/
public class Consumer1 {
    // 定義交換機名稱
    public static final String EXCHANGE_NAME = "HEADERS";

    public static void main(String[] args) throws Exception {
        Channel channel = MQChannelUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
        /** 生成一個臨時的佇列 佇列的名稱是隨機的 當消費者斷開和該佇列的連接時 佇列自動洗掉  */
        String queueName = channel.queueDeclare().getQueue();

        //設定佇列上的 map 引數,用于匹配請求時候的引數!
        //特殊引數 x-match 值 all 或 any
        //all  在發布訊息時攜帶的map 必須和系結在佇列上的所有map 完全匹配
        //any  只要在發布訊息時攜帶的有一對鍵值map 滿足佇列定義的多個引數map的其中一個就能匹配上
        //注意: 這里是鍵值對的完全匹配,只匹配到鍵了,值卻不一樣是不行的;
        HashMap<String ,Object> param = new HashMap<String, Object>();
        param.put("x-match","all");
        param.put("id","1");            // 可以嘗試改變,map 資訊,生產者訊息還能發送到佇列上面~
        param.put("name","wsm");

        // 系結: 把該臨時佇列系結我們的 exchange
        // 佇列系結時需要指定引數,注意雖然不需要路由鍵但仍舊不能寫成null,需要寫成空字串"", 引數四: map引數,規范!
        channel.queueBind(queueName, EXCHANGE_NAME, "",param);

        // 發送回呼
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("最新的訊息是:"+message);
            System.out.println("Map傳入引數資料:"+delivery.getProperties().getHeaders());
        };
        // 消費者監聽訊息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    }
}

Head 匹配規則:

//消費者:佇列規則all 所有匹配即可
        Map<String ,Object> param = new HashMap<String, Object>();
        param.put("x-match","all");
        param.put("id","1");
        param.put("name","wsm");
		
		//生產者:傳入head map;  
        Map<String ,Object> param = new HashMap<String, Object>();
        param.put("id","2");
        param.put("name","wsm");
       	//不匹配
		
		//生產者 
		Map<String ,Object> param = new HashMap<String, Object>();
        param.put("x-match","all");
        param.put("id","1");
        param.put("name","wsm");
		//匹配
		
//消費者:佇列規則any 一個匹配即可
        Map<String ,Object> param = new HashMap<String, Object>();
        param.put("x-match","any");
        param.put("id","1");
        param.put("name","wsm");     
        //生產者:傳入head map;  
		Map<String ,Object> param = new HashMap<String, Object>();
        param.put("id","1");		//匹配
        
        //生產者:傳入head map;  
       	Map<String ,Object> param = new HashMap<String, Object>();
        param.put("id","2"); 		//不匹配 key /value 都要匹配才可以
        ....

死信佇列 DLX 💀

死信,顧名思義就是無法被消費的訊息

正常情況下:

  • producer 將訊息投遞到 broker 或者直接到queue 里了,consumer 從 queue 取出訊息 進行消費

  • 某些時候由于特定的原因導致 queue 中的某些訊息無法被消費

  • 這樣的訊息如果沒有后續的處理,就變成了死信,有死信自然就有了死信佇列

應用場景:

  • 用戶在商城下單成功并點擊去支付后在指定時間支付,下單成功,指定時間內訂單未支付 下單失敗

  • 用戶下單,向MQ 發送一條訂單的訊息,并設定訊息時間 30分鐘,到佇列 succeed 成功佇列 等待用戶確認訂單,支付訂單,訊息被消費 下單成功

    如果 30 分鐘用戶沒有下單,則 succeed 成功佇列訊息超時,為了確保訊息不丟失,將訊息發送到 死信交換機 —— defeated死信佇列 消費者接收處理:下單失敗!

    在這里插入圖片描述

死信佇列:

  • 相當于對于一個,特定時間|場景 需要被處理的事情,但因為某種原因沒有正常處理,的一個兜底操作…

死信佇列產生:

  • 訊息 TTL 過期

    TTL是Time To Live的縮寫, 也就是生存時間,訂單超時支付 下單失敗

  • 佇列達到最大長度

    佇列滿了,無法再添加資料到 mq 中

  • 訊息被拒絕

    (basic.reject 或 basic.nack) 并且 requeue=false,訂單被用戶取消 下單失敗

死信佇列的實作:

  • 正常的 生產者 交換機 普通佇列
  • 不正常的,為了保證普通佇列,訊息穩定: 當訊息出現意外, 普通佇列上配置了 DXL交換機訊息超時 超出佇列... 直接發送到 DLX交換機———— DLX佇列

消費TTL 過期?

生產者:

Producer.Java

/** 訊息生產者 **/
public class Producer {
    // 普通交換機名
    public static final String EXCHANGE_NAME = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = MQChannelUtil.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //設定訊息的 TTL 時間 10s秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        //該資訊是用作演示佇列個數限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(EXCHANGE_NAME, "zhangsan", properties, message.getBytes());
            System.out.println("生產者發送訊息:" + message);
        }
    }
}

消費者:

Consumer1.Java

/** 訊息消費者 **/
public class Consumer1 {
    // 普通交換機名稱
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交換機名稱
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = MQChannelUtil.getChannel();
        // 宣告死信和普通交換機 型別為 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 宣告死信佇列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信佇列系結:佇列、交換機、路由鍵(routingKey)
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        // 正常佇列系結死信佇列資訊
        HashMap<String, Object> params = new HashMap<>();
        // 正常佇列設定死信交換機 引數 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 正常佇列設定死信 routing-key 引數 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");

        // 正常佇列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收訊息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到訊息" + message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

先啟動消費者,創建宣告好佇列之后,關閉,啟動生產者發送訊息…

RabbitMQ-00000049

DLX 消費者:

Consumer2.Java

/** 訊息消費者 **/
public class Consumer2 {
    //死信交換機名稱
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = MQChannelUtil.getChannel();

        //宣告交換機
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //宣告佇列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信訊息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收到訊息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

佇列達到最大長度

訊息生產者代碼去掉 TTL 屬性

image-20210628101337825

C1 消費者修改以下代碼 (啟動之后關閉該消費者 模擬其接收不到訊息)

RabbitMQ-00000051

//設定正常佇列的長度限制,例如發10個,4個則為死信 注意:此時需要把原先佇列洗掉 因為引數改變了
params.put("x-max-length",6);

C2 消費者代碼不變(啟動 C2 消費者)

RabbitMQ-00000052

訊息被拒絕

訊息生產者代碼同上 佇列達到最大長度生產者一致

C1 消費者代碼(啟動之后關閉該消費者 模擬其接收不到訊息) 注釋 params.put("x-max-length",6);

C2 消費者代碼不變 ,啟動消費者1關閉 然后再啟動消費者 2

RabbitMQ-00000054

SpringBoot 集成 RabbitMQ 模式開發

延遲佇列,其實就是 死信佇列 的一種,所有為了方便查看,使用SpringBoot 來進行搭建順便了解學習一些SpringBoot 集成 RabbitMQ

① 創建SpringBoot 工程 啟動類....

② 引入Maven依賴:pom.xml

<dependencies>
    <!-- SpringBoot依賴 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--RabbitMQ 依賴-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- 阿里巴巴fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    <!-- lombok依賴 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--RabbitMQ 測驗依賴-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

③ 撰寫組態檔:application.yml | properties

# SpringBoot 配置RabbitMQ ip 埠 用戶 密碼;
spring.rabbitmq.host=47.243.109.199
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

延遲佇列 TTL ?

延時佇列,佇列內部是有序的,最重要的特性就體現在它的延時屬性上

延時佇列中的元素是希望 在指定時間到了以后或之前取出和處理 簡單來說,延時佇列就是用來存放需要在指定時間被處理的 元素的佇列

延遲佇列使用場景:

  • 訂單在十分鐘之內未支付則自動取消
  • 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送訊息提醒
  • 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒

**這些場景都有一個特點,需要在某個事件發生之后或者之前的指定時間點完成某一項任務: **就幾乎等于一個 死信佇列

RabbitMQ 的 TTL

TTL 是 RabbitMQ 中一個訊息或者佇列的屬性,表明一條訊息或者該佇列中的所有訊息的最大存活時間,單位是毫秒 RabbitMQ 有兩種方式:

佇列設定TTL

在創建佇列的時候設定佇列的“x-message-ttl”屬性

Map<String, Object> args = new HashMap<>(3);
//宣告當前佇列系結的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//宣告當前佇列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//宣告佇列的 TTL
args.put("x-message-ttl", 10000);
//佇列系結交換機
QueueBuilder.durable(QUEUE_A).withArguments(args).build();

訊息設定TTL

是針對每條訊息設定TTL 生產者 生產訊息時候設定:

// Spring方式
// 編輯引數
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
// 發送訊息
channel.basicPublish("交換機", "佇列", properties, "訊息".getBytes());

// SpringBoot方式
// 通過 rabbitTemplate 發送訊息...
rabbitTemplate.convertAndSend("交換機", "佇列", "訊息", correlationData -> {
    correlationData.getMessageProperties().setExpiration("10000");
    return correlationData;
});

佇列設定TTL

RabbitMQ-00000060

代碼實作:

  • 創建一個交換機 X 和死信交 換機 Y,它們的型別都是direct

  • 創建兩個佇列 QA 和 QB 兩者佇列 TTL 分別設定為 10S 和 40S訊息超時會進入到 死信交換機 —— 發送到 QD死信佇列

交換機 系結 佇列 組態檔:

因為,專案采用SpringBoot 進行管理 原先配置佇列資訊,寫在了生產者和消費者代碼中,現在可寫咋配置類中,生產者只發訊息,消費者只接受訊息

TtlQueueConfig.Java

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {
    // 普通交換機 普通佇列
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    // 死信交換機
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    // 死信佇列
    public static final String DEAD_LETTER_QUEUE = "QD";

    // 宣告 xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    // 宣告 死信佇列交換機
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //宣告佇列 A ttl 為 10s 并系結到對應的死信交換機
    @Bean("queueA")
    public Queue queueA() {     // 導包: org.springframework.amqp.core
        Map<String, Object> args = new HashMap<>(3);
        //宣告當前佇列系結的死信交換機
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //宣告當前佇列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //宣告佇列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    // 宣告佇列 A 系結 X 交換機
    @Bean
    public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //宣告佇列 B ttl 為 40s 并系結到對應的死信交換機
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        //宣告當前佇列系結的死信交換機
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //宣告當前佇列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //宣告佇列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //宣告佇列 B 系結 X 交換機
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
    }


    //宣告死信佇列 QD
    @Bean("queueD")
    public Queue queueD() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    //宣告死信佇列 QD 系結關系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

生產者:

controller 用戶發送一個請求,服務將資料進行處理直接發送到MQ,由其它服務模塊處理…

SendMsgController .Java

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;

@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
    // RabbitTemplate 對rabbitmq 的服務介面API 進行了封裝;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("當前時間:{},發送一條資訊給兩個 TTL 佇列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "訊息來自 ttl 為 10S 的佇列: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "訊息來自 ttl 為 40S 的佇列: " + message);
    }
}

消費者:

DeadLetterQueueConsumer .Java

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;

/**
 * 消費者 - 死信佇列
 * @author wsm
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //@RabbitListener 負責監聽具體那個佇列...
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當前時間:{},收到死信佇列資訊{}", new Date().toString(), msg);
    }
}
  • @RabbitListener 宣告在放上,處理要監聽的佇列,訊息進行處理...

  • 流量器請求 http://localhost:8080/ttl/sendMsg/拉拉拉

  • 間隔 10s 40s 控制臺輸出:

    2022-01-23 19:12:01.380  INFO 1508 --- [nio-8080-exec-3] c.example.controller.SendMsgController   : 當前時間:Sun Jan 23 19:12:01 CST 2022,發送一條資訊給兩個 TTL 佇列:嘻嘻嘻
    2022-01-23 19:12:11.684  INFO 1508 --- [ntContainer#0-1] c.e.consumer.DeadLetterQueueConsumer     : 當前時間:Sun Jan 23 19:12:11 CST 2022,收到死信佇列資訊訊息來自 ttl 為 10S 的佇列: 拉拉拉
    2022-01-23 19:12:41.566  INFO 1508 --- [ntContainer#0-1] c.e.consumer.DeadLetterQueueConsumer     : 當前時間:Sun Jan 23 19:12:41 CST 2022,收到死信佇列資訊訊息來自 ttl 為 40S 的佇列: 拉拉拉
    

訊息設定TTL 存在問題bug

上面通過,SpringBoot 集成了 RabbitMQ 通過 佇列設定TTL

  • 如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個佇列, 所以一般建議使用:訊息設定TTL

  • 生產者 每次發送訊息的時候,設定訊息的存活時間,這樣: 即使只有一個佇列,也可以設定不同訊息的 延遲時間

RabbitMQ-00000062

修改上面業務,添加一個 QC 普通佇列,不設定佇列 延遲時間 每次發送訊息給訊息設定延遲時間...

MQ 組態檔:

MsgTtlQueueConfig.Java

@Configuration
public class MsgTtlQueueConfig {
    // 死信交換機
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    // 普通的佇列
    public static final String QUEUE_C = "QC";

    //宣告佇列 C 死信交換機
    @Bean("queueC")
    public Queue queueB() {
        HashMap<String, Object> args = new HashMap<>(3);
        //宣告當前佇列系結的死信交換機
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //宣告當前佇列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //沒有宣告 TTL 屬性
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    //宣告佇列 B 系結 X 交換機
    @Bean
    public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}

生產者:

SendMsgController.Java

/**
* 延時佇列優化
* @param message 訊息
* @param ttlTime 延時的毫秒
*/
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
    rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
        // 發送訊息的時候,設定訊息的延遲時間...
        correlationData.getMessageProperties().setExpiration(ttlTime);
        return correlationData;
    });
    log.info("當前時間:{},發送一條時長{}毫秒 TTL 資訊給佇列 C:{}", new Date(), ttlTime, message);
}

消費者:

不需要任何更改,只需要等待接收訊息即可…

結果查看 存在bug

瀏覽器請求:

  • http://localhost:8080/ttl/sendExpirationMsg/你好1/20000

  • http://localhost:8080/ttl/sendExpirationMsg/你好1/4000

  • image-20220123201213523

  • 因為 RabbitMQ 只會檢查第一個訊息是否過期,如果過期則丟到死信佇列,

    如果第一個訊息的延時時長很長,而第二個訊息的延時時長很短,第二個訊息并不會優先得到執行 這是RabbitMQ 的bug 好在已經存在插件可以解決該問題💡

訊息設定TTL 插件解決bug

安裝插件:rabbitmq_delayed_message_exchange

下載地址🚀

# 工具引入,插件安裝包;
[root@iZj6ciuzx7luldnazt4iswZ wsm]# ls
erlang-21.3-1.el7.x86_64.rpm  rabbitmq_delayed_message_exchange-3.8.0.ez  rabbitmq-server-3.8.8-1.el7.noarch.rpm

# 將插件移動到 RabbitMQ的plugins 包下: /usr/lib 是linux 默認安裝服務路徑...
[root@iZj6ciuzx7luldnazt4iswZ wsm]# cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
# rabbitmq 開始安裝啟動插件補丁...
[root@iZj6ciuzx7luldnazt4iswZ wsm]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@iZj6ciuzx7luldnazt4iswZ:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@iZj6ciuzx7luldnazt4iswZ...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange
started 1 plugins.
# 重啟服務:
[root@iZj6ciuzx7luldnazt4iswZ wsm]# systemctl restart rabbitmq-server

安裝成功,查看頁面中發現,交換機多了一種信的型別:x-delayed-message

RabbitMQ-00000065

測驗實作:

新增了一個佇列delayed.queue,一個自定義交換機 delayed.exchange,系結關系如下

RabbitMQ-00000066

  • 正常的一組生產者消費者,設定自定義交換機型別 生產者發送訊息指定訊息 延遲,到交換機上 到達固定的時間才會發送到交換機上... 來實作訊息的延遲TTL

組態檔類代碼

在我們自定義的交換機中,這是一種新的交換型別

  • 該型別訊息支持延遲投遞機制訊息傳遞后并不會立即投遞到目標佇列中
  • 而是存盤在 mnesia(一個分布式資料系統)表中,當達到投遞時間時,才投遞到目標佇列中

DelayedQueueConfig.Java

@Configuration
public class DelayedQueueConfig {
    // 佇列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    // 交換機
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    // key
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    // 宣告佇列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    // 自定義交換機 我們在這里定義的是一個延遲交換機
    @Bean
    public CustomExchange delayedExchange() {
        HashMap<String, Object> args = new HashMap<>();
        //自定義交換機的型別
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    // 系結 佇列和交換機;
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生產者

SendMsgController.Java

// 交換機 和 routingkey
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 請求介面:
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
    rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
                                  correlationData -> {
                                      correlationData.getMessageProperties().setDelay(delayTime);
                                      return correlationData;
                                  });
    log.info(" 當 前 時 間 : {}, 發 送 一 條 延 遲 {} 毫秒的資訊給佇列 delayed.queue:{}", new Date(), delayTime, message);
}

消費者

DelayQueueConsumer.Java

/**
 * 消費者 - 基于插件的延時佇列
 *
 * @author wsm
 */
@Slf4j
@ComponentScan
public class DelayQueueConsumer {

    public static final String DELAYED_QUEUE_NAME = "delayed.queue";

    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("當前時間:{},收到延時佇列的訊息:{}", new Date().toString(), msg);
    }
}

結果查看

瀏覽器請求:

  • http://localhost:8080/ttl/sendDelayMsg/wsm/20000

  • http://localhost:8080/ttl/sendDelayMsg/www/4000

  • image-20220123222115861

  • ok,發送的訊息,確實在 4s 20s 進行了處理

發布確認高級:

發布確認 springboot 版本

在生產環境中由于一些不明原因,導致 RabbitMQ 重啟

  • 在 RabbitMQ 重啟期間生產者訊息投遞失敗, 導致訊息丟失,需要手動處理和恢復
  • 于是,我們開始思考,如何才能進行 RabbitMQ 的訊息可靠投遞呢?

確認機制方案:

RabbitMQ-00000068

  • ① 生產者 每次發送訊息的時候,將訊息存入快取中,Map kv結構:k每個訊息唯一的標識 v每個訊息體

  • ② 將訊息發送到 交換機

    交換機接收到訊息,回傳 生產者 ack 生產者根據對于的k 洗掉快取資料

    交換機超時|宕機,沒有收到訊息,生產者 回呼 nack 生產者,重新發送訊息,或其他處理

  • RabbitMQ-00000069

實體說明:

  • ① SpringBoot 組態檔 開啟發布確認模式

  • ② 添加配置類,宣告定義:交換機 direct型別 佇列 系結資訊

    為了方便測驗,訊息沒有發送到佇列上,訊息丟失的場景,使用 direct型別:發送訊息指定 routingkey 只會發送到相同的 佇列上沒有匹配的佇列 訊息丟失

  • ③ 撰寫訊息回呼類

    生產者——發送訊息——交換機(接收到訊息,進行回呼ack, 長時間沒有收到也會回呼觸發 nack)

    但,注意 交換機將訊息發送到對應佇列上,如果,訊息沒有匹配的佇列,所以訊息還是會丟失(沒有匹配的佇列,發送;

  • ④ 撰寫訊息生產者:

    發送兩個訊息,一個與佇列匹配routingkey 另一個不匹配…

  • ⑤ 撰寫消費者,監聽佇列進行消費…

注意:首先要開啟Rabbit MQ的發布確認模式:

# 開啟RabbitMQ 發布確認模式:
spring.rabbitmq.publisher-confirm-type=correlated
# NONE 值是禁用發布確認模式,是默認值
# CORRELATED 值是發布訊息成功到交換器后會觸發回呼方法
# SIMPLE 值經測驗有兩種效果
	# 其一效果和 CORRELATED 值一樣會觸發回呼方法
	# 其二在發布訊息成功后使用 rabbitTemplate 呼叫 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節點回傳發送結果
	# 根據回傳結果來判定下一步的邏輯,注意的點是 waitForConfirmsOrDie 方法如果回傳 false 則會關閉 channel 則接下來無法發送訊息到 broker

添加配置類

宣告定義:交換機 佇列 并進行系結

ConfirmConfig.Java

/** SpringBoot 訊息確認模式 **/
@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    //宣告業務 Exchange
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    // 宣告確認佇列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    // 宣告確認佇列系結關系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                @Qualifier("confirmExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("key1");
    }
}
  • 宣告 交換機confirm.exchange 佇列 confirm.queue 佇列與交換機系結 routingkey: key1

訊息回呼類

com.example.producercallack包下: MyCallBack.Java

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    /**
     * 交換機不管是否收到訊息的一個回呼方法
     *
     * @param correlationData 訊息相關資料
     * @param ack             交換機是否收到訊息, true(ack) false(nack)
     * @param cause           為收到訊息的原因: 例外資訊
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交換機已經收到 id 為:{}的訊息", id);
        } else {
            log.info("交換機還未收到 id 為:{}訊息,原因:{}", id, cause);
        }
    }
}

生產者

ProducerController.Java

@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
    @Autowired  // rabbitmq 模板物件;
    private RabbitTemplate rabbitTemplate;
    @Autowired  // 發布確認訊息,訊息回呼方法類;
    private MyCallBack myCallBack;
    //依賴注入 rabbitTemplate 之后再設定它的回呼物件
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(myCallBack);
    }

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    
    /**
     * 訊息回呼和退回
     * @param message
     */
    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        //指定訊息 id 為 1
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
        log.info(routingKey + "發送訊息內容:{}", message + routingKey);

        CorrelationData correlationData2 = new CorrelationData("2");
        routingKey = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
        log.info(routingKey + "發送訊息內容:{}", message + routingKey);
    }
}
  • 生產者發送兩個訊息,一個訊息key1有匹配的佇列,另一個key2沒有匹配的佇列

消費者

ConfirmConsumer.Java

@Component
@Slf4j
public class ConfirmConsumer {
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message) {
        String msg = new String(message.getBody());
        log.info("消費方法接受到佇列 confirm.queue 訊息:{}", msg);
    }
}

結果測驗:

瀏覽器請求:http://localhost:8080/confirm/sendMessage/你好

image-20220123235531693

  • 圖片 cmd 輸出有一點問題… 回呼方法接收 應該是 消費方法接收

訊息回退🔙:

對于上面的操作: 如果發現該訊息不可路由,那么訊息會被直接丟棄,此時生產者是不知道訊息被丟棄這個事件的

  • 如何讓無法被路由的訊息幫我想辦法處理一下? 最起碼通知我一聲,我好自己處理啊

通過設定 mandatory 引數可以在當訊息傳遞程序中不可達目的地時將訊息回傳給生產者:

#訊息退回
spring.rabbitmq.publisher-returns=true

修改回呼介面:

com.example.producercallack包下: MyCallBack.Java

  • 實作:RabbitTemplate.ReturnsCallback介面

    低版本可能沒有 RabbitTemplate.ReturnsCallback 請用 RabbitTemplate.ReturnCallback

  • 添加介面實作:returnedMessage(ReturnedMessage returned) 當訊息無法路由的時候的回呼方法

//當訊息無法路由的時候的回呼方法
@Override
public void returnedMessage(ReturnedMessage returned) {

    log.error("訊息:{},被交換機 {} 退回,原因:{},路由key:{},code:{}",
              new String(returned.getMessage().getBody()), returned.getExchange(),
              returned.getReplyText(), returned.getRoutingKey(),
              returned.getReplyCode());

}

低版本:訊息無法路由回呼方法()

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
	log.info("訊息:{}被服務器退回,退回原因:{}, 交換機是:{}, 路由 key:{}",new String(message.getBody()),replyText, exchange, routingKey);
}

修改發送者 ProducerController

  • 修改RabbitTemplate 的 init() 初始化配置:
//依賴注入 rabbitTemplate 之后再設定它的回呼物件
@PostConstruct
public void init() {
    rabbitTemplate.setConfirmCallback(myCallBack);
    /**
    * true:交換機無法將訊息進行路由時,會將該訊息回傳給生產者
    * false:如果發現訊息無法進行路由,則直接丟棄
    */
    rabbitTemplate.setMandatory(true);
    //設定回退訊息交給誰處理
    rabbitTemplate.setReturnsCallback(myCallBack);
}

重啟測驗:

http://localhost:8080/confirm/sendMessage/你好

image-20220124001025173

ok, 訊息成功回退,剩下的處理代碼可以自定義了…

Rabbit MQ概念:

冪等

因為訊息 ack 持久化機制存在一定的缺點

  • 持久化機制保證訊息100%消費:

    消費者處理訊息 突然崩潰,長時間沒有處理完,佇列中的訊息不會洗掉,而是發送給其它消費者處理,如果這個時候消費者恢復了

    就有相同消費者,消費同一個資料的情況了!

  • 為了解決這個問題 RabbitMQ 消費者通常都需要做 冪等性 操作

  • 冪等:

    無論,程式執行多少次,結果不會發送任何改變!

實作原理:

  • MQ 消費者的冪等性的解決一般使用全域 ID

    或者寫個唯一標識比如時間戳 或者 UUID 或者訂單消費者消費 MQ 中的訊息也可利用 MQ 的該 id 來判斷

  • 利用 redis 執行 setnx 命令,天然具有冪等性,從而實作不重復消費

    redissetnx 也就是只有不存在key的時候才設定

    每個訊息具有一個唯一的標識, 消費者第一次消費成功的時候,使用 setnx 設定,這樣無論后面多少次操作,都不在進行操作了!

優先級

使用場景:

  • 通常商城專案中:訂單催付的場景

    我們的客戶在天貓下的訂單,淘寶會及時將訂單推送給我們,如果在用戶設定的時間內未付款那么就會給用戶推送一條短信提醒,很簡單的一個功能對吧

    但是,tmall 商家對我們來說,肯定是要分大客戶和小客戶的對吧,比如像蘋果,小米這樣大商家一年起碼能給我們創造很大的利潤 所以理應當然,他們的訂單必須得到優先處理

如何實作:

  • 控制臺頁面添加

RabbitMQ-00000076

  • 佇列中代碼添加優先級

    Map<String, Object> params = new HashMap();
    params.put("x-max-priority", 10);
    channel.queueDeclare("hello", true, false, false, params);
    
  • 訊息中代碼添加優先級

    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();
    

注意事項:

要讓佇列實作優先級需要做的事情有如下事情

  • 佇列需要設定為優先級佇列,訊息需要設定訊息的優先級
  • 消費者需要等待訊息已經發送到佇列中才去消費因為,這樣才有機會對訊息進行排序

生產者:

public class PriorityProducer {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //給訊息賦予一個 priority 屬性
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();

        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            if (i == 5) {
                channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
            } else {
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            }
            System.out.println("發送訊息完成:" + message);
        }
    }
}

消費者:

public class PriorityConsumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //設定佇列的最大優先級 最大可以設定到 255 官網推薦 1-10 如果設定太高比較吃記憶體和 CPU
        Map<String, Object> params = new HashMap();
        params.put("x-max-priority", 10);
        channel.queueDeclare(QUEUE_NAME, true, false, false, params);

        //推送的訊息如何進行消費的介面回呼
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println(message);
        };
        //取消消費的一個回呼介面 如在消費的時候佇列被洗掉掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("訊息消費被中斷");
        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }

}

image-20210629163922085

惰性佇列

使用場景

  • RabbitMQ 從 3.6.0 版本開始引入了惰性佇列的概念

    惰性佇列會盡可能的將訊息存入磁盤中,而在消費者消費到相應的訊息時才會被加載到記憶體中,它的一個重要的設計目標是能夠支持更長的佇列,即支持更多的訊息存盤

    當消費者由于各種各樣的原因(比如消費者下線、宕機亦或者是由于維護而關閉等)而致使長時間內不能消費訊息造成堆積時

  • 默認情況下,當生產者將訊息發送到 RabbitMQ 的時候:

    佇列中的訊息會盡可能的存盤在記憶體之中, 這樣可以更加快速的將訊息發送給消費者 持久化 在被寫入磁盤的同時也會在記憶體中駐留一份備份

    當RabbitMQ 需要釋放記憶體的時候,會將記憶體中的訊息換頁至磁盤中,這個操作會耗費較長的時間,也會阻塞佇列的操作,進而無法接收新的訊息.

兩種模式

佇列具備兩種模式:default默認lazy

  • lazy 模式即為惰性佇列的模式

    可以通過呼叫 channel.queueDeclare 方法的時候在引數中設定

    也可以通過 Policy 的方式設定,如果一個佇列同時使用這兩種方式設定的話,那么 Policy 的方式具備更高的優先級

  • 在佇列宣告的時候可以通過“x-queue-mode”引數來設定佇列的模式,取值為“default”和“lazy”

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

記憶體開銷對比

RabbitMQ-00000077

在發送 1 百萬條訊息,每條訊息大概占 1KB 的情況下,普通佇列占用記憶體是 1.2GB,而惰性佇列僅僅 占用 1.5MB

常見錯誤:

佇列宣告錯誤:

channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_test' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)

有的時候對于,已經宣告的佇列,更改了其配置,需要在RabbitMQ管理頁面手動洗掉MQ的佇列,才能進行重新宣告,不然會報錯…

完:

終于寫完了…

需要代碼,安裝工具的兄弟可以下方下載: 點個👍吧!

image-20220124004245782

鏈接:https://pan.baidu.com/s/1M0m0xKBtZlAs3v3FYKq6Tw
提取碼:2540

MQ

  • 大神筆記:

  • https://juejin.cn/post/7051469607806173221 超級詳細筆記

    https://note.oddfar.com/rabbitmq/#%E8%A7%86%E9%A2%91%E6%95%99%E7%A8%8B

    RPC + MQ: https://zhuanlan.zhihu.com/p/48230422

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/420474.html

標籤:其他

上一篇:九章云極DataCanvas公司榮獲機器之心三大獎項,助力產業數智化升級

下一篇:返回列表

標籤雲
其他(135977) Python(24228) JavaScript(15073) Java(14739) C(11147) 區塊鏈(8215) AI(6935) 基礎類(6313) MySQL(5230) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4207) Linux(4118) PHP(3814) C#(3716) 爪哇(3561) html(3374) C語言(3288) C++語言(3117) sql(3024) R(2776) 熊猫(2774) Java相關(2746) 数组(2739) 疑難問題(2699) 反应(2482) 單片機工控(2479) css(2105) 数据框(1968) Web開發(1951) 节点.js(1938) VBA(1919) 網絡通信(1793) 蟒蛇-3.x(1774) 數據庫相關(1767) VB基礎類(1755) .NETCore(1671) ASP.NET(1650) 開發(1646) 系統維護與使用區(1617) C++(1582) 列表(1581) 基礎和管理(1579) json(1568) JavaEE(1566) 安卓(1523) HtmlCss(1519) 專題技術討論區(1515) Windows客戶端使用(1484) 扑(1469) iOS(1432) 查询(1408) .NET技术(1404) 打字稿(1376) Unity3D(1366) VCL組件開發及應用(1353) sql-server(1287) Qt(1283) 细绳(1226) HTML(CSS)(1220)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • RabbitMQ 超詳細入門篇

    RabbitMQ 入門篇????MQ 的基本概念:什么是 MQ ?MQ全稱為Message Queue即訊息佇列"訊息佇列" 是在訊息的傳輸程序中保存訊息的容器它是典型的:生產者————消費者模型生產者不斷向訊息佇列中生產訊息 ———————— 消費者不斷的從佇列中獲取訊息.這樣的好處: 生產者只需要關注發訊息,消費者只需要關注收訊息,二者沒有業務邏輯的侵入,這樣就實作了生產者和消費者的解耦.為什么要使用 MQ?或者說MQ 有什么好處,MQ 主要可以實作三種功能:服務解耦...

    uj5u.com 2022-01-25 07:23:56 more
  • 九章云極DataCanvas公司榮獲機器之心三大獎項,助力產業數智化升級

    近日,國內領先的前沿科技媒體和產業服務平臺機器之心發布了「AI 中國」機器之心 2021 年度榜單。九章云極DataCanvas公司憑借在人工智能領域優秀的技術、豐富的AI解決方案、智能化場景的創新應用以及適合國內市場的商業模式,入選 “最佳人工智能公司 TOP 30”、“最具商業價值解決方案TOP30”、“最具創新價值落地案例TOP30”。圖片來源:機器之心作為人工智能技術及產業發展的風向標,機器之心「AI中國」年度評選自2017年設立,已連續舉辦5屆,成為國內人工智能界的權威年度獎項之一。本次榜單...

    uj5u.com 2022-01-25 07:22:09 more
  • 2021—很有意義的一年

    最大的改變2021年,對于我來說,最大的改變應該就是有在努力想要提高自己的溝通能力。我的溝通能力和表達能力對于我來說,就是一個永遠搬不動的大山,擋在我前進的路上。其實,我知道自己的弱點,也想要改變,但是就是不付諸行動,也不知咋改變這種現狀。直到離開學校,開始步入社會,進入了一個完全陌生的圈子。很慶幸的是,遇到了一個特別好的導師,他給了我很多的幫助,給我提了很多建議,不管是作業上、生活上、還是人生發展,都是很寶貴的建議。由于我的性格,我有時候不敢在群里發言,導師經常鼓勵我要多在群上發言,這樣可以克服交流的...

    uj5u.com 2022-01-25 07:21:57 more
  • Centos7下安裝Jdk1.8

    Centos7下安裝Jdk1.8官網下載jdk-8u202-linux-x64.tar.gz下載后使用檔案傳輸工具SecureFX上傳服務器/opt/software目錄。(這里是在/opt/目錄下創建了software、module檔案夾)# 解壓包存放目錄$ mkdir -p /opt/module# 壓縮包存放目錄$ mkdir -p /opt/software查看當前linux是否裝有java$ java -version如果有則卸載原裝版本java#查看和ope...

    uj5u.com 2022-01-25 07:21:48 more
  • ByteV打造3D海上風電監控平臺 ——助力風電能源可持續發展

    ByteV自主研發了3D海上風電監控平臺有著豐富的組態化可供選擇,本文將介紹如何運用ByteV豐富的2D、3D組態搭建可視化解決方案。風電廠是指采用風力發電形式的電廠。風電廠在生產的程序中,風力發電的原理是比較簡單的。風能是可再生能源,同時也屬于清潔能源。隨著我國經濟技術的不斷提高,風電廠生產產業規模不斷擴大,并且數量也大幅度增加。通過風能發電不僅可以減少能源的消耗,同時也會減少發電程序對環境的污染。海上風電是可再生能源發展的重要領域,是推動風電技術進步以及產業升級的重要力量,是促進能源結構調整的重要...

    uj5u.com 2022-01-25 07:21:24 more
  • 第一次理事會會議內容摘要2022.1.23

    北京時間2022年1月23日星期日上午,DBC新年的第一次線上會議在Discord上圓滿召開。中國、美國、日本等理事代表都參與了會議。會議主要圍繞社區提案如何更好地獲得理事會的審核與通過展開,同時對Swap老鏈上DBC的方式進行了深入討論,核心內容總結如下:1.理事會的權利與義務是審核與批復對DBC生態有利的提案,代表社區合理規劃、運用國會資金使得生態健康發展——但目前的情況是,很多提案提交reddit后,因為網路限制和語言障礙等原因,很少得到國內理事的投票與關注。討論后提出的......

    uj5u.com 2022-01-25 07:20:38 more
  • 用戶畫像資料指標體系之其他常見標簽劃分

    趙宏田 老師的 用戶畫像·方法論與工程化解決方案 一書讀后筆記主要記錄的是該書中 用戶畫像之資料指標體系 一章...

    uj5u.com 2022-01-25 07:20:28 more
  • 用戶畫像資料指標體系之風險控制維度

    趙宏田 老師的 用戶畫像·方法論與工程化解決方案 一書讀后筆記主要記錄的是該書中 用戶畫像之資料指標體系 一章...

    uj5u.com 2022-01-25 07:20:16 more
  • 基于真實電商的下單扣庫存學習理解分布式事務解決方案

    文章目錄業務背景分布式事務解決方案及缺點業務程序分析下單扣減庫存的業務難點解決方案1. 先扣庫存,后創建訂單2. 先創建訂單,后扣庫存例外資料處理1. 庫存表流水表2. 重試+回滾+報警+人工介入總結這里是weihubeats,覺得文章不錯可以關注公眾號小奏技術,文章首發。拒絕營銷號,拒絕標題黨業務背景我們這里將業務簡化為最簡單的下單扣庫存邏輯,不去關注其他分支部分來理解整個電商下單扣庫存的解決方案分布式事務解決方案及缺點最終一致性:異步解決方案,資料同步有延遲。性能高TCC(try、Co...

    uj5u.com 2022-01-25 07:20:04 more
  • 機器學習框架及評估指標詳解

    目錄機器學習的步驟train_test_split函式的詳解機器學習評估指標分類模型評估指標混淆矩陣ROC曲線利用ROC的其他評估標準Python 繪制ROC曲線求解AUC模板代碼錯誤率精度查準率、查全率P-R曲線Python繪制P-R曲線模板代碼平衡點(BEP)F1度量Python求解F1_score代碼回歸模型評估指標均方誤差MAE(平均絕對誤差)MAPE(平均絕對百分比誤差)RMSE(均方根誤差)R Square(...

    uj5u.com 2022-01-25 07:18:45 more