RabbitMQ
一、MQ
1.同步呼叫的優缺點
同步呼叫的優點:
- 時效性較強,可以立即得到結果
同步呼叫的問題:
- 耦合度高
- 性能和吞吐能力下降
- 有額外的資源消耗
- 有級聯失敗問題
2.異步呼叫
異步呼叫常見實作就是事件驅動模式

好處:
-
吞吐量提升:無需等待訂閱者處理完成,回應更快速
-
故障隔離:服務沒有直接呼叫,不存在級聯失敗問題
-
呼叫間沒有阻塞,不會造成無效的資源占用
-
耦合度極低,每個服務都可以靈活插拔,可替換
-
流量削峰:不管發布事件的流量波動多大,都由Broker接收,訂閱者可以按照自己的速度去處理事件
缺點:
- 架構復雜了,業務沒有明顯的流程線,不好管理
- 需要依賴于Broker的可靠、安全、性能
3.MQ實作
MQ,中文是訊息佇列(MessageQueue),字面來看就是存放訊息的佇列,也就是事件驅動架構中的Broker,
幾種常見MQ的對比:
| RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
|---|---|---|---|---|
| 公司/社區 | Rabbit | Apache | 阿里 | Apache |
| 開發語言 | Erlang | Java | Java | Scala&Java |
| 協議支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協議 | 自定義協議 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 單機吞吐量 | 一般 | 差 | 高 | 非常高 |
| 訊息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內 |
| 訊息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求訊息低延遲:RabbitMQ、Kafka
二、Linux安裝RabbitMQ
1.1.下載鏡像
方式一:在線拉取
docker pull rabbitmq:3-management
方式二:從本地加載
在課前資料已經提供了鏡像包:

上傳到虛擬機中后,使用命令加載鏡像即可:
docker load -i mq.tar
1.2.安裝MQ
執行下面的命令來運行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
注意5672是MQ訊息通信的埠,15672是ui埠
三、RabbitMQ
1.種類
大致分為兩種
1.**無交換機的 **基本訊息佇列 和 作業訊息佇列

2.有交換機的發布訂閱,路由,主題

2.基于SpringAMQP使用
2.1基礎訊息佇列

-
先使用測驗類生成佇列
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立連接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.設定連接引數,分別是:主機名、埠號、vhost、用戶名、密碼 factory.setHost("116.62.32.68"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立連接 Connection connection = factory.newConnection(); // 2.創建通道Channel Channel channel = connection.createChannel(); // 3.創建佇列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.訂閱訊息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.處理訊息 String message = new String(body); System.out.println("接收到訊息:【" + message + "】"); } }); System.out.println("等待接收訊息,,,,"); } } -
引入依賴
<!--AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> -
生產者撰寫yaml檔案的MQ地址,埠號,用戶名密碼
spring: rabbitmq: host: 192.168.150.101 # 主機名 port: 5672 # 埠 virtual-host: / # 虛擬主機 username: itcast # 用戶名 password: 123321 # 密碼 -
撰寫生產者代碼
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { //類似于redisTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue(){ //佇列名 String queueName = "simple.queue"; //訊息 String message = "hello,SpringAmqp"; //發送訊息到佇列 rabbitTemplate.convertAndSend(queueName,message); } } -
消費者撰寫yaml檔案的MQ地址,埠號,用戶名密碼
同上
-
撰寫消費者代碼
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消費者接收到訊息:【" + msg + "】"); } }
2.2作業訊息佇列

消費者:
//作業佇列
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1Message(String msg) throws InterruptedException {
System.out.println("spring 消費者1接收到訊息:【" + msg + "】");
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue2Message(String msg) throws InterruptedException {
System.out.println("spring 消費者2接收到訊息:【" + msg + "】");
}
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: # 主機名
port: 5672 # 埠
virtual-host: / # 虛擬主機
username: itcast # 用戶名
password: 123321 # 密碼
listener:
direct:
prefetch: 1 # 訊息預期,之前把所有訊息預期導致平分訊息,現在是處理一個取一個
2.3廣播Fanout

多了一個交換機,交換機決定將生產者的訊息發送給哪個佇列,這決定就是交換機種類不同規則不同,因此分為三種,廣播、路由和主題,
廣播:只要佇列系結了交換機,就發送訊息到佇列
1.撰寫消費者:使用注解宣告佇列和交換機(同時系結消費者的佇列和交換機)
//3.廣播(系結佇列,同時系結交換機)
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/Gao-yubo/archive/2022/11/01/@Queue(name ="fanout.queue1"),
exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String message){
System.out.println("廣播方式佇列1接受訊息:"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/Gao-yubo/archive/2022/11/01/@Queue(name ="fanout.queue2"),
exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String message){
System.out.println("廣播方式佇列2接受訊息:"+message);
}
2.生產者
//廣播型別交換機
@Test
public void testFanoutExchangeQueue(){
//交換機名稱
String exchangeName = "fanout.exange";
//訊息
String message = "廣播訊息";
//發送,第二個引數是佇列的key,在路由型別中可以使用到
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
2.4路由Route

和廣播相比,在佇列中多了一個key屬性,
交換機通過訊息的key來對應路由到相同key的佇列上,
1.消費者:
/**
4.路由
*/
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/Gao-yubo/archive/2022/11/01/@Queue(name ="direct.queue1"),
exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String message){
System.out.println("路由紅、藍佇列接受訊息:"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/Gao-yubo/archive/2022/11/01/@Queue(name ="direct.queue2"),
exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
key = {"red","green"}
))
public void listenDirectQueue2(String message){
System.out.println("路由紅、綠佇列接受訊息:"+message);
}
2.生產者
//路由型別交換機
@Test
public void testDirectExchangeQueue(){
//交換機名稱
String exchangeName = "direct.exchange";
//訊息
String message = "路由訊息";
//發送,第二個引數是佇列的key,在路由型別中可以使用到
rabbitTemplate.convertAndSend(exchangeName,"red",message);
}
2.5主題Topic

key不再是一組單詞,而是一個規則
通配符規則:
#:匹配一個或多個詞
*:匹配不多不少恰好1個詞
舉例:
item.#:能夠匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu
1.消費者
/**
* 主題
*/
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/Gao-yubo/archive/2022/11/01/@Queue(name ="topic.queue1"),
exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String message){
System.out.println("中國佇列接受訊息:"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/Gao-yubo/archive/2022/11/01/@Queue(name ="topic.queue2"),
exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String message){
System.out.println("新聞佇列接受訊息:"+message);
}
2.生產者
//主題型別交換機
@Test
public void testTopicExchangeQueue(){
//交換機名稱
String exchangeName = "topic.exchange";
//訊息
String message = "china.lala訊息";
//發送,第二個引數是佇列的key,在路由型別中可以使用到
rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
}
3.訊息轉換器
Spring會把你發送的訊息序列化為位元組發送給MQ,接收訊息的時候,還會把位元組反序列化為Java物件,
默認情況下Spring采用的序列化方式是JDK序列化,眾所周知,JDK序列化存在下列問題:
因此需要配置一個Json轉換器
步驟
- 在publisher和consumer兩個服務中都引入依賴:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
- 配置訊息轉換器,在啟動類中添加一個Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/524921.html
標籤:其他
上一篇:OpenGL ES EGL eglCreateContext
下一篇:有可能是學習Git命令最好的網站
