三、SpringAMQP
SpringAMQP是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實作了自動裝配,使用起來非常方便
SpringAMQP的官方地址
- https://spring.io/projects/spring-amqp
AMQP
Spring AMQP
SpringAMQP提供了三個功能
- 自動宣告佇列、交換機及其系結關系
- 基于注解的監聽模式,異步接收訊息
- 封裝了RabbitTemplate工具,用于發送訊息
3.1、Basic Queue 簡單佇列模型
在父工程引入依賴
<!--AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.1.1、訊息發送
-
①、配置MQ地址,在publisher服務的application.yml中添加配置
-
spring: rabbitmq: host: 192.168.222.135 # 主機名 port: 5672 # 埠號 virtual-host: /coolman # 虛擬主機 username: root # 用戶名 password: root # 密碼
-
-
②、在publisher服務中撰寫測驗類,并利用RabbitTemplate實作訊息發送
-
package cn.coolman.mq.springamqptest; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class SimpleProducerTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test01() { String queueName = "simpleQueue"; String message = "革命尚未完成,同志仍需努力"; rabbitTemplate.convertAndSend(queueName, message); System.out.println("發送簡單訊息:【" + message + "】完畢"); } }
-
3.1.2、訊息接收
-
①、配置MQ地址,在consumer服務的application.yml中添加配置
-
spring: rabbitmq: host: 192.168.222.135 # 主機名 port: 5672 # 埠號 virtual-host: /coolman # 虛擬主機 username: root # 用戶名 password: root # 密碼
-
-
②、在consumer服務中定義佇列
-
package cn.coolman.mq.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitQueueConfig{ /** * 定義佇列,佇列名稱為:simpleQueue * @return */ @Bean public Queue simpleQueue(){ return new Queue("simpleQueue"); } }
-
-
③、在consumer服務中創建一個SpringRabbitListener類
-
package cn.coolman.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SimpleListener { /** * 方法的呼叫時機:如果佇列中一旦出現了訊息,馬上就會呼叫這個方法去處理,并且會把這個訊息傳遞給message引數 * @RabbitListener 指定監聽的佇列的名稱 * @param message */ @RabbitListener(queues = "simpleQueue") public void listener1(String message) { System.out.println("監聽器聽到的訊息:【"+ message + "】"); } }
-
3.1.3、測驗
- 啟動consumer服務,然后在publisher服務中運行測驗代碼,發送MQ訊息

3.2、Work Queue 作業佇列模型
Work Queues,也杯稱為(Task Queues),任務模型
- 簡單來說就是讓多個消費者系結到一個佇列,共同消費佇列中的訊息
當訊息處理比較耗時的時候,可能生產訊息的速度會遠遠大于消費的速度,
長此以往,訊息就會堆積越來越多,無法及時處理,
此時就可以使用work模型,多個消費者共同處理訊息,速度就能大大提高
3.2.1、訊息發送
-
這次我們回圈發送,模擬大量訊息堆積現象
-
在publisher服務中的SpringAmqpTest類中添加一個測驗方法
-
@Test public void testWorkQueue() { String queueName = "workQueue"; String message = "good news"; for (int i = 1; i < 11; i++) { rabbitTemplate.convertAndSend(queueName, message + "\t" + i); System.out.println("發送簡單訊息:【" + message + "\t" + i + "】完畢"); } }
-
3.2.2、訊息接收
-
修改RabbitQueueConfig類,將佇列名稱改為workQueue
-
package cn.coolman.mq.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitQueueConfig{ /** * 定義佇列,佇列名稱為:simpleQueue * @return */ // @Bean // public Queue simpleQueue(){ // return new Queue("simpleQueue"); // } /** * 定義佇列,佇列名稱為:workQueue * @return */ @Bean public Queue workQueue(){ return new Queue("workQueue"); } }
-
-
要模擬多個消費者系結同一個佇列,我們在consumer服務的SpringRabbitListener中添加1個新的方法,這兩個方法系結同一個佇列
-
package cn.coolman.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SimpleListener { /** * 方法的呼叫時機:如果佇列中一旦出現了訊息,馬上就會呼叫這個方法去處理,并且會把這個訊息傳遞給message引數 * @RabbitListener 指定監聽的佇列的名稱 * @param message */ @RabbitListener(queues = "workQueue") public void listener01(String message) { System.out.println("監聽器01聽到的訊息:【"+ message + "】"); } @RabbitListener(queues = "workQueue") public void listener02(String message) throws InterruptedException { System.out.println("監聽器02聽到的訊息:【"+ message + "】"); Thread.sleep(10000); } } -
Listener02這個消費者sleep了10秒,模擬任務耗時
-
3.2.3、測驗
- 啟動ConsumerApplication后,執行publisher服務中剛撰寫的發送測驗方法testWorkQueue
- 可以看到,我們發布的10條訊息,居然平均分配給了兩個消費者,要知道我們其中一個消費者可是每次都睡眠了10秒鐘
- 這并沒有考慮到消費者的處理能力,顯然是有問題的
3.2.4、任務策略
能者多勞
-
在Spring中有個簡單的配置,可以解決這個問題
-
修改consumer服務的application.yml檔案,添加配置
-
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能獲取一條訊息,處理完成才能后去下一個訊息 -
再進行測驗
3.2.5、小結
- Work Queue模型的使用
- 多個消費者系結到一個佇列,同一條訊息只會被一個消費者處理
- 通過設定prefetch來控制消費者預取的訊息數量
3.3、Publish/Subscribe 發布/訂閱模型
- 發布/訂閱的模型如圖所示
- 可以看到,在發布/訂閱模型中,多了一個exchange角色,而且程序略有變化
- Publisher:生產者,也就是要發送訊息的程式,但是不再發送到佇列中,而是發給exchange(交換機)
- Exchange:交換機,一方面,接收生產者發送的訊息;另一方面,知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或者是將訊息丟棄
- 到底如何操作,取決與Exchange的型別;Exchange有以下3中型別
- Fanout:廣播,將訊息交給所有系結到交換機的佇列
- Direct:定向,把訊息交給符合指定routing key的佇列
- Topic:通配符,把訊息交給符合routing pattern(路由模式)的佇列
- Consumer:消費者,與以前一樣,訂閱佇列,沒有變化
- Queue:訊息佇列也與以前一樣,接收訊息、快取訊息
- PS:
- Exchange(交換機)只負責轉發訊息,不具備存盤訊息的能力
- 因此如果沒有任何佇列與Exchange系結,或者沒有符合路由規則的佇列,那么訊息會丟失
3.3.1、Fanout 模型
- Fanout,中文意思是扇出,一般稱為廣播
- 在廣播模式下,訊息發送流程如下所示
- 1)可以有很多個佇列
- 2)每個佇列都要系結到Exchange(交換機)
- 3)生產者發送的訊息,只能發送到交換機,交換機來決定要發給哪個佇列,生產者無法決定
- 4)交換機把訊息發送給系結過的所有佇列
- 5)訂閱佇列的消費者都能拿到訊息
①、宣告佇列和交換機
-
Spring提供了一個介面Exchange,來表示所有不同型別的交換機
-
在consumer中創建一個類,宣告佇列和交換機
-
package cn.coolman.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitQueueConfig{ // ========================定義發布訂閱模式的交換機與佇列,并且把佇列系結到交換機上================================= /** * 定義fanout的交換機 * @return */ @Bean // 這里的回傳值物件已經存盤到spring的容器中, key 方法名: new FanoutExchange("fanoutExchange") public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } /** * 定義一個佇列 */ @Bean public Queue fanoutQueue01() { return new Queue("fanoutQueue01"); } /** * 定義一個佇列 */ @Bean public Queue fanoutQueue02() { return new Queue("fanoutQueue02"); } /** * 把佇列系結到交換機上 */ @Bean public Binding bindingQueueToExchange01(FanoutExchange fanoutExchange, Queue fanoutQueue01) { return BindingBuilder.bind(fanoutQueue01).to(fanoutExchange); } /** * 把佇列系結到交換機上 */ @Bean public Binding bindingQueueToExchange02(FanoutExchange fanoutExchange, Queue fanoutQueue02) { return BindingBuilder.bind(fanoutQueue02).to(fanoutExchange); } }
-
②、訊息發送
-
在publisher服務的SpringAMQPTest類中添加測驗方法
-
@Test public void testFanoutQueue() { String exchangeName = "fanoutExchange"; String message = "嘿嘿嘿嘿嘿嘿,fanout,i am coming "; rabbitTemplate.convertAndSend(exchangeName, "", message); System.out.println("發送成功!"); }
-
③、訊息接收
-
在consumer服務的SpringRabbitListener中添加兩個方法,作為消費者
-
package cn.coolman.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanoutListener { /** * 方法的呼叫時機:如果佇列中一旦出現了訊息,馬上就會呼叫這個方法去處理,并且會把這個訊息傳遞給message引數 * @RabbitListener 指定監聽的佇列的名稱 * @param message */ @RabbitListener(queues = "fanoutQueue01") public void listener01(String message) { System.out.println("監聽器01聽到的訊息:【"+ message + "】"); } @RabbitListener(queues = "fanoutQueue02") public void listener02(String message) throws InterruptedException { System.out.println("監聽器02聽到的訊息:【"+ message + "】"); } }
-
-
運行結果如下所示

- 由此可見,FanoutExchange會將訊息無條件轉發給每個系結的佇列
- 即routekey為空:
rabbitTemplate.convertAndSend(exchangeName, "", message);
④、小結
- 交換機的作用
- 接收publisher發送的訊息
- 將訊息按照規則路由到與之系結的佇列
- 不能快取訊息,路由失敗,訊息丟失
- FanoutExchange的會將訊息路由到每個系結的佇列
- 宣告佇列、交換機、系結關系的Bean是什么
- Queue
- FanoutExchange
- Binding
3.3.2、Direct 模型
- 在Fanout模式中,一條訊息,會被所有訂閱的佇列消費,但是,在某些場景下,我們希望不同的訊息被不同的佇列消費,這時候就要用到Direct型別的Exchange
- 在Direct模型中
- 佇列與交換機的系結,不能是任意系結了,而是要指定一個
RoutingKey(路由Key)- Pulisher在向Excahnge發送訊息的時候,也必須指定訊息的
RoutingKey- Exchange不再把訊息交給每一個系結的佇列,而是根據訊息的
RoutingKey在進行判斷,只有佇列的RoutingKey與訊息的RoutingKey完全一致,才會接收到訊息
①、基于注解宣告佇列和交換機
-
基于@Bean的方式宣告佇列和交換機比較麻煩,Spring還提供了基于注解的方式來宣告
-
在consumer的SpringRabbitListener中添加兩個消費者,同時基于注解來宣告佇列和交換機
-
package cn.coolman.mq.listener; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DirectListener { @RabbitListener( bindings = @QueueBinding( exchange = @Exchange( name = "directExchange", type = ExchangeTypes.DIRECT ), value = https://www.cnblogs.com/OnlyOnYourself-lzw/p/@Queue("directQueue01"), key = "pig") ) public void listener01(String message) { System.out.println("監聽器01聽到的訊息:【"+ message + "】"); } @RabbitListener( bindings = @QueueBinding( exchange = @Exchange( name = "directExchange", type = ExchangeTypes.DIRECT ), value = https://www.cnblogs.com/OnlyOnYourself-lzw/p/@Queue("directQueue02"), key = "dog") ) public void listener02(String message) { System.out.println("監聽器02聽到的訊息:【"+ message + "】"); } }
-
②、訊息發送
-
在publisher服務的SpringAMQPTest類中添加測驗方法
-
@Test public void testDirectQueue() { String exchangeName = "directExchange"; String key1 = "pig"; String key2 = "dog"; String message = "I am a "; rabbitTemplate.convertAndSend(exchangeName, key1, message + key1); System.out.println("向directExchange交換機發送了路由key為【" + key1 + "】的訊息【" + message + key1 + "】完畢!"); rabbitTemplate.convertAndSend(exchangeName, key2, message + key2); System.out.println("向directExchange交換機發送了路由key為【" + key2 + "】的訊息【" + message + key2 + "】完畢!"); }
-
-
運行結果如下所示

- 可以明顯看到,DirectExchange是根據
routekey來分發訊息的
③、總結
- Direct交換機與Fanout交換機的差異
- Fanout交換機將訊息路由給每一個與之系結的佇列
- Direct交換機根據RoutingKey判斷路由給哪個佇列
- 如果多個佇列具有相同的RoutingKey,則與Fanout功能類似
- 基于@RabbitListener注解宣告佇列和交換機有那些常見的注解
- @Queue
- @Exchange
3.3.3、Topic 模型
- Topic模型的Exchange與Direct模型相比,都是可以根據
RoutingKey把訊息路由到不同的佇列- 只不過Topic模型的Exchange可以讓佇列在系結
RoutingKey的時候使用通配符RoutingKey一般是由一個或多個單詞組成,多個單詞之間以.號分割,例如item.insert- 通配符規則
#:匹配一個或多個詞*:匹配不多不少,恰好一個詞- 舉例
item.#:表示能夠匹配item.spu.insert或者item.spuitem.*:只能匹配item.spu- 再比如下圖所示
- Queue1:系結的是
china.#,因此凡是以china.開頭的routing key都會被匹配到,包括china.news和china.weather- Queue2:系結的是
#.news,因此凡是以.news結尾的routing key都會被匹配,包括china.news和japan.news
- 案例需求
- ①、利用@RabbitListener宣告Exchange、Queue、RoutingKey
- ②、在consumer服務中,撰寫兩個消費者方法,分別監聽
topic.queue1和topic.queue2 - ③、在publisher中撰寫測驗方法,向topicExchange發送訊息
- ?
①、訊息發送
-
@Test public void testTopicQueue() { String exchangeName = "topicExchange"; String routeKey1 = "coolman.hand"; String routeKey2 = "coolman.hand.hand.hand"; String message1 = "世紀之握!!!!"; String message2 = "究極無敵世紀之握!!!!"; rabbitTemplate.convertAndSend(exchangeName, routeKey1, message1); System.out.println("向【topicExchange】交換機發送 routeKey = 【coolman.hand】型別訊息:【" + message1 + "】"); rabbitTemplate.convertAndSend(exchangeName, routeKey2, message2); System.out.println("向【topicExchange】交換機發送 routeKey = 【coolman.hand.hand.hand】型別訊息:【" + message2 + "】"); }
②、訊息接收
-
package cn.coolman.mq.listener; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TopicListener { @RabbitListener( bindings = @QueueBinding( exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC), value = https://www.cnblogs.com/OnlyOnYourself-lzw/p/@Queue("topic.queue1"), key = "coolman.*" ) ) public void listener01(String message) { System.out.println("【routeKey = " + "coolman.*】的監聽器【topic.queue1】接收到訊息:【" + message + "】"); } @RabbitListener( bindings = @QueueBinding( exchange = @Exchange(name = "topicExchange", type = ExchangeTypes.TOPIC), value = https://www.cnblogs.com/OnlyOnYourself-lzw/p/@Queue("topic.queue2"), key = "coolman.#" ) ) public void listener02(String message) { System.out.println("【routeKey = " + "coolman.#】的監聽器【topic.queue2】接收到訊息:【" + message + "】"); } } -
運行結果如下所示

- 顯然,
#號可以匹配一個或多個詞;*號匹配不多不少,恰好一個詞
③、小結
- Direct交換機和Topic交換機的差別
- Topic交換機接收的訊息RoutingKey必須是多個單詞,以
**.**分割 - Topic交換機與佇列系結時的bingdingKey可以指定通配符
#:代表0個或多個詞*:代表1個詞
- Topic交換機接收的訊息RoutingKey必須是多個單詞,以
3.4、訊息轉換器
- 之前說過,Spring會把發送的訊息序列化為位元組發送給MQ,接收訊息的時候,還會把位元組反序列化為Java物件
- 只不過,默認情況下Spring采用的序列化方式是JDK序列化,眾所周知,JDK序列化存在下列問題
- 資料體積過大
- 有安全漏洞
- 可讀性差
- 接下來我們可以測驗一下
3.4.1、測驗默認轉換器
-
我們修改訊息發送的代碼,發送一個Map物件
-
@Test public void testDefaultConvert() { String queueName = "simple.queue"; // 準備訊息 HashMap<String, Object> message = new HashMap<>(); message.put("name", "coolman"); message.put("age", 8); message.put("power", "SSS+"); // 發送訊息 rabbitTemplate.convertAndSend(queueName, message); }
-
-
停止consumer服務
-
發送訊息后查看RabbitMQ控制臺,查看map資料
-

-
顯然,JDK序列化方式并不合適;我們希望訊息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化
3.4.2、配置JSON轉換器
-
在publisher和consumer兩個服務中都引入依賴
-
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.10.5</version> </dependency>
-
-
配置訊息轉換器,在啟動類中添加一個Bean即可,如下所示
-
/** * 訊息轉換器 * @return */ @Bean // amqp的MessageConverter public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
-
-
再次向
simple.queue發送訊息,再在控制臺查看,如下圖所示 -
顯然,使用JSON進行序列化,可以有效減小體積和提高可讀性
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/499616.html
標籤:Java












