
?
1. 訊息佇列解決了什么問題
訊息中間件是目前比較流行的一個中間件,其中RabbitMQ更是占有一定的市場份額,主要用來做異步處理、應用解耦、流量削峰、日志處理等等方面,
1. 異步處理
一個用戶登陸網址注冊,然后系統發短信跟郵件告知注冊成功,一般有三種解決方法,
-
串行到依次執行,問題是用戶注冊后就可以使用了,沒必要等驗證碼跟郵件,
-
注冊成功后,郵件跟驗證碼用并行等方式執行,問題是郵件跟驗證碼是非重要的任務,系統注冊還要等這倆完成么?
-
基于異步MQ的處理,用戶注冊成功后直接把資訊異步發送到MQ中,然后郵件系統跟驗證碼系統主動去拉取資料
-
,

?
2. 應用解耦
比如我們有一個訂單系統,還要一個庫存系統,用戶下訂單了就要呼叫下庫存系統來處理,直接呼叫到話庫存系統出現問題咋辦呢?
?
3. 流量削峰
舉辦一個 秒殺活動,如何較好到設計?服務層直接接受瞬間搞密度訪問絕對不可以起碼要加入一個MQ,
?
4. 日志處理
用戶通過WebUI訪問發送請求到時候后端如何接受跟處理呢一般?

?
2. RabbitMQ 安裝跟配置
官網:https://www.rabbitmq.com/download.html
開發語言:https://www.erlang.org/
正式到安裝跟允許需要Erlang跟RabbitMQ倆版本之間相互兼容!我這里圖省事直接用Docker 拉取鏡像了,下載:

?
開啟:管理頁面 默認賬號:guest 默認密碼:guest ,Docker啟動時候可以指定賬號密碼對外埠以及
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin-e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672-p 61613:61613 -p 1883:1883 rabbitmq:management
啟動:
?用戶添加:

?vitrual hosts 相當于mysql中的DB,創建一個virtual hosts,一般以/ 開頭,

?對用戶進行授權,點擊/vhost_mmr,

至于WebUI多點點即可了解,
3. 實戰
RabbitMQ 官網支持任務模式:https://www.rabbitmq.com/getstarted.htm
l創建Maven專案匯入必要依賴:
/n/n /n/ncom.rabbitmq /namqp-client /n4.0.2 /n/n /n/norg.slf4j /nslf4j-api /n1.7.10 /n/n /n/norg.slf4j /nslf4j-log4j12 /n1.7.5 /n/n /n/nlog4j /nlog4j /n1.2.17 /n/n /n /n","classes":[]}" data-cke-widget-upcasted="1" data-cke-widget-keep-attr="0" data-widget="codeSnippet">junit /njunit /n4.11 /n
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
0. 獲取MQ連接
package com.sowhat.mq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtils {
/**
* 連接器
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/vhost_mmr");
factory.setUsername("user_mmr");
factory.setPassword("sowhat");
Connection connection = factory.newConnection();
return connection;
}
}
1. 簡單佇列

?
P:Producer 訊息的生產者 中間:Queue訊息佇列 C:Consumer 訊息的消費者
package com.sowhat.mq.simple;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取一個連接
Connection connection = ConnectionUtils.getConnection();
// 從連接獲取一個通道
Channel channel = connection.createChannel();
// 創建佇列宣告
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello Simple";
// exchange,佇列,引數,訊息位元組體
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("--send msg:" + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.simple;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消費者獲取訊息
*/
public class Recv {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
newApi();
oldApi();
}
private static void newApi() throws IOException, TimeoutException {
// 創建連接
Connection connection = ConnectionUtils.getConnection();
// 創建頻道
Channel channel = connection.createChannel();
// 佇列宣告 佇列名,是否持久化,是否獨占模式,無訊息后是否自動洗掉,訊息攜帶引數
channel.queueDeclare(Send.QUEUE_NAME,false,false,false,null);
// 定義消費者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override // 事件模型,訊息來了會觸發該函式
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("---new api recv:" + s);
}
};
// 監聽佇列
channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);
}
// 老方法 消費者 MQ 在3,4以下 用次方法,
private static void oldApi() throws IOException, TimeoutException, InterruptedException {
// 創建連接
Connection connection = ConnectionUtils.getConnection();
// 創建頻道
Channel channel = connection.createChannel();
// 定義佇列消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//監聽佇列
channel.basicConsume(Send.QUEUE_NAME, true, consumer);
while (true) {
// 發貨體
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
byte[] body = delivery.getBody();
String s = new String(body);
System.out.println("---Recv:" + s);
}
}
}
右上角有可以設定頁面重繪頻率,然后可以在UI界面直接手動消費掉,如下圖:

?
簡單佇列的不足:耦合性過高,生產者一一對應消費者,如果有多個消費者想消費佇列中資訊就無法實作了,
2. WorkQueue 作業佇列
Simple佇列中只能一一對應的生產消費,實際開發中生產者發訊息很簡單,而消費者要跟業務結合,消費者接受到訊息后要處理從而會耗時,「可能會出現佇列中出現訊息積壓」,所以如果多個消費者可以加速消費,
?
1. round robin 輪詢分發
代碼編程一個生產者兩個消費者:
package com.sowhat.mq.work;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取 channel
Channel channel = connection.createChannel();
// 宣告佇列
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i <50 ; i++) {
String msg = "hello-" + i;
System.out.println("WQ send " + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發機制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
}
}
};
boolean autoAck = true;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發機制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000 );
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
}
}
};
boolean autoAck = true;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
現象:消費者1 跟消費者2 處理的資料量完全一樣的個數:消費者1:處理偶數 消費者2:處理奇數 這種方式叫輪詢分發(round-robin)結果就是不管兩個消費者誰忙,「資料總是你一個我一個」,MQ 給兩個消費發資料的時候是不知道消費者性能的,默認就是雨露均沾,此時 autoAck = true,
?
2. 公平分發 fair dipatch
如果要實作公平分發,要讓消費者消費完畢一條資料后就告知MQ,再讓MQ發資料即可,自動應答要關閉!
package com.sowhat.mq.work;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取 channel
Channel channel = connection.createChannel();
// s宣告佇列
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 每個消費者發送確認訊息之前,訊息佇列不發送下一個訊息到消費者,一次只發送一個訊息
// 從而限制一次性發送給消費者到訊息不得超過1個,
int perfetchCount = 1;
channel.basicQos(perfetchCount);
for (int i = 0; i <50 ; i++) {
String msg = "hello-" + i;
System.out.println("WQ send " + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取通道
final Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 保證一次只分發一個
channel.basicQos(1);
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發機制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手動回執
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動應答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.work;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連接
Connection connection = ConnectionUtils.getConnection();
// 獲取通道
final Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 保證一次只分發一個
channel.basicQos(1);
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發機制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手動回執
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動應答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
}
}
結果:實作了公平分發,消費者2 是消費者1消費數量的2倍,
3. publish/subscribe 發布訂閱模式
類似公眾號的訂閱跟發布,無需指定routingKey:
?
解讀:
-
一個生產者多個消費者
-
每一個消費者都有一個自己的佇列
-
生產者沒有把訊息直接發送到佇列而是發送到了
交換機轉化器(exchange), -
每一個佇列都要系結到交換機上,
-
生產者發送的訊息經過交換機到達佇列,從而實作一個訊息被多個消費者消費,
生產者:
package com.sowhat.mq.ps;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 分發= fanout
// 發送訊息
String msg = "hello ps ";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("Send:" + msg);
channel.close();
connection.close();
}
}

?
訊息哪兒去了?丟失了,在RabbitMQ中只有佇列有存盤能力,「因為這個時候佇列還沒有系結到交換機 所以訊息丟失了」,消費者:
package com.sowhat.mq.ps;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String QUEUE_NAME = "test_queue_fanout_email";
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 佇列宣告
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 系結佇列到交換機轉發器
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );
// 保證一次只分發一個
channel.basicQos(1);
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發機制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手動回執
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動應答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.ps;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String QUEUE_NAME = "test_queue_fanout_sms";
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 佇列宣告
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 系結佇列到交換機轉發器
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );
// 保證一次只分發一個
channel.basicQos(1);
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發機制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手動回執
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動應答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

?「同時還可以自己手動的添加一個佇列監控到該exchange」

?
4. routing 路由選擇 通配符模式
Exchange(交換機,轉發器):「一方面接受生產者訊息,另一方面是向佇列推送訊息」,匿名轉發用 "" 表示,比如前面到簡單佇列跟WorkQueue,fanout:不處理路由鍵,「不需要指定routingKey」,我們只需要把佇列系結到交換機,「訊息就會被發送到所有到佇列中」,direct:處理路由鍵,「需要指定routingKey」,此時生產者發送資料到時候會指定key,任務佇列也會指定key,只有key一樣訊息才會被傳送到佇列中,如下圖
?
package com.sowhat.mq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// exchange
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String msg = "hello info!";
// 可以指定型別
String routingKey = "info";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("Send : " + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.routing;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發機制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手動回執
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動應答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.routing;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 系結種類似 Key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發機制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手動回執
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自動應答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
WebUI:
?
缺點:路由key必須要明確,無法實作規則性模糊匹配,
5. Topics 主題
將路由鍵跟某個模式匹配,# 表示匹配 >=1個字符, *表示匹配一個,生產者會帶routingKey,但是消費者的MQ會帶模糊routingKey,

?
商品:發布、洗掉、修改、查詢,
package com.sowhat.mq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String msg = "商品!";
// 可以指定型別
String routingKey = "goods.find";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("Send : " + msg);
channel.close();
connection.close();
}
}
---
package com.sowhat.mq.topic;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static final String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發機制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【1】:" + s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【1】 done");
// 手動回執
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自動應答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
---
package com.sowhat.mq.topic;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static final String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 此乃重點
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override // 事件觸發機制
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "utf-8");
System.out.println("【2】:" + s);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("【2】 done");
// 手動回執
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自動應答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
6. MQ的持久化跟非持久化
因為訊息在記憶體中,如果MQ掛了那么訊息也丟失了,所以應該考慮MQ的持久化,MQ是支持持久化的,
// 宣告佇列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
boolean durable就是表明是否可以持久化,如果我們將程式中的durable = false改為true是不可以的!因為我們已經定義過的test_work_queue,這個queue已宣告為未持久化的,結論:MQ 不允許修改一個已經存在的佇列引數,
7. 消費者端手動跟自動確認訊息

?
// 自動應答
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
當MQ發送資料個消費者后,消費者要對收到對資訊應答給MQ,
如果autoAck = true 表示「自動確認模式」,一旦MQ把訊息分發給消費者就會把訊息從記憶體中洗掉,如果消費者收到訊息但是還沒有消費完而MQ中資料已洗掉則會導致丟失了正在處理對訊息,
如果autoAck = false表示「手動確認模式」,如果有個消費者掛了,MQ因為沒有收到回執資訊可以把該資訊再發送給其他對消費者,
MQ支持訊息應答(Message acknowledgement),消費者發送一個訊息應答告訴MQ這個訊息已經被消費了,MQ才從記憶體中洗掉,訊息應答模式「默認為 false」,
8. RabbitMQ生產者端訊息確認機制(事務 + confirm)
在RabbitMQ中我們可以通過持久化來解決MQ服務器例外的資料丟失問題,但是「生產者如何確保資料發送到MQ了」?默認情況下生產者也是不知道的,如何解決 呢?
1. AMQP事務
第一種方式AMQP實作了事務機制,類似mysql的事務機制,txSelect:用戶將當前channel設定為transition模式,txCommit:用于提交事務,txRollback:用于回滾事務,
以上都是對生產者對操作,
package com.sowhat.mq.tx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TxSend {
public static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello tx message";
try {
//開啟事務模式
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
int x = 1 / 0;
// 提交事務
channel.txCommit();
} catch (IOException e) {
// 回滾
channel.txRollback();
System.out.println("send message rollback");
} finally {
channel.close();
connection.close();
}
}
}
---
package com.sowhat.mq.tx;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TxRecv {
public static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv[tx] msg:" + new String(body, "utf-8"));
}
});
channel.close();
connection.close();
}
}
缺點就是大量對請求嘗試然后失敗然后回滾,會降低MQ的吞吐量,
2. Confirm模式,
「生產者端confirm實作原理」生產者將信道設定為confirm模式,一旦信道進入了confirm模式,所以該信道上發布的資訊都會被派一個唯一的ID(從1開始),一旦訊息被投遞到所有的匹配佇列后,Broker就回發送一個確認給生產者(包含訊息唯一ID),這就使得生產者知道訊息已經正確到達目的佇列了,如果訊息跟佇列是可持久化的,那么確認訊息會在訊息寫入到磁盤后才發出,broker回傳給生產者到確認訊息中deliver-tag域包含了確認訊息到序列號,此外broker也可以設定basic.ack的multiple域,表示這個序列號之前所以資訊都已經得到處理,
Confirm模式最大的好處在于是異步的,第一條訊息發送后不用一直等待回復后才發第二條訊息,
開啟confirm模式:channel.confimSelect()編程模式:
1. 普通的發送一個訊息后就 waitForConfirms()
package com.sowhat.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send1 {
public static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 將channel模式設定為 confirm模式,注意設定這個不能設定為事務模式,
channel.confirmSelect();
String msg = "hello confirm message";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
if (!channel.waitForConfirms()) {
System.out.println("訊息發送失敗");
} else {
System.out.println("訊息發送OK");
}
channel.close();
connection.close();
}
}
---
package com.sowhat.confirm;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv {
public static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv[tx] msg:" + new String(body, "utf-8"));
}
});
}
}
2. 批量的發一批資料 waitForConfirms()
package com.sowhat.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send2 {
public static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 將channel模式設定為 confirm模式,注意設定這個不能設定為事務模式,
channel.confirmSelect();
String msg = "hello confirm message";
// 批量發送
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
// 確認
if (!channel.waitForConfirms()) {
System.out.println("訊息發送失敗");
} else {
System.out.println("訊息發送OK");
}
channel.close();
connection.close();
}
}
---
接受資訊跟上面一樣
3. 異步confirm模式,提供一個回呼方法,
Channel物件提供的ConfirmListener()回呼方法只包含deliveryTag(包含當前發出訊息序號),我們需要自己為每一個Channel維護一個unconfirm的訊息序號集合,每publish一條資料,集合中元素加1,每回呼一次handleAck方法,unconfirm集合刪掉回應的一條(multiple=false)或多條(multiple=true)記錄,從運行效率來看,unconfirm集合最好采用有序集合SortedSet存盤結構,

?
package com.sowhat.mq.confirm;
import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
public class Send3 {
public static final String QUEUE_NAME = "test_queue_confirm3";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生產者呼叫confirmSelect
channel.confirmSelect();
// 存放未確認訊息
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// 添加監聽通道
channel.addConfirmListener(new ConfirmListener() {
// 回執有問題的
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("--handleNack---multiple");
confirmSet.headSet(deliveryTag + 1).clear();
} else {
System.out.println("--handleNack-- multiple false");
confirmSet.remove(deliveryTag);
}
}
// 沒有問題的handleAck
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("--handleAck---multiple");
confirmSet.headSet(deliveryTag + 1).clear();
} else {
System.out.println("--handleAck--multiple false");
confirmSet.remove(deliveryTag);
}
}
});
// 一般情況下是先開啟 消費者,指定好 exchange跟routingkey,如果生產者等routingkey 就會觸發這個return 方法
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("---- handle return----");
System.out.println("replyCode:" + replyCode );
System.out.println("replyText:" +replyText );
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
});
String msgStr = "sssss";
while(true){
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());
confirmSet.add(nextPublishSeqNo);
Thread.sleep(1000);
}
}
}
總結:AMQP模式相對來說沒Confirm模式性能好些,推薦使用后者,
9. RabbitMQ延遲佇列 跟死信
淘寶訂單付款,驗證碼等限時型別服務,
headers = new HashMap();/n headers.put(/"my1/",/"111/");/n headers.put(/"my2/",/"222/");/n AMQP.BasicProperties build = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding(/"utf-8/").expiration(/"10000/").headers(headers).build();/n","classes":[]}" data-cke-widget-upcasted="1" data-cke-widget-keep-attr="0" data-widget="codeSnippet">
Map<String,Object> headers = new HashMap<String,Object>();
headers.put("my1","111");
headers.put("my2","222");
AMQP.BasicProperties build = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();
死信的處理:

?
10. SpringBoot Tpoic Demo
需求圖:

?新建SpringBoot 專案添加如下依賴:
/norg.springframework.boot /nspring-boot-starter-amqp /n /n","classes":[]}" data-cke-widget-upcasted="1" data-cke-widget-keep-attr="0" data-widget="codeSnippet">
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1. 生產者
application.yml
spring:
rabbitmq:
host: 127.0.0.1
username: admin
password: admin
測驗用例:
package com.sowhat.mqpublisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqpublisherApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
void userInfo() {
/**
* exchange,routingKey,message
*/
this.amqpTemplate.convertAndSend("log.topic","user.log.error","Users...");
}
}
2. 消費者
application.xml
spring:
rabbitmq:
host: 127.0.0.1
username: admin
password: admin
# 自定義配置
mq:
config:
exchange_name: log.topic
# 配置佇列名稱
queue_name:
info: log.info
error: log.error
logs: log.logs
點擊并拖拽以移動
三個不同的消費者:
package com.sowhat.mqconsumer.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @QueueBinding value屬性:用于系結一個佇列,@Queue去查找一個名字為value屬性中的值得佇列,如果沒有則創建,如果有則回傳
* type = ExchangeTypes.TOPIC 指定交換器型別,默認的direct交換器
*/
@Service
public class ErrorReceiverService {
/**
* 把一個方法跟一個佇列進行系結,收到訊息后系結給msg
*/
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/hehe199316/p/@Queue(value ="${mq.config.queue_name.error}"),
exchange = @Exchange(value = "https://www.cnblogs.com/hehe199316/p/${mq.config.exchange_name}", type = ExchangeTypes.TOPIC),
key = "*.log.error"
)
)
public void process(String msg) {
System.out.println(msg + " Logs...........");
}
}
---
package com.sowhat.mqconsumer.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @QueueBinding value屬性:用于系結一個佇列,
* @Queue去查找一個名字為value屬性中的值得佇列,如果沒有則創建,如果有則回傳
*/
@Service
public class InfoReceiverService {
/**
* 添加一個能夠處理訊息的方法
*/
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/hehe199316/p/@Queue(value ="${mq.config.queue_name.info}"),
exchange = @Exchange(value = "https://www.cnblogs.com/hehe199316/p/${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),
key = "*.log.info"
))
public void process(String msg){
System.out.println(msg+" Info...........");
}
}
--
package com.sowhat.mqconsumer.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @QueueBinding value屬性:用于系結一個佇列,
* @Queue去查找一個名字為value屬性中的值得佇列,如果沒有則創建,如果有則回傳
*/
@Service
public class LogsReceiverService {
/**
* 添加一個能夠處理訊息的方法
*/
@RabbitListener(bindings = @QueueBinding(
value = https://www.cnblogs.com/hehe199316/p/@Queue(value ="${mq.config.queue_name.logs}"),
exchange = @Exchange(value = "https://www.cnblogs.com/hehe199316/p/${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),
key = "*.log.*"
))
public void process(String msg){
System.out.println(msg+" Error...........");
}
}
詳細安裝跟代碼看參考下載:
總結
如果需要指定模式一般是在消費者端設定,靈活性調節,
| 模式 | 生產者Queue | 生產者exchange | 生產者routingKey | 消費者exchange | 消費者queue | routingKey |
|---|---|---|---|---|---|---|
| Simple(簡單模式少用) | 指定 | 不指定 | 不指定 | 不指定 | 指定 | 不指定 |
| WorkQueue(多個消費者少用) | 指定 | 不指定 | 不指定 | 不指定 | 指定 | 不指定 |
| fanout(publish/subscribe模式) | 不指定 | 指定 | 不指定 | 指定 | 指定 | 不指定 |
| direct(路由模式) | 不指定 | 指定 | 指定 | 指定 | 指定 | 消費者routingKey精確指定多個 |
| topic(主題模糊匹配) | 不指定 | 指定 | 指定 | 指定 | 指定 | 消費者routingKey可以進行模糊匹配 |

?

完整的Java初級,高級對應的學習路線和資料!專注于java開發,分享java基礎、原理性知識、JavaWeb實戰、spring全家桶、設計模式、分布式及面試資料、開源專案,助力開發者成長!
歡迎關注微信公眾號:碼邦主

原作者:sowhat1412
原文鏈接:https://mp.weixin.qq.com/s/DUhHC2Oum7LNJnY76pbrxQ
來源:公眾號
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/227876.html
標籤:Java
