pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.2</version>
</parent>
<packaging>jar</packaging>
<groupId>com.kaven</groupId>
<artifactId>springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot</name>
<description>springboot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties:
spring.rabbitmq.host=192.168.1.9
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
spring.rabbitmq.exchange=""
spring.rabbitmq.routingKey=kaven
spring.rabbitmq.queue=kaven
RabbitMQProperties類(RabbitMQ的引數類):
package com.kaven.springboot.rabbitmq;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Setter
@Getter
public class RabbitMQProperties {
private String host;
private int port;
private String username;
private String password;
private String exchange;
private String queue;
private String routingKey;
private String virtualHost;
}
User類(訊息負載的物體類):
package com.kaven.springboot.rabbitmq;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Setter
@Getter
@ToString
@AllArgsConstructor
public class User {
private String username;
private String password;
private String code;
}
Json2UserMessageConverter類(訊息轉換器,將json資料轉換成User物件,json資料由訊息體的byte[]生成):
package com.kaven.springboot.rabbitmq;
import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class Json2UserMessageConverter implements MessageConverter {
private static final Gson GSON = new Gson();
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(GSON.toJson(object).getBytes(StandardCharsets.UTF_8));
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return GSON.fromJson(new String(message.getBody(), StandardCharsets.UTF_8), User.class);
}
}
CustomizeMessageListener類(自定義訊息監聽器):
package com.kaven.springboot.rabbitmq;
import org.springframework.stereotype.Component;
@Component
public class CustomizeMessageListener {
public void customizeHandleMessage(User user) {
System.out.printf("處理用戶資料: %s\n", user);
}
}
RabbitMQConfig類(定義RabbitMQ組件的配置類):
package com.kaven.springboot.rabbitmq;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import javax.annotation.Resource;
@Configuration
public class RabbitMQConfig {
@Resource
private RabbitMQProperties properties;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(properties.getHost(), properties.getPort());
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setVirtualHost(properties.getVirtualHost());
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必須是prototype型別
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,
CustomizeMessageListener delegate,
Json2UserMessageConverter messageConverter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
// 設定連接工廠
container.setConnectionFactory(connectionFactory);
// 指定要創建的并發消費者數量
// 默認值為1
container.setConcurrentConsumers(3);
// 設定消費者數量的上限
// 默認為concurrentConsumers
// 消費者將按需添加
// 不能小于concurrentConsumers
container.setMaxConcurrentConsumers(5);
// 設定要從中接收訊息的佇列名稱
// 引數為String... queueName
container.setQueueNames(properties.getQueue());
// 控制容器在訊息確認方面的行為
// 自動確認
// 如果手動確認
// 可以使用ChannelAwareMessageListener訊息監聽器
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 告訴代理在單個請求中向每個消費者發送多少條訊息
// 通常可以將其設定得相當高以提高吞吐量
container.setPrefetchCount(3);
// 創建配接器
MessageListenerAdapter adapter = new MessageListenerAdapter();
// 設定委托物件
adapter.setDelegate(delegate);
// 設定默認的監聽方法名稱
adapter.setDefaultListenerMethod("customizeHandleMessage");
// 設定訊息轉換器
adapter.setMessageConverter(messageConverter);
// 設定MessageListener(訊息監聽器)
container.setMessageListener(adapter);
return container;
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(properties.getExchange());
}
@Bean
public Queue queue() {
//佇列持久
return new Queue(properties.getQueue(), true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(properties.getRoutingKey());
}
}
Producer類(用于發布訊息):
package com.kaven.springboot.rabbitmq;
import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Component
public class Producer {
private final RabbitTemplate rabbitTemplate;
private static final Gson GSON = new Gson();
@Resource
private RabbitMQProperties properties;
@Autowired
public Producer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMsg(User user) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000");
Message message = new Message(GSON.toJson(user).getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.send(properties.getExchange(), properties.getRoutingKey(), message, correlationId);
}
}
ProducerController類(用于發布訊息的介面):
package com.kaven.springboot.rabbitmq;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class ProducerController {
@Resource
private Producer producer;
@GetMapping("/send")
public String send(User user) {
System.out.println(user);
producer.sendMsg(user);
return "資料發送成功";
}
}
啟動類:
package com.kaven.springboot;
import com.kaven.springboot.rabbitmq.RabbitMQProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
@SpringBootApplication
@ConfigurationPropertiesScan(basePackageClasses = {RabbitMQProperties.class})
public class SpringbootApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringbootApplication.class);
application.run(args);
}
}
啟動應用,使用Postman請求介面,

控制臺輸出:
User(username=kaven, password=itkaven, code=908767)
處理用戶資料: User(username=kaven, password=itkaven, code=908767)
輸出符合預期,訊息轉換器的作用就是當SimpleMessageListenerContainer并發消費時,將獲取的訊息轉換成想要的資料型別(將訊息體中的byte[]轉換成json,再將json轉換成User物件,這里不討論Json2UserMessageConverter類名是否合適),比如這里的User物件,之后訊息監聽器接收的就是進行了轉換的資料型別,即直接處理User物件,單一職責原則,因此生產者發布的資料也應該從User物件轉換成json,最后轉換成byte[]型別,MessageConverter訊息轉換器就介紹到這里,如果博主有說錯的地方或者大家有不同的見解,歡迎大家評論補充,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/425049.html
標籤:其他
