一、MQ的基本概念
1、MQ概述

2、MQ優勢
1、應用解耦


2、異步提速


3、削峰填谷



優勢小結

3、MQ劣勢

4、常見的MQ產品

5、RabbitMQ簡介

2007年,Rabbit 技術公司基于 AMQP 標準開發的 RabbitMQ 1.0 發布,RabbitMQ 采用 Erlang 語言開發,Erlang 語言由 Ericson 設計,專門為開發高并發和分布式系統的一種語言,在電信領域使用廣泛,
RabbitMQ 基礎架構如下圖

6、MQ中的相關概念介紹
- Broker:接收和分發訊息的應用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網路中的 namespace 概念,當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之間的 TCP 連接
- Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在訊息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低,Channel 是在 connection 內部建立的邏輯連接,如果應用程式支持多執行緒,通常每個thread創建單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識別 channel,所以 channel 之間是完全隔離的,Channel 作為輕量級的 Connection
- Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發訊息到queue 中去,常用的型別有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:訊息最終被送到這里等待 consumer 取走
- Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key,Binding 資訊被保存到 exchange 中的查詢表中,用于 message 的分發依據
7、RabbitMQ中的6種作業模式
RabbitMQ 提供了 6 種作業模式:簡單模式、work queues、Publish/Subscribe 發布與訂閱模式、Routing 路由模式、Topics 主題模式、RPC 遠程呼叫模式(遠程呼叫,不太算 MQ;暫不作介紹),

二、RabbitMQ 的作業模式
1、作業模式介紹
1、Work Queues作業佇列模式



2、發布訂閱模式(Pub/Sub 訂閱模式 )



3、Routing 路由模式


4、Topics 通配符模式



2、作業模式總結

3、RabbitMQ訊息確認機制


注意,Confirm只表示訊息被送到Broker中,至于消費者是否會消費掉Broker中的訊息,是無法確定的,就像是信件被送到了郵局,到底能不能送到收件人手中,還不確定,
當然,我們也可以通過ACK機制來完成消費者端的消費確認,代碼如下:
channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費業務即ACK確認消費
System.out.println("【新浪天氣】收到氣象訊息:" + new String(body));
// 確認消費
channel.basicAck(envelope.getDeliveryTag(), false);
}
});

三、代碼demo
依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
1、簡單模式
生產者代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.131.171");
connectionFactory.setPort(5672); // 默認埠是5672
connectionFactory.setUsername("jihu");
connectionFactory.setPassword("jihu");
connectionFactory.setVirtualHost("/jihu");
// 建立TCP長連接
Connection connection = connectionFactory.newConnection();
// 創建通信通道,相當于TCP中的虛擬連接
Channel channel = connection.createChannel();
// 創建佇列,宣告并創建一個佇列,如果佇列已經存在,則使用這個佇列
// 引數1:佇列名稱ID
// 引數2:是否持久化,false對應不持久化資料,MQ停掉資料就會丟失
// 引數3:是否佇列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
// 引數4:是否自動洗掉 false代表連接停掉后不自動洗掉這個佇列
// 引數5:其他的額外引數
channel.queueDeclare("helloworld", true, false, false, null);
String message = "jihu666";
// 引數1:交換機,簡單模式不需要指定交換機,會有默認的交換機
// 引數2:佇列名稱
// 引數3:額外的設定屬性
// 引數4:訊息的位元組陣列
channel.basicPublish("", "helloworld", null, message.getBytes());
channel.close();
connection.close();
System.out.println("==== 資料發送成功 =====");
}
}
消費者代碼
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.131.171");
connectionFactory.setPort(5672); // 默認埠是5672
connectionFactory.setUsername("jihu");
connectionFactory.setPassword("jihu");
connectionFactory.setVirtualHost("/jihu");
// 建立TCP長連接
Connection connection = connectionFactory.newConnection();
// 創建通信通道,相當于TCP中的虛擬連接
Channel channel = connection.createChannel();
// 創建佇列,宣告并創建一個佇列,如果佇列已經存在,則使用這個佇列
// 引數1:佇列名稱ID
// 引數2:是否持久化,false對應不持久化資料,MQ停掉資料就會丟失
// 引數3:是否佇列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
// 引數4:是否自動洗掉 false代表連接停掉后不自動洗掉這個佇列
// 引數5:其他的額外引數
channel.queueDeclare("helloworld", true, false, false, null);
// 從MQ服務器中獲取訊息
// 引數1:佇列名稱
// 引數2:是否自動確認收到訊息,false代表手動編程來確認訊息,沒這事MQ的推薦做法
// 引數2:
channel.basicConsume("helloworld", false, new Receiver(channel));
// 消費者的長連接不能關閉,因為需要持續監聽佇列獲取訊息
}
}
class Receiver extends DefaultConsumer {
private Channel channel;
// 重寫建構式
public Receiver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("[消費者] 接收到的訊息:" + msg + ". 訊息的tagId: " + deliveryTag);
// 簽收訊息, false代表只確認當前訊息,設定為true代表簽收該消費者所有未簽收的訊息
channel.basicAck(deliveryTag, false);
}
}
測驗,先啟動消費者,然后我們查看rabbitmq頁面:

我們再啟動生成者:


從運行log來看,已經成功實作了訊息的發送和消費,
我們關閉消費者之后,再啟動生產者發送一條新訊息,然后來看看overview頁面:


可以看到,此時的ready數量為1,代表已經收到了一條訊息,
這個頁面的重繪我們可以手動重繪,也可以在右下角設定重繪頻率:

2、workqueue模式(模擬12306短信服務)
為了復用代碼,我們建立一個工廠類用來獲取連接:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitUntils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
static {
connectionFactory.setHost("192.168.131.171");
connectionFactory.setPort(5672); // 默認埠是5672
connectionFactory.setUsername("jihu");
connectionFactory.setPassword("jihu");
connectionFactory.setVirtualHost("/jihu");
}
public static Connection getConnection() {
Connection connection = null;
try {
return connectionFactory.newConnection();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
我們來模擬一個12306的短信發送功能:

我們先來新建三個短信發送者來消費訊息:
import com.jihu.rabbitmq.constant.RabbitConstant;
import com.jihu.rabbitmq.utils.RabbitUntils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class SMSSender1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUntils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
// 如果不寫channel.basicQos(1),則MQ自動會采用輪訓演算法將請求平均發送給所有消費者
// basicQos:MQ不再對消費者一次發送多個請求,而是消費者處理完一個訊息之后(確認消費后),再從佇列中獲取新的訊息
// channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String sms = new String(body);
System.out.println("SMSSender1-短信發送成功:" + sms);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
public class SMS {
String name;
String mobile;
String content;
.....
}


然后構建生產者,訂單:
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jihu.rabbitmq.constant.RabbitConstant;
import com.jihu.rabbitmq.utils.RabbitUntils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class OrderSystem {
private static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUntils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
for (int i = 1; i <= 100; i++) {
SMS sms = new SMS("乘客+" + i, "1390000000" + i, "您的車票已預定成功!");
byte[] bytes = objectMapper.writeValueAsBytes(sms);
channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, bytes);
}
System.out.println("資料發送成功!");
channel.close();
connection.close();
}
}

從log可以看到,訊息已經被三個訊息發送器都消費完了,
默認情況下,RabbitMQ使用的是輪訓演算法,
其實我們可以設定讓消費者處理完一個訊息被確認消費后,再獲取一個新的訊息,
// 如果不寫channel.basicQos(1),則MQ自動會采用輪訓演算法將請求平均發送給所有消費者
// basicQos:MQ不再對消費者一次發送多個請求,而是消費者處理完一個訊息之后(確認消費后),再從佇列中獲取新的訊息
channel.basicQos(1);
這樣處理之后,訊息并不是平均的分發給消費者們,而是消費者性能越好處理越快的,消費的訊息獲取的則越多,反之亦然,
我們可以來模擬一下這個場景,給SMSSender1休眠10ms,SMSSender2休眠100ms,SMSSender3休眠1200ms:
channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String sms = new String(body);
System.out.println("SMSSender1-短信發送成功:" + sms);
try {
TimeUnit.MICROSECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
這樣的話,相當于服務器性能SMSSender1 > SMSSender2 > SMSSender3.
啟動一下測驗,發現SMSSender3消費的訊息最少:

3、Publish / Subscribe 發布訂閱模式(模擬天氣預報)

注意:使用到的交換機必須提前宣告,否則消費者消費的時候會報錯!這和佇列不一樣,佇列默認會自動添加!

注意交換機的型別,此時我們需要的是廣播模式的交換機:

我們來創建一個廣播型別的交換機:

然后我們先來創建氣象局,可以發送天氣訊息:
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUntils.getConnection();
String input = new Scanner(System.in).next();
Channel channel = connection.createChannel();
// 引數1:交換機名稱
// 引數2:佇列名稱, 佇列會和交換機系結,我們在消費端處理即可
// 引數3:額外引數
// 引數4:傳輸資料
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, input.getBytes());
channel.close();
connection.close();
}
}
然后我們創建新浪天氣和百度天氣:
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUntils.getConnection();
final Channel channel = connection.createChannel();
// 宣告佇列資訊
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false,
false, null);
// 佇列系結交換機
// 引數1:佇列名
// 引數1:交換機名
// 引數1:路由key,在發布訂閱模式中用不到
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
// 消費完當前訊息后再獲取下一條訊息
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費業務即ACK確認消費
System.out.println("【新浪天氣】收到氣象訊息:" + new String(body));
// 確認消費
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
百度天氣只是佇列名稱和新浪不一樣,這兩個佇列都是被系結在weather交換機上,用來將這個交換機上的訊息拉取到自己獨立的佇列中去消費,
建好之后我們啟動它們,在rabbitmq界面可以看到新建的佇列及交換機的系結關系:

接著我們來發布天氣訊息:



可以看到,此時已經實作了廣播模式,
4、Routing 路由模式(模擬接收特定地區和日期的天氣)

我們首先來創建一個direct型別的交換機:

注意:一個佇列可以系結多個路由key!
然后我們來創建氣象局,此時氣象局值給固定路由的佇列發訊息:
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
// 天氣資訊, key作為路由
Map<String, String> area = new LinkedHashMap<>();
area.put("china.hunan.changsha.20211127", "中國湖南長沙20201127天氣資料");
area.put("china.hubei.wuhan.20211127", "中國湖北武漢20201127天氣資料");
area.put("china.hunan.zhuzhou.20211127", "中國湖南株洲20201127天氣資料");
area.put("us.cal.lsj.20211127", "美國加州洛杉磯20201127天氣資料");
area.put("china.hebei.shijiazhuang.20211128", "中國河北石家莊20201128天氣資料");
area.put("china.hubei.wuhan.20211128", "中國湖北武漢20201128天氣資料");
area.put("china.henan.zhengzhou.20211128", "中國河南鄭州20201128天氣資料");
area.put("us.cal.lsj.20211128", "美國加州洛杉磯20211128天氣資料");
Connection connection = RabbitUntils.getConnection();
Channel channel = connection.createChannel();
// 回圈發布氣象局的天氣訊息
area.forEach((key, value) -> {
try {
// map的key作為訊息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, key, null, value.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
System.out.println("訊息發送完畢!");
channel.close();
connection.close();
}
}
然后讓新浪只接受特定地區特定日期的天氣:
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUntils.getConnection();
final Channel channel = connection.createChannel();
// 宣告佇列資訊
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false,
false, null);
// ==== 一個佇列可以系結多個路由key =====
// 指定交換機和佇列以及路由key的關系
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20211127");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20211127");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20211128");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.zhuzhou.20211127");
// 消費完當前訊息后再獲取下一條訊息
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費業務即ACK確認消費
System.out.println("【新浪天氣】收到氣象訊息:" + new String(body));
// 確認消費
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
public class Baidu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUntils.getConnection();
final Channel channel = connection.createChannel();
// 宣告佇列資訊
channel.queueDeclare(RabbitConstant.QUEUE_BAI_DU, false, false,
false, null);
channel.queueBind(RabbitConstant.QUEUE_BAI_DU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20211127");
channel.queueBind(RabbitConstant.QUEUE_BAI_DU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20211128");
// 消費完當前訊息后再獲取下一條訊息
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAI_DU, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費業務即ACK確認消費
System.out.println("【百度天氣】收到氣象訊息:" + new String(body));
// 確認消費
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
然后我們分別啟動新浪和百度,最后啟動氣象局類:


此時來查看rabbitmq頁面上的交換機系結:

5、Topics 通配符模式(模擬接收中國地區的天氣資訊)

我們首先來新建一個交換機:

創建氣象局:
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
// 天氣資訊, key作為路由
Map<String, String> area = new LinkedHashMap<>();
area.put("china.hunan.changsha.20211127", "中國湖南長沙20201127天氣資料");
area.put("china.hubei.wuhan.20211127", "中國湖北武漢20201127天氣資料");
area.put("china.hunan.zhuzhou.20211127", "中國湖南株洲20201127天氣資料");
area.put("us.cal.lsj.20211127", "美國加州洛杉磯20201127天氣資料");
area.put("china.hebei.shijiazhuang.20211128", "中國河北石家莊20201128天氣資料");
area.put("china.hubei.wuhan.20211128", "中國湖北武漢20201128天氣資料");
area.put("china.henan.zhengzhou.20211128", "中國河南鄭州20201128天氣資料");
area.put("us.cal.lsj.20211128", "美國加州洛杉磯20211128天氣資料");
Connection connection = RabbitUntils.getConnection();
Channel channel = connection.createChannel();
// 回圈發布氣象局的天氣訊息
area.forEach((key, value) -> {
try {
// map的key作為訊息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, key, null, value.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
System.out.println("訊息發送完畢!");
channel.close();
connection.close();
}
}
接下來我們規定,新浪只被允許接收中國地區的訊息:
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUntils.getConnection();
final Channel channel = connection.createChannel();
// 宣告佇列資訊
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false,
false, null);
// 指定交換機和佇列以及路由key的關系
// 交換機的型別在創建的時候就設定好了
// #表示匹配多個,*表示匹配一個
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");
// 消費完當前訊息后再獲取下一條訊息
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費業務即ACK確認消費
System.out.println("【新浪天氣】收到氣象訊息:" + new String(body));
// 確認消費
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

百度只被允許接收20211127這一天的訊息:
public class Baidu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUntils.getConnection();
final Channel channel = connection.createChannel();
// 宣告佇列資訊
channel.queueDeclare(RabbitConstant.QUEUE_BAI_DU, false, false,
false, null);
channel.queueBind(RabbitConstant.QUEUE_BAI_DU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20211127");
// 消費完當前訊息后再獲取下一條訊息
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAI_DU, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費業務即ACK確認消費
System.out.println("【百度天氣】收到氣象訊息:" + new String(body));
// 確認消費
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

6、訊息確認機制


1、Confirm測驗
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
// 天氣資訊, key作為路由
Map<String, String> area = new LinkedHashMap<>();
area.put("china.hunan.changsha.20211127", "中國湖南長沙20201127天氣資料");
area.put("china.hubei.wuhan.20211127", "中國湖北武漢20201127天氣資料");
area.put("china.hunan.zhuzhou.20211127", "中國湖南株洲20201127天氣資料");
area.put("us.cal.lsj.20211127", "美國加州洛杉磯20201127天氣資料");
area.put("china.hebei.shijiazhuang.20211128", "中國河北石家莊20201128天氣資料");
area.put("china.hubei.wuhan.20211128", "中國湖北武漢20201128天氣資料");
area.put("china.henan.zhengzhou.20211128", "中國河南鄭州20201128天氣資料");
area.put("us.cal.lsj.20211128", "美國加州洛杉磯20211128天氣資料");
Connection connection = RabbitUntils.getConnection();
Channel channel = connection.createChannel();
// 開啟confirm監聽模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 第二個引數代表接收的引數是否為批量接收
System.out.println("訊息已經被Broker接收. Tag: " + 1);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("訊息已被Brokder拒收. Tag: " + 1);
}
});
// 回圈發布氣象局的天氣訊息
area.forEach((key, value) -> {
try {
// map的key作為訊息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, key, null, value.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
System.out.println("訊息發送完畢!");
// === 注意,此時開啟了Confirm確認機制,不能直接關閉Channel!關閉后無法完成監聽!
// channel.close();
// connection.close();
}
}

注意,此時的Confirm確認代表訊息已經成功發送到Broker中去了,即再等待訊息者完成消費,如果Broker中的佇列滿了或者因為其他原因可能導致產生Return情況,即訊息被Broker退還給生產者,

2、Return測驗
public class WeatherBureauReturn {
public static void main(String[] args) throws IOException, TimeoutException {
// 天氣資訊, key作為路由
Map<String, String> area = new LinkedHashMap<>();
area.put("china.hunan.changsha.20211127", "中國湖南長沙20201127天氣資料");
area.put("china.hubei.wuhan.20211127", "中國湖北武漢20201127天氣資料");
area.put("china.hunan.zhuzhou.20211127", "中國湖南株洲20201127天氣資料");
area.put("us.cal.lsj.20211127", "美國加州洛杉磯20201127天氣資料");
area.put("china.hebei.shijiazhuang.20211128", "中國河北石家莊20201128天氣資料");
area.put("china.hubei.wuhan.20211128", "中國湖北武漢20201128天氣資料");
area.put("china.henan.zhengzhou.20211128", "中國河南鄭州20201128天氣資料");
area.put("us.cal.lsj.20211128", "美國加州洛杉磯20211128天氣資料");
Connection connection = RabbitUntils.getConnection();
Channel channel = connection.createChannel();
// 開啟confirm監聽模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 第二個引數代表接收的引數是否為批量接收
System.out.println("訊息已經被Broker接收. Tag: " + deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("訊息已被Brokder拒收. Tag: " + deliveryTag);
}
});
// 開啟Return監聽模式
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return returnMessage) {
System.err.println("=================================");
System.err.println("Return編碼:" + returnMessage.getReplyCode() + "-Return描述:"
+ returnMessage.getReplyText());
System.err.println("交換機:" + returnMessage.getExchange() + "-路由key:"
+ returnMessage.getRoutingKey());
System.err.println("Return主題:" + new String(returnMessage.getBody()));
System.err.println("=================================");
}
});
// 回圈發布氣象局的天氣訊息
area.forEach((key, value) -> {
try {
// map的key作為訊息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, key, null, value.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
System.out.println("訊息發送完畢!");
// === 注意,此時開啟了Confirm確認機制,不能直接關閉Channel!關閉后無法完成監聽!
// channel.close();
// connection.close();
}
}
然后我們創建新浪接收,因為新浪只接受中國地區的天氣資訊,所以我們發送的其他天氣訊息要被退回:
注意,如果要得到回傳結果:basicPublish方法的第三個引數mandatory true代表如果訊息無法正常投遞則return回生產者,如果false,則直接將訊息放棄,
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, key, true, null, value.getBytes());
public class WeatherBureauReturn {
public static void main(String[] args) throws IOException, TimeoutException {
// 天氣資訊, key作為路由
Map<String, String> area = new LinkedHashMap<>();
area.put("china.hunan.changsha.20211127", "中國湖南長沙20201127天氣資料");
area.put("china.hubei.wuhan.20211127", "中國湖北武漢20201127天氣資料");
area.put("china.hunan.zhuzhou.20211127", "中國湖南株洲20201127天氣資料");
area.put("us.cal.lsj.20211127", "美國加州洛杉磯20201127天氣資料");
area.put("china.hebei.shijiazhuang.20211128", "中國河北石家莊20201128天氣資料");
area.put("china.hubei.wuhan.20211128", "中國湖北武漢20201128天氣資料");
area.put("china.henan.zhengzhou.20211128", "中國河南鄭州20201128天氣資料");
area.put("us.cal.lsj.20211128", "美國加州洛杉磯20211128天氣資料");
Connection connection = RabbitUntils.getConnection();
Channel channel = connection.createChannel();
// 開啟confirm監聽模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 第二個引數代表接收的引數是否為批量接收
System.out.println("訊息已經被Broker接收. Tag: " + deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("訊息已被Brokder拒收. Tag: " + deliveryTag);
}
});
// 開啟Return監聽模式
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return returnMessage) {
System.err.println("=================================");
System.err.println("Return編碼:" + returnMessage.getReplyCode() + "-Return描述:"
+ returnMessage.getReplyText());
System.err.println("交換機:" + returnMessage.getExchange() + "-路由key:"
+ returnMessage.getRoutingKey());
System.err.println("Return主題:" + new String(returnMessage.getBody()));
System.err.println("=================================");
}
});
// 回圈發布氣象局的天氣訊息
area.forEach((key, value) -> {
try {
// map的key作為訊息的routing key
// 注意:第三個引數為:mandatory true代表如果訊息無法正常投遞則return回生產者,如果false,則直接將訊息放棄,
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, key, true,null, value.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
System.out.println("訊息發送完畢!");
// === 注意,此時開啟了Confirm確認機制,不能直接關閉Channel!關閉后無法完成監聽!
// channel.close();
// connection.close();
}
}
我們先來啟動新浪,然后再啟動氣象發布:

此時將不滿足條件的訊息直接回傳給了生產者,當然我們可以設定成為直接丟棄等策略,注意,此時的到達和退回指的是訊息被發動到Broker容器中的佇列中,不是消費者已經確認了消費!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/290606.html
標籤:其他
