1、Work 訊息模型
官網解釋:在消費者之間分配任務(競爭的消費者模式),

那么它與上回說的基本訊息模型有什么區別?下面通過代碼驗證一下,
生產者(回圈50次發送訊息便于對比),
// 生產者 public class Send { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 回圈發布任務 for (int i = 0; i < 50; i++) { // 訊息內容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 2); } // 關閉通道和連接 channel.close(); connection.close(); } }
消費者1(此消費者部署在比較垃圾的服務器,效率比較低,為了模擬效率低讓他每處理一個訊息就休息1秒)
// 消費者1 public class Recv { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 final Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義佇列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取訊息,并且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即訊息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); try { // 模擬完成任務的耗時:1000ms Thread.sleep(1000); } catch (InterruptedException e) { } // 手動ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽佇列, channel.basicConsume(QUEUE_NAME, false, consumer); } }
消費者2(此消費者的服務器好,屬于后浪級別的消費者,消費能力強,不休眠,)
//消費者2 public class Recv2 { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 final Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義佇列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取訊息,并且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即訊息體 String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); // 手動ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽佇列, channel.basicConsume(QUEUE_NAME, false, consumer); } }
現在啟動兩消費者,讓它兩處于監聽狀態,然后再啟動生產者,可以明顯觀察到,兩消費者處理的訊息一樣多(都是25條)但是消費者2速度快多了,消費者1還在處理,這樣明顯浪費了機器性能,處理時間長了,正確的做法是消費者2效率高機器性能OK應該多處理訊息,能者多勞,
為了解決這個問題,我們可以將basicQos方法與 prefetchCount = 1設定一起使用,這告訴RabbitMQ一次不要給工人一個以上的訊息,換句話說,在處理并確認上一條訊息之前,不要將新訊息發送給作業人員,而是將其分派給不忙的下一個作業程式,不忙?那就多干點,
接下來改造消費者,就是在宣告佇列后面加上:
channel.basicQos(1);
運行查看,消費者1只消費了3個訊息,而消費者2 處理了47個訊息,處理時間大大減小了,

2、訂閱模式(public/subscribe)
作業佇列背后的假設是,每個任務都恰好交付給一個工人,在這一部分中,我們將做一些完全不同的事情-我們將訊息傳達給多個消費者,這種模式稱為“發布/訂閱”,

新引入一個概念 X (Exchange): 交換機,接收生產者發送的訊息,另一方面知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄,到底如何操作,取決于Exchange的型別,RabbitMQ訊息傳遞模型的核心思想是生產者從不將任何訊息直接發送到佇列,實際上,生產者經常甚至根本不知道是否將訊息傳遞到任何佇列,相反,生產者只能將訊息發送到交換機,交流是一件非常簡單的事情,一方面,它接收來自生產者的訊息,另一方面,將它們推入佇列,交易所必須確切知道如何處理收到的訊息,是否應將其附加到特定佇列?是否應該將其附加到許多佇列中?還是應該丟棄它,規則由交換機型別定義 ,
RabbitMQ訊息傳遞模型的核心思想是生產者從不將任何訊息直接發送到佇列,實際上,生產者經常甚至根本不知道是否將訊息傳遞到任何佇列,
需要特別注意的是:交換機只負責分發訊息,不具備存盤訊息的能力,因此如果沒有任何佇列與Exchange系結,或者沒有符合路由規則的佇列,那么訊息會丟失!
Exchange型別有以下幾種:
? Fanout:廣播,將訊息交給所有系結到交換機的佇列
? Direct:定向,把訊息交給符合指定routing key 的佇列
? Topic:通配符,把訊息交給符合routing pattern(路由模式) 的佇列
3、訂閱模式之Fanout(廣播)

如圖所示,生產者只負責將訊息傳遞給交換機(X),交換機系結了兩佇列,這條訊息由交換分發給佇列,Fanout模式有以下特點
- 所有系結到交換機的佇列都可以接收訊息(純廣播的,所有消費者都能收到訊息),
代碼示例:生產者,(這里不再宣告佇列了,宣告交換機了,生產者只負責發訊息給交換機,不再與佇列互動)
public class Send { private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { //獲取連接 Connection connection = ConnectionUtil.getConnection(); //獲取通道 Channel channel=connection.createChannel(); //宣告交換機名稱和型別 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String message = "hello everyone"; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("[生產者] Sent" + message+"'"); channel.close(); connection.close(); } }
消費者1 (消費者宣告佇列,佇列系結交換機)
public class Recv { private final static String QUEUE_NAME = "fanout_exchange_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 系結佇列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定義佇列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取訊息,并且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即訊息體 String msg = new String(body); System.out.println(" 消費者1" + msg + "!"); } }; // 監聽佇列,自動回傳完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
消費者2
public class Recv2 { private final static String QUEUE_NAME = "fanout_exchange_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel() ; channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body); System.out.println("消費者2"+message); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }

測驗結果如上圖,
4、訂閱模型之Direct(路由)
Direct有選擇性的分發訊息,生產者在向Exchange發送訊息時,也必須指定訊息的routing key(路由秘鑰),交換機將訊息發送給相應有該秘鑰(系結秘鑰)的佇列中去,

代碼示例:生產者
class Send { private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 宣告exchange,指定型別為direct channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 訊息內容 String message = "商品新增, id = 1001"; // 發送訊息,并且指定routing key 為:insert ,代表新增商品 channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes()); System.out.println(" [商品服務:] Sent '" + message + "'"); channel.close(); connection.close(); }
消費者1(系結update 與 delete 秘鑰)
class Recv { private final static String QUEUE_NAME = "direct_exchange_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 系結佇列到交換機,同時指定需要訂閱的routing key,假設此處需要update和delete訊息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定義佇列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取訊息,并且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即訊息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); } }; // 監聽佇列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
消費者2(系結update 與 delete 秘鑰還有insert 秘鑰)
public class Recv2 { private final static String QUEUE_NAME = "direct_exchange_queue_2"; private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 系結佇列到交換機,同時指定需要訂閱的routing key,訂閱 insert、update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定義佇列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取訊息,并且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即訊息體 String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); } }; // 監聽佇列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
測驗結果

insert時消費者1 沒有接收到訊息,
5、訂閱模式之Topic(話題)
與Direct 不同的是,Topic 路由鍵可以更加的靈活,使用 # 或者 * 通配,

Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert,路由密鑰中可以包含任意多個單詞,最多255個位元組,
通配符規則如下:
- * (星號)可以代替一個單詞,
- # (哈希)可以替代零個或多個單詞,
舉例:item.# 能匹配的有 item.hello 或 item.hello.world 等等,,,,
item.* 匹配 item.hello 不能匹配 item.hello.world (后邊就跟一個單詞)
代碼示例:生產者(topic型別)
public class Send { private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String message = "insert"; channel.basicPublish(EXCHANGE_NAME,"item.insert",null,message.getBytes()); System.out.println("生產者:"+message); channel.close(); connection.close(); } }
消費者1
public class Recv { private final static String QUEUE_NAME = "topic_exchange_queue_1"; private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 系結佇列到交換機,同時指定需要訂閱的routing key,需要 update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); // 定義佇列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取訊息,并且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即訊息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); } }; // 監聽佇列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
消費者2
public class Recv2 { private final static String QUEUE_NAME = "topic_exchange_queue_2"; private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 系結佇列到交換機,同時指定需要訂閱的routing key, channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*"); // 定義佇列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取訊息,并且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即訊息體 String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); } }; // 監聽佇列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
結果

通配的接收到了訊息,
至此,訊息模型介紹完了,還有持久化問題,上次說到ACK機制也是一種持久化方式,ACK機制是假設消費者掛了的前提下,那么MQ掛了訊息怎么持久化?
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/119109.html
標籤:Java
