一、模式說明
Topic 型別與 Direct 相比,都是可以根據 RoutingKey 把訊息路由到不同的佇列,只不過 Topic 型別 Exchange 可以讓佇列在系結 Routingkey 的時候使用通配符!
Routingkey` 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: `item.insert
通配符規則:
- 匹配一個或多個詞
- 匹配不多不少恰好1個詞
舉例:
item.#`:能夠匹配`item.insert.abc` 或者 `item.insert
item.*`:只能匹配`item.insert


圖解:
- 紅色Queue:系結的是 usa.# ,因此凡是以 usa. 開頭的 routing key 都會被匹配到
- 黃色Queue:系結的是 #.news ,因此凡是以 .news 結尾的 routing key 都會被匹配
二、代碼
①生產者
使用topic型別的Exchange,發送訊息的routing key有3種: item.insert 、 item.update 、 item.delete :
package com.itheima.rabbitmq.topic;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 通配符Topic的交換機型別為:topic
*/
public class Producer {
//交換機名稱
static final String TOPIC_EXCHAGE = "topic_exchange";
//佇列名稱
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//佇列名稱
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
//創建連接
Connection connection = ConnectionUtil.getConnection();
// 創建頻道
Channel channel = connection.createChannel();
/**
* 宣告交換機
* 引數1:交換機名稱
* 引數2:交換機型別,fanout、topic、topic、headers
*/
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 發送資訊
String message = "新增了商品,Topic模式;routing key 為 item.insert " ;
channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
System.out.println("已發送訊息:" + message);
// 發送資訊
message = "修改了商品,Topic模式;routing key 為 item.update" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
System.out.println("已發送訊息:" + message);
// 發送資訊
message = "洗掉了商品,Topic模式;routing key 為 item.delete" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
System.out.println("已發送訊息:" + message);
// 關閉資源
channel.close();
connection.close();
}
}
②消費者1
接收兩種型別的訊息:更新商品和洗掉商品
package com.itheima.rabbitmq.topic;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創建頻道
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 宣告(創建)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨占本次連接
* 引數4:是否在不使用的時候自動洗掉佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);
//佇列系結交換機
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update");
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.delete");
//創建消費者;并設定訊息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 訊息者標簽,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標志(收到訊息失敗后是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//收到的訊息
System.out.println("消費者1-接收到的訊息為:" + new String(body, "utf-8"));
}
};
//監聽訊息
/**
* 引數1:佇列名稱
* 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回復接收到了,mq接收到回復會洗掉訊息,設定為false則需要手動確認
* 引數3:訊息接收到后回呼
*/
channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);
}
}
③消費者2
接收所有型別的訊息:新增商品,更新商品和洗掉商品,
package com.itheima.rabbitmq.topic;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創建頻道
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 宣告(創建)佇列
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列
* 引數3:是否獨占本次連接
* 引數4:是否在不使用的時候自動洗掉佇列
* 引數5:佇列其它引數
*/
channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
//佇列系結交換機
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");
//創建消費者;并設定訊息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 訊息者標簽,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標志(收到訊息失敗后是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//收到的訊息
System.out.println("消費者2-接收到的訊息為:" + new String(body, "utf-8"));
}
};
//監聽訊息
/**
* 引數1:佇列名稱
* 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回復接收到了,mq接收到回復會洗掉訊息,設定為false則需要手動確認
* 引數3:訊息接收到后回呼
*/
channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
}
}
三、測驗
啟動所有消費者,然后使用生產者發送訊息;在消費者對應的控制臺可以查看到生產者發送對應 routing key 對應佇列的訊息;到達按照需要接收的效果;并且這些 routing key 可以使用通配符,
在執行完測驗代碼后,其實到 RabbitMQ 的管理后臺找到 Exchanges 選項卡,點擊 topic_exchange 的交換機,可以查看到如下的系結:

總結
Topic主題模式可以實作 Publish/Subscribe發布與訂閱模式 和 Routing路由模式 的功能;只是Topic 在配置 routing key 的時候可以使用通配符,顯得更加靈活,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/275830.html
標籤:其他
下一篇:C++實作 高精度 加減乘除
