主頁 > 後端開發 > RabbitMQ 跟 springBoot 整合,相關組件

RabbitMQ 跟 springBoot 整合,相關組件

2021-02-16 14:22:25 後端開發

文章目錄

      • 環境準備:
      • RabbitAdmin:
      • SpringAMQP 宣告:
      • RabbitTemplate: 訊息模板
      • SimpleMessageListenerContainer : 訊息監聽容器
      • MessageListenerAdapter 訊息監聽配接器:
      • MessageConverter 訊息轉換器,序列化,反序列化:
        • TextMessageConverter 文本訊息轉換器
        • JSON格式轉換器 Jackson2JsonMessageConverter
        • java物件映射 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
        • 支持java物件多映射轉換 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
        • 全域轉換器 ContentTypeDelegatingMessageConverter

RabbitMQ和SpringBoot整合使用的組件串列:

RabbitAdmin:
RabbitAdmin 可以用來宣告exchange,queue,binding,發送訊息等操作

SpringAMQP 宣告
通過@Bean注解方式宣告交換機和佇列,

RabbitTemplate 訊息模板
我們在與springAMQP整合的時候進行發送訊息的關鍵類

SimpleMessageListenerContainer
訊息監聽容器

MessageListenerAdapter
訊息監聽配接器

MessageConverter
轉換器,序列化,反序列化

環境準備:

  1. 新建一個SpringBoot專案
  2. pom檔案中添加依賴
<!-- junit測驗 -->
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-test</artifactId>
     <scope>test</scope>
 </dependency>

<!-- mq依賴 -->
 <dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>5.4.3</version>
 </dependency>

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
  1. 創建配置類, 用來RabbitMQ加載配置,
package com.example.demo;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

/**
 * @author wx
 * @date 2021-01-16
 */
@Configuration
@ComponentScan("com.example.demo.*")
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }
    
}

RabbitAdmin:

rabbitAdmin類可以很好的操作RabbitMQ, 在Spring中直接注入即可,

// 在RabbitMQConfig類中加入如下代碼
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    //AutoStartup 必須設定為true, 否則spring容器不會加載
    rabbitAdmin.setAutoStartup(true);
    return rabbitAdmin;
}

RabbitAdmin 底層實作就是從Sping容器中獲取Exchange、Bingding、RoutingKey、以及Queue的@Bean宣告,
然后使用RabbitTemplate的execute方法執行對應的宣告,修改,洗掉等一系列RabbitMQ基礎功能操作,
添加測驗類:RabbitAdminTest

package com.example.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitAdminTest {

    @Test
    public void contextLoads() {
        
    }

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void testAdmin() throws Exception{
        //宣告交換機
        rabbitAdmin.declareExchange(new DirectExchange("wx.direct", false, false));
        rabbitAdmin.declareExchange(new TopicExchange("wx.topic", false, false));
        rabbitAdmin.declareExchange(new FanoutExchange("wx.fanout", false, false));

        //宣告佇列
        rabbitAdmin.declareQueue(new Queue("wx.direct.queue", false));
        rabbitAdmin.declareQueue(new Queue("wx.topic.queue", false));
        rabbitAdmin.declareQueue(new Queue("wx.fanout.queue", false));

        //bingding 交換機和佇列, new HashMap<>()為其他arguments
        rabbitAdmin.declareBinding(new Binding("wx.direct.queue",
            Binding.DestinationType.QUEUE,
            "wx.direct",    //exchange
            "direct",      //routingKey
            new HashMap<>()));

        //bingding 交換機和佇列的另一種方式
        rabbitAdmin.declareBinding(
            BindingBuilder
            .bind(new Queue("wx.topic.queue", false))       //直接創建佇列
            .to(new TopicExchange("wx.topic", false, false))  //建立關聯關系(沒有該交換機會報錯)
            .with("wx.#"));         //routingKey

        //bingding fanout型別交換機不需要 routingKey
        rabbitAdmin.declareBinding(
            BindingBuilder
            .bind(new Queue("wx.fanout.queue", false))
            .to(new FanoutExchange("wx.fanout", false, false)));

        //清空佇列
        rabbitAdmin.purgeQueue("wx.topic.queue", false);
    }
}

在RabbitMQ控制臺 http://localhost:15672/ 可以看到對應的exchange和queue,以及系結關系,
在這里插入圖片描述

SpringAMQP 宣告:

在RabbitMQ基礎API里面宣告一個Exchange, 宣告一個系結,一個佇列,
通過注解去宣告,跟用RabbitMQ寫法類似,
新建一個 RabbitMQBinding 配置類, 類上加注解 @Configuration

package com.example.demo;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

/**
 * @author wx
 * @date 2021-01-16
 */
@Configuration
public class RabbitMQBinding {

    /**
     * 針對消費者配置
     * 1. 設定交換機型別
     * 2. 將佇列系結到交換機
     FanoutExchange: 將訊息分發到所有系結佇列,無routingkey的概念
     HeaderExchange: 通過添加屬性key-value匹配
     DirectExchange: 按照routingkey分發到指定佇列
     TopicExchange: 多關鍵字匹配
     */
    @Bean
    public TopicExchange topic001(){
        return new TopicExchange("wx.topic001", false, false);
    }

    /**
     * 宣告佇列
     */
    @Bean
    public Queue queue001(){
        return new Queue("wx.topic.queue001", true);
    }

    @Bean
    public Queue queue002(){
        return new Queue("wx.topic.queue002", true);
    }

    @Bean
    public Queue queue003(){
        return new Queue("wx.topic.queue003", true);
    }

    /**
     * 建立系結關系
     */
    @Bean
    public Binding binding001(){
        return BindingBuilder.bind(queue001()).to(topic001()).with("rabbit.*");
    }

    @Bean
    public Binding binding002(){
        return BindingBuilder.bind(queue002()).to(topic001()).with("mq.*");
    }
}

在RabbitMQ控制臺 http://localhost:15672/ 可以看到對應的exchange和queue,以及系結關系,
在這里插入圖片描述

RabbitTemplate: 訊息模板

我們在與springAMQP整合的時候進行發送訊息的關鍵類
該類豐富了發送訊息方法,包括可靠性投遞訊息方法,回呼監聽訊息介面 ConfirmCallback, 回傳值確認介面ReturnCallback等等,同樣我們需要進行注入到Spring容器中,然后直接使用,
RabbitMQBinding 類中增加如下代碼

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    return rabbitTemplate;
}

增加 RabbitTemplateTest 類:

package com.example.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTemplateTest {

    @Test
    public void contextLoads() {
        
    }

    /**
     * rabbitTemplate.convertAndSend 發送訊息
     * 發送object格式的訊息時, 接收端使用string接收, 發送message格式時,用byte[]接收
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage1() throws Exception{
        //直接發送一個object
//        rabbitTemplate.convertAndSend("wx.topic001", "mq.info", "hello mq");
        rabbitTemplate.convertAndSend("wx.topic001", "rabbit.info", "hello rabbit");
        rabbitTemplate.convertAndSend("wx.topic001", "mq.info", "hello mq");
    }

    @Test
    public void testSendMessage2() throws Exception{
        //通過message發送訊息
        //訊息屬性, 可以使用已經存在的屬性比如setContentType, 也可以自定義屬性
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
         //訊息內容
        Message message = new Message("hello rabbitMQ - text/plain".getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend("wx.topic001", "rabbit.info", message);
    }

    @Test
    public void testSendMessage3() throws Exception{
        //發送message, 并在發送時修改和增加內容
        //訊息屬性, 可以使用已經存在的屬性比如setContentType, 也可以自定義屬性
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc", "自定義資訊描述");
        messageProperties.getHeaders().put("type", "自定義訊息型別");
        //訊息內容
        Message message = new Message("hello rabbitMQ".getBytes(StandardCharsets.UTF_8), messageProperties);
        //通過 MessagePostProcessor 增加額外的屬性
        rabbitTemplate.convertAndSend("wx.topic001", "rabbit.info", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.out.println("添加額外的設定");
                message.getMessageProperties().getHeaders().put("desc", "額外修改的資訊描述");
                message.getMessageProperties().getHeaders().put("attr", "額外增加的屬性");
                return message;
            }
        });
    }
}

在RabbitMQ控制臺 http://localhost:15672/ 可以看到對應的queue下的訊息,
在這里插入圖片描述

SimpleMessageListenerContainer : 訊息監聽容器

這個類非常強大,我們可以對他進行很多設定,對于消費者的配置項,這個類都可以滿足

  • 監聽佇列(多個佇列),自動啟動,自動宣告功能
  • 設定事務特性,事務管理器,事務屬性,事務容量(并發),是否開啟事務,回滾訊息
  • 設定消費者數量,最小最大數量,批量消費
  • 設定訊息確認和自動確認模式,是否重回佇列,例外捕獲handler函式,
  • 設定消費者標簽生成策略,是否獨占模式,消費者屬性等
  • 設定具體的監聽器,訊息轉換器等等,

simpleMessageListenerContainer可以進行動態設定,比如在運行程序中的應用可以動態的修改其消費者數量的大小,接收訊息的模式等,
很多基于RabbitMQ的自定制的一些后端管控臺在進行動態設定的時候,也是根據這一特性去實作的,所以可以看出SpringAMQP非常的強大,
RabbitMQBinding 類中增加如下代碼:

/**
 * 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
 */
@Bean
public SimpleMessageListenerContainer messageListenerContainer1(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    //設定監聽佇列
    container.setQueues(queue001(), queue002(), queue003());
    //設定消費者數量
    container.setConcurrentConsumers(1);
    //設定最大消費者數量
    container.setMaxConcurrentConsumers(2);
    //是否重回佇列
    container.setDefaultRequeueRejected(false);
    //簽收策略
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    //消費端的標簽策略, 用于區分不同的消費者
    container.setConsumerTagStrategy(new ConsumerTagStrategy() {
        @Override
        public String createConsumerTag(String queue) {
            return queue + "_" + UUID.randomUUID().toString();
        }
    });
    container.setMessageListener(new ChannelAwareMessageListener() {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            //處理訊息
            String msg = new String(message.getBody());
            System.out.println("----- consumer : " + msg);
        }
    });
    return container;
}

– 消費者增加標簽后的效果
在這里插入圖片描述

MessageListenerAdapter 訊息監聽配接器:

也可以通過配接器的方式處理訊息,上邊代碼改成如下:

/**
     * 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
     * 通過配接器的方式處理訊息, 處理訊息的模式方法為是 handleMessage, 可以進行修改
     */
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //設定監聽佇列
        container.setQueues(queue001(), queue002(), queue003());
        //設定消費者數量
        container.setConcurrentConsumers(1);
        //設定最大消費者數量
        container.setMaxConcurrentConsumers(1);
        //是否重回佇列
        container.setDefaultRequeueRejected(false);
        //簽收策略
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的標簽策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

        //配接器方式
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        //修改執行的方法名,  默認方法名為 handleMessage
        messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
        container.setMessageListener(messageListenerAdapter);

        return container;
    }

增加一個 MessageDelegate 類:

    package com.example.adapter;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author wx
 * @date 2021/2/15
 */
@Component
public class MessageDelegate {

    @Autowired
    private MessageDelegate messageDelegate;

    public void handleMessage(byte[] messageBody){
        System.err.println("handleMessage : " + new String(messageBody));
    }

    public void handleMessage(String messageBody){
        System.err.println("handleMessage : " + messageBody);
    }

    public void consumeMessage(String messageBody){
        System.err.println("consumeMessage string : " + messageBody);
    }

    public void consumeMessage(byte[] messageBody){
        System.err.println("consumeMessage byte[] : ");
        consumeMessage(new String(messageBody));
    }

    public void method1(String messageBody){
        System.err.println("method1 : " + messageBody);
    }

    public void method2(String messageBody){
        System.err.println("method2 : " + messageBody);
    }
}

MessageConverter 訊息轉換器,序列化,反序列化:

TextMessageConverter 文本訊息轉換器

rabbitTemplate.convertAndSend 發送訊息時,有一個問題

  • 發送object格式的訊息時, 接收端使用string接收
  • 發送message格式時,用byte[]接收
    這樣我們每個處理訊息的方法都需要多載,因為不知道具體應該用什么格式接收訊息,
    這時可以使用 MessageConverter 來處理訊息,
    上邊方法改成:
/**
     * 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
     * 通過配接器的方式處理訊息
     */
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //設定監聽佇列
        container.setQueues(queue001(), queue002(), queue003());
        //設定消費者數量
        container.setConcurrentConsumers(1);
        //設定最大消費者數量
        container.setMaxConcurrentConsumers(1);
        //是否重回佇列
        container.setDefaultRequeueRejected(false);
        //簽收策略
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的標簽策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

//        //1,配接器方式1
//        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//        //修改執行的方法名,  默認方法名為 handleMessage
//        messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
//        //指定一個轉換器, 將message的位元組陣列轉換為字串,轉換器可以不加(用方法多載解決)
//        messageListenerAdapter.setMessageConverter(new TextMessageConverter());
//        container.setMessageListener(messageListenerAdapter);

        //2, 配接器方式2, 不同的佇列可以使用不同的方法處理資料
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        Map<String, String> queueOrTagToMethodName = new HashMap<>();
        queueOrTagToMethodName.put("wx.topic.queue001", "method1");
        queueOrTagToMethodName.put("wx.topic.queue002", "method2");
        //指定一個轉換器, 將message的位元組陣列轉換為字串,轉換器可以不加(用方法多載解決)
        messageListenerAdapter.setMessageConverter(new TextMessageConverter());
        messageListenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
        container.setMessageListener(messageListenerAdapter);
        return container;
    }

增加一個 TextMessageConverter 轉換器類:
自定義轉換器:實作MessageConverter 介面, 并且需要重寫下面兩個方法,

  • toMessage:java物件轉換為Message
  • fromMessage:Message物件轉換為java物件,
    package com.example.convert;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

/**
 * @author wx
 * @date 2021/2/15
 */
public class TextMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(o.toString().getBytes(), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
//        String contentType = message.getMessageProperties().getContentType();
//        if(null != contentType && contentType.contains("text")){
//            return new String(message.getBody());
//        }
        Object body = message.getBody();
        if(body == null) return body;
        /**
         * rabbitTemplate.convertAndSend 發送訊息時
         *      發送object格式的訊息時, 接收端使用string接收
         *      發送message格式時,用byte[]接收
         * 所以在接收到訊息時轉換一下格式, (判斷是否是byte[], 也可以使用其他的屬性欄位判斷, 如上通過contentType判斷)
         **/
        return body instanceof byte[] ? (new String((byte[]) body)) : body;
    }
}

JSON格式轉換器 Jackson2JsonMessageConverter

修改 messageListenerContainer 代碼如下:

/**
     * 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
     * 通過配接器的方式處理訊息
     */
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //設定監聽佇列
        container.setQueues(queue001(), queue002(), queue003(), queueImage(), queuePdf());
        //設定消費者數量
        container.setConcurrentConsumers(1);
        //設定最大消費者數量
        container.setMaxConcurrentConsumers(1);
        //是否重回佇列
        container.setDefaultRequeueRejected(false);
        //簽收策略
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的標簽策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

        //1.1 配接器方式, json格式轉換器 Jackson2JsonMessageConverter
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        //修改執行的方法名,  默認方法名為 handleMessage
        messageListenerAdapter.setDefaultListenerMethod("consumeMapMessage");
        //JSON格式轉換器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(messageListenerAdapter);
        
        return container;
    }

MessageDelegate 類中增加方法:

public void consumeMapMessage(Map messageBody){
	System.err.println("consumeMapMessage map : " + messageBody);
}

增加junit測驗方法:

@Test
public void testSendJsonMessage() throws Exception{
    //物體類
    User user = new User("001", "張三", "130000000000X", "宇宙中心");
    String json = new ObjectMapper().writeValueAsString(user);
    System.err.println("user json : " + json);

    MessageProperties messageProperties = new MessageProperties();
    //必須修改ContentType 為application/json
    messageProperties.setContentType("application/json");
    Message message = new Message(json.getBytes(), messageProperties);
    rabbitTemplate.send("wx.topic001", "rabbit.info", message);
}

User 物體類代碼:

public class User {

	private String id;
	private String name;
	private String IDCard;
	private String address;

	public User() {}

	public User(String id, String name, String IDCard, String address) {
		this.id = id;
		this.name = name;
		this.IDCard = IDCard;
		this.address = address;
	}

	public String toString(){
		return "id = " + id + ", name = " + name + ", IDCard = " + IDCard + ", address = " + address;
	}

	//getter() and  setter()

	
}

java物件映射 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter

修改 messageListenerContainer 代碼如下:

/**
     * 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
     * 通過配接器的方式處理訊息
     */
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //設定監聽佇列
        container.setQueues(queue001(), queue002(), queue003(), queueImage(), queuePdf());
        //設定消費者數量
        container.setConcurrentConsumers(1);
        //設定最大消費者數量
        container.setMaxConcurrentConsumers(1);
        //是否重回佇列
        container.setDefaultRequeueRejected(false);
        //簽收策略
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的標簽策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

        //1.2 配接器方式, json格式 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter    java物件轉換
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        //修改執行的方法名,  默認方法名為 handleMessage
        messageListenerAdapter.setDefaultListenerMethod("consumeMappingMessage");
        //JSON格式轉換器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        //java物件轉換
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        //springBoot2.0新特性, RabbitMQ信任package
        javaTypeMapper.setTrustedPackages("*");
        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(messageListenerAdapter);
        return container;
    }

MessageDelegate 類中增加消費者方法(接收User):

public void consumeMappingMessage(User user){
    System.err.println("user info : " + user.toString());
}

增加測驗方法

@Test
public void testSendUser() throws Exception{
    User user = new User("001", "張三", "130000000000X", "宇宙中心");
    String json = new ObjectMapper().writeValueAsString(user);

    MessageProperties messageProperties = new MessageProperties();
    //修改ContentType 為application/json
    messageProperties.setContentType("application/json");
    //指定決議的物體類的型別
    messageProperties.getHeaders().put("__TypeId__", "com.example.entity.User");
    Message message = new Message(json.getBytes(), messageProperties);
    rabbitTemplate.send("wx.topic001", "rabbit.info", message);
}

支持java物件多映射轉換 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter

修改 messageListenerContainer 代碼如下:

    /**
     * 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
     * 通過配接器的方式處理訊息
     */
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //設定監聽佇列
        container.setQueues(queue001(), queue002(), queue003(), queueImage(), queuePdf());
        //設定消費者數量
        container.setConcurrentConsumers(1);
        //設定最大消費者數量
        container.setMaxConcurrentConsumers(1);
        //是否重回佇列
        container.setDefaultRequeueRejected(false);
        //簽收策略
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的標簽策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

        //支持java物件多映射轉換 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter    
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        //修改執行的方法名,  默認方法名為 handleMessage
        messageListenerAdapter.setDefaultListenerMethod("consumeMappingMessage");
        //JSON格式轉換器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        //java物件轉換
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        //springBoot2.0新特性, RabbitMQ信任package
        javaTypeMapper.setTrustedPackages("*");
        //支持java物件多映射轉換
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("user", User.class);
        idClassMapping.put("school", School.class);
        javaTypeMapper.setIdClassMapping(idClassMapping);
        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        //設定轉換器
        messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
        //設定配接器
        container.setMessageListener(messageListenerAdapter);

        return container;
    }

}

全域轉換器 ContentTypeDelegatingMessageConverter

修改 messageListenerContainer 代碼如下:

/**
 * 宣告一個 SimpleMessageListenerContainer 用于監聽queue, 并處理訊息
 * 通過配接器的方式處理訊息
 */
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    //設定監聽佇列
    container.setQueues(queue001(), queue002(), queue003(), queueFile());
    //設定消費者數量
    container.setConcurrentConsumers(1);
    //設定最大消費者數量
    container.setMaxConcurrentConsumers(1);
    //是否重回佇列
    container.setDefaultRequeueRejected(false);
    //簽收策略
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    //消費端的標簽策略
    container.setConsumerTagStrategy(new ConsumerTagStrategy() {
        @Override
        public String createConsumerTag(String queue) {
            return queue + "_" + UUID.randomUUID().toString();
        }
    });

    //ext 全域轉換器
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
    //修改執行的方法名,  默認方法名為 handleMessage
    messageListenerAdapter.setDefaultListenerMethod("consumeMappingMessage");
    //全域轉換器: 可以支持放入很多的小的Converter
    ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
    //文本轉換器
    TextMessageConverter textConvert = new TextMessageConverter();
    converter.addDelegate("text", textConvert);
    converter.addDelegate("html/text", textConvert);
    converter.addDelegate("xml/text", textConvert);
    converter.addDelegate("text/plain", textConvert);
    //json轉換器
    Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
    converter.addDelegate("json", jsonConvert);
    converter.addDelegate("application/json", jsonConvert);
    //檔案流轉換器, 也可以根據不同的檔案型別(根據contentType判斷), 用不用的轉換器去做處理
    FileMessageConverter fileMessageConverter = new FileMessageConverter();
    converter.addDelegate("file", fileMessageConverter);
    //設定轉換器
    messageListenerAdapter.setMessageConverter(converter);
    //設定配接器
    container.setMessageListener(messageListenerAdapter);

    return container;
}

增加檔案轉化器類 FileMessageConverter

package com.example.convert;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;


/**
 * @author wx
 * @date 2021/2/15
 */
public class FileMessageConverter implements MessageConverter {
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("-----------File MessageConverter----------");
        //獲取檔案擴展名
        Object _extName = message.getMessageProperties().getHeaders().get("extName");
        String extName = _extName == null ? "png" : _extName.toString();

        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String path = "/Users/wx/Desktop/test/" + fileName + "." + extName;
        File file = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), file.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return file;
    }
}

增加消費者方法:

 public void consumeMappingMessage(File file){
     System.out.println("file : " + file.getPath());
 }

增加測驗方法:

@Test
public void testSendExtConvertMessage() throws Exception{
    byte[] body = Files.readAllBytes(Paths.get("/Users/wx/Desktop/", "aaa.png"));
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("file");
    messageProperties.getHeaders().put("extName", "png");
    Message message = new Message(body, messageProperties);
    rabbitTemplate.send("", "file_queue", message);
}

新增宣告 file_queue:

@Bean
public Queue queueFile(){
    return new Queue("file_queue", true);
}

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/260036.html

標籤:java

上一篇:為什么工廠模式可以解耦?(一)

下一篇:設計模式-模板方法模式

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more