文章目錄
- 環境準備:
- RabbitAdmin:
- SpringAMQP 宣告:
- RabbitTemplate: 訊息模板
- SimpleMessageListenerContainer : 訊息監聽容器
- MessageListenerAdapter 訊息監聽配接器:
- MessageConverter 訊息轉換器,序列化,反序列化:
- TextMessageConverter 文本訊息轉換器
- JSON格式轉換器 Jackson2JsonMessageConverter
- java物件映射 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
- 支持java物件多映射轉換 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
- 全域轉換器 ContentTypeDelegatingMessageConverter
RabbitMQ和SpringBoot整合使用的組件串列:
RabbitAdmin:
RabbitAdmin 可以用來宣告exchange,queue,binding,發送訊息等操作
SpringAMQP 宣告
通過@Bean注解方式宣告交換機和佇列,
RabbitTemplate 訊息模板
我們在與springAMQP整合的時候進行發送訊息的關鍵類
SimpleMessageListenerContainer
訊息監聽容器
MessageListenerAdapter
訊息監聽配接器
MessageConverter
轉換器,序列化,反序列化
環境準備:
- 新建一個SpringBoot專案
- pom檔案中添加依賴
<!-- junit測驗 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- mq依賴 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 創建配置類, 用來RabbitMQ加載配置,
package com.example.demo;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* @author wx
* @date 2021-01-16
*/
@Configuration
@ComponentScan("com.example.demo.*")
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
}
RabbitAdmin:
rabbitAdmin類可以很好的操作RabbitMQ, 在Spring中直接注入即可,
// 在RabbitMQConfig類中加入如下代碼
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//AutoStartup 必須設定為true, 否則spring容器不會加載
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
RabbitAdmin 底層實作就是從Sping容器中獲取Exchange、Bingding、RoutingKey、以及Queue的@Bean宣告,
然后使用RabbitTemplate的execute方法執行對應的宣告,修改,洗掉等一系列RabbitMQ基礎功能操作,
添加測驗類:RabbitAdminTest
package com.example.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitAdminTest {
@Test
public void contextLoads() {
}
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin() throws Exception{
//宣告交換機
rabbitAdmin.declareExchange(new DirectExchange("wx.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("wx.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("wx.fanout", false, false));
//宣告佇列
rabbitAdmin.declareQueue(new Queue("wx.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("wx.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("wx.fanout.queue", false));
//bingding 交換機和佇列, new HashMap<>()為其他arguments
rabbitAdmin.declareBinding(new Binding("wx.direct.queue",
Binding.DestinationType.QUEUE,
"wx.direct", //exchange
"direct", //routingKey
new HashMap<>()));
//bingding 交換機和佇列的另一種方式
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("wx.topic.queue", false)) //直接創建佇列
.to(new TopicExchange("wx.topic", false, false)) //建立關聯關系(沒有該交換機會報錯)
.with("wx.#")); //routingKey
//bingding fanout型別交換機不需要 routingKey
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("wx.fanout.queue", false))
.to(new FanoutExchange("wx.fanout", false, false)));
//清空佇列
rabbitAdmin.purgeQueue("wx.topic.queue", false);
}
}
在RabbitMQ控制臺 http://localhost:15672/ 可以看到對應的exchange和queue,以及系結關系,

SpringAMQP 宣告:
在RabbitMQ基礎API里面宣告一個Exchange, 宣告一個系結,一個佇列,
通過注解去宣告,跟用RabbitMQ寫法類似,
新建一個 RabbitMQBinding 配置類, 類上加注解 @Configuration,
package com.example.demo;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* @author wx
* @date 2021-01-16
*/
@Configuration
public class RabbitMQBinding {
/**
* 針對消費者配置
* 1. 設定交換機型別
* 2. 將佇列系結到交換機
FanoutExchange: 將訊息分發到所有系結佇列,無routingkey的概念
HeaderExchange: 通過添加屬性key-value匹配
DirectExchange: 按照routingkey分發到指定佇列
TopicExchange: 多關鍵字匹配
*/
@Bean
public TopicExchange topic001(){
return new TopicExchange("wx.topic001", false, false);
}
/**
* 宣告佇列
*/
@Bean
public Queue queue001(){
return new Queue("wx.topic.queue001", true);
}
@Bean
public Queue queue002(){
return new Queue("wx.topic.queue002", true);
}
@Bean
public Queue queue003(){
return new Queue("wx.topic.queue003", true);
}
/**
* 建立系結關系
*/
@Bean
public Binding binding001(){
return BindingBuilder.bind(queue001()).to(topic001()).with("rabbit.*");
}
@Bean
public Binding binding002(){
return BindingBuilder.bind(queue002()).to(topic001()).with("mq.*");
}
}
在RabbitMQ控制臺 http://localhost:15672/ 可以看到對應的exchange和queue,以及系結關系,

RabbitTemplate: 訊息模板
我們在與springAMQP整合的時候進行發送訊息的關鍵類
該類豐富了發送訊息方法,包括可靠性投遞訊息方法,回呼監聽訊息介面 ConfirmCallback, 回傳值確認介面ReturnCallback等等,同樣我們需要進行注入到Spring容器中,然后直接使用,
在 RabbitMQBinding 類中增加如下代碼
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
增加 RabbitTemplateTest 類:
package com.example.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTemplateTest {
@Test
public void contextLoads() {
}
/**
* rabbitTemplate.convertAndSend 發送訊息
* 發送object格式的訊息時, 接收端使用string接收, 發送message格式時,用byte[]接收
*/
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage1() throws Exception{
//直接發送一個object
// rabbitTemplate.convertAndSend("wx.topic001", "mq.info", "hello mq");
rabbitTemplate.convertAndSend("wx.topic001", "rabbit.info", "hello rabbit");
rabbitTemplate.convertAndSend("wx.topic001", "mq.info", "hello mq");
}
@Test
public void testSendMessage2() throws Exception{
//通過message發送訊息
//訊息屬性, 可以使用已經存在的屬性比如setContentType, 也可以自定義屬性
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
//訊息內容
Message message = new Message("hello rabbitMQ - text/plain".getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("wx.topic001", "rabbit.info", message);
}
@Test
public void testSendMessage3() throws Exception{
//發送message, 并在發送時修改和增加內容
//訊息屬性, 可以使用已經存在的屬性比如setContentType, 也可以自定義屬性
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "自定義資訊描述");
messageProperties.getHeaders().put("type", "自定義訊息型別");
//訊息內容
Message message = new Message("hello rabbitMQ".getBytes(StandardCharsets.UTF_8), messageProperties);
//通過 MessagePostProcessor 增加額外的屬性
rabbitTemplate.convertAndSend("wx.topic001", "rabbit.info", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("添加額外的設定");
message.getMessageProperties().getHeaders().put("desc", "額外修改的資訊描述");
message.getMessageProperties().getHeaders().put("attr", "額外增加的屬性");
return message;
}
});
}
}
在RabbitMQ控制臺 http://localhost:15672/ 可以看到對應的queue下的訊息,

SimpleMessageListenerContainer : 訊息監聽容器
這個類非常強大,我們可以對他進行很多設定,對于消費者的配置項,這個類都可以滿足
- 監聽佇列(多個佇列),自動啟動,自動宣告功能
- 設定事務特性,事務管理器,事務屬性,事務容量(并發),是否開啟事務,回滾訊息
- 設定消費者數量,最小最大數量,批量消費
- 設定訊息確認和自動確認模式,是否重回佇列,例外捕獲handler函式,
- 設定消費者標簽生成策略,是否獨占模式,消費者屬性等
- 設定具體的監聽器,訊息轉換器等等,
simpleMessageListenerContainer可以進行動態設定,比如在運行程序中的應用可以動態的修改其消費者數量的大小,接收訊息的模式等,
很多基于RabbitMQ的自定制的一些后端管控臺在進行動態設定的時候,也是根據這一特性去實作的,所以可以看出SpringAMQP非常的強大,
在 RabbitMQBinding 類中增加如下代碼:
/**
* 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer1(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//設定監聽佇列
container.setQueues(queue001(), queue002(), queue003());
//設定消費者數量
container.setConcurrentConsumers(1);
//設定最大消費者數量
container.setMaxConcurrentConsumers(2);
//是否重回佇列
container.setDefaultRequeueRejected(false);
//簽收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的標簽策略, 用于區分不同的消費者
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//處理訊息
String msg = new String(message.getBody());
System.out.println("----- consumer : " + msg);
}
});
return container;
}
– 消費者增加標簽后的效果

MessageListenerAdapter 訊息監聽配接器:
也可以通過配接器的方式處理訊息,上邊代碼改成如下:
/**
* 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
* 通過配接器的方式處理訊息, 處理訊息的模式方法為是 handleMessage, 可以進行修改
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//設定監聽佇列
container.setQueues(queue001(), queue002(), queue003());
//設定消費者數量
container.setConcurrentConsumers(1);
//設定最大消費者數量
container.setMaxConcurrentConsumers(1);
//是否重回佇列
container.setDefaultRequeueRejected(false);
//簽收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的標簽策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//配接器方式
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//修改執行的方法名, 默認方法名為 handleMessage
messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
container.setMessageListener(messageListenerAdapter);
return container;
}
增加一個 MessageDelegate 類:
package com.example.adapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author wx
* @date 2021/2/15
*/
@Component
public class MessageDelegate {
@Autowired
private MessageDelegate messageDelegate;
public void handleMessage(byte[] messageBody){
System.err.println("handleMessage : " + new String(messageBody));
}
public void handleMessage(String messageBody){
System.err.println("handleMessage : " + messageBody);
}
public void consumeMessage(String messageBody){
System.err.println("consumeMessage string : " + messageBody);
}
public void consumeMessage(byte[] messageBody){
System.err.println("consumeMessage byte[] : ");
consumeMessage(new String(messageBody));
}
public void method1(String messageBody){
System.err.println("method1 : " + messageBody);
}
public void method2(String messageBody){
System.err.println("method2 : " + messageBody);
}
}
MessageConverter 訊息轉換器,序列化,反序列化:
TextMessageConverter 文本訊息轉換器
rabbitTemplate.convertAndSend 發送訊息時,有一個問題
- 發送object格式的訊息時, 接收端使用string接收
- 發送message格式時,用byte[]接收
這樣我們每個處理訊息的方法都需要多載,因為不知道具體應該用什么格式接收訊息,
這時可以使用 MessageConverter 來處理訊息,
上邊方法改成:
/**
* 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
* 通過配接器的方式處理訊息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//設定監聽佇列
container.setQueues(queue001(), queue002(), queue003());
//設定消費者數量
container.setConcurrentConsumers(1);
//設定最大消費者數量
container.setMaxConcurrentConsumers(1);
//是否重回佇列
container.setDefaultRequeueRejected(false);
//簽收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的標簽策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// //1,配接器方式1
// MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
// //修改執行的方法名, 默認方法名為 handleMessage
// messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
// //指定一個轉換器, 將message的位元組陣列轉換為字串,轉換器可以不加(用方法多載解決)
// messageListenerAdapter.setMessageConverter(new TextMessageConverter());
// container.setMessageListener(messageListenerAdapter);
//2, 配接器方式2, 不同的佇列可以使用不同的方法處理資料
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("wx.topic.queue001", "method1");
queueOrTagToMethodName.put("wx.topic.queue002", "method2");
//指定一個轉換器, 將message的位元組陣列轉換為字串,轉換器可以不加(用方法多載解決)
messageListenerAdapter.setMessageConverter(new TextMessageConverter());
messageListenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(messageListenerAdapter);
return container;
}
增加一個 TextMessageConverter 轉換器類:
自定義轉換器:實作MessageConverter 介面, 并且需要重寫下面兩個方法,
- toMessage:java物件轉換為Message
- fromMessage:Message物件轉換為java物件,
package com.example.convert;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
/**
* @author wx
* @date 2021/2/15
*/
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(o.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
// String contentType = message.getMessageProperties().getContentType();
// if(null != contentType && contentType.contains("text")){
// return new String(message.getBody());
// }
Object body = message.getBody();
if(body == null) return body;
/**
* rabbitTemplate.convertAndSend 發送訊息時
* 發送object格式的訊息時, 接收端使用string接收
* 發送message格式時,用byte[]接收
* 所以在接收到訊息時轉換一下格式, (判斷是否是byte[], 也可以使用其他的屬性欄位判斷, 如上通過contentType判斷)
**/
return body instanceof byte[] ? (new String((byte[]) body)) : body;
}
}
JSON格式轉換器 Jackson2JsonMessageConverter
修改 messageListenerContainer 代碼如下:
/**
* 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
* 通過配接器的方式處理訊息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//設定監聽佇列
container.setQueues(queue001(), queue002(), queue003(), queueImage(), queuePdf());
//設定消費者數量
container.setConcurrentConsumers(1);
//設定最大消費者數量
container.setMaxConcurrentConsumers(1);
//是否重回佇列
container.setDefaultRequeueRejected(false);
//簽收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的標簽策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//1.1 配接器方式, json格式轉換器 Jackson2JsonMessageConverter
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//修改執行的方法名, 默認方法名為 handleMessage
messageListenerAdapter.setDefaultListenerMethod("consumeMapMessage");
//JSON格式轉換器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(messageListenerAdapter);
return container;
}
在 MessageDelegate 類中增加方法:
public void consumeMapMessage(Map messageBody){
System.err.println("consumeMapMessage map : " + messageBody);
}
增加junit測驗方法:
@Test
public void testSendJsonMessage() throws Exception{
//物體類
User user = new User("001", "張三", "130000000000X", "宇宙中心");
String json = new ObjectMapper().writeValueAsString(user);
System.err.println("user json : " + json);
MessageProperties messageProperties = new MessageProperties();
//必須修改ContentType 為application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("wx.topic001", "rabbit.info", message);
}
User 物體類代碼:
public class User {
private String id;
private String name;
private String IDCard;
private String address;
public User() {}
public User(String id, String name, String IDCard, String address) {
this.id = id;
this.name = name;
this.IDCard = IDCard;
this.address = address;
}
public String toString(){
return "id = " + id + ", name = " + name + ", IDCard = " + IDCard + ", address = " + address;
}
//getter() and setter()
}
java物件映射 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
修改 messageListenerContainer 代碼如下:
/**
* 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
* 通過配接器的方式處理訊息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//設定監聽佇列
container.setQueues(queue001(), queue002(), queue003(), queueImage(), queuePdf());
//設定消費者數量
container.setConcurrentConsumers(1);
//設定最大消費者數量
container.setMaxConcurrentConsumers(1);
//是否重回佇列
container.setDefaultRequeueRejected(false);
//簽收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的標簽策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//1.2 配接器方式, json格式 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter java物件轉換
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//修改執行的方法名, 默認方法名為 handleMessage
messageListenerAdapter.setDefaultListenerMethod("consumeMappingMessage");
//JSON格式轉換器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
//java物件轉換
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//springBoot2.0新特性, RabbitMQ信任package
javaTypeMapper.setTrustedPackages("*");
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(messageListenerAdapter);
return container;
}
在 MessageDelegate 類中增加消費者方法(接收User):
public void consumeMappingMessage(User user){
System.err.println("user info : " + user.toString());
}
增加測驗方法
@Test
public void testSendUser() throws Exception{
User user = new User("001", "張三", "130000000000X", "宇宙中心");
String json = new ObjectMapper().writeValueAsString(user);
MessageProperties messageProperties = new MessageProperties();
//修改ContentType 為application/json
messageProperties.setContentType("application/json");
//指定決議的物體類的型別
messageProperties.getHeaders().put("__TypeId__", "com.example.entity.User");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("wx.topic001", "rabbit.info", message);
}
支持java物件多映射轉換 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
修改 messageListenerContainer 代碼如下:
/**
* 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
* 通過配接器的方式處理訊息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//設定監聽佇列
container.setQueues(queue001(), queue002(), queue003(), queueImage(), queuePdf());
//設定消費者數量
container.setConcurrentConsumers(1);
//設定最大消費者數量
container.setMaxConcurrentConsumers(1);
//是否重回佇列
container.setDefaultRequeueRejected(false);
//簽收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的標簽策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//支持java物件多映射轉換 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//修改執行的方法名, 默認方法名為 handleMessage
messageListenerAdapter.setDefaultListenerMethod("consumeMappingMessage");
//JSON格式轉換器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
//java物件轉換
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//springBoot2.0新特性, RabbitMQ信任package
javaTypeMapper.setTrustedPackages("*");
//支持java物件多映射轉換
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("user", User.class);
idClassMapping.put("school", School.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
//設定轉換器
messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
//設定配接器
container.setMessageListener(messageListenerAdapter);
return container;
}
}
全域轉換器 ContentTypeDelegatingMessageConverter
修改 messageListenerContainer 代碼如下:
/**
* 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
* 通過配接器的方式處理訊息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//設定監聽佇列
container.setQueues(queue001(), queue002(), queue003(), queueFile());
//設定消費者數量
container.setConcurrentConsumers(1);
//設定最大消費者數量
container.setMaxConcurrentConsumers(1);
//是否重回佇列
container.setDefaultRequeueRejected(false);
//簽收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的標簽策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//ext 全域轉換器
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//修改執行的方法名, 默認方法名為 handleMessage
messageListenerAdapter.setDefaultListenerMethod("consumeMappingMessage");
//全域轉換器: 可以支持放入很多的小的Converter
ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
//文本轉換器
TextMessageConverter textConvert = new TextMessageConverter();
converter.addDelegate("text", textConvert);
converter.addDelegate("html/text", textConvert);
converter.addDelegate("xml/text", textConvert);
converter.addDelegate("text/plain", textConvert);
//json轉換器
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
converter.addDelegate("json", jsonConvert);
converter.addDelegate("application/json", jsonConvert);
//檔案流轉換器, 也可以根據不同的檔案型別(根據contentType判斷), 用不用的轉換器去做處理
FileMessageConverter fileMessageConverter = new FileMessageConverter();
converter.addDelegate("file", fileMessageConverter);
//設定轉換器
messageListenerAdapter.setMessageConverter(converter);
//設定配接器
container.setMessageListener(messageListenerAdapter);
return container;
}
增加檔案轉化器類 FileMessageConverter :
package com.example.convert;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
/**
* @author wx
* @date 2021/2/15
*/
public class FileMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------File MessageConverter----------");
//獲取檔案擴展名
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "/Users/wx/Desktop/test/" + fileName + "." + extName;
File file = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), file.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return file;
}
}
增加消費者方法:
public void consumeMappingMessage(File file){
System.out.println("file : " + file.getPath());
}
增加測驗方法:
@Test
public void testSendExtConvertMessage() throws Exception{
byte[] body = Files.readAllBytes(Paths.get("/Users/wx/Desktop/", "aaa.png"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("file");
messageProperties.getHeaders().put("extName", "png");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "file_queue", message);
}
新增宣告 file_queue:
@Bean
public Queue queueFile(){
return new Queue("file_queue", true);
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/260036.html
標籤:java
上一篇:為什么工廠模式可以解耦?(一)
下一篇:設計模式-模板方法模式
