RabbitMQ(九)——Routing 之訂閱模型-Direct(直連)
Direct exchange簡介
在Fanout模式中,一條訊息,會被所有訂閱的佇列都消費,但是,在某些場景下,我們希望不同的訊息被不同的佇列消費,這時就要用到Direct型別的Exchange,
在Direct模型下:
- 佇列與交換機的系結,不能是任意系結了,而是要指定一個
RoutingKey(路由key) - 訊息的發送方在 向 Exchange發送訊息時,也必須指定訊息的
RoutingKey, - Exchange不再把訊息交給每一個系結的佇列,而是根據訊息的
Routing Key進行判斷,只有佇列的Routingkey與訊息的Routing key完全一致,才會接收到訊息
領取資料
圖解:
- P:生產者,向Exchange發送訊息,發送訊息時,會指定一個routing key,
- X:Exchange(交換機),接收生產者的訊息,然后把訊息遞交給 與routing key完全匹配的佇列
- C1:消費者,其所在佇列指定了需要routing key 為 error 的訊息
- C2:消費者,其所在佇列指定了需要routing key 為 info、error、warning 的訊息
在上面這張圖中,我們可以看到 X 系結了兩個佇列,系結型別是 direct,佇列 Q1 系結鍵為 orange,佇列 Q2 系結鍵有兩個:一個系結鍵為 black,另一個系結鍵為 green
在這種系結情況下,生產者發布訊息到 exchange 上,系結鍵為 orange 的訊息會被發布到佇列 Q1,系結鍵為 blackgreen 和的訊息會被發布到佇列 Q2,其他訊息型別的訊息將被丟棄,
領取資料
多重系結

當然如果 exchange 的系結型別是 direct,但是它系結的多個佇列的 key 如果都相同,在這種情況下雖然系結型別是 direct 但是它表現的就和 fanout 有點類似了,就跟廣播差不多,如上圖所示,
實戰


代碼
生產者
package com.study.rabbitmq.six;
import com.rabbitmq.client.Channel;
import com.study.rabbitmq.utils.RabbitMQUtils;
import java.util.Scanner;
public class DirectLogs {
//交換機名稱
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
Scanner scanner = new Scanner(System.in);
// 加群1025684353一起吹水聊天-->
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
System.out.println("生產者發出訊息:" + message);
}
}
}
消費者一:
package com.study.rabbitmq.six;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.study.rabbitmq.utils.RabbitMQUtils;
public class ReceiveLogsDirect01 {
//宣告交換機
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
//宣告一個交換機
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//宣告一個佇列
channel.queueDeclare("console",false,false,false,null);
/*
* 1、佇列名稱
* 2、交換機
* 3、Routing key
* */
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
// 加群1025684353一起吹水聊天-->
//接收訊息
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("ReceiveLogsDirect01控制臺列印接收到的訊息:"+ new String(message.getBody(),"UTF-8"));
};
//消費者取消訊息時回呼介面
channel.basicConsume("console",true,deliverCallback,consumerTag -> {});
}
}
領取資料
消費者二:
package com.study.rabbitmq.six;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.study.rabbitmq.utils.RabbitMQUtils;
public class ReceiveLogsDirect02 {
//宣告交換機
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
//宣告一個交換機
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//宣告一個佇列
channel.queueDeclare("disk",false,false,false,null);
/*
* 1、佇列名稱
* 2、交換機
* 3、Routing key
* */
// 加群1025684353一起吹水聊天-->
channel.queueBind("disk",EXCHANGE_NAME,"error");
//接收訊息
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("ReceiveLogsDirect02控制臺列印接收到的訊息:"+ new String(message.getBody(),"UTF-8"));
};
//消費者取消訊息時回呼介面
channel.basicConsume("disk",true,deliverCallback,consumerTag -> {});
}
}
運行測驗一:



運行測驗二:

領取資料


最后,祝大家早日學有所成,拿到滿意offer,快速升職加薪,走上人生巔峰, 可以的話請給我一個三連支持一下我喲,我們下期再見
領取資料

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/294157.html
標籤:其他
上一篇:springboo專案用自定義框架來完成elasticsearch7.14.0最新版的增刪改。同時在mybatis-plus中集成elasticsearch,完成一系列自動化操作。
