郵件功能是一個新的專案,

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>com.xxxx</groupId>
<artifactId>yeb-server</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
先看resources檔案夾,

application.yml
server:
port: 8082
spring:
#這個是在QQ郵箱申請開通的
mail:
host: smtp.qq.com
protocol: smtp
default-encoding: utf-8
password: tbvfyzswjqeqdgbj
username: 3084693478@qq.com
port: 465
rabbitmq:
username: guest
password: guest
host: localhost
port: 5672
listener:
simple:
acknowledge-mode: manual #開啟手動確認
redis:
#超時時間
timeout: 10000ms
#服務器地址
host: 192.168.10.100
#服務器埠
port: 6379
#資料庫
database: 0
#密碼
password: root
lettuce:
pool:
#最大連接數,默認8
max-active: 1024
#最大連接阻塞等待時間,默認-1
max-wait: 10000ms
#最大空閑連接
max-idle: 200
#最小空閑連接
min-idle: 5
SMTP自行了解,
mail.html
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.theymeleaf.org">
<head>
<meta charset="UTF-8">
<title>入職歡迎郵件</title>
</head>
<body>
歡迎<span th:text="${name}"></span>加入XXXX大家庭,您的入職資訊如下:
<table border="1">
<tr>
<td>姓名</td>
<td th:text="${name}"></td>
</tr>
<tr>
<td>職位</td>
<td th:text="${posName}"></td>
</tr>
<tr>
<td>職稱</td>
<td th:text="${joblevelName}"></td>
</tr>
<tr>
<td>部門</td>
<td th:text="${departmentName}"></td>
</tr>
</table>
<p>
合作愉快!期望與公司攜手共進!
</p>
</body>
</html>
再看具體實作,

郵件功能是一個獨立的服務,因此必須有啟動類(MailApplication),
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class MailApplication {
public static void main(String[] args) {
SpringApplication.run(MailApplication.class, args);
}
@Bean
public Queue queue(){
return new Queue(MailConstants.MAIL_QUEUE_NAME);
}
}
配置exclude = {DataSourceAutoConfiguration.class},禁止 SpringBoot 自動注入資料源配置,因為Mail服務引入了yeb-server的依賴,而yeb-server里連接了資料庫,但是Mail服務里并沒有配置資料庫,因此把Mail服務的資料源配置禁用掉(默認是開啟的,所以如果SpringBoot去自動注入資料源配置時,找不到資料庫依賴或配置,會報錯),
把一個RabbitMQ的佇列放到了Spring容器中,這個queue在之后的代碼中好像并沒有出現,猜測是用于Spring框架內部呼叫了,
MailConstants類定義在yeb-server中,
public class MailConstants {
//訊息投遞中
public static final Integer DELIVERING = 0;
//訊息投遞成功
public static final Integer SUCCESS = 1;
//訊息投遞失敗
public static final Integer FAILURE = 2;
//最大重試次數
public static final Integer MAX_TRY_COUNT = 3;
//訊息超時時間
public static final Integer MSG_TIMEOUT = 1;
//佇列
public static final String MAIL_QUEUE_NAME = "mail.queue";
//交換機
public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
//路由鍵
public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
}
MailReceiver類監聽了RabbitMQ中的一個佇列(之前配置過的那個),之后再介紹,
先看RabbitMQConfig配置類,在yeb-server專案下,
@Configuration
public class RabbitMQConfig {
private static final Logger LOGGER= LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private IMailLogService mailLogService;
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate=new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setConfirmCallback((data,ack,cause)->{
String msgId = data.getId();
if (ack){
LOGGER.info("{}=======>訊息發送成功", msgId);
mailLogService.update(new UpdateWrapper<MailLog>().set("status", 1).eq("msgId", msgId));
}else {
LOGGER.error("{}=======>訊息發送失敗", msgId);
}
});
rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingKey)->{
LOGGER.error("{}=======>訊息發送到queue時失敗", msg.getBody());
});
return rabbitTemplate;
}
@Bean
public Queue queue(){
return new Queue(MailConstants.MAIL_QUEUE_NAME);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
}
}
該類配置了RabbitMQ交換機和佇列(這個佇列和Mail服務里配置的佇列是一致的),配置了RabbitTemplate,setConfirmCallback和setReturnCallback,設定了回呼函式,如果訊息發送成功了,就更新資料庫t_mail_log的id為msgId的資料的狀態欄位,
配置完后就可以開始寫業務邏輯代碼了,
本專案的郵件發送功能是在添加員工的同時,給RabbitMQ發送訊息,然后Mail服務監聽到訊息后,給新員工發送郵件,
MQ的生產端可靠性投遞,架構如下:

訊息落庫,對訊息狀態進行打標,
具體流程為:發送訊息時,將當前的訊息資料存入資料庫,投遞狀態設定為訊息投遞中;開啟訊息確認回呼機制,確認成功時,更新投遞狀態為訊息投遞成功;開啟定時任務,重新投遞失敗的訊息,重試超過3次,更新投遞狀態為投遞失敗,
發送訊息是在添加新員工時,因此代碼在EmployeeService的addEmp方法中,如下:
@Override
public RespBean addEmp(Employee employee) {
//處理合同期限,注意合同期限前端沒有傳,需要后端來計算
LocalDate beginContract = employee.getBeginContract();
LocalDate endContract = employee.getEndContract();
long days = beginContract.until(endContract, ChronoUnit.DAYS);
DecimalFormat decimalFormat=new DecimalFormat("##.00");
employee.setContractTerm(Double.parseDouble(decimalFormat.format(days/365.00)));
if (employeeMapper.insert(employee)==1){
Employee emp = employeeMapper.getEmployee(employee.getId()).get(0);
//資料庫記錄發送的訊息
String msgId = UUID.randomUUID().toString();
MailLog mailLog=new MailLog();
mailLog.setMsgId(msgId);
mailLog.setEid(employee.getId());
mailLog.setStatus(0);
mailLog.setRouteKey(MailConstants.MAIL_ROUTING_KEY_NAME);
mailLog.setExchange(MailConstants.MAIL_EXCHANGE_NAME);
mailLog.setCount(0);
mailLog.setTryTime(LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT));
mailLog.setCreateTime(LocalDateTime.now());
mailLog.setUpdateTime(LocalDateTime.now());
mailLogMapper.insert(mailLog);
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME, MailConstants.MAIL_ROUTING_KEY_NAME, emp, new CorrelationData(msgId));
return RespBean.success("添加成功");
}
return RespBean.error("添加失敗");
}
流程是資料庫t_employee成功添加員工后,再在t_mail_log表中添加訊息資料,并同時向RabbitMQ的指定佇列發送訊息,
訊息回呼已經在RabbitMQConfig中配置過了,是由RabbitTemplate指定的,
定時任務(MailTask類)掃描資料庫表t_mail_log,如下:
@Component
public class MailTask {
@Autowired
private IMailLogService mailLogService;
@Autowired
private IEmployeeService employeeService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(cron = "0/10 * * * * ?")
public void mailTask(){
List<MailLog> list = mailLogService.list(new QueryWrapper<MailLog>().eq("status",0).lt("tryTime", LocalDateTime.now()));
list.forEach(mailLog -> {
//如果重試次數超過3次,更新狀態為投遞失敗,不再重試
if (mailLog.getCount() >= 3){
mailLogService.update(new UpdateWrapper<MailLog>().set("status",2).eq("msgId",mailLog.getMsgId()));
}
mailLogService.update(new UpdateWrapper<MailLog>().set("count",mailLog.getCount()+1).set("updateTime",LocalDateTime.now()).set("tryTime",LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT)).eq("msgId", mailLog.getMsgId()));
//重新發送訊息
Employee emp = employeeService.getEmployee(mailLog.getEid()).get(0);
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME, MailConstants.MAIL_ROUTING_KEY_NAME, emp, new CorrelationData(mailLog.getMsgId()));
});
}
}
@Scheduled(cron = “0/10 * * * * ?”)表示每隔10秒觸發一次定時任務,
定時任務流程為:取出狀態(status欄位)為0(表示還未發送出去或發送不成功)和tryTime小于(lt表示less than 小于)當前時間(表示這條訊息可以進行新一輪重試發送了)的訊息資料,遍歷每條訊息資料;如果重試次數超過3次,更新狀態為投遞失敗,不再重試;如果重試次數小于3次,更新此條訊息資料的count、updateTime和tryTime,并重新發送(發送的是emp和msgId),
發送訊息后,Mail服務監聽訊息佇列(消費端,需要解決訊息冪等性問題),如下:
@Component
public class MailReceiver {
private static final Logger LOGGER= LoggerFactory.getLogger(MailReceiver.class);
@Autowired
private JavaMailSender javaMailSender;
@Autowired
private MailProperties mailProperties;
@Autowired
private TemplateEngine templateEngine;
@Autowired
private RedisTemplate redisTemplate;
@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
public void handler(Message message, Channel channel){
Employee employee = (Employee) message.getPayload();
MessageHeaders headers = message.getHeaders();
//訊息序號
long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
String msgId = (String) headers.get("spring_returned_message_correlation");
HashOperations hashOperations = redisTemplate.opsForHash();
try {
if (hashOperations.entries("mail_log").containsKey(msgId)){
LOGGER.error("訊息已經被消費了=========>{}", msgId);
channel.basicAck(tag, false);
return;
}
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper=new MimeMessageHelper(msg);
helper.setFrom(mailProperties.getUsername());
helper.setTo(employee.getEmail());
helper.setSubject("入職歡迎郵件");
helper.setSentDate(new Date());
Context context=new Context();
context.setVariable("name", employee.getName());
context.setVariable("posName", employee.getPosition().getName());
context.setVariable("joblevelName", employee.getJoblevel().getName());
context.setVariable("departmentName", employee.getDepartment().getName());
String mail = templateEngine.process("mail", context);
helper.setText(mail, true);
javaMailSender.send(msg);
LOGGER.info("郵件發送成功");
//將訊息Id存入Redis
hashOperations.put("mail_log", msgId, "OK");
//手動確認訊息
channel.basicAck(tag, false);
} catch (Exception e) {
try {
channel.basicNack(tag, false, true);
} catch (IOException ex) {
LOGGER.error("郵件發送失敗=======>{}", e.getMessage());
}
LOGGER.error("郵件發送失敗=======>{}", e.getMessage());
}
}
}
監聽到的訊息message含有訊息體和訊息頭,msgId是由UUID.randomUUID().toString()得到,能保證msgId的唯一性,
流程為:從message中取出employee物件、msgId和tag(訊息序號,用來手動確認訊息);查找Redis中是否已經存在msgId,存在就說明這條訊息已經接收過了,列印日志并提交確認,不存在就發送郵件,并把msgId存入Redis,實作訊息的冪等性,

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/281223.html
標籤:其他
下一篇:Java記憶體模型(一)
