1、訊息模型概述
RabbitMQ提供幾種訊息模型,如下圖

其中最后一種是RPC,不是MQ,此處不予討論,下面用例子說明每種模式,
2、Demo準備作業
首先新建一個springboot專案,引入依賴,
<dependencies> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
為了方便操作,先寫一個連接MQ的工具類
public class ConnectionUtil { /** * 建立與RabbitMQ的連接 * @return * @throws Exception */ public static Connection getConnection() throws Exception { //定義連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設定服務地址 factory.setHost("127.0.0.1"); //埠 factory.setPort(5672); //設定賬號資訊,用戶名、密碼、vhost factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); return connection; } }
3、基本訊息模型
官網對RabbitMQ簡介:RabbitMQ是一個訊息代理:它接受和轉發訊息, 你可以把它想象成一個郵局:當你把郵件放在郵箱里時,你可以確定郵差先生最侄訓把郵件發送給你的收件人, 在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員,RabbitMQ與郵局的主要區別是它不處理紙張,而是接受,存盤和轉發資料訊息的二進制資料塊,
術語
- 生產者(producer ),生產僅意味著發送,發送訊息的程式是生產者:

- 佇列(queue ),佇列是RabbitMQ內部的郵箱的名稱,盡管訊息流經RabbitMQ和您的應用程式,但它們只能存盤在佇列中,佇列僅由主機的存盤器&磁盤限制約束,它本質上是一個大的訊息緩沖器,許多生產者可以將訊息發送到一個佇列,許多消費者可以嘗試從一個佇列接收資料,這就是我們表示佇列的方式:

- 消費者(consumer ),消費與接收具有相似的含義,一個消費者是一個程式,主要是等待接收資訊:

最終
生產者生產(發送)訊息:
/** * 生產者 */ public class Send { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 從連接中創建通道,使用通道才能完成訊息相關的操作 Channel channel = connection.createChannel(); // 宣告(創建)佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 訊息內容 String message = "Hello World!"; // 向指定的佇列中發送訊息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //關閉通道和連接 channel.close(); connection.close(); } }
執行send.main方法,打開RabbitMQ控制臺Queues標簽頁可以看到佇列里已經有這條訊息了,

消費者消費(接收)訊息
public class Recv { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義佇列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取訊息,并且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即訊息體 String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); } }; // 監聽佇列,第二個引數:是否自動進行訊息確認, channel.basicConsume(QUEUE_NAME, true, consumer); } }
運行消費者

發現已經接收到訊息了,再次查看控制臺也沒訊息了,但是程式依舊沒有停止,消費者依然在監聽佇列中是否有新的訊息,一旦有新的訊息進入佇列,就會立即接收(消費),
4、訊息確認機制(ACK)
如果訊息到了消費者那沒有執行,消費者就宕機了,或者消費者在執行的時候有例外等情況,RabbitMQ就需要知道是否消費正常消費了訊息,
因此,RabbitMQ有一個ACK機制,當消費者獲取訊息后,會向RabbitMQ發送回執ACK,告知訊息已經被接收,不過這種回執ACK分兩種情況:
- 自動ACK:訊息一旦被接收,消費者自動發送ACK(訊息不重要時,丟失沒什么影響)
- 手動ACK:訊息接收后,不會發送ACK,需要手動呼叫(訊息重要時)
上面那個例子是自動ACK,下面寫一個手動ACK例子,
public class Recv { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 final Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義佇列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取訊息,并且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即訊息體 String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); // 手動進行ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽佇列,第二個引數false,手動進行ACK channel.basicConsume(QUEUE_NAME, false, consumer); } }
如何演示兩者的區別呢?在代碼里故意添加的一個例外, 在兩個消費者中加入
int i =1/0;
先運行生產者,再運行自動ACK消費者可以查看控制臺發現訊息被消費了(雖然消費者出事了),
關閉自動ACK的消費者,再次運行生產者,運行手動ACK的消費者如圖

雖然依舊列印了訊息但是訊息沒有被消費,查看控制臺可見,訊息依舊還在,雖然程式出事了,但是訊息沒有丟失,ACK也算是訊息持久化的手段之一,
總結: 如果消費者在不發送確認的情況下死亡(其通道已關閉,連接已關倍訓TCP連接丟失),RabbitMQ將了解訊息未得到充分處理,并將重新排隊,如果同時有其他消費者在線,它將很快將其重新分發給另一個消費者,這樣,可以確保即使消費者偶爾死亡也不會丟失任何訊息,

如上圖,訊息又ready了,后面會繼續介紹其他種類的訊息機制,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/121013.html
標籤:Java
上一篇:求各位大佬幫忙,新手超級懵
下一篇:藍橋杯入門題求圓的面積
