主頁 > 後端開發 > 昨晚12點,女朋友突然問我:你會RabbitMQ嗎?我竟然愣住了。

昨晚12點,女朋友突然問我:你會RabbitMQ嗎?我竟然愣住了。

2021-02-05 06:18:47 後端開發

01為什么要用訊息佇列?

1.1 同步呼叫和異步呼叫

在說起訊息佇列之前,必須要先說一下同步呼叫和異步呼叫,

同步呼叫:A服務去呼叫B服務,需要一直等著B服務,直到B服務執行完畢并把執行結果回傳給A之后,A才能繼續往下執行,

舉個例子:過年回到家,老媽對你說:“你也不小了,該談女朋友了,隔壁王阿姨給你......,”“媽!我談的有!"

老媽嘴角微微上揚:“那她現在有空嗎?讓媽給你把把關,”

你被逼之下跟女朋友開視頻說:“那個我媽在我旁邊,她想跟你說說話,”

你女朋友一下子慌了,立馬拿起眉筆、口紅、遮瑕對你說:“你先別掛,等我2分鐘,我稍微化一下妝,”

你就一直等著她,等她化好妝之后你把手機給了你老媽,所以同步呼叫的核心就是:等待,

異步呼叫:A服務去呼叫B服務,不用一直等待B服務的執行結果,也就是說在B服務執行的同時A服務可以接著執行下面的程式,

舉個例子:上午10點鐘,辦公室里,正在上班的你給你女朋友發微信說:“親愛的,等你不忙了給我發一張你的照片吧,我想你了,”然后你接著作業了,

等到下午2點你女朋友給你發了一張她的美顏照,你點開看了看,迷的顛三倒四,所以異步呼叫的核心就是:只用通知對方一下,不用等待,通知完我這邊該干嘛干嘛!

上面所說的異步呼叫就是用訊息佇列去實作,

1.2 為什么要用訊息佇列?

場景一:用戶注冊

現在很多網站都需要給注冊的用戶發送注冊短信或者激活郵箱,如果使用同步呼叫的話用戶只有注冊成功后才能給用戶發送短信和郵箱鏈接,這樣花費的時間就會很長,

有了訊息佇列之后我們只需要將用戶注冊的資訊寫入到訊息佇列里面,接來下該干嘛干嘛,

發送郵箱和發送短信的服務隨時從訊息佇列里面取出該用戶的資訊,然后再去發送短信和郵箱鏈接,這樣花費的時間就會大大減少,

場景二:修改商品

在微服務專案中,有時候資料量太多的話就需要分庫分表,例如下圖中商品表分別存盤在A資料庫和B資料庫中,

有一天我們去呼叫修改商品的服務去修改A資料庫中的商品資訊,由于我們還需要呼叫搜索商品的服務查詢商品資訊,所以修改完A庫中的商品資訊后必須保證B庫中的商品資訊和A庫一樣,

如果采用同步呼叫的方式,在修改完A庫的商品資訊之后需要等待B庫的商品資訊修改完,這樣耗時過長,

有了訊息佇列之后我們修改完A庫的商品資訊之后只需要將要修改的商品資訊寫入訊息佇列中,接下來該干什么干什么,

搜索商品的服務從訊息佇列中讀取要修改的商品資訊,然后同步B庫中的商品資訊,這樣就大大地縮短回應時間,

02 RabbitMQ介紹

2.1 什么是MQ

MQ(Message Quene) : 江湖人稱訊息佇列,小名又叫訊息中間件,訊息佇列基于生產者和消費者模型,生產者不斷向訊息佇列中發送訊息,消費者不斷從佇列中獲取訊息,

因為訊息的生產和消費都是異步的,而且沒有業務邏輯的侵入,所以可以輕松的實作系統間解耦,

2.2 MQ有哪些

當今市面上有很多訊息中間件,ActiveMQ、RabbitMQ、Kafka以及阿里巴巴自研的訊息中間件RocketMQ等,

2.3 不同MQ特點

  • RabbitMQ 穩定可靠,支持多協議,有訊息確認,基于erlang語言,

  • Kafka高吞吐,高性能,快速持久化,無訊息確認,無訊息遺漏,可能會有有重復訊息,依賴于zookeeper,成本高,

  • ActiveMQ不夠靈活輕巧,對佇列較多情況支持不好,

  • RocketMQ性能好,高吞吐,高可用性,支持大規模分布式,協議支持單一,

2.4 RabbitMQ

基于AMQP協議,erlang語言開發,是部署最廣泛的開源訊息中間件,是最受歡迎的開源訊息中間件之一,

AMQP:即Advanced Message Queuing Protocol, 一個提供統一訊息服務的應用層標準高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,

RabbitMQ主要特性:

  • 保證可靠性:使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認

  • 可伸縮性:支持訊息集群,多臺RabbitMQ服務器可以組成一個集群

  • 高可用性:RabbitMQ集群中的某個節點出現問題時佇列任然可用

  • 支持多種協議

  • 支持多語言客戶端

  • 提供良好的管理界面

  • 提供跟蹤機制:如果訊息出現例外,可以通過跟蹤機制分析例外原因

  • 提供插件機制:可通過插件進行多方面擴展

03 RabbitMQ安裝及配置

3.1 docker安裝RabbitMQ

3.1.1 獲取RabbitMQ鏡像

指定版本,該版本包含了RabbitMQ的后臺圖形化頁面

docker pull rabbitmq:management

3.1.2 運行RabbitMQ鏡像

方式一:默認guest 用戶,密碼也是 guest

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

方式二:設定用戶名和密碼

docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

3.2 本地安裝RabbitMQ

3.2.1 因為RabbitMQ是用erlang語言開發的,所以安裝之前先洗掉erlang包

yum remove erlang*

3.2.2 將RabbitMQ安裝包上傳到linux服務器上

erlang-23.2.1-1.el7.x86_64.rpm
rabbitmq-server-3.8.9-1.el7.noarch.rpm

3.2.3 安裝Erlang依賴包

rpm -ivh erlang-23.2.1-1.el7.x86_64.rpm

3.2.4 安裝RabbitMQ安裝包(需要聯網)

yum install -y rabbitmq-server-3.8.9-1.el7.noarch.rpm

注意:安裝完成后組態檔在:/usr/share/doc/rabbitmq-server-3.8.9/rabbitmq.config.example目錄中,需要 將組態檔復制到/etc/rabbitmq/目錄中,并修改名稱為rabbitmq.config

3.2.5 復制組態檔

cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example  /etc/rabbitmq/rabbitmq.config

3.2.6 查看組態檔

ls /etc/rabbitmq/rabbitmq.config

3.2.7 修改組態檔

vim /etc/rabbitmq/rabbitmq.config 

將上圖中框著的部分修改為下圖:

3.2.8 啟動rabbitmq中的插件管理

rabbitmq-plugins enable rabbitmq_management

3.2.9 查看服務狀態

systemctl status rabbitmq-server

rabbitmq常用命令
systemctl start rabbitmq-server
systemctl restart rabbitmq-server
systemctl stop rabbitmq-server

3.2.10 如果是買的服務器,記得安全組開放15672和5672埠

3.2.11 訪問RabbitMQ的后臺圖形化管理界面

  1. 瀏覽器地址欄輸入:http://ip:15672

  1. 登錄管理界面

username:guest
password:guest

3.3 Admin用戶和虛擬主機管理

3.3.1 添加用戶

上面的Tags選項,其實是指定用戶的角色,超級管理員(administrator):可登陸管理控制臺,可查看所有的資訊,并且可以對用戶,策略(policy)進行操作,

3.3.2 創建虛擬主機

虛擬主機:為了讓各個用戶可以互不干擾的作業,RabbitMQ添加了虛擬主機(Virtual Hosts)的概念,

其實就是一個獨立的訪問路徑,不同用戶使用不同路徑,各自有自己的佇列、交換機,互相不會影響,

3.3.3 系結虛擬主機和用戶

創建好虛擬主機,我們還要給用戶添加訪問權限,點擊添加好的虛擬主機,進入虛擬機設定界面,

04 RabbitMQ的4種訊息模式

4.1 簡單模式

說白了就是一個生產者發送訊息,一個消費者接受訊息,一對一的關系,

在上圖的模型中,有以下概念:

producer:生產者,訊息發送者
consumer:消費者:訊息的接受者
queue:訊息佇列,圖中紅色部分,類似一個倉庫,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息,

4.2 作業模式

說白了就是一個生產者發送訊息,多個消費者接受訊息,只要其中的一個消費者搶先接收到了訊息,其他的就接收不到了,一對多的關系,

4.3 廣播模式

這里引入了交換機(Exchange)的概念,交換機系結所有的佇列,也就是說訊息生產者會先把訊息發送給交換機,然后交換機把訊息發送到與它系結的所有佇列里面,消費者從它所系結的佇列里面獲取訊息,

在廣播模式下,訊息發送流程是這樣的:

  • 可以有多個消費者

  • 每個消費者有自己的queue(佇列)

  • 每個佇列都要系結到Exchange(交換機)

  • 生產者發送的訊息,只能發送到交換機,交換機來決定要發給哪個佇列,生產者無法決定

  • 交換機把訊息發送給系結過的所有佇列

  • 佇列的消費者都能拿到訊息,實作一條訊息被多個消費者消費

4.4 路由模式

4.4.1 Routing之訂閱模型-Direct(直連)

舉個例子:訊息生產者發送訊息時給了交換機一個紅桃A,訊息生產者對交換機說:”這條訊息只能給有紅桃A的佇列“,交換機發現佇列一手里是黑桃K,佇列二手里是紅桃A,所以它將這條訊息給了佇列二,

在路由-直連模式中,一條訊息,會被所有訂閱的佇列都消費,但是在某些場景下,我們希望不同的訊息被不同的佇列消費,這時就要用到Direct型別的Exchange,

在Direct模型下:

  • 佇列與交換機的系結,不能是任意系結了,而是要指定一個RoutingKey(路由key)

  • 訊息的發送方在向Exchange發送訊息時,也必須指定訊息的 RoutingKey,

  • Exchange不再把訊息交給每一個系結的佇列,而是根據訊息的Routing Key進行判斷,只有佇列的Routingkey與訊息的 Routing key完全一致,才會接收到訊息

4.4.2 Routing 之訂閱模型-Topic

舉個例子:訊息生產者發送訊息時給了交換機一個暗號:hello.mq,訊息生產者對交換機說:”這條訊息只能給暗號以hello開頭的佇列“,交換機發現它與佇列一的暗號是hello.java,與佇列二的暗號是news.today,所以它將這條訊息給了佇列一,

Topic型別的交換機與Direct相比,都是可以根據RoutingKey把訊息路由到不同的佇列,只不過Topic型別Exchange可以讓佇列在系結Routing key 的時候使用通配符!這種模型Routingkey 一般都是由一個或多個單詞組成,多個單詞之間以”.”分割,例如:b.hello

05 Maven 應用整合 RabbitMQ

5.1 創建 SpringBoot 專案,引入依賴

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.7.2</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.7.6.RELEASE</version>
    </dependency>
</dependencies>

5.2 創建 RabbitMQ 的連接引數工具類

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //ip地址
        factory.setHost("##.##.##.##");
        //埠
        factory.setPort(5672);
        //虛擬主機
        factory.setVirtualHost("myhost");
        //賬戶
        factory.setUsername("root");
        //密碼
        factory.setPassword("########");
        Connection connection = factory.newConnection();
        return connection;
    }
}

5.3 第一種:簡單模式

訊息生產者

public class Producer {
    public static void main(String[] args) throws Exception {
        // 獲取RabbitMQ的連接
        Connection connection = ConnectionUtil.getConnection();
        // 從連接中創建通道
        Channel channel = connection.createChannel();
        // 創建佇列,如果存在就不創建,不存在就創建
        // 引數1 佇列名, 引數2 durable:資料是否持久化 ,引數3 exclusive:是否排外的,記住false就行
        // 引數4 autoDelete:是否自動洗掉,消費者消費完訊息之后是否洗掉這個佇列
        // 引數5 arguments: 其他引數
        channel.queueDeclare("queue", false, false, false, null);
        // 寫到佇列中的訊息內容
        String message = "你好啊,mq!";
        // 引數1 交換機,此處沒有
        // 引數2 發送到哪個佇列
        // 引數3 屬性
        // 引數4 內容
        channel.basicPublish("", "queue", null, message.getBytes());
        //關閉通道和連接
        channel.close();
        connection.close();
    }
}

訊息消費者

public class Consumer {
    public static void main(String[] args) throws Exception {
        //獲取RabbitMq的連接
        Connection connection = ConnectionUtil.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //第一個引數:要從哪個佇列獲取訊息
        channel.basicConsume("queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("簡單模式獲取訊息:"+new String(body));
            }
        });
    }
}

測驗結果:

5.4 第二種:作業模式

訊息生產者

public class Producer {
    public static void main(String[] args) throws Exception {
        // 獲取RabbitMQ的連接
        Connection connection = ConnectionUtil.getConnection();
        // 從連接中創建通道
        Channel channel = connection.createChannel();
        // 創建佇列,如果存在就不創建,不存在就創建
        // 引數1 佇列名, 引數2 durable:資料是否持久化 ,引數3 exclusive:是否排外的,記住false就行
        // 引數4 autoDelete:是否自動洗掉,消費者消費完訊息之后是否洗掉這個佇列
        // 引數5 arguments: 其他引數
        channel.queueDeclare("queue", false, false, false, null);
        // 寫到佇列中的訊息內容
        String message = "你好啊,mq";
        // 引數1 交換機,此處無
        // 引數2 發送到哪個佇列
        // 引數3 屬性
        // 引數4 內容
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", "queue", null, (message+i).getBytes());
        }
        //關閉通道和連接
        channel.close();
        connection.close();
    }
}

消費者01

public class ConsumerOne {
    public static void main(String[] args) throws Exception {
        //創建一個RabbitMq的連接
        Connection connection = ConnectionUtil.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        channel.basicConsume("queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者01:"+new String(body));
            }
        });
    }
}

消費者02

public class ConsumerTwo {
    public static void main(String[] args) throws Exception {
        //創建一個RabbitMq的連接
        Connection connection = ConnectionUtil.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        channel.basicConsume("queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者02:"+new String(body));
            }
        });
    }
}

測驗結果:

消費者01

消費者02

5.5 第三種:廣播模式

訊息生產者

public class Producer {
    public static void main(String[] args) throws Exception {
        // 獲取RabbitMQ的連接
        Connection connection = ConnectionUtil.getConnection();
        // 從連接中創建通道
        Channel channel = connection.createChannel();
        // 創建佇列,如果存在就不創建,不存在就創建
        // 引數1 佇列名, 引數2 durable:資料是否持久化 ,引數3 exclusive:是否排外的,記住false就行
        // 引數4 autoDelete:是否自動洗掉,消費者消費完訊息之后是否洗掉這個佇列
        // 引數5 arguments: 其他引數
        channel.queueDeclare("queue01", false, false, false, null);
        channel.queueDeclare("queue02", false, false, false, null);
        //創建交換機,如果存在就不創建,并指定交換機的型別是FANOUT即廣播模式
        channel.exchangeDeclare("fanout-exchange", BuiltinExchangeType.FANOUT);
        //系結交換機與佇列,第一個引數是佇列,第二個引數是交換機,第三個引數是路由key,這里不指定key
        channel.queueBind("queue01", "fanout-exchange", "");
        channel.queueBind("queue02", "fanout-exchange", "");
        // 訊息內容
        String message = "這是一條廣播訊息";
        // 引數1 交換機
        // 引數2 發送到哪個佇列,因為指定了交換機,所以這里佇列名為空
        // 引數3 屬性
        // 引數4 內容
        channel.basicPublish("fanout-exchange", "", null, message.getBytes());
        //關閉通道和連接
        channel.close();
        connection.close();
    }
}

消費者01

public class ConsumerOne {
    public static void main(String[] args) throws Exception {
        //創建一個新的RabbitMq連接
        Connection connection = ConnectionUtil.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //第一個引數:要從哪個佇列獲取訊息
        channel.basicConsume("queue01",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者01:"+new String(body));
            }
        });
    }
}

消費者02

public class ConsumerTwo {
    public static void main(String[] args) throws Exception {
        //創建一個新的RabbitMq連接
        Connection connection = ConnectionUtil.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //第一個引數:要從哪個佇列獲取訊息
        channel.basicConsume("queue02",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者02:"+new String(body));
            }
        });
    }
}

測驗結果

5.6 第四種 路由模式

1)路由模式之Direct(直連)
訊息生產者

public class Producer {
    public static void main(String[] args) throws Exception {
        // 獲取RabbitMQ的連接
        Connection connection = ConnectionUtil.getConnection();
        // 從連接中創建通道
        Channel channel = connection.createChannel();
        // 創建佇列,如果存在就不創建,不存在就創建
        // 引數1 佇列名, 引數2 durable:資料是否持久化 ,引數3 exclusive:是否排外的,記住false就行
        // 引數4 autoDelete:是否自動洗掉,消費者消費完訊息之后是否洗掉這個佇列
        // 引數5 arguments: 其他引數
        channel.queueDeclare("queue03", false, false, false, null);
        channel.queueDeclare("queue04", false, false, false, null);
        //創建交換機,如果存在就不創建,并指定交換機的型別是DIRECT模式
        channel.exchangeDeclare("direct-exchange", BuiltinExchangeType.DIRECT);
        //系結交換機與佇列,第一個引數是佇列,第二個引數是交換機,第三個引數是路由key,這里指定路由key是a
        channel.queueBind("queue03", "direct-exchange", "a");
        //系結交換機與佇列,第一個引數是佇列,第二個引數是交換機,第三個引數是路由key,這里指定路由key是b
        channel.queueBind("queue04", "direct-exchange", "b");
        //訊息
        String message = "這是一條key為a的訊息";
        // 引數1 交換機
        // 引數2 路由key
        // 引數3 屬性
        // 引數4 內容
        channel.basicPublish("direct-exchange", "a", null, message.getBytes());
        //關閉通道和連接
        channel.close();
        connection.close();
    }
}

消費者03

public class ConsumerThree {
    public static void main(String[] args) throws Exception {
        //創建一個新的RabbitMQ連接
        Connection connection = ConnectionUtil.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //第一個引數:要從哪個佇列獲取訊息
        channel.basicConsume("queue03",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者03:"+new String(body));
            }
        });
    }
}

消費者04

public class ConsumerFour {
    public static void main(String[] args) throws Exception {
        //創建一個新的RabbitMQ連接
        Connection connection = ConnectionUtil.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //第一個引數:要從哪個佇列獲取訊息
        channel.basicConsume("queue04",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者04:"+new String(body));
            }
        });
    }
}

測驗結果
只有消費者03收到了訊息

2)路由模式之-Topic
訊息生產者

public class Producer {
    public static void main(String[] args) throws Exception {
        // 獲取RabbitMQ的連接
        Connection connection = ConnectionUtil.getConnection();
        // 從連接中創建通道
        Channel channel = connection.createChannel();
        // 創建佇列,如果存在就不創建,不存在就創建
        // 引數1 佇列名, 引數2 durable:資料是否持久化 ,引數3 exclusive:是否排外的,記住false就行
        // 引數4 autoDelete:是否自動洗掉,消費者消費完訊息之后是否洗掉這個佇列
        // 引數5 arguments: 其他引數
        channel.queueDeclare("queue05", false, false, false, null);
        channel.queueDeclare("queue06", false, false, false, null);
        //創建交換機,如果存在就不創建,并指定交換機的型別是TOPIC模式
        channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
        //系結交換機與佇列,第一個引數是佇列,第二個引數是交換機,第三個引數是路由key,這里指定路由key是a.*
        //*是通配符,意思只要key滿足a開頭,.后面是什么都可以
        channel.queueBind("queue05", "topic-exchange", "a.*");
        //系結交換機與佇列,第一個引數是佇列,第二個引數是交換機,第三個引數是路由key,這里指定路由key是b.*
        //*是通配符,意思只要key滿足b開頭,.后面是什么都可以
        channel.queueBind("queue06", "topic-exchange", "b.*");
        //   channel.queueDeclare("queue", false, false, false, null);
        // 訊息內容
        String message = "這是一條key為a.hello的訊息";
        // 引數1 交換機,此處無
        // 引數2 路由key 
        // 引數3 屬性
        // 引數4 內容
        channel.basicPublish("topic-exchange", "a.hello", null, message.getBytes());
        //關閉通道和連接
        channel.close();
        connection.close();
            }
}

訊息消費者05

public class ConsumerFive {
    public static void main(String[] args) throws Exception {
        //創建一個新的RabbitMQ連接
        Connection connection = ConnectionUtil.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //第一個引數:要從哪個佇列獲取訊息
        channel.basicConsume("queue05",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者05:"+new String(body));
            }
        });
    }
}

訊息消費者06

public class ConsumerSix {
    public static void main(String[] args) throws Exception {
        //創建一個新的RabbitMQ連接
        Connection connection = ConnectionUtil.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //第一個引數:要從哪個佇列獲取訊息
        channel.basicConsume("queue06",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者06:"+new String(body));
            }
        });
    }
}

測驗結果

06 SpringBoot 整合 RabbitMQ

6.1 創建 SpringBoot 專案,引入依賴

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>

6.2 配置組態檔

spring:
  application:
    name: mq-springboot
  rabbitmq:
    host: ##.##.##.##
    port: 5672
    username: root
    password: #####
    virtual-host: myhost

6.3 第一種:簡單模式

訊息生產者:

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendMsg(){
        rabbitTemplate.convertAndSend("quenue","你好mq");
    }

訊息消費者

@Component
public class SingleCunstomer {
    //監聽的佇列 
    @RabbitListener(queues = "queue")
    public void receive(String message){
        System.out.println("訊息:" + message);
    }
}

6.4 第二種:作業模式

訊息生產者

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendMsg(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("quenue","你好mq!");
        }
    }

訊息消費者

@Component
public class WorkCunstomer {
    @RabbitListener(queues = "queue")
    public void customerOne(String message){
        System.out.println("消費者一:" + message);
    }
    @RabbitListener(queues = "queue")
    public void customerTwo(String message){
        System.out.println("消費者二:" + message);
    }
}

6.5 第三種:廣播模式

訊息生產者

@Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendMsg() {
        //引數1 交換機 引數2 路由key 引數三 訊息
        rabbitTemplate.convertAndSend("fanout-exchange","","這是一條廣播訊息");
    }

訊息消費者

@Component
public class FanoutCunstomer {
    @RabbitListener(queues = "queue01")
    public void customerOne(String message){
        System.out.println("消費者一:" + message);
    }
    @RabbitListener(queues = "queue02")
    public void customerTwo(String message){
        System.out.println("消費者二:" + message);
    }
}

6.6 第4種:路由模式

1)Direct(直連)模式
訊息生產者

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendMsg() {
        //引數1 交換機 引數2 路由key 引數三 訊息
        rabbitTemplate.convertAndSend("direct-exchange","a","這是一條廣播訊息");
    }

訊息消費者

@Component
public class DirectCunstomer {
    //監聽的佇列 queue03
    @RabbitListener(queues = "queue03")
    //監聽的佇列 queue04
    public void customerOne(String message){
        System.out.println("消費者一:" + message);
    }
    @RabbitListener(queues = "queue04")
    public void customerTwo(String message){
        System.out.println("消費者二:" + message);
    }
}

2)Topic模式
訊息生產者

 @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendMsg() {
        //引數1 交換機 引數2 路由key 引數三 訊息
        rabbitTemplate.convertAndSend("topic-exchange","a.hello","這是一條廣播訊息");
    }

訊息消費者

@Component
public class TopicCunstomer {
    //監聽的佇列 queue05
    @RabbitListener(queues = "queue05")
    public void customerOne(String message){
        System.out.println("消費者一:" + message);
    }
    //監聽的佇列 queue06
    @RabbitListener(queues = "queue06")
    public void customerTwo(String message){
        System.out.println("消費者二:" + message);
    }
}

6.7 SpringBoot 應用中通過配置完成佇列的創建

@Configuration
public class RabbitMQConfiguration {

    //創建佇列
    @Bean
    public Queue queue1(){
        Queue queue9 = new Queue("queue1");
        return queue9;
    }
    @Bean
    public Queue queue2(){
        Queue queue2 = new Queue("queue2");
        //設定佇列屬性
        return queue2;
    }

    //創建廣播模式交換機
    @Bean
    public FanoutExchange ex1(){
        return new FanoutExchange("ex1");
    }

    //創建路由模式-direct交換機
    @Bean
    public DirectExchange ex2(){
        return new DirectExchange("ex2");
    }

    //系結佇列
    @Bean
    public Binding bindingQueue1(Queue queue1, DirectExchange ex2){
        return BindingBuilder.bind(queue1).to(ex2).with("a1");
    }
    @Bean
    public Binding bindingQueue2(Queue queue2, DirectExchange ex2){
        return BindingBuilder.bind(queue2).to(ex2).with("a2");
    }
}

6.8 使用RabbitMQ發送-接收物件

訊息生產者:

@Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void sendMsg() {
        User user = new User();
        user.setId(1).setAge(16).setUsername("張飛");
        rabbitTemplate.convertAndSend("queue",user);
    }

訊息消費者

public class SingleCunstomer {
    //監聽的佇列
    @RabbitListener(queues = "queue")
    public void receive(User user){
        System.out.println("物件:" + user);
    }
}

07 RabbitMQ 訊息確認機制

所謂訊息確認機制就是訊息生產者有沒有將訊息發出去?生產者有沒有將訊息發給交換機,交換機有沒有將訊息發到佇列里面?訊息消費者是否成功的從佇列里面獲取到了訊息?

就像你在網上買東西,商家有沒有將快遞發到你家小區樓下的快遞驛站?你有沒有成功的從快遞驛站拿到你的快遞?

所以RabbitMQ的訊息確認機制包括訊息發送端的確認機制和訊息消費端的確認機制,

訊息發送端:

- confirm機制:訊息生產者是否成功的將訊息發送到交換機,

- return機制:交換機是否成功的將訊息發送到佇列,

訊息消費端:訊息消費者是否成功的從佇列獲取到了訊息,

7.1 SpringBoot配置訊息確認

訊息發送端訊息確認配置

# 訊息發送到交換器確認
spring.rabbitmq.publisher-confirm-type=correlated
# 訊息發送到佇列確認
spring.rabbitmq.publisher-returns=true

7.2 訊息發送到交換機監聽類

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
//訊息發送到交換機監聽類
public class SendConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("訊息成功發送到交換機! correlationData:{}", correlationData);
        } else {
            log.info("訊息發送到交換機失敗! correlationData:{}", correlationData);
        }
    }
}

7.3 訊息未路由到佇列監聽類

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
//訊息未路由到佇列監聽類
@Slf4j
@Component
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("Fail... message:{},從交換機exchange:{},以路由鍵routingKey:{}," + "未找到匹配佇列,replyCode:{},replyText:{}",
                message, exchange, routingKey, replyCode, replyText);
    }

}

7.4 重新注入RabbitTemplate,并設定兩個監聽類

@Configuration
public class RabbitMQConfig {

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback( new SendConfirmCallback());
        rabbitTemplate.setReturnCallback(new SendReturnCallback());
        return rabbitTemplate;
    }

}

7.5 消費端確認

添加配置

# 消費者訊息確認--手動 ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消費者代碼

@Component
@RabbitListener(queues = RabbitMQConfig.TASK_QUEUE_NAME)
public class Receiver {
    
    @RabbitHandler
    public void process(String content, Channel channel, Message message) {
        try {
            // 業務處理成功后呼叫,訊息會被確認消費
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            // 業務處理失敗后呼叫
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

08 RabbitMQ 死信佇列實作訊息延遲

8.1 什么是延遲佇列

延遲佇列存盤的物件肯定是對應的延時訊息,所謂”延時訊息”是指當訊息被發送以后,并不想讓消費者立即拿到訊息,而是等待指定時間后,消費者才拿到這個訊息進行消費,

8.2 RabbitMQ如何實作延遲佇列?

AMQP協議和RabbitMQ佇列本身沒有直接支持延遲佇列功能,但是可以通過TTL(Time To Live)特性模擬出延遲佇列的功能,

8.3 訊息的TTL(Time To Live)

訊息的TTL就是訊息的存活時間,RabbitMQ可以對佇列和訊息分別設定TTL,對佇列設定就是佇列沒有消費者連著的保留時間,也可以對每一個單獨的訊息做單獨的設定,超過了這個時間,我們認為這個訊息就死了,稱之為死信,可以通過設定訊息的expiration欄位或者x-message-ttl屬性來設定時間.

8.4 實作延遲佇列

延遲任務通過訊息的TTL來實作,我們需要建立2個佇列,一個用于發送訊息,一個用于訊息過期后的轉發目標佇列,

場景:使用延遲佇列實作訂單支付監控

8.5 代碼實作

RabbitMQConfig

@Configuration
public class RabbitMQConfig {

    //交換機
    public static final String EXCHANGE = "delay.exchange";
    //死信佇列
    public static final String DELAY_QUEUE = "delay.queue";
    //死信佇列與交換機系結的路由key
    public static final String DELAY_ROUTING_KEY = "delay.key";
    //業務佇列
    public static final String TASK_QUEUE_NAME = "task.queue";
    //業務佇列與交換機系結的路由key
    public static final String TASK_ROUTING_KEY = "task.key";

    // 宣告交換機
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE);
    }

    // 宣告死信佇列
    @Bean
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>(2);
        //死信佇列訊息過期之后要轉發的交換機
        args.put("x-dead-letter-exchange", EXCHANGE);
        //訊息過期轉發的交換機對應的key
        args.put("x-dead-letter-routing-key", TASK_ROUTING_KEY);
        return new Queue(DELAY_QUEUE, true, false, false, args);
    }

    // 宣告死信佇列系結關系
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(delayQueue()).to(exchange()).with(DELAY_ROUTING_KEY);
    }

    // 宣告業務佇列
    @Bean
    public Queue taskQueue() {
        return new Queue(TASK_QUEUE_NAME, true);
    }

    //宣告業務佇列系結關系
    @Bean
    public Binding taskBinding() {
        return BindingBuilder.bind(taskQueue()).to(exchange()).with(TASK_ROUTING_KEY);
    }
}

訊息生產者

@Component
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //orderId 是訂單id interval是自定義過期時間 單位:秒
    public void orderDelay(String orderId,Long interval) {
        MessageProperties messageProperties = new MessageProperties();
        //設定訊息過期時間
        messageProperties.setExpiration(String.valueOf(interval));
        Message message = new Message(orderId.getBytes(), messageProperties);
        //生產者將訊息發給死信佇列,并設定訊息過期時間
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, null, message);
    }
}

訊息消費者

@Component
public class Consumer {

    @Autowired
    private OrderService orderService;

    //監聽業務佇列
    @RabbitListener(queues = RabbitMQConfig.TASK_QUEUE_NAME)
    public void receiveTask(Message message){
        String orderId = new String(message.getBody());
        log.info("過期的任務Id:{}", orderId);
        Order order = orderService.getById(orderId);
        //如果訂單支付狀態仍為未支付
        if(order.getPayState()==0){
            //設定該訂單狀態為已關閉
            order.setPayState(2);
            orderService.updateById(order);
        }
    }
}

09 RabbitMQ 的應用場景

9.1 解耦

場景說明:用戶下單之后,訂單系統要通知庫存系統修改商品數量

9.2 異步

場景說明:用戶注冊成功之后,需要發送注冊郵件及注冊短信提醒

9.3 訊息通信

場景說明:應用系統之間的通信,例如聊天室

9.4 流量削峰

場景說明:秒殺業務,大量的請求不會主動請求秒殺業務,而是存放在訊息佇列,

微信公眾號:eclipse編程,專注于編程技術分享,堅持終身學習,

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/256603.html

標籤:Java

上一篇:計算機專業:一個學期也沒學會C語言,編程真的是需要天賦嗎?

下一篇:抽象類

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more