RabbitMQ
配置環境
安裝 erlang環境以及RabbitMQ
RabbitMQ埠號: 5672
去官網下載 https://www.rabbitmq.com
然后重啟RabbitMQ服務 RabbitMQ安裝教程
開放埠15672
這里,通過http://IP地址:15672 進行Web頁面登錄,輸入賬號密碼(默認都是guest),完成頁面訪問,至此,全部安裝結束,
匯入依賴
<!-- 集成RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置相關資訊
RabbitMQ的埠號是什么?
5672 :這是rabbitMQ的埠號;
15672 :這是那個RabbitMQ的web頁面的埠號;
spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1 ##主機ip
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=MmHost
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring:
rabbitmq:
username: root
password: root
addresses: 127.0.0.1:5672
cache:
connection:
#Cache connection mode, with default connections and multiple channels
mode: channel
#Multiple connections, multiple channels
# mode: connection
# rabbitmq
server:
port: 8080
spring:
#給專案來個名字
application:
name: rabbitmq-consumer
#配置rabbitMq 服務器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#虛擬host 可以不設定,使用server默認host
virtual-host: MmHost
發送訊息
@Component
public class SenderTest{
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void Send() {
// 佇列名稱
String queueName = "ThisKey";
// 訊息
String message = "Hello, Spring AMQP!";
// 發送訊息
rabbitTemplate.convertAndSend(queueName, message);
}
}
rabbitTemplate.convertAndSend(key, message);
接收訊息
@Component
public class SpringRabbitMQListener {
@RabbitListener(queues = "ThisKey")
public void listenSimpleQueueMsg(String msg){
System.out.println(msg);
}
}
@Component
//指定所監聽的佇列
@RabbitListener(queues = "ThisKey")
public class SpringRabbitMQListener {
//指定用來處理接收訊息的方法
@RabbitHandler
public void listenSimpleQueueMsg(String msg){
System.out.println(msg);
}
}
注意:此處訊息被消費后,對應的ThisKey中的訊息就消失了,
原文鏈接 去的去看看
RabbitMQ-基礎使用(Spring AMQP) - 簡書 (jianshu.com)
如果使用其他交換機,則需要進行相關配置
可以看這篇文章:SpringBoot整合RabbitMQ
1、創建對應的組態檔
例如Direct交換機
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : Mm
* @CreateTime : 2023/2/23
* @Description :
**/
@Configuration
public class DirectRabbitConfig {
//佇列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
return new Queue("TestDirectQueue",true);
}
//Direct交換機 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange");
}
//系結 將佇列和交換機系結, 并設定用于匹配鍵:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
如何保證訊息的可靠?
ack應答
訊息應答
概念
消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并僅只完成了部分突然它掛掉了,會導致訊息丟失,RabbitMQ 一旦向消費者傳遞了一條訊息,便立即將該訊息標記為洗掉,在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的訊息,以及后續發送給該消費這的訊息,因為它無法接收到,
為了保證訊息在發送程序中不丟失,rabbitmq 引入訊息應答機制,訊息應答就是:消費者在接收到訊息并且處理該訊息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該訊息洗掉了,
各種訊息模型實體
五種交換機型別
Direct Exchange
直連型交換機,根據訊息攜帶的路由鍵將訊息投遞給對應佇列,
大致流程,有一個佇列系結到一個直連交換機上,同時賦予一個路由鍵 routing key ,
然后當一個訊息攜帶著路由值為X,這個訊息通過生產者發送給交換機時,交換機就會根據這個路由值X去尋找系結值也是X的佇列,
Fanout Exchange
扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的, 這個交換機在接收到訊息后,會直接轉發到系結到它上面的所有佇列,
Topic Exchange
主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和系結鍵之間是有規則的,
其他兩種:WorkQueue(通過監聽佇列名稱)
基本訊息佇列BasicQueue即為上方的代碼,此處不再重復,
- 1、WorkQueue(通過監聽佇列名稱)

WorkQueue.png
WorkQueue與BasicQueue不同之處,就是WorkQueue支持一對多發布訊息(不是一個訊息發給多個消費者,一個訊息只會被一個消費者消費),多個消費者可以提高訊息消費速度,當然相同之處也是訊息消費后就會從Queue中消失(后續的幾種模型都是如此),
① 模擬訊息堆積
// 佇列名稱
String queueName = "simple.queue";
// 訊息
String message = "Message_";
for (int i = 1; i <= 50; i++) {
// 發送訊息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
② 接收訊息
此處設定兩個執行緒處理速度不同,
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消費者1接收到訊息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消費者2........接收到訊息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
處理結果是2個消費者會均分訊息,可以修改消費方的配置,以按照實際處理能力分配,如下:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能獲取一條訊息,處理完成才能獲取下一個訊息
- 2、Fanout (扇形交換機)

Fanout.png
① 撰寫Fanout配置類
創建FanoutExchange,系結佇列Queue和交換機Exchange,
@Configuration
public class FanoutConfig {
/**
* 宣告交換機
* @return Fanout型別交換機
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("stone.fanout");
}
/**
* 第1個佇列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 系結佇列和交換機
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2個佇列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 系結佇列和交換機
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
② 發送訊息
// 佇列名稱
String exchangeName = "stone.fanout";
// 訊息
String message = "Hello, Fanout!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
③ 接收訊息
@RabbitListener(queues = "fanout.queue1")
public void listen1FanoutQueueMsg(String msg){
System.out.println("Listener1 get :" + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listen2FanoutQueueMsg(String msg){
System.out.println("Listener2 get :" + msg);
}
不同于WorkQueue,Fanout Exchange廣播模型下,系結該交換機的消費者可以獲取到對應的訊息(即一條訊息可以通過交換機被多個消費者消費),
- 3、Direct(直連交換機)

Direct.png
① 基于注解宣告佇列和交換機
@RabbitListener的使用
Ⅰ bindings = @QueueBinding()配置系結關系;
Ⅱ value = https://www.cnblogs.com/maomao777/archive/2023/02/23/@Queue(name ="direct.queue1")配置佇列;
Ⅲ exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT)配置交換機;
Ⅳ key = {"talkshow", "musicshow"}配置訂閱,監聽的key進行匹配,
rabbitTemplate.convertAndSend(exchangeName, "xxx", message);中的xxx第二個引數進行匹配
注意:type = ExchangeTypes.DIRECT是默認型別,可以不做配置,
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/maomao777/archive/2023/02/23/@Queue(name ="direct.queue1"),
exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT),
key = {"talkshow", "musicshow"}
))
public void listenDirectQueue1(String msg){
System.out.println("DirectQueue1 :" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/maomao777/archive/2023/02/23/@Queue(name ="direct.queue2"),
exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT),
key = {"talkshow", "news"}
))
public void listenDirectQueue2(String msg){
System.out.println("DirectQueue2 :" + msg);
}
② 發送訊息
// 交換機名稱
String exchangeName = "itcast.direct";
// 訊息
String messageNews = "烏俄沖突升級,昔日友邦冷眼旁觀!";
// 發送訊息
rabbitTemplate.convertAndSend(exchangeName, "news", messageNews);
// 訊息
String messageTalks = "蜘蛛俠3英雄無歸發布藍光預告,主演再登SN宣傳!";
// 發送訊息
rabbitTemplate.convertAndSend(exchangeName, "talkshow", messageTalks);
此時:訂閱news主題的佇列direct.queue1可以消費messageNews,訂閱talkshow主題的direct.queue1和direct.queue2均可以消費messageTalks,
- 4、Topic(主題交換機)

Topic.png
Topic型別的Exchange與Direct相比,都是可以根據RoutingKey把訊息路由到不同的佇列,只不過Topic型別Exchange可以讓佇列在系結Routing key 的時候使用通配符,通配符規則:
#:匹配一個或多個詞
*:匹配不多不少恰好1個詞
① 發送訊息
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交換機名稱
String exchangeName = "itcast.topic";
// 訊息
String message = "建設更高水平法治中國";
// 發送訊息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
② 接收訊息
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/maomao777/archive/2023/02/23/@Queue(name ="topic.queue1"),
exchange = @Exchange(name = "stone.topic", type = ExchangeTypes.TOPIC),
key = {"China.#"}
))
public void listenTopicQueue1(String msg){
System.out.println("TopicQueue1 :" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/maomao777/archive/2023/02/23/@Queue(name ="topic.queue2"),
exchange = @Exchange(name = "stone.topic", type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String msg){
System.out.println("TopicQueue2 :" + msg);
}
此時,由于訊息Topic滿足兩個佇列的訂閱規則,所以兩個佇列都可以消費到訊息,
RabbitMQ交換機型別(引數介紹)
amqp (advanced message queuing protocol 高級訊息佇列協議) 核心概念:
server /broker 訊息服務器: 接收 / 存盤訊息, 維護交換機 / 佇列等元資訊資料
connection 連接: 客戶端連接訊息服務器
channel 網路信道: 一個 connection 可以有多個 channel
virutual host 虛擬主機: 邏輯隔離, 相當于 tomcat 的 webapps 下各個專案
exchange 交換機: 訊息會首先發送到指定交換機, 交換機再根據交換機型別和路由鍵路由到指定佇列
routing key 路由鍵: 交換機根據路由鍵路由到指定佇列, 路由鍵可以是固定的名字或者是主題交換機的 pattern, 即 # 表示 0 個或多個單詞, * 固定表示 1 個單詞
message 訊息: 包含訊息屬性 (頭資訊 / 優先級等) 和訊息體
queue 佇列 (訊息會路由保存到某個佇列中)
監聽者 (相同佇列下可以有多個監聽者消費, 一般一個訊息只會分發到某個監聽者身上)
交換機型別
Direct 直連交換機 (一對一, 只能發送到對應系結的單個佇列, 不夠靈活)
Topic 主題交換機 (支持一對多, 根據模式, 與路由鍵匹配的佇列都會收到訊息, 靈活, 推薦)
Fanout 廣播交換機 (所有與該交換機系結的佇列都會收到訊息, 即忽略路由鍵)
headers 頭交換機 (使用頭資訊匹配, 不使用路由鍵. 交換機會宣告一個 key 是 x-match, 值是 all (全部匹配) 或者 any (任一匹配) (匹配會忽略所有 x- 開頭的頭資訊) 傳過來訊息的頭資訊會與系結頭交換機的佇列的頭資訊比較)
附:
主題交換機的模式語法:
- 表示匹配 1 個單詞, 例如 mall.* 可以匹配 mall.order 不能匹配 mall.order.cancel
表示匹配 0 個或多個單詞, 例如 mall.# 可以匹配 mall.order.cancel
RabbitMQ的應用場景
- 延遲佇列,延遲訊息
- 服務與服務之間的解耦(例如一個服務進行mysql操作的時候需要另一個服務同時進行對應操作)
- 異步處理、流量削峰
1、異步處理
假設想象一下我們做一個商城專案,在用戶支付模塊中,可能會涉及到其它業務,比如:積分折扣、消費券、短信驗證等功能,我們傳統的執行步驟是逐步執行,也就是說當用戶點擊支付 ----> 積分折扣 ----> 消費券 ----> 短信驗證 ----->支付完成,用戶需要等待每個業務執行完畢才能支付成功!假設我們從點擊支付 -----> 支付成功消耗時間為100/ms,后面我們每新增一個業務就會多耗時50/ms,上述的流程大概會耗時250/ms!如果說以后業務更多的話,那么用戶支付訂單的時間會越來越長,這樣大大影響了用戶的體驗!參照下圖理解

我們使用訊息中間件進行異步處理,當用戶下單支付同時我們創建訊息佇列進行異步的處理其它業務,在我們支付模塊中最重要的是用戶支付,我們可以將一些不重要的業務放入訊息佇列執行,這樣可以大大添加我們程式運行的速度,用戶支付模塊中也大大減少了支付時間,為用戶添加了更好的體驗,其它模塊與其思想一致,就比如說用戶注冊!
2、流量削峰
假設我們有一個訂單系統,我們的訂單系統最大承受訪問量是每秒1萬次,如果說某天訪問量過大我們的系統承受不住了,會對服務器造成宕機,這樣的話我們的系統就癱瘓了,為了解決該問題我們可以使用中間件對流量進行消峰
未加入中間件之前,用戶直接訪問的是訂單系統

加入中間件之后,用戶直接訪問的是中間件,通過中間件對用戶進行消峰,好處是可以避免系統的宕機癱瘓,壞處是系統速度變慢,但是總比不能使用好

3、應用解耦
我們以商城專案為例,訂單系統耦合呼叫支付、庫存、物流系統,如果某天其中一個系統出現了例外就會造成訂單系統故障!使用中間件后訂單系統通過佇列去訪問支付、庫存、物流系統就不會造成上述的問題,因為訂單系統執行完成才會發訊息給佇列,接下來的任務就交給佇列完成,佇列會監督各個系統完成,如果完不成佇列會一直監督,直到完成為止!所以說使用中間件后不會造成一個子系統出現故障而造成整個系統故障

本文來自博客園,作者:沒有煩惱的貓貓,轉載請注明原文鏈接:https://www.cnblogs.com/maomao777/p/rabbitMQ_knowledge.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/544714.html
標籤:其他
