文章目錄
- 1、發布確認高級
- 1.1、發布確認SpringBoot版本
- 1.1.1、確認機制方案
- 1.1.2、代碼架構圖
- 1.1.3、組態檔
- 1.1.4、配置類
- 1.1.5、回呼介面
- 1.1.6、生產者
- 1.1.7、消費者
- 1.1.8、測驗結果
- 1.2、回退訊息
- 1.2.1、Mandatory引數
- 1.2.2、組態檔
- 1.2.3、生產者代碼
- 1.2.4、回呼介面代碼
- 1.2.5、測驗結果
- 1.3、備份交換機
- 1.3.1、代碼架構圖
- 1.3.2、配置類代碼
- 1.3.3、消費者代碼
- 1.3.4、測驗結果
1、發布確認高級
1. 存在的問題
再生產環境中由于一些不明原因導致rabbitmq重啟,在RabbitMQ重啟期間生產者訊息投遞失敗,會導致訊息丟失,
1.1、發布確認SpringBoot版本
1.1.1、確認機制方案

- 當訊息不能正常被接收的時候,我們需要將訊息存放在快取中,
1.1.2、代碼架構圖

1.1.3、組態檔
spring.rabbitmq.host=192.168.123.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
NONE:禁用發布確認模式,是默認值,CORRELATED:發布訊息成功到交換機會觸發回呼方方法,CORRELATED:就是發布一個就確認一個,
1.1.4、配置類
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
@Qualifier("confirmQueue") Queue confirmQueue){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
1.1.5、回呼介面
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 回呼介面
*/
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交換機接受失敗后進行回呼
* 1. 保存訊息的ID及相關訊息
* 2. 是否接收成功
* 3. 接受失敗的原因
* @param correlationData
* @param b
* @param s
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if(b == true){
log.info("交換機已經收到id為:{}的訊息",id);
}else{
log.info("交換機還未收到id為:{}訊息,由于原因:{}",id,s);
}
}
}
1.1.6、生產者
import com.xiao.springbootrabbitmq.utils.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData1 = new CorrelationData("1");
String routingKey1 = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1);
CorrelationData correlationData2 = new CorrelationData("2");
String routingKey2 = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2);
log.info("發送得內容是:{}",message);
}
}
1.1.7、消費者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMessage(Message message){
String msg = new String(message.getBody());
log.info("接收到佇列" + CONFIRM_QUEUE_NAME + "訊息:{}",msg);
}
}
1.1.8、測驗結果
1. 第一種情況

ID為1的訊息正常送達,ID為2的訊息由于RoutingKey的錯誤,導致不能正常被消費,但是交換機還是正常收到了訊息,所以此時由于交換機正常接收之后的原因丟失的訊息不能正常被接收,
2. 第二種情況

- 我們再上一種情況下修改了
ID為1的訊息的交換機的名稱,所以此時回呼函式會進行回答由于什么原因導致交換機無法接收成功訊息,
1.2、回退訊息
1.2.1、Mandatory引數
- 在僅開啟了生產者確認機制的情況下,交換機接收到訊息后,會直接給訊息生產者發送確認訊息,如果發現該訊息不可路由(就是訊息被交換機成功接收后,無法到達佇列),那么訊息會直接被丟棄,此時生產者是不知道訊息被丟棄這個事件的,
- 通過設定該引數可以在訊息傳遞程序中不可達目的地時將訊息回傳給生產者,
1.2.2、組態檔
spring.rabbitmq.publisher-returns=true
- 需要在組態檔種開啟回傳回呼
1.2.3、生產者代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData1 = new CorrelationData("1");
String routingKey1 = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1);
log.info("發送得內容是:{}",message + routingKey1);
CorrelationData correlationData2 = new CorrelationData("2");
String routingKey2 = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2);
log.info("發送得內容是:{}",message + routingKey2);
}
}
1.2.4、回呼介面代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 回呼介面
*/
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交換機接受失敗后進行回呼
* 1. 保存訊息的ID及相關訊息
* 2. 是否接收成功
* 3. 接受失敗的原因
* @param correlationData
* @param b
* @param s
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if(b == true){
log.info("交換機已經收到id為:{}的訊息",id);
}else{
log.info("交換機還未收到id為:{}訊息,由于原因:{}",id,s);
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
Message message = returnedMessage.getMessage();
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
String replyText = returnedMessage.getReplyText();
log.error("訊息{},被交換機{}退回,回退原因:{},路由Key:{}",new String(message.getBody()),exchange,replyText,routingKey);
}
}
1.2.5、測驗結果
- 其他類的代碼與上一小節案例相同

ID為2的訊息由于RoutingKey不可路由,但是還是被回呼函式處理了,
1.3、備份交換機
1.3.1、代碼架構圖

- 這里我們新增了備份交換機、備份佇列、報警佇列,它們系結關系如圖所示,如果確認交換機成功接收的訊息無法路由到相應的佇列,就會被確認交換機發送給備份交換機,
1.3.2、配置類代碼
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
public static final String BACKUP_QUEUE_NAME = "backup_queue";
public static final String WARNING_QUEUE_NAME = "warning_queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
@Qualifier("confirmQueue") Queue confirmQueue){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
@Bean
public Binding queueBindingExchange1(@Qualifier("backupExchange") FanoutExchange backupExchange,
@Qualifier("backupQueue") Queue backupQueue){
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
@Bean
public Binding queueBindingExchange2(@Qualifier("backupExchange") FanoutExchange backupExchange,
@Qualifier("warningQueue") Queue warningQueue){
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
1.3.3、消費者代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class WarningConsumer {
public static final String WARNING_QUEUE_NAME = "warning_queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveMessage(Message message){
String msg = new String(message.getBody());
log.info("報警發現不可路由的訊息內容為:{}",msg);
}
}
1.3.4、測驗結果

- mandatory引數與備份交換機可以一起使用的時候,如果兩者同時開啟,備份交換機優先級高,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/402478.html
標籤:其他
