RabbitMQ(簡介、概念、安裝和springboot整合)

一、MQ簡介
? 在計算機科學中,訊息佇列((英語:Message queue)是一種行程間通信或同一行程的不同執行緒間的通信方式,軟體的貯列用來處理一系列的輸入,通常是來自用戶,訊息佇列提供了異步的通信協議,每一個貯列中的紀錄包含詳細說明的資料,包含發生的時間,輸入設備的種類,以及特定的輸入引數,也就是說:訊息的發送者和接收者不需要同時與訊息佇列互交,訊息會保存在佇列中,直到接收者取回它,
1.1.實作
- 訊息佇列常常保存在鏈表結構中,擁有權限的行程可以向訊息佇列中寫入或讀取訊息,
- 目前,有很多訊息佇列有很多開源的實作,包括JBoss Messaging.JORAM、Apache ActiveMQ、Sun0pen Message Queue、IBM MQ、Apache Qpid和HTTPSQS,
- 當前使用較多的訊息佇列有RabbitMQ、RocketNQ、ActiveMQ、Kafka、ZeroNQ、MetaMq等,而部分資料庫如Redis、Mysql以及phxsql也可實作訊息佇列的功能,
1.2.特點
? MQ是消費者-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息,MQ和JMS類似,但不同的是JMS是SUN JAVA訊息中間件服務的一個標準和API定義,而MQ則是遵循了AMQP協議的具體實作和產品,
注意:
- AMQP,即DAdvanced MessageQueuing Protocol,一個提供統一訊息服務的應用層標準高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,基于此協議的客戶端與訊息中間件可傳遞訊息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制,
- JMS,Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關于面向訊息中間件的API,
用于在兩個應用程式之間,或分布式系統中發送訊息,進行異步通信,Java訊息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持,常見的訊息佇列,大部分都實作了JMSAPI,如
ActiveMQ ,Redis以及 RabbitMQ等,
1.3.優缺點
優點
應用耦合、異步處理、流量削鋒
-
解耦
傳統模式:

傳統模式的缺點:
系統間耦合性太強,如上圖所示,系統A在代碼中直接呼叫系統B和系統C的代碼,如果將來D系統接入,系統A還需要修改代碼,過于麻煩!
中間件模式:

中間件模式的的優點:
將訊息寫入訊息佇列,需要訊息的系統自己從訊息佇列中訂閱,從而系統A不需要做任何修改,
,異步
傳統模式:

傳統模式的缺點:
—些非必要的業務邏輯以同步的方式運行,太耗費時間,
中間件模式:

中間件模式的的優點:
將訊息寫入訊息佇列,需要訊息的系統自己從訊息佇列中訂閱,從而系統A不需要做任何修改,
-
削峰
傳統模式:

傳統模式的缺點:
并發量大的時候,所有的請求直接懟到資料庫,造成資料庫連接例外
中間件模式:

中間件模式的的優點:
系統A慢慢的按照資料庫能處理的并發量,從訊息佇列中慢慢拉取訊息,在生產中,這個短暫的高峰期積壓是允許的,
缺點
系統可用性降低、系統復雜性增加
1.4.使用場景
? 訊息佇列,是分布式系統中重要的組件,其通用的使用場景可以簡單地描述為:當不需要立即獲得結果,但是并發量又需要進行控制的時候,差不多就是需要使用訊息佇列的時候
? 在專案中,將一些無需即時回傳且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求回應時間,從而提高了系統的吞吐量,
1.5.為什么使用RabbitMQ
? AMQP,即Advanced Message Queuing Protocol,高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,訊息中間件主要用于組件之間的解耦,訊息的發送者無需知道訊息使用者的存在,反之亦然,
? AMQP的主要特征是面向訊息、佇列、路由(包括點對點和發布/訂閱)、可靠性、安全,
? RabbitMQ是一個開源的AMQP實作,服務器端用Erlangi語言撰寫,支持多種客戶端,如: Python、
Ruby、 .NET,Java,JMS、C,PHP,ActionScript, XMPP,STONP等,支持AJAX,用于在分布式系統中存盤轉發訊息,在易用性、擴展性、高可用性等方面表現不俗,
總結如下:
- 基于AMQP協議
- 高并發(是一個容量的概念,服務器可以接受的最大任務數量),
- 高性能(是一個速度的概念,單位時間內服務器可以處理的任務數)
- 高可用(是一個持久的概念,單位時間內服務器可以正常作業的時間比例),
- 強大的社區支持,以及很多公司都在使用
- 支持插件
- 支持多語言
二、概念
-
RabbitMQ簡介:RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實作,
-
Message:訊息,訊息是不具名的,它由訊息頭和訊息體組成,訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他訊息的優先權)、delivery-mode(指出該訊息可能需要持久性存盤)等,
-
Publisher:訊息的生產者,也是一個向交換器發布訊息的客戶端應用程式,
-
Exchange:交換器,用來接收生產者發送的訊息并將這些訊息路由給服務器中的佇列,Exchange有4種型別:direct(默認),fanout, topic, 和headers(headers和direct交換器完全一致,但性能差很多,目前幾乎用不到),不同型別的Exchange轉發訊息的策略有所區別,
-
Queue:訊息佇列,用來保存訊息直到發送給消費者,它是訊息的容器,也是訊息的終點,一個訊息可投入一個或多個佇列,訊息一直在佇列里面,等待消費者連接到這個佇列將其取走,
-
Binding:系結,用于訊息佇列和交換器之間的關聯,一個系結就是基于路由鍵將交換器和訊息佇列連接起來的路由規則,所以可以將交換器理解成一個由系結構成的路由表,Exchange 和Queue的系結可以是多對多的關系,
-
Connection:網路連接,比如一個TCP連接,
-
Channel:信道,多路復用連接中的一條獨立的雙向資料流通道,信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布訊息、訂閱佇列還是接收訊息,這些動作都是通過信道完成,因為對于作業系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條TCP 連接,
-
Consumer:訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式,
-
Virtual Host:虛擬主機,表示一批交換器、訊息佇列和相關物件,虛擬主機是共享相同的身份認證和加
密環境的獨立服務器域,每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的佇列、交換器、系結和權限機制,vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / ,
-
Broker:表示訊息佇列服務器物體
三、安裝
- 獲取鏡像
#指定版本,該版本包含了web控制頁面
docker pull rabbitmq:management
- 運行鏡像
#方式一:默認guest 用戶,密碼也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
#方式二:設定用戶名和密碼
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
#方式三
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
# 4369, 25672 (Erlang發現&集群埠)
# 5672, 5671 (AMQP埠) 15672 (web管理后臺埠)
# 61613, 61614 (STOMP協議埠)
# 1883, 8883 (MQTT協議埠)
# https://www.rabbitmq.com/networking.html
- 訪問ui界面
http://localhost:15672/

四、SpringBoot整合RabbitMQ
4.1. 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2. application.yml配置
spring:
rabbitmq:
host: 192.168.10.123
port: 5672
virtual-host: /
#開啟發送端確認
#publisher-confirms: true
publisher-confirm-type: correlated
# 開啟發送端訊息抵達佇列的確認
publisher-returns: true
template:
#只要抵達佇列,以異步發送優先呼叫我們這個return - confirm
mandatory: true
listener:
simple:
#手動ack訊息(手動確認訊息是否消費)
acknowledge-mode: manual
4.3. RabbitConfig配置類
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
/**
* 設定傳輸訊息格式為json
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1、服務收到訊息就會回呼
* 1、spring.rabbitmq.publisher-confirms: true
* 2、設定確認回呼
* 2、訊息正確抵達佇列就會進行回呼
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、設定確認回呼ReturnCallback
*
* 3、消費端確認(保證每個訊息都被正確消費,此時才可以broker洗掉這個訊息),即手動確認
* 消費端宕機,訊息也不會丟失(channel.basicAck() )
*
*/
// @PostConstruct //MyRabbitConfig物件創建完成以后,執行這個方法
public void initRabbitTemplate() {
/**
* 1、只要訊息抵達Broker就ack=true
* correlationData:當前訊息的唯一關聯資料(這個是訊息的唯一id)
* ack:訊息是否成功收到
* cause:失敗的原因
*/
/* rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
}
});*/
//設定確認回呼
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 只要訊息沒有投遞給指定的佇列,就觸發這個失敗回呼
* message:投遞失敗的訊息詳細資訊
* replyCode:回復的狀態碼
* replyText:回復的文本內容
* exchange:當時這個訊息發給哪個交換機
* routingKey:當時這個訊息用哪個路郵鍵
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}
4.4. RabbitMQConfig(容器中創建交換機、佇列和系結)
package com.lyh.mall.order.config;
import com.lyh.mall.order.entity.OrderEntity;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.HashMap;
/**
* MQ交換機、普通佇列、死信佇列和系結
* 容器中的Queue、Exchange、Binding 會自動創建(在RabbitMQ中不存在的情況下)
**/
@Configuration
public class MyRabbitMQConfig {
/**
* 測驗
*/
/*@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到過期的訂單訊息:準備關閉訂單"+orderEntity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}*/
/**
* 使用JSON序列化機制,進行訊息轉換
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/* 容器中的Queue、Exchange、Binding 會自動創建(在RabbitMQ中不存在的情況下) */
/**
* 死信佇列
*
* @return
*/
@Bean
public Queue orderDelayQueue() {
/*
Queue(String name, 佇列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自動洗掉
Map<String, Object> arguments) 屬性
*/
HashMap<String, Object> arguments = new HashMap<>();
//死信路由
arguments.put("x-dead-letter-exchange", "order-event-exchange");
//死信路由鍵
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000); // 訊息過期時間 1分鐘
//創建佇列
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
/**
* 普通佇列
*
* @return
*/
@Bean
public Queue orderReleaseQueue() {
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
/**
* TopicExchange
* 創建交換機
* @return
*/
@Bean
public Exchange orderEventExchange() {
/*
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
* */
return new TopicExchange("order-event-exchange", true, false);
}
/**
* 系結死信佇列
* @return
*/
@Bean
public Binding orderCreateBinding() {
/*
* String destination, 目的地(佇列名或者交換機名字)
* DestinationType destinationType, 目的地型別(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* 系結普通佇列
* @return
*/
@Bean
public Binding orderReleaseBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
/**
* 訂單釋放直接和庫存釋放進行系結
* @return
*/
@Bean
public Binding orderReleaseOtherBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
/**
* 商品秒殺佇列
* @return
*/
@Bean
public Queue orderSecKillOrrderQueue() {
Queue queue = new Queue("order.seckill.order.queue", true, false, false);
return queue;
}
@Bean
public Binding orderSecKillOrrderQueueBinding() {
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map<String, Object> arguments
Binding binding = new Binding(
"order.seckill.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.seckill.order",
null);
return binding;
}
}
4.5. 測驗代碼
- AmqpAdmin:管理組件
- RabbitTemplate:訊息發送處理組件
- @RabbitListener 監聽訊息的方法可以有三種引數(不分數量,順序)Object content, Message message,Channel channel
import com.lyh.mall.order.entity.OrderReturnReasonEntity;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
@SpringBootTest
class MallOrderApplicationTests {
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發送訊息
*/
@Test
void sendMessige(){
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("你好!");
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity);
System.out.println("發送訊息成功!!");
}
/**
* 創建交換機
*/
@Test
void createExchange() {
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
System.out.println("創建成功");
}
/**
* 創建佇列
*/
@Test
void createQueue() {
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
System.out.println("創建成功");
}
/**
* 創建系結
*/
@Test
void createBinding() {
//String destination【目的地】,
// DestinationType destinationType【目的地型別】,
// String exchange【交換機】,
// String routingKey【路由鍵】,
//@Nullable Map<String, Object> arguments【自定義引數】
Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
amqpAdmin.declareBinding(binding);
System.out.println("創建成功");
}
}
//service層加監聽注解獲取 訊息資料
/**
* 監聽訊息
* queues 宣告需要監聽的所有佇列
* org.springframework.amqp.core.Message
* <p>
* 引數可以寫一下型別
* 1、Message essage: 原生訊息詳細資訊,頭+體
* 2、發送的訊息的型別: OrderReturnReasonEntity content;
* 3、Channel channel:當前傳輸資料的通道
* <p>
* Queue:可以很多人都來監聽,只要收到訊息,佇列洗掉訊息,而且只能有一個收到此訊息
* 1)、訂單服務啟動多個:同一個訊息,只能有一個客戶端收到
* 2)、只有一個訊息完全處理完,方法運行結束,我們就可以接收到下一個訊息
*/
@RabbitListener(queues = {"hello-java-queue"})
//這個類的這個方法才能接受hello-java-queue訊息
//@RabbitHandler //類上加注解@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) {
//拿到訊息體
// byte[] body = message.getBody();
//拿到訊息頭
// MessageProperties properties = message.getMessageProperties();
System.out.println("接收到訊息:" + content);
//訊息處理完 手動確認 deliveryTag在Channel內按順序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag->" + deliveryTag);
try {
if (deliveryTag % 2 == 0) {
//確認簽收 佇列洗掉該訊息 false非批量模式
channel.basicAck(deliveryTag, false);
} else {
//拒收退貨 第三個引數 -> true:重新入隊 false:丟棄
channel.basicNack(deliveryTag, false, true);
}
} catch (IOException e) {
//網路中斷
}
}
// @RabbitHandler
//public void receiveMessage2(OrderEntity content) {
// System.out.println("接收到訊息:" + content);
//}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/434505.html
標籤:其他
上一篇:用戶行為采集平臺概述
下一篇:新能源智慧路燈充電樁可行性分析
