目錄
1、什么是SpringAMQP
2、簡單佇列模型
3、WorkQueue 作業佇列
3.1 模擬WorkQueue,實作一個佇列系結多個消費者
1、訊息發送
2、訊息接收
3、測驗
4、能者多勞
5、小結
4、發布/訂閱
4.1 Fanout(廣播)
1、在消費者中創建一個配置類,宣告佇列和交換機:
2、訊息發送
3、訊息接收
4、測驗
5、小結
4.2、Direct(路由)
1、訊息接收
2、訊息發送
3、總結
4.3 Topic(通配符)
1、訊息發送
2、訊息接收
5、訊息轉換器
5.1 測驗默認轉換器
5.2 配置JSON轉換器
1、什么是SpringAMQP
SpringAMQP是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實作了自動裝配,使用起來非常方便,
SpringAmqp的官方地址:Spring AMQP
2、簡單佇列模型
1、在父工程pom.xml中引入依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、首先在生產者端配置MQ地址,在application.yml中添加配置:

spring:
rabbitmq:
host: 192.168.58.131 #rabbitMQ的ip地址
port: 5672 #埠
username: jie #用戶名
password: 1234 #密碼
virtual-host: / #虛擬主機
2、在生產者端撰寫測驗類SpringAmqpTest,并利用RabbitTemplate實作訊息發送:
package com.jie.mq.helloworld.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
//佇列名稱
String queueName = "simple.queue";
//訊息
String message = "hello, spring amqp!";
//發送訊息
rabbitTemplate.convertAndSend(queueName, message);
}
}
點擊運行后可以到瀏覽器看看,
3、在消費者端配置MQ地址,在application.yml中添加配置(和在生產者端一致)
4、然后在生產者服務新建一個類,代碼如下:
package com.jie.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("spring 消費者接收到訊息 :【" + msg + "】");
}
}
5、啟動消費者,查看控制臺和瀏覽器,
3、WorkQueue 作業佇列
Work queues,也被稱為(Task queues),任務模型,簡單來說就是讓多個消費者系結到一個佇列,共同消費佇列中的訊息,
當訊息處理比較耗時的時候,可能生產訊息的速度會遠遠大于訊息的消費速度,長此以往,訊息就會堆積越來越多,無法及時處理,
此時就可以使用work 模型,多個消費者共同處理訊息處理,速度就能大大提高了
3.1 模擬WorkQueue,實作一個佇列系結多個消費者
1、訊息發送
這次我們回圈發送,模擬大量訊息堆積現象,
在生產者服務中添加一個測驗方法:
/**
* @description:向佇列中不停發送訊息,模擬訊息堆積,
* @author: jie
* @time: 2022/2/22 9:23
*/
@Test
public void test2() throws InterruptedException {
//佇列名稱
String queueName = "simple.queue";
//訊息
String message = "hello, spring amqp!--";
for (int i = 1; i < 50; i++) {
//發送訊息
rabbitTemplate.convertAndSend(queueName, message+i);
//休眠
Thread.sleep(20);
}
}
2、訊息接收
要模擬多個消費者系結同一個佇列,我們在消費者服務的中添加2個新的方法:
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("spring 消費者1接收到訊息 :【" + msg + "】"+ LocalTime.now());
//休眠
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消費者2........接收到訊息:【" + msg + "】" + LocalTime.now());
//休眠
Thread.sleep(200);
}
3、測驗
啟動消費者后,在執行生產者服務中剛剛撰寫的發送測驗方法InterruptedException,
可以看到消費者1很快完成了自己的25條訊息,消費者2卻在緩慢的處理自己的25條訊息,
也就是說訊息是平均分配給每個消費者,并沒有考慮到消費者的處理能力,這樣顯然是有問題的,
4、能者多勞
在spring中有一個簡單的配置,可以解決這個問題,我們修改消費者服務的application.yml檔案,添加配置:
重啟消費者,再重新發送訊息,

5、小結
Work模型的使用:
多個消費者系結到一個佇列,同一條訊息只會被一個消費者處理
通過設定prefetch來控制消費者預取的訊息數量
4、發布/訂閱
發布訂閱模式與之前案例的區別就是允許將同一訊息發送給多個消費者,實作方式是加入了exchange(交換機),
發布訂閱的模型如圖:
可以看到,在訂閱模型中,多了一個exchange角色,而且程序略有變化:
Publisher:生產者,也就是要發送訊息的程式,但是不再發送到佇列中,而是發給exchange(交換機)
Exchange:交換機,一方面,接收生產者發送的訊息,另一方面,知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄,到底如何操作,取決于Exchange的型別,Exchange有以下3種型別:
Fanout:廣播,將訊息交給所有系結到交換機的佇列
Direct:路由,把訊息交給符合指定routing key 的佇列
Topic:通配符,把訊息交給符合routing pattern(路由模式) 的佇列
Consumer:消費者,與以前一樣,訂閱佇列,沒有變化
Queue:訊息佇列也與以前一樣,接收訊息、快取訊息,
Exchange(交換機)只負責轉發訊息,不具備存盤訊息的能力,因此如果沒有任何佇列與Exchange系結,或者沒有符合路由規則的佇列,那么訊息會丟失!
4.1 Fanout(廣播)
Fanout,英文翻譯是扇出,在MQ中叫廣播更合適,

在廣播模式下,訊息發送流程是這樣的:
可以有多個佇列,
每個佇列都要系結到Exchange(交換機),
生產者發送的訊息,只能發送到交換機,交換機來決定要發給哪個佇列,生產者無法決定,
交換機把訊息發送給系結過的所有佇列,
訂閱佇列的消費者都能拿到訊息,
1、在消費者中創建一個配置類,宣告佇列和交換機:
package com.jie.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description:廣播交換機配置類
* @author: jie
* @time: 2022/2/22 11:09
*/
@Configuration
public class FanoutConfig {
/**
* @description:創建交換機itcast.fanout
* @author: jie
* @time: 2022/2/12 19:38
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* @description:創建佇列fanout.queuel
* @author: jie
* @time: 2022/2/12 19:40
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queuel");
}
/**
* @description:系結佇列fanout.queuel到交換機
* @author: jie
* @time: 2022/2/12 19:41
*/
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
// fanout.queue2
/**
* @description:創建佇列fanout.queue2
* @author: jie
* @time: 2022/2/12 19:40
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* @description:系結佇列fanout.queue2到交換機
* @author: jie
* @time: 2022/2/12 19:41
*/
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}
運行消費者然后查看瀏覽器,

2、訊息發送
在生產者服務的測驗類中添加測驗方法:
/**
* @description:測驗交換機發送訊息
* @author: jie
* @time: 2022/2/22 11:18
*/
@Test
public void testFanoutExchange() {
//交換機名稱
String exchangeName = "itcast.fanout";
//訊息
String message = "hello,every one!";
// 發送訊息
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
3、訊息接收
在消費者服務中添加兩個方法,作為消費者:
@RabbitListener(queues = "fanout.queuel")
public void listenFanoutQueue1(String msg){
System.out.println("消費者接收到fanout.queuel的訊息 :【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg){
System.out.println("消費者接收到fanout.queue2的訊息 :【" + msg + "】");
}
4、測驗
運行生產者訊息發送,啟動消費者,查看控制臺,
5、小結
交換機的作用是什么?
接收生產者發送的訊息
將訊息按照規則路由到與之系結的佇列
不能快取訊息,路由失敗,訊息丟失
FanoutExchange的會將訊息路由到每個系結的佇列
宣告佇列、交換機、系結關系的Bean是什么?
Queue(佇列)
FanoutExchange(廣播交換機)
Binding(系結)
4.2、Direct(路由)
在Fanout模式中,一條訊息,會被所有訂閱的佇列都消費,但是,在某些場景下,我們希望不同的訊息被不同的佇列消費,這時就要用到Direct型別的Exchange,
Direct Exchange 會將接收到的訊息根據規則路由到指定的Queue,因此稱為路由模式(routes),

在Direct模型下:
-
佇列與交換機的系結,不能是任意系結了,而是要指定一個
RoutingKey(路由key) -
訊息的發送方在 向 Exchange發送訊息時,也必須指定訊息的
RoutingKey, -
Exchange不再把訊息交給每一個系結的佇列,而是根據訊息的
Routing Key進行判斷,只有佇列的Routingkey與訊息的Routing key完全一致,才會接收到訊息
1、訊息接收
基于@Bean的方式宣告佇列和交換機比較麻煩,Spring還提供了基于注解方式來宣告,
在消費者r中添加兩個消費者,同時基于注解來宣告佇列和交換機:
@RabbitListener(bindings = @QueueBinding(
//佇列
value = @Queue(name = "driect.queue1"),
//交換機
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue2(String msg){
System.out.println("消費者接收到direct.queue1的訊息 :【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
//佇列
value = @Queue(name = "driect.queue2"),
//交換機
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void listenDirectQueue1(String msg){
System.out.println("消費者接收到direct.queue2的訊息 :【" + msg + "】");
}

2、訊息發送
@Test
public void testSendDirectExchange(){
//交換機名稱
String exchangeName = "itcast.direct";
//訊息
String message = "hello,jie";
// 發送訊息
rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}

3、總結
描述下Direct交換機與Fanout交換機的差異?
Fanout交換機將訊息路由給每一個與之系結的佇列
Direct交換機根據RoutingKey判斷路由給哪個佇列
如果多個佇列具有相同的RoutingKey,則與Fanout功能類似
基于@RabbitListener注解宣告佇列和交換機有哪些常見注解?
@Queue
@Exchange
4.3 Topic(通配符)
Topic型別的Exchange與Direct相比,都是可以根據RoutingKey把訊息路由到不同的佇列,只不過Topic型別Exchange可以讓佇列在系結Routing key的時候使用通配符!
Routingkey一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如:item.insert通配符規則:
#:匹配一個或多個詞
*:匹配不多不少恰好1個詞舉例:
item.#:能夠匹配item.spu.insert或者item.spu
item.*:只能匹配item.spu
1、訊息發送
在生產者服務的中添加測驗方法:
@Test
public void testSendTopicExchange(){
//交換機名稱
String exchangeName = "itcast.topic";
//訊息
String message = "2022冬奧運在中國開始了!";
// 發送訊息
rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);
}
2、訊息接收
在消費者服務中添加方法:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消費者接收到topic.queue1的訊息 :【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消費者接收到topic.queue2的訊息 :【" + msg + "】");
}

5、訊息轉換器
Spring會把你發送的訊息序列化為位元組發送給MQ,接收訊息的時候,還會把位元組反序列化為Java物件,
默認情況下Spring采用的序列化方式是JDK序列化,眾所周知,JDK序列化存在下列問題:
資料體積過大
有安全漏洞
可讀性差
我們來測驗一下,
5.1 測驗默認轉換器
@Test
public void test6() {
Map<String,Object> msg = new HashMap<>();
msg.put("name","zs");
msg.put("age",24);
rabbitTemplate.convertAndSend("object.queue",msg);
}
我們修改訊息發送的代碼,發送一個Map物件:
發送訊息后查看瀏覽器:
5.2 配置JSON轉換器
顯然,JDK序列化方式并不合適,我們希望訊息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化,
在生產者和消費者兩個服務中都引入依賴:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
配置訊息轉換器,
在啟動類中添加一個Bean即可:
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}

查看瀏覽器
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/431485.html
標籤:其他
上一篇:update關聯更新
下一篇:Flink簡單使用手冊


