@
目錄- 一、前言
- 二、RabbitMQ作用
- 1. 異步處理
- 2. 應用解耦
- 3. 流量控制
- 三、RabbitMQ概念
- 1. RabbitMQ簡介
- 2. 核心概念
- 四、JMS與AMQP比較
- 五、RabbitMQ運行機制
- 1. direct
- 2. fanout
- 3. topic
- 六、Docker安裝RabbitMQ
- 七、整合Springboot
- 1. 引入依賴
- 2. 主啟動類上添加注解
- 3. 撰寫組態檔
- 八、測驗創建交換機、佇列、系結關系
- 1. 測驗創建Direct交換機
- 2. 打開交換機界面查看
- 3. 創建Queue
- 4. 打開佇列界面查看
- 5. 系結交換機和佇列
- 6. 打開交換機中的系結界面
- 九、測驗發訊息
- 1. 測驗發送訊息
- 2. Queue串列中查看訊息
- 3. 手動確認訊息
- 4. 測驗發送物件
- 5. 查看訊息發送的物件
- 6. 書寫訊息發送物件轉JSON配置類
- 7. 再次發送九、4中的代碼,查詢是否正常顯示物件
- 十、測驗收訊息
- 1. 創建接收資訊的方法
- 2. 模擬發送訊息
- 3. 接收訊息查看
- 4. 在思考
- 5. 多個服務監聽同一條佇列
- 6. 模擬發送十條訊息,查看會被那個服務接收
- 7. 多服務接收訊息結果查看
- 十一、總結
一、前言
我們先來聊聊訊息中間件:
訊息中間件利用高效可靠的訊息傳遞機制進行平臺無關的資料交流,并基于資料通信來進行分布式系統的集成,通過提供訊息傳遞和訊息排隊模型,它可以在分布式環境下擴展行程間的通信,(來自百度百科)
我們常見的中間件其實有很多種了,例如ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等,其中應用最為廣泛的要數RabbitMQ、RocketMQ、Kafka 這三款,Redis在某種程度上也可以使用list或者Stream來實作訊息佇列,但不能算是中間件哈!
如果大家對怎么選型感興趣,可以看一下小編的這篇文章:四大MQ選型
今天小編帶著大家一起學習一下RabbitMQ,從入門到精通,從無到有!!小編沒有使用Windows安裝,很麻煩,所以使用Docker安裝,如果沒有安裝Docker的可以看一下小編的另一篇文章:Linux安裝Docker
小編其實也是通過雷神的課件和講解后,自己在整理一下,供以后學習和參考,在此感謝尚硅谷雷神哈!
小編覺得在說概念之前,應該知道他的作用,然后再系統的學習概念等!
二、RabbitMQ作用
其實作用還是挺多的,但是主要是以下三條:
- 異步處理
- 應用解耦
- 流量控制
下面我們進行一個個的簡單描述一下哈,我們還是拿被用了一萬次的例子和圖例哈!!
1. 異步處理
用戶在某網站注冊成功后,需要向用戶發送郵件和資訊提示其注冊成功(其實沒什么必要,但是例子說一下還可以,小編自己的理解哈!),常規的做法是:后臺將注冊資訊保存到資料庫,然后再給用戶發郵件發短信,

我們看到這樣非常的耗時,其實保存完成后,就可以登錄了,短信和郵件過一會接收也是沒有什么問題的!或者發送失敗,用戶一直沒有收到,這都是沒什么問題的,用戶已經登錄進去了,管你發不發短信,大家說對吧!!
既然存在問題,我們就是有訊息佇列來解決這個問題:
我們可以在將注冊資訊保存資料庫之后,把要發送注冊郵件和發送短信的訊息寫入訊息佇列,然后就告知用戶注冊成功,發送郵件和短信將由訂閱了訊息的應用異步的去執行,這樣耗時的問題就解決了!!

2. 應用解耦
在大型電商專案中,會將訂單系統和庫存系統分成兩個不同的應用,然后進行服務與服務之間的呼叫,正常情況下用戶下單后訂單系統會呼叫庫存系統,然后回傳給用戶顯示下單成功,

但是也存在問題,如果庫存系統掛了,這樣就會導致下單失敗;如果你是用戶,你會判斷這個產品不行,以后不用了!!
別著急,這位用戶,我們幫您解決哈:這時我們引入訊息佇列進行解耦

現在有的同學會問,怎么解決的呢?
別著急,小編來和你說一下哈!剛剛出錯的原因就是庫存系統掛了,改處理的請求沒有處理,所以下單失敗;我們引入訊息佇列,就是把訂單訊息寫入到訊息佇列中,然后庫存系統訂閱我們的訊息佇列;然后庫存系統去訊息佇列中獲取訊息,進行處理訂單,來完成減庫存的操作;如果失敗也會有重試機制,真的掛了,也可以持久化,等到庫存系統活了之后繼續處理!!一個宗旨,不能影響用戶的使用體驗呢!!
3. 流量控制
看名字就能知道,肯定是并發很大的情況才會出現的,不用想就是秒殺時刻了!
假設一瓶茅臺2萬人搶,這是我們的系統可能會被打垮,所以我們把超過一定并發量時,把超過的請求放在訊息佇列中,然后級訓系統壓力,然后慢慢處理;雖然可能降低一下用戶的體驗,但是秒殺就是這樣,只能有一部分人成功,我們要保證好系統可以正常運行哈!!

三、RabbitMQ概念
1. RabbitMQ簡介
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實作,
RabbitMQ 是部署最廣泛的開源訊息代理,
RabbitMQ擁有數萬用戶,是最流行的開源訊息代理之一,從T-Mobile到Runtastic,RabbitMQ在世界各地的小型初創公司和大型企業中使用,
RabbitMQ是輕量級的,易于在本地和云中部署,它支持多種訊息傳遞協議,RabbitMQ可以在分布式和聯合配置中部署,以滿足高規模、高可用性需求,
RabbitMQ運行在許多作業系統和云環境上,并為最流行的語言提供了廣泛的開發工具,
2. 核心概念
Message
訊息,訊息是不具名的,它由訊息頭和訊息體組成,訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,
這些屬性包括routing-key(路由鍵)、priority(相對于其他訊息的優先權)、delivery-mode(指出該訊息可
能需要持久性存盤)等,
Publisher
訊息的生產者,也是一個向交換器發布訊息的客戶端應用程式,
Exchange
交換器,用來接收生產者發送的訊息并將這些訊息路由給服務器中的佇列,
Exchange有4種型別:direct(默認),fanout, topic, 和headers,不同型別的Exchange轉發訊息的策略有所區別Queue
訊息佇列,用來保存訊息直到發送給消費者,它是訊息的容器,也是訊息的終點,一個訊息可投入一個或多個佇列,訊息一直在佇列里面,等待消費者連接到這個佇列將其取走,
Binding
系結,用于訊息佇列和交換器之間的關聯,一個系結就是基于路由鍵將交換器和訊息佇列連接起來的路由規則,所以可以將交換器理解成一個由系結構成的路由表,
Exchange 和Queue的系結可以是多對多的關系,
Connection
網路連接,比如一個TCP連接,
Channel
信道,多路復用連接中的一條獨立的雙向資料流通道,信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布訊息、訂閱佇列還是接收訊息,這些動作都是通過信道完成,因為對于作業系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接,
Consumer
訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式,
Virtual Host
虛擬主機,表示一批交換器、訊息佇列和相關物件,虛擬主機是共享相同的身份認證和加密環境的獨立服務器域,每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的佇列、交換器、系結和權限機制,vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / ,
類似docker容器和容器之間是相互隔離的,一個壞了,不耽誤另一個使用
Broker
表示訊息佇列服務器物體,
總架構圖

四、JMS與AMQP比較
| JMS(Java Message Service) | AMQP(Advanced Message Queuing Protocol) | |
|---|---|---|
| 定義 | Java api | 網路線級協議 |
| 跨語言 | 否 | 是 |
| 跨平臺 | 否 | 是 |
| Model | 提供兩種訊息模型: (1)、Peer-2-Peer (2)、Pub/sub |
提供了五種訊息模型: (1)、direct exchange (2)、fanout exchange (3)、topic change (4)、headers exchange (5)、system exchange 本質來講,后四種和JMS的pub/sub模型沒有太大差別, 僅是在路由機制上做了更詳細的劃分; |
| 支持訊息類 型 |
多種訊息型別: TextMessage MapMessage BytesMessage StreamMessage ObjectMessage Message (只有訊息頭和屬性) |
byte[] 當實際應用時,有復雜的訊息,可以將訊息序列化后發 送, |
| 實作中間件 | ActiveMQ、HornetMQ | RabbitMQ |
| 綜合評價 | JMS 定義了JAVA API層面的標準;在java體系中, 多個client均可以通過JMS進行互動,不需要應用修 改代碼,但是其對跨平臺的支持較差; |
AMQP定義了wire-level層的協議標準;天然具有跨平 臺、跨語言特性 |
五、RabbitMQ運行機制
AMQP 中訊息的路由程序和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了Exchange 和Binding的角色,生產者把訊息發布到 Exchange 上,訊息最終到達佇列并被消費者接收,而 Binding 決定交換器的訊息應該發送到那個佇列,

Exchange 型別
- direct
- fanout
- topic
- headers(不建議使用)
RabbitMQ默認七大交換機

1. direct
訊息中的路由鍵(routing key)如果和Binding 中的 binding key 一致, 交換器就將訊息發到對應的佇列中,路由鍵與佇列名完全匹配,如果一個佇列系結到交換機要求路由鍵為"a1.b1",則只轉發 routing key 標記為"a1.b1"的訊息,不會轉發"a1.b2”,也不會轉發"a1.b3" 等等,它是完全匹配、單播的模式,

2. fanout
每個發到 fanout 型別交換器的訊息都會分到所有系結的佇列上去,fanout 交換器不處理路由鍵,只是簡單的將佇列系結到交換器上,每個發送到交換器的訊息都會被轉發到與該交換器系結的所有佇列上,很像子網廣播,每臺子網內的主機都獲得了一份復制的訊息,fanout 型別轉發訊息是最快的、廣播,

3. topic
topic是升級版的fanout模式,做了選擇權,并不是全都會接受,符合條件才會收到!topic 交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要系結到一個模式上,它將路由鍵和系結鍵的字串切分成單詞,這些單詞之間用點隔開,
它同樣也會識別兩個通配符:符號#和符號*,#匹配0個或多個單詞,* 匹配一個單詞,

六、Docker安裝RabbitMQ
直接輸入命令,docker會幫助我們自動去拉去鏡像的:
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 \
-p 15671:15671 -p 15672:15672 rabbitmq:management
我們查詢是否運行成功
docker ps

我們在windows上進行測驗是否能夠打開界面:
輸入:http://192.168.17.130:15672/

用戶名密碼都是:guest
進入界面:

七、整合Springboot
1. 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--自定義訊息轉化器Jackson2JsonMessageConverter所需依賴-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2. 主啟動類上添加注解
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallOrderApplication.class, args);
}
}
3. 撰寫組態檔
# 指定rabbitmq服務器主機
spring.rabbitmq.host=192.168.17.130
# 賬號密碼埠號都默認配置了,我們無需配置

八、測驗創建交換機、佇列、系結關系
1. 測驗創建Direct交換機
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange() {
// 第一個引數為交換機名字,第二個引數為是否持久化,第三個引數為不使用交換機時洗掉
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
System.out.println("交換機創建成功");
}
2. 打開交換機界面查看

3. 創建Queue
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createQueue() {
/**
* 第一個引數為佇列名字,
* 第二個引數為是否持久化,
* 第三個引數為是否排他(true:一個連接只能有一個佇列,false:一個連接可以有多個(推薦))
* 第四個引數為不使用佇列時自動洗掉
*/
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
System.out.println("佇列創建成功");
}
4. 打開佇列界面查看

5. 系結交換機和佇列
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createBinding() {
/**
* 第一個引數為目的地,就是交換機或者佇列的名字
* 第二個引數為目的地型別,交換機還是佇列
* 第三個引數為交換機,
* 第四個引數為路由鍵,匹配的名稱
*/
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",null);
amqpAdmin.declareBinding(binding);
System.out.println("系結成功");
}
6. 打開交換機中的系結界面
點擊交換機

系結串列

九、測驗發訊息
1. 測驗發送訊息
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
// 訊息型別為object 發送物件也是可以的
String msg = "這是一條訊息";
// 第一個引數為發送訊息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合系結的佇列),第三個引數為發送的訊息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
System.out.println("訊息發送成功");
}
2. Queue串列中查看訊息

3. 手動確認訊息
點擊我們的佇列:

進入詳細界面,下滑找到Get messages:

在次點擊佇列,訊息消失:

4. 測驗發送物件
@Data
// 必須序列化,不然報錯
public class User implements Serializable {
private String name;
private Integer age;
}
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
User user = new User();
user.setAge(22);
user.setName("王振軍");
// 第一個引數為發送訊息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合系結的佇列),第三個引數為發送的訊息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",user);
System.out.println("訊息發送成功");
}
5. 查看訊息發送的物件

6. 書寫訊息發送物件轉JSON配置類
撰寫配置類
@Configuration
public class MyRabbitmqConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
7. 再次發送九、4中的代碼,查詢是否正常顯示物件

十、測驗收訊息
1. 創建接收資訊的方法
方法所在的類必須交給了IOC管理,我們直接寫在service里面,代碼如下:
@Service
public class TestService {
// queues是監聽的佇列名字,可以是多個
@RabbitListener(queues = {"hello-java-queue"})
public void reciveMessage(Object message){
System.out.println("接受的資訊" + message);
}
}
2. 模擬發送訊息
還是用上面的方法進行發送一個物件!
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
User user = new User();
user.setAge(22);
user.setName("王振軍");
// 第一個引數為發送訊息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合系結的佇列),第三個引數為發送的訊息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",user);
System.out.println("訊息發送成功");
}
3. 接收訊息查看
接受的資訊:(Body:'{"name":"王振軍","age":22}'
MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.User},
contentType=application/json, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1,
consumerTag=amq.ctag-Nlg0mulsX9mxdPvGe72CBw, consumerQueue=hello-java-queue])
4. 在思考
我們發現剛付訓傳的是詳細資訊,我們可以指定訊息的型別,就是發送訊息發的物件是什么,我們就可以直接接收就行!看代碼:
@Service
public class TestService {
@RabbitListener(queues = {"hello-java-queue"})
public void reciveMessage(Message message, User user){
System.out.println("接受的資訊:" + message);
System.out.println("發送的資訊:" + user);
}
}
在此發送訊息,我們看一下控制臺:
接受的資訊:(Body:'{"name":"王振軍","age":22}'
MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.User},
contentType=application/json, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1,
consumerTag=amq.ctag-Nlg0mulsX9mxdPvGe72CBw, consumerQueue=hello-java-queue])
發送的資訊:User(name=王振軍, age=22)
這樣就很清晰了哈!
拓展: 接收的還有第三個引數就是通道,每一個連接只會有一個通道哈!,大家可以自己測驗一下,列印看看一下!!
public void reciveMessage(Message message, User user, Channel channel)
5. 多個服務監聽同一條佇列
右擊已存在服務,復制一份配置不同埠:


現在有兩個服務監聽同一個佇列!!
6. 模擬發送十條訊息,查看會被那個服務接收
調整測驗發訊息代碼:
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
for (int i = 0;i < 10; i++) {
User user = new User();
user.setAge(i);
user.setName("王振軍" + i);
// 第一個引數為發送訊息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合系結的佇列),第三個引數為發送的訊息
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", user);
System.out.println("訊息發送成功");
}
}
接收訊息的代碼:
@Service
public class TestService {
@RabbitListener(queues = {"hello-java-queue"})
public void reciveMessage(Message message, User user){
System.out.println("接收的資訊:" + user);
}
}
7. 多服務接收訊息結果查看
我們看到9000服務接收了1,4,7訊息

9010服務接收了0,3,6,9訊息

總結: 我們可以發現一個訊息只會被接收一次!
還有就是發了10條訊息,只有7條被接收了,其余的呢?
別急小編來告訴大家,這是因為我們測驗是使用SpringBoot的測驗類進行的,有的部分訊息被測驗的接收了!大家不信可以看一下測驗的控制臺,找一下:

我們看到訊息的2,5,8在這里呢!!
拓展: 我們一次發送十條訊息,每條接收訊息假如耗時10s,此時會接收處理完一個訊息,才會接收下一個,就是我們說的串行化!!
十一、總結
這樣我們就對RabbitMQ有了新的認識,從入門也算走上了實踐!后面有時間小編再把訊息的可靠性發出來,也就是進階版!!
在次感謝雷神的課程哈,看到這里,小伙伴們點個贊唄,小編整理不易呀!!謝謝大家了!!
有緣人才可以看得到的哦!!!
點擊訪問!小編自己的網站,里面也是有很多好的文章哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/472347.html
標籤:其他
