👏👏👏
哈嘍!大家好,我是【學無止境小奇】,一位熱愛分享各種技術的博主!😍😍😍
?【學無止境小奇】的創作宗旨:每一條命令都親自執行過,每一行代碼都實際運行過,每一種方法都真實實踐過,每一篇文章都良心制作過,???
?【學無止境小奇】的博客中所有涉及命令、代碼的地方,除了提供圖片供大家參考,另外會在圖片下方提供一份純文本格式的命令或者代碼方便大家粘貼復制直接執行命令或者運行代碼,🤝🤝🤝
?如果你對技術有著濃厚的興趣,歡迎關注【學無止境小奇】,歡迎大家和我一起交流,😘😘😘
??????感謝各位朋友接下來的閱讀??????
文章目錄
- 一、JAVA使用RabbitMQ解決生產端訊息投遞可靠性,消費端冪等性問題
- 1、生產端訊息投遞可靠性
- 1.1、訊息落庫
- 1.2、定時任務
- 2、消費端冪等性問題
- 2.1、redis解決
- 3、總結
一、JAVA使用RabbitMQ解決生產端訊息投遞可靠性,消費端冪等性問題
1、生產端訊息投遞可靠性
1.1、訊息落庫
思路:
1.將訊息落庫:
我們發送一個訊息沒辦法知道我們發的訊息消費端是否接收到,假如消費端沒有接收到那么我們需要觸發補償機制來重新發送一個訊息,這個時候我們為了解決這個問題就需要將訊息落庫,每次將準備發送的訊息存入到資料庫中,并設定一個狀態為待發送,
等消費端接收到訊息并給我們反饋后,我們將資料庫中的訊息狀態改為已完成,
訊息庫

發送訊息之前先將訊息落庫

如果訊息發送成功則將資料庫狀態改為發送完成,如果沒有成功則將重試次數+1,我們一般重試3次還是失敗就會將狀態改為發送失敗,

package com.xiaoqi.server.config;/**
* @ProjectName: yeb
* @Package: com.xiaoqi.server.config
* @ClassName: RabbitMQConfig
* @Author: LiShiQi
* @Description: ${description}
* @Date: 2022/2/24 16:16
* @Version: 1.0
*/
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.xiaoqi.server.pojo.MailConstants;
import com.xiaoqi.server.pojo.MailLog;
import com.xiaoqi.server.service.IMailLogService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Description
* @Author LiShiQi
* @Date 2022/2/24 16:16
* @Version 1.0
*/
@Configuration
public class RabbitMQConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private IMailLogService mailLogService;
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
/**
* 訊息確認回呼,確認訊息是否到達broker
* data:訊息唯一標識
* ack:確認結果
* cause:失敗原因
*/
rabbitTemplate.setConfirmCallback((data,ack,cause) ->{
String msgId = data.getId();
if(ack){
LOGGER.info("{}=============>訊息發送成功",msgId);
mailLogService.update(new UpdateWrapper<MailLog>().set("status",1).eq("msgId",msgId));
}else{
LOGGER.error("{}=============>訊息發送失敗",msgId);
}
});
/**
* 訊息失敗回呼,比如router不到queue時回呼
* msg:訊息主題
* repCode:回應碼
* repText:相應描述
* exchange;交換機
* routingkey:路由鍵
*/
rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey) ->{
LOGGER.error("{}=============>訊息發送queue時失敗",msg.getBody());
});
return rabbitTemplate;
}
@Bean
public Queue queue(){
return new Queue(MailConstants.MAIL_QUEUE_NAME);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
}
}
1.2、定時任務
前面我們訊息已經落庫了,這個時候我們就弄一個定時任務去掃描我們的訊息表中,把狀態為待發送的訊息任務重新發送一次,如果還失敗則重試次數欄位+1,等重試次數到達3次,不再重試,

package com.xiaoqi.server.task;/**
* @ProjectName: yeb
* @Package: com.xiaoqi.server.task
* @ClassName: MailTask
* @Author: LiShiQi
* @Description: ${description}
* @Date: 2022/2/24 18:28
* @Version: 1.0
*/
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.xiaoqi.server.pojo.Employee;
import com.xiaoqi.server.pojo.MailConstants;
import com.xiaoqi.server.pojo.MailLog;
import com.xiaoqi.server.service.IEmployeeService;
import com.xiaoqi.server.service.IMailLogService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
/**
* @Description
* @Author LiShiQi
* @Date 2022/2/24 18:28
* @Version 1.0
*/
@Component
public class MailTask {
@Autowired
private IMailLogService mailLogService;
@Autowired
private IEmployeeService employeeService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(cron = "0/10 * * * * ?")
public void mailTask(){
List<MailLog> list = mailLogService.list(new QueryWrapper<MailLog>()
.eq("status", 0)
.lt("tryTime", LocalDateTime.now()));
list.forEach(mailLog -> {
//如果重試次數超過3次,更新狀態為投遞失敗,不再重試
if(3 < mailLog.getCount()) {
mailLogService.update(new UpdateWrapper<MailLog>()
.set("status", 2)
.eq("msgId", mailLog.getMsgId()));
}
mailLogService.update(new UpdateWrapper<MailLog>()
.set("count",mailLog.getCount()+1)
.set("updateTime", LocalDateTime.now())
.set("tryTime", LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT))
.eq("msgId",mailLog.getMsgId()));
Employee emp = employeeService.getEmployee(mailLog.getEid()).get(0);
//發送訊息
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME,
emp,new CorrelationData(mailLog.getMsgId()));
});
}
}
2、消費端冪等性問題
首先冪等性問題就是一次和多次結果是一樣的,也就是說有可能一個訊息因為某些原因(例如在定時任務掃描資料庫的時候掃描到狀態為待發送,但是這個時候其實已經正在發送,這個時候定時任務又發送了一次)生產端可能給消費端發送了兩次訊息,這個時候我們消費端只需要消費一次就可以了,因為如果是電商業務,不可能下一筆訂單扣兩筆錢吧,所以這里我們用redis來實作,
2.1、redis解決
大概思路: 我們消費每一個訊息的時候將這個訊息的訊息id放入redis中,如果接收到的訊息id在redis中,證明我們已經消費過了就不在進行消費了,

package com.xiaoqi.mail;/**
* @ProjectName: yeb
* @Package: com.xiaoqi.mail
* @ClassName: MailReceiver
* @Author: LiShiQi
* @Description: ${description}
* @Date: 2022/2/24 12:43
* @Version: 1.0
*/
import com.rabbitmq.client.Channel;
import com.xiaoqi.server.pojo.Employee;
import com.xiaoqi.server.pojo.MailConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.mail.MailProperties;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import org.thymeleaf.TemplateEngine;
import org.thymeleaf.context.Context;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.util.Date;
/**
* @Description
* @Author LiShiQi
* @Date 2022/2/24 12:43
* @Version 1.0
*/
@Component
public class MailReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
//郵件發送
@Autowired
private JavaMailSender javaMailSender;
//郵件配置
@Autowired
private MailProperties mailProperties;
//引擎
@Autowired
private TemplateEngine templateEngine;
@Autowired
private RedisTemplate redisTemplate;
//監聽
@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
public void handler(Message message, Channel channel){
Employee employee = (Employee)message.getPayload();
MessageHeaders headers = message.getHeaders();
//訊息序號
long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
String msgId = (String) headers.get("spring_returned_message_correlation");
HashOperations hashOperations = redisTemplate.opsForHash();
try {
if(hashOperations.entries("mail_log").containsKey(msgId)){
LOGGER.error("訊息已經被消費===============>{}",msgId);
/**
* 手動確認訊息
* tag:訊息序號
* multiple:是否確認多條
*/
channel.basicAck(tag,false);
return;
}
//創建訊息
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg);
//發件人
helper.setFrom(mailProperties.getUsername());
//收件人
helper.setTo(employee.getEmail());
//主題
helper.setSubject("入職歡迎郵件");
//發送日期
helper.setSentDate(new Date());
//郵件內容
Context context = new Context();
context.setVariable("name",employee.getName());
context.setVariable("posName",employee.getPosition().getName());
context.setVariable("joblevelName",employee.getJoblevel().getName());
context.setVariable("departmentName",employee.getDepartment().getName());
String mail = templateEngine.process("mail", context);
//引數為true就是html
helper.setText(mail,true);
//發送郵件
javaMailSender.send(msg);
LOGGER.info("郵件發送成功");
//將訊息id存入redis
hashOperations.put("mail_log",msgId,"OK");
//手動確認訊息
channel.basicAck(tag,false);
} catch (Exception e) {
/**
* 手動確認訊息
* tag:訊息序號
* multiple:是否確認多條
* requeue:是否退回到佇列
*/
try {
channel.basicNack(tag,false,true);
} catch (IOException e1) {
LOGGER.error("郵件發送失敗=========>{}",e.getMessage());
}
LOGGER.error("郵件發送失敗=========>{}",e.getMessage());
}
}
}
3、總結
以上就是解決MQ訊息佇列的可靠性問題,因為在引入訊息佇列解決某些問題的同時我們隨之而來了一些其他問題,這個時候我們就要考慮怎么解決這些其他問題,以上的解決方案只是眾多方案中的其中一種,還有其他方案也可以解決這些問題,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/433347.html
標籤:其他
