RabbitMQ的使用與理解
- 前言
- 一. 創建用戶與虛擬主機
- 二. SpringBoot集成RabbitMQ
- 1. 引入依賴
- 2. 撰寫組態檔(yml格式)
- 3. 模板物件
- 三. 訊息發布模型
- 1. 直連(點對點)
- 2. work 作業模型
- 3. 發布訂閱(fanout廣播)
- 4. Routing 路由模型-direct
- 5.Topic 動態路由模型-topic
- 總結
前言
RabbitMQ安裝完之后呢,該學習怎么使用了,

RabbitMQ基于生產者于消費者模型,實作了系統間的解耦
生產者需要與rabbitmq server建立連接,每一個生產者對應一個虛擬主機,類似于MySQL中庫的概念,即一個應用(一個業務)對應一個虛擬主機,使各個應用(業務)之間互不影響,每一個虛擬主機都要跟一個用戶進行系結,這個用戶名密碼作為虛擬主機的訪問權限,所以在開發之前我們需要在web管理界面創建虛擬主機與用戶,并將二者系結,將用戶授權,
消費者也需要與rabbitmq server建立連接,從queue(訊息佇列)中消費訊息的前提是連接虛擬主機,有用戶名和密碼才能成功消費訊息,該用戶名密碼即與虛擬主機系結的用戶名密碼,
訊息不一定被生產者放到交換機,由交換機決定放到哪個queue,生產者也可以直接將訊息放入queue,這是一種點對點訊息發布模型,文章后續會介紹消費模型,
一. 創建用戶與虛擬主機
1. 在web界面創建一個虛擬主機,我就取名為rabbitmq了

此時該虛擬機的權限為guest,如果操作不失誤,按步驟走完,這里會變成你創建的用戶名

2.創建用戶,我取名為rabbitmqtest

3.把創建的虛擬主機與創建的用戶系結

然后按照下圖給用戶分配訪問虛擬主機的權限

到此,已經創建好虛擬主機與用戶,且虛擬主機與用戶已系結成功,可以進行開發了,
二. SpringBoot集成RabbitMQ
1. 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 撰寫組態檔(yml格式)
spring:
application:
name: spring-boot-rabbitmq
rabbitmq:
host: 192.168.132.151
port: 5672
username: rabbitmqtest # 為上述創建的用戶名
password: 123
virtual-host: /rabbitmq # 為上述創建的虛擬主機名稱
3. 模板物件
SpringBoot提供了一個模板物件RabbitTemplate,跟RestTemplate、RedisTemplate一樣,在SpringBoot啟動后便將該物件加載到ioc容器,我們使用的時候自動注入即可,
三. 訊息發布模型
1. 直連(點對點)

P:生產者,向訊息佇列發布訊息
紅色:訊息佇列,接收生產者發布的訊息
C:消費者,從訊息佇列消費訊息
1.1 開發生產者
@SpringBootApplication
@RunWith(SpringRunner.class)
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
// 點對點直連
@Test
public void test() {
// 引數1:訊息佇列名稱 引數2:訊息內容
rabbitTemplate.convertAndSend("hello", "hello rabbitmq");
}
}
1.2 開發消費者
@Component //需要讓當前類被ioc管理
// 代表監聽名為hello的訊息佇列,默認為訊息持久化,非獨占
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloConsumer {
// 方法名無所謂,隨便定義
//這個注解表明當前方法為訊息消費的回呼方法,即接收到訊息執行此方法
@RabbitHandler
public void receive(String message) {
System.out.println("點對點直連消費訊息:" + message);
}
}
點對點模型會產生的問題:當消費者業務邏輯比較復雜,也就是消費訊息過于緩慢,可能產生訊息的速度遠遠大于消費者消費訊息的速度,就會導致訊息佇列的訊息大量堆積,訊息無法即使處理,如果有多個消費者,每個消費者去處理不同的訊息(必須是處理不同訊息,否則會出現訊息重復處理),效率必然會大大提高,
2. work 作業模型
為解決點對點模型的問題,引入了作業佇列模型(任務佇列),作業模型就是將多個消費者系結到同一個佇列,共同消費佇列中的訊息,訊息一旦被消費,就會消失,確保了不會重復消費訊息

P:生產者,任務的發布者
C1:消費者1,監聽紅色佇列,消費訊息,假設業務邏輯較簡單,則消費速度快
C2:消費者2,監聽紅色佇列,消費訊息,假設業務邏輯教復雜,則消費速度慢
2.1 開發生產者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testWork() {
// 利用回圈一次多發幾條訊息,讓多個消費者都有訊息可消費
for (int i = 0; i < 10; i++) {
// 引數1:訊息佇列名稱 引數2:訊息內容
rabbitTemplate.convertAndSend("work", "work rabbitmq");
}
}
2.2 開發消費者
@Component
public class WorkConsumer {
// 構建消費者1
// @RabbitListener該注解加在方法上,直接讓方法監聽佇列,消費訊息直接運行該方法
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receiveOne(String message) {
System.out.println("消費者1:" + message);
}
// 構建消費者2
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receiveTwo(String message) {
System.out.println("消費者2:" + message);
}
}
2.3 默認消費機制
默認情況是平均分配,將每一個訊息發給下一個消費者,官方明確指出這種方式叫做回圈消費,這種消費方式依賴于訊息自動確認機制,
2.4 訊息確認機制
訊息確認機制有自動確認與手動確認,自動確認指的是在消費者接收到訊息之后,佇列將立馬洗掉當前訊息,佇列不會管消費者真正的業務邏輯有沒有處理完,立馬將下一條訊息分配給下一個消費者,這也是平均分配的原理,手動確認即在消費者業務邏輯執行完成后進行確認,告訴訊息佇列我執行完了,此時訊息佇列再將訊息洗掉
2.5 默認消費機制存在問題
舉個例子,假如一個消費者在訊息佇列確認接收了5條訊息,此時訊息佇列已經將這5條訊息洗掉,但是該消費者消費訊息較慢,在消費第3條訊息的時候宕機了,不僅第三條訊息會丟失,剩下的兩條訊息也會丟失,在我們真正的業務中,并不希望有訊息丟失,
2.6 能者多勞
消費者與訊息佇列之間是通過channel通道傳輸訊息的,上述提到自動確認機制只是消費者接收到訊息rabbitmq就認為你已經處理完成,如果消費者處理訊息較慢,那一定會產生訊息堆積,堆積的訊息就會放到通道,能者多勞讓處理訊息較快的消費者多處理訊息,首先需要將通道設定只允許傳輸一條訊息,這樣不會有訊息堆積,第二需要開啟手動確認機制,消費完訊息再通知消費者洗掉當前訊息,然后消費下一條,這樣不僅能達到能者多勞,還能避免上述訊息丟失的問題,
3. 發布訂閱(fanout廣播)

由官方檔案提供的圖示可知,該模型可以有多個消費者,每個消費者都有自己的queue(臨時訊息佇列),每個訊息佇列都要系結到Exchange(交換機),生產者發送訊息只能發送給交換機,由交換機決定發送給哪個佇列,該模型下,交換機的型別為fanout,即廣播模式,也就是將訊息發給所有佇列,佇列所屬的消費者都能拿到訊息,實作一處通知,處處執行的廣播效果,
3.1 開發生產者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanout() {
// 引數1為交換機名稱;引數2為routingKey,路由模式才有用;引數3為訊息體
rabbitTemplate.convertAndSend("orders", "", "廣播fanout模型發送的訊息");
}
3.2 開發消費者
@Component
public class FanoutConsumer {
// 消費者1
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 不給值代表創建臨時佇列
exchange = @Exchange(value = "orders", type = "fanout") // 系結交換機,型別為廣播型別
)
})
public void receiveOne(String message) {
System.out.println("消費者1:" + message);
}
// 消費者2
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 不給值代表創建臨時佇列
exchange = @Exchange(value = "orders", type = "fanout") // 系結交換機,型別為廣播型別
)
})
public void receiveTwo(String message) {
System.out.println("消費者2:" + message);
}
}
3.3 應用場景
例如訂單模塊下單了,然后向訊息佇列發布一個訊息,商品模塊、用戶模塊都要消費該訊息,商品模塊進行庫存更新,用戶模塊進行積分更新等等…
4. Routing 路由模型-direct
發布訂閱模型中,一條訊息會被所有訂閱的佇列消費,但在實際業務場景中,我們往往有定向的消費,即有些訊息希望被一些消費者消費,有些訊息希望被另一些消費者消費,
在該模型下,生產者向交換機發送的訊息要攜帶routing key,訊息佇列與交換機系結也要指定routing key,此時交換機的模式為direct模式,交換機不再把所有訊息發向所有訊息佇列,只有訊息佇列的routing key與訊息的routing key一致時才可接收到此訊息,

- 官網給出的例子為日志相關的,C1消費者只接受error級別的日志,將其存入磁盤,C2消費者接收所有級別的日志,將其輸出到控制臺
- P為生產者,向交換機發布訊息,發送時會指定一個routing key,用于標記訊息
- X為交換機,接收生產者的訊息,然后把訊息傳給與routing key匹配的訊息佇列
- C1為消費者,其所在佇列只能接收到routing key為error的訊息,假設消費邏輯為寫入磁盤
- C2為消費者,其所在佇列能接收到routing key為info、error、waring的訊息,假設消費邏輯為列印到控制臺
4.1 開發生產者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testRoute() {
// 引數1:交換機名稱;引數2:routingKey,對訊息標記;引數3:訊息體
rabbitTemplate.convertAndSend("logs", "info", "發送routingKey為info的訊息");
}
4.2 開發消費者
@Component
public class RouteConsumer {
// 消費者1
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //創建臨時佇列
exchange = @Exchange(value = "logs", type = "direct"), // 系結交換機,direct模式
key = {"error"} // 指定routingKey
)
})
public void receiveOne(String message) {
System.out.println("消費者1將error的訊息寫入磁盤: " + message);
}
// 消費者2
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //創建臨時佇列
exchange = @Exchange(value = "logs", type = "direct"), // 系結交換機,direct模式
key = {"info", "error", "warning"} // 指定routingKey
)
})
public void receiveTwo(String message) {
System.out.println("消費者2將info,error,warning的日志輸出到控制臺: " + message);
}
}
4.3 測驗結果

4.4 存在問題
不夠靈活,拓展性差,上面說了,訊息佇列與交換機系結的時候需要指明routing key,像消費者2那樣,有三個roouting key,那么我就要指定三個,如果后續業務發生變化,消費者2要消費其他routing kye的訊息,那就需要再添加新的key,
5.Topic 動態路由模型-topic

topic模型,其實跟direct一樣,只不過在渠道與訊息佇列系結的時候,routing key使用通配符,這樣一來,只要發布的訊息滿足通配符,就可以被消費,在這種模型下 routing key一般都是由一個或者多個單詞組成,由“.”分割,例如sms.pay
通配符:
* 匹配一個單詞
# 匹配多個單詞
例如:
sms.* 可以匹配sms.pay sms.user等
sms.# 可以匹配sms.pay、sms.pay.user
5.1 開發生產者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testTopic() {
rabbitTemplate.convertAndSend("topic", "user.save", "user.save 動態路由訊息");
}
5.2 開發消費者
@Component
public class TopicConsumer {
// 消費者1
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //不給值則為創建臨時佇列
exchange = @Exchange(type = "topic", name = "topic"),
key = {"user.#"}
)
})
public void receiveOne(String message) {
System.out.println("消費者1,負責消費user相關操作:" + message);
}
// 消費者2
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //不給值則為創建臨時佇列
exchange = @Exchange(type = "topic", name = "topic"),
key = {"order.*"}
)
})
public void receiveTwo(String message) {
System.out.println("消費者2,負責消費order相關操作:" + message);
}
}
總結
到此,RabbitMQ基礎的使用方式與常見的訊息發布模型就介紹完了,其使用場景無非就是解耦、異步、削峰,以上只是簡單示例用法,大家可以在此基礎上繼續研究,例如如何實作延時發布訊息,類似于淘寶7天后自動確認識訓,即7天后向支付系統發送mq訊息,進行賬戶轉賬確認;還有如何實作訊息限流等等,另外rabbitmq高可用集群搭建是必不可少的,等后續再給大家介紹,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/225890.html
標籤:其他
