
基本知識
為什么使用訊息中間件
解耦

在有多個系統組成的應用中,常常出現A系統會影響B、C、D資料的情況,通常做法是在A中呼叫其他系統的介面,這樣系統之間的依賴太大,如果后續添加新的系統,就需要在A中添加相應的邏輯,這樣做耦合程度太大,不利于維護,

加入MQ后,A系統中不用添加其他系統的呼叫,只需要發送訊息,其他系統監聽訊息,在自己系統中處理,新增或者洗掉也不需要改動A系統的代碼,只需要在自己中取消該型別的訊息監聽就行,
異步
很多時候涉及多服務之間呼叫的情況,客戶端發起請求,A中回去呼叫B、C、D的介面,最后再將執行結果回傳到客戶端,這樣一個流程中A介面的執行時間,收到其他服務的影響,是他們執行時間的總和,如果A不關心B、C、D他們的執行情況,就可以使用MQ,A發送訊息后直接回傳,從而提升介面的回應時間,
削峰

當系統面臨大量請求時,會對資料庫造成很大壓力,引入MQ后,你可以根據資料庫的實際處理能力,每次從MQ中拿一定數量的資料處理,處理完從中取,

生產者/消費者
RabbitMQ 的安裝
普通安裝
直接去官網下載安裝包,
https://www.rabbitmq.com/

docker安裝
// 拉去鏡像
docker pull rabbitmq
// 啟動容器
docker run -it --name rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 \
-d rabbitmq
// 進入容器開啟管理界面
docker exec -it rabbitmq sh
//開啟管理界面
rabbitmq-plugins enable rabbitmq_management
通過訪問http://localhost:15672/ 即可看到管理界面

在Springboot集成RabbitMQ
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置佇列和交換機
@Configuration
public class RabbitmqConfig {
@Bean
public Queue msgQueue(){
/**
* name: 佇列名稱
* durable 是否持久化
* exclusive 是否是排他佇列 只有創建者可以使用
* autoDelete 宣告此佇列為臨時佇列,最后一個消費者使用完自動洗掉
*/
return new Queue("MSG_MQ", true, false, false);
}
@Bean
public DirectExchange msgExchange(){
return new DirectExchange("MSG_ECHANGE", true, false);
}
@Bean
public Binding mailBinding(){
return BindingBuilder
.bind(mailQueue())
.to(msgExchange())
.with("MSG_ROUTING");
}
}
生產者發送訊息
@RequestMapping("/v1/demo")
@RestController
public class DemoController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sed_queue")
public void sendMsg(){
rabbitTemplate.convertAndSend("MSG_ECHANGE", "MSG_ROUTING", "你好慘:"+ System.currentTimeMillis());
}
}
消費者接受訊息
使用 @RabbitListener 去監聽訊息佇列,佇列中有訊息了就去消費
@Component
public class RabbitListner {
@RabbitListener(queuesToDeclare = @Queue("MSG_MQ"))
public void handleMsg(String msg){
System.out.println("msg-"+ msg);
}
}
queues 和queuesToDeclare 不同點 :使用queuesToDeclare時,服務啟動時回去MQ中檢測監聽的佇列是否存在,沒有這個佇列會就會去創建
RabbitMQ的組成
- Broker:訊息佇列服務行程,此行程包括兩個部分:Exchange和Queue,
- Exchange:訊息佇列交換機,按一定的規則將訊息路由轉發到某個佇列,
- Queue:訊息佇列,存盤訊息的佇列,
- Producer:訊息生產者,生產方客戶端將訊息同交換機路由發送到佇列中,
- Consumer:訊息消費者,消費佇列中存盤的訊息,
四種交換機
DirectExchange:直連交換機,需要系結一個佇列,同時需要指定routeKey值,類似于點對點發送,上面demo中使用的就是DirectExchangeFanoutExchange: 將有需要的佇列與此交換機系結后,一個發送到交換機上的訊息會被轉發到與這個交換機相連的所有佇列上,這種模式類似于發布訂閱,
@Bean
public Queue faQueue1(){
/**
* name: 佇列名稱
* durable 是否持久化
* exclusive 是否是排他佇列 只有創建者可以使用
* autoDelete 宣告此佇列為臨時佇列,最后一個消費者使用完自動洗掉
*/
return new Queue("fa.queue1", true, false, false);
}
@Bean
public Queue faQueue2(){
/**
* name: 佇列名稱
* durable 是否持久化
* exclusive 是否是排他佇列 只有創建者可以使用
* autoDelete 宣告此佇列為臨時佇列,最后一個消費者使用完自動洗掉
*/
return new Queue("fa.queue2", true, false, false);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout.exchange", true, false);
}
@Bean
public Binding bindingFanoutExchange(){
return BindingBuilder
.bind(faQueue1())
.to(fanoutExchange());
}
@Bean
public Binding bindingFanoutExchange1(){
return BindingBuilder
.bind(faQueue2())
.to(fanoutExchange());
}
@RabbitListener(queuesToDeclare = @Queue("fa.queue1"))
public void faQueue1(String msg){
System.out.println("faQueue1-"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("fa.queue2"))
public void faQueue2(String msg){
System.out.println("faQueue2-"+msg);
}
@GetMapping("/sed_fanout")
public void sendFanoutMsg(){
rabbitTemplate.convertAndSend("fanout.exchange", null, "fanoutExchange:"+ System.currentTimeMillis());
}
TopicExchange: 主題交換機又可以叫通配符交換機,這種交換機通過通配符去匹配,然后路由到對應的佇列,通配符#和*分別代表匹配多個和一個,
@Bean
public Queue topicQueue1(){
/**
* name: 佇列名稱
* durable 是否持久化
* exclusive 是否是排他佇列 只有創建者可以使用
* autoDelete 宣告此佇列為臨時佇列,最后一個消費者使用完自動洗掉
*/
return new Queue("topic.queue1", true, false, false);
}
@Bean
public Queue topicQueue2(){
/**
* name: 佇列名稱
* durable 是否持久化
* exclusive 是否是排他佇列 只有創建者可以使用
* autoDelete 宣告此佇列為臨時佇列,最后一個消費者使用完自動洗掉
*/
return new Queue("topic.queue2", true, false, false);
}
@Bean
public TopicExchange topicExchange1(){
return new TopicExchange("topic.exchange1", true, false);
}
@Bean
public Binding topicBinding1(){
return BindingBuilder
.bind(topicQueue1())
.to(topicExchange1())
.with("top.*");
}
@Bean
public Binding topicBinding2(){
return BindingBuilder
.bind(topicQueue2())
.to(topicExchange1())
.with("top.#");
}
@RabbitListener(queuesToDeclare = @Queue("topic.queue1"))
public void topicQueue1(String msg){
System.out.println("topicQueue1-"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("topic.queue2"))
public void topicQueue2(String msg){
System.out.println("topicQueue2-"+msg);
}
@GetMapping("/sed_topic")
public void sendFanoutMsg(String key){
rabbitTemplate.convertAndSend("topic.exchange1", key, "TopicExchange:"+ System.currentTimeMillis());
}
HeadersExchange: 這種交換機用的相對沒這么多,它跟上面三種有點區別,它的路由不是用routingKey進行路由匹配,而是在匹配請求頭中所帶的鍵值進行路由,這種交換機用的不多,
@Bean
public Queue headQueue(){
/**
* name: 佇列名稱
* durable 是否持久化
* exclusive 是否是排他佇列 只有創建者可以使用
* autoDelete 宣告此佇列為臨時佇列,最后一個消費者使用完自動洗掉
*/
return new Queue("head.queue1", true, false, false);
}
@Bean
public Queue headQueue1(){
/**
* name: 佇列名稱
* durable 是否持久化
* exclusive 是否是排他佇列 只有創建者可以使用
* autoDelete 宣告此佇列為臨時佇列,最后一個消費者使用完自動洗掉
*/
return new Queue("head.queue2", true, false, false);
}
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange("head.exchange", true, false);
}
@Bean
public Binding headBinding(){
Map<String, Object> headers = new HashMap<>();
headers.put("abk", "asd");
return BindingBuilder
.bind(headQueue())
.to(headersExchange())
.whereAll(headers)
.match();
}
@Bean
public Binding headBinding1(){
Map<String, Object> headers = new HashMap<>();
headers.put("abk", "ack");
return BindingBuilder
.bind(headQueue1())
.to(headersExchange())
.whereAll(headers)
.match();
}
@RabbitListener(queuesToDeclare = @Queue("head.queue1"))
public void headQueue1(String msg){
System.out.println("headQueue1-"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("head.queue2"))
public void headQueue2(String msg){
System.out.println("headQueue2-"+msg);
}
@GetMapping("/sed_head_msg")
public void sendHeaderMsg1(@RequestParam String msg,
@RequestBody Map<String, Object> map){
MessageProperties messageProperties = new MessageProperties();
//訊息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
//添加訊息
messageProperties.getHeaders().putAll(map);
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("head.exchange", null, message);
}
用postman呼叫請求

后臺列印出

說明匹配到了head.queue2
同理,設定head的值


訊息可靠性

圖中顯示的是一條訊息傳遞的整個程序,我們大致可以分析出那些環節會導致訊息不可靠或者說訊息丟失,
- 生產者發送訊息到MQ程序中,MQ掛了,這時會出現訊息丟失,
- 生產者發送訊息到MQ但是沒有持久化佇列,消費者還沒消費,MQ掛了,訊息會丟失,
- 消費者消費了訊息,但是出現報錯或者程式掛了,這時訊息也丟失了,
針對于以上的是三種情況,Rabbit為我們提供了對應的解決方案:持久化、confirm機制、ACK事務機制,
訊息持久化
配置Exchange持久化和Queue持久化,
在創建Queue 和Exchange時設定 durable 為true

你也可以使用默認值,默認為true

交換機同樣如此

訊息確認機制

在生產者發送訊息到MQ這段程序中,MQ掛了,導致訊息丟失,Rabbit提供confirm和returnMessage方法去處理訊息丟失,
springboot 添加配置
## 新版中使用 publisher-confirm-type 有三個引數
# none(禁用)
# correlated(觸發confirm回呼)
# simple(具有correlated的功能 同時可以使rabbitTemplate呼叫waitForConfirms或waitForConfirmsOrDie)
# 舊版中 publisher-confirms 默認 false
spring.rabbitmq.publisher-confirm-type=simple
# 訊息沒有匹配到佇列 觸發returnMessage 回呼
spring.rabbitmq.publisher-returns= true
# publisher-returns 和 mandatory 同時使用時優先使用 mandatory
spring.rabbitmq.template.mandatory= true
實作 RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback
@Component
public class RabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String error) {
if(ack){
System.out.println("訊息發送成功");
} else {
System.out.println("訊息發送失敗");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("replyCode:").append(replyCode).append(",")
.append("replyText:").append(replyText).append(",")
.append("exchange:").append(exchange).append(",")
.append("routingKey:").append(routingKey).append(",");
}
}

沒有匹配到路由觸發returnMessage

找到交換機觸發confirm
沒有找到交換機和佇列

ACK 事務機制
訊息確認機制解決了訊息發送MQ這個程序中的問題,ACK則是解決消費者處理程序中訊息丟失的問題,

消費者接受訊息,在處理程序中出現失敗手動拒簽,重新放回佇列等待再次消費,消費成功后手動簽收,
配置手動模式
### 開啟手動模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 最小消費者數量
spring.rabbitmq.listener.simple.concurrency=1
## 最大消費者數量
spring.rabbitmq.listener.simple.max-concurrency=1
改造消費者
@RabbitListener(queuesToDeclare = @Queue("MSG_MQ"))
public void handleMsg(String msg, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if("success".equals(msg)){
channel.basicAck(deliveryTag, false);
} else if("reply".equals(msg)) {
// basicReject 和 basicNack的區別 basicReject不支持批量 basicNack不支持
// channel.basicReject(deliveryTag, true);
channel.basicNack(deliveryTag, false, true);
} else {
channel.basicNack(deliveryTag, false, false);
}
}
basicAck : 成功確認訊息
- deliveryTag: 訊息的index
- mutiple: 是否批量確認, 為true時,一次ack所有小于deliveryTag的訊息

basicReject: 失敗拒絕
- deliveryTag: 訊息的index
- requeue: 是否重新放入佇列

basicNack: 失敗拒絕 - deliveryTag: 訊息的index
- mutiple:批量拒絕,一次性拒絕所有小于deliveryTag的訊息
- requeue: 是否重新放入佇列

ack帶來的問題
- nack死回圈
reply 的訊息重新放入佇列后,程式還是處理不了,會出現死回圈,不斷地消費,放入佇列,知道問題解決,
我的想法是用資料庫去保存訊息資訊,然后通過定時任務再去處理或者通過界面反饋通知 - double ack
開啟自動ACK的時候又在代碼中手動處理,導致一個訊息觸發兩次ack,有一次ack會失敗, - 性能消耗
手動ack模式會比自動模式慢是10倍左右,很多時候使用默認的就行了, - 手動ack,不及時回復會導致佇列例外
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/390597.html
標籤:其他
上一篇:02HDFS分布式文系統
