不了解RabbitMQ的訊息確認與訊息回傳可以參考下面兩篇博客:
- RabbitMQ:訊息確認機制
- RabbitMQ:Return訊息機制
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.2</version>
</parent>
<packaging>jar</packaging>
<groupId>com.kaven</groupId>
<artifactId>springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot</name>
<description>springboot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties:
spring.rabbitmq.host=192.168.1.9
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
spring.rabbitmq.exchange=""
spring.rabbitmq.routingKey=kaven
spring.rabbitmq.queue=kaven
RabbitMQProperties類(RabbitMQ的引數類):
package com.kaven.springboot.rabbitmq;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Setter
@Getter
public class RabbitMQProperties {
private String host;
private int port;
private String username;
private String password;
private String exchange;
private String queue;
private String routingKey;
private String virtualHost;
}
RabbitMQConfirmCallbackAndReturnsCallback類(用于訊息確認與訊息回傳的回呼):
package com.kaven.springboot.rabbitmq;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQConfirmCallbackAndReturnsCallback implements RabbitTemplate.ConfirmCallback,
RabbitTemplate.ReturnsCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.printf("%s 訊息成功發送到RabbitMQ\n", correlationData.getId());
ReturnedMessage returned = correlationData.getReturned();
if(returned != null) {
returnedMessage(returned);
}
} else {
System.out.printf("%s 訊息發送到RabbitMQ失敗, %s\n", correlationData.getId(), cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.printf("%s %s %s %s %d\n",
returned.getExchange(),
returned.getRoutingKey(),
returned.getMessage(),
returned.getReplyText(),
returned.getReplyCode()
);
}
}
RabbitMQConfig類(定義RabbitMQ組件的配置類):
package com.kaven.springboot.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import javax.annotation.Resource;
@Configuration
public class RabbitMQConfig {
@Resource
private RabbitMQProperties properties;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(properties.getHost(), properties.getPort());
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setVirtualHost(properties.getVirtualHost());
// 連接工廠開啟訊息確認和訊息回傳機制
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必須是prototype型別
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
RabbitMQConfirmCallbackAndReturnsCallback confirmCallbackAndReturnsCallback) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// RabbitTemplate設定訊息確認和訊息回傳的回呼實體
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallbackAndReturnsCallback);
rabbitTemplate.setReturnsCallback(confirmCallbackAndReturnsCallback);
return rabbitTemplate;
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(properties.getExchange());
}
@Bean
public Queue queue() {
//佇列持久
return new Queue(properties.getQueue(), true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(properties.getRoutingKey());
}
}
Producer類(用于發布訊息):
package com.kaven.springboot.rabbitmq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Component
public class Producer {
private final RabbitTemplate rabbitTemplate;
@Resource
private RabbitMQProperties properties;
@Autowired
public Producer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMsg(String msg, boolean returned) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000");
Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
// 如果returned為true
// 發送的訊息不會路由到佇列
// 因為路由鍵properties.getRoutingKey() + "-returned"沒有系結的佇列
// 因此會觸發訊息回傳
if(returned) {
rabbitTemplate.send(properties.getExchange(),
properties.getRoutingKey() + "-returned",
message, correlationId);
}
else {
rabbitTemplate.send(properties.getExchange(), properties.getRoutingKey(), message, correlationId);
}
}
}
Consumer類(用于消費訊息):
package com.kaven.springboot.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(queues = {"${spring.rabbitmq.queue}"})
public void process(String msg) {
System.out.println("接收訊息: " + msg);
}
}
ProducerController類(用于發布訊息的介面):
package com.kaven.springboot.rabbitmq;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class ProducerController {
@Resource
private Producer producer;
@GetMapping("/send")
public String send(String msg, boolean returned) {
producer.sendMsg(msg, returned);
return "發送訊息成功";
}
}
啟動類:
package com.kaven.springboot;
import com.kaven.springboot.rabbitmq.RabbitMQProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
@SpringBootApplication
@ConfigurationPropertiesScan(basePackageClasses = {RabbitMQProperties.class})
public class SpringbootApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringbootApplication.class);
application.run(args);
}
}
啟動應用,Spring Boot會與RabbitMQ建立連接,


請求介面,

控制臺輸出:
// 訊息確認
a9f3afef-72c5-406a-8e75-e9af46a89381 訊息成功發送到RabbitMQ
接收訊息: "我不會觸發訊息回傳,但可以觸發訊息確認"
訊息成功發送到RabbitMQ觸發了訊息確認,并且可路由,因此不會觸發訊息回傳,

再次請求介面,

控制臺輸出:
// 訊息回傳
"" kaven-returned (Body:'[B@6d598368(byte[53])' MessageProperties [headers={spring_returned_message_correlation=34c045fb-7706-4aca-b775-7befa6c9be01}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, expiration=10000, priority=0, deliveryTag=0]) NO_ROUTE 312
// 訊息確認
34c045fb-7706-4aca-b775-7befa6c9be01 訊息成功發送到RabbitMQ
"" kaven-returned (Body:'[B@6d598368(byte[53])' MessageProperties [headers={spring_listener_return_correlation=2facdfdd-f4ec-4625-ae0b-6f913798eb29, spring_returned_message_correlation=34c045fb-7706-4aca-b775-7befa6c9be01}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, expiration=10000, priority=0, redelivered=false, receivedExchange="", receivedRoutingKey=kaven-returned, deliveryTag=0]) NO_ROUTE 312
訊息成功發送到RabbitMQ觸發了訊息確認,

但不可路由,因此也會觸發訊息回傳,

不觸發訊息確定的情況比較難測驗(關掉RabbitMQ服務,Spring Boot會不斷嘗試重連,請求介面會回應500),這里就不測驗了,應該沒問題,RabbitMQ整合Spring Boot實作訊息確認與訊息回傳就介紹到這里,如果博主有說錯的地方或者大家有不同的見解,歡迎大家評論補充,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/425051.html
標籤:其他
