新的閱讀體驗地址:http://www.zhouhong.icu/post/141
本篇文章所有的代碼:https://github.com/Tom-shushu/Distributed-system-learning-notes/tree/master/rabbitmq-api-demo
一、初識RabbitMQ
是一個開源的訊息代理和佇列服務器,用來通過普通協議在完全不同的應用之間共享資料,RabbitMQ是使用Erlang語言來撰寫的,并且RabbitMQ是基于AMQP協議的,AMQP協議Advanced Message Queuing Protocol(高級訊息佇列協議)
定義:具有現代特征的二進制協議,是一個提供統一訊息服務的應用層標準高級訊息佇列協議, 是應用層協議的一個開放標準,為面向訊息中間件設計,
AMQP專業術語:
- Server:又稱broker,接受客戶端的鏈接,實作AMQP物體服務
- Connection:連接,應用程式與broker的網路連接
- Channel:網路信道,幾乎所有的操作都在channel中進行,Channel是進行訊息讀寫的通道,客戶端可以建立多個channel,每個channel代表一個會話任務,
- Message:訊息,服務器與應用程式之間傳送的資料,由Properties和Body組成.Properties可以對訊息進行修飾,必須訊息的優先級、延遲等高級特性;Body則是訊息體內容,
- virtualhost: 虛擬地址,用于進行邏輯隔離,最上層的訊息路由,一個virtual host里面可以有若干個Exchange和Queue,同一個Virtual Host 里面不能有相同名稱的Exchange 或 Queue,
- Exchange:交換機,接收訊息,根據路由鍵轉單訊息到系結佇列
- Binding: Exchange和Queue之間的虛擬鏈接,binding中可以包換routing key
- Routing key: 一個路由規則,虛擬機可用它來確定如何路由一個特定訊息,(如負載均衡)
RabbitMQ整體架構

二、單機版快速安裝
網不好的朋友也直接使用我下載下來的安裝包(下載有時候會卡主):
鏈接:https://pan.baidu.com/s/1diapYC19UlDy4G-4lgZWHA
提取碼:jf5r
復制這段內容后打開百度網盤手機App,操作更方便哦
- 1、首先在Linux上進行一些軟體的準備作業
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
- 2、下載安裝必要的軟體
- 備用地址:
- https://www.rabbitmq.com/install-rpm.html#downloads
- https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.4/erlang-23.0.4-1.el7.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
- 3、安裝服務命令
rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm?
- 4、啟動
啟動服務
systemctl start rabbitmq-server
查看是否啟動
lsof -i:5672
- 5、啟動、安裝web管理插件(管控臺)
rabbitmq-plugins enable rabbitmq_management
- 6、查看管理埠有沒有啟動
lsof -i:15672
或者:
netstat -tnlp | grep 15672
- 7、添加用戶
#添加用戶 用戶名 admin 密碼 admin web管理工具可用此用戶登錄
sudo rabbitmqctl add_user admin admin
#設定用戶角色 管理員
sudo rabbitmqctl set_user_tags admin administrator
#設定用戶權限(接受來自所有Host的所有操作)
sudo rabbitmqctl set_permissions -p / admin "." "." ".*"
#查看用戶權限
sudo rabbitmqctl list_user_permissions admin
- 重新啟動
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
- 訪問:http://192.168.2.121:15672/ 使用 admin 登錄

- 代碼測驗
- 引入依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
2.發送端:
package com.zhouhong.rabbitmq.api.helloworld; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Sender { public static void main(String[] args) throws Exception { // 1 創建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.121"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); // 2 創建Connection Connection connection = connectionFactory.newConnection(); // 3 創建Channel Channel channel = connection.createChannel(); // 4 宣告 String queueName = "test001"; // 引數: queue名字,是否持久化,獨占的queue(僅供此連接),不使用時是否自動洗掉, 其他引數 channel.queueDeclare(queueName, false, false, false, null); Map<String, Object> headers = new HashMap<String, Object>(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers).build(); for(int i = 0; i < 5;i++) { String msg = "Hello World RabbitMQ " + i; channel.basicPublish("", queueName , props , msg.getBytes()); } } }
3.接收端
package com.zhouhong.rabbitmq.api.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = "test001";
// durable 是否持久化訊息
channel.queueDeclare(queueName, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 引數:佇列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 回圈獲取訊息
while(true){
// 獲取訊息,如果沒有訊息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:" + msg);
}
}
}
4.結果(先啟動接收端進行監控,再啟動發送端)
收到訊息:Hello World RabbitMQ 0
收到訊息:Hello World RabbitMQ 1
收到訊息:Hello World RabbitMQ 2
收到訊息:Hello World RabbitMQ 3
收到訊息:Hello World RabbitMQ 4
三、RabbitMQ----交換機

- Name:交換機名稱,
- Type:交換機型別 direct、topic、fanout、headers,
- Durability:是否持久化,ture為持久化,
- Auto Delete :當最后一個系結道Exchange上的佇列洗掉后,自動洗掉該Exchange,
- Internal:當前Exchange是否用于RabbitMQ內部使用,默認為False,
- Arguments:擴展引數,用于擴展AMQP協議自制定化使用,
- DirectExchange的訊息被轉發道RouteKey中指定的Queue,
交換機-----Direct exchange
Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進行任何系結操作,訊息傳遞時,RouteKey必須完全匹配才會被佇列接收,否則該訊息會被拋棄,代碼:
- 發送端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4DirectExchange {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
//2 創建Connection
Connection connection = connectionFactory.newConnection();
//3 創建Channel
Channel channel = connection.createChannel();
//4 宣告
String exchangeName = "test_direct_exchange";
//必須要和接收端 routingKey 一一對應
String routingKey = "test_direct_routingKey";
//5 發送
String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
- 接收端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4DirectExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 宣告
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test_direct_routingKey";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化訊息
QueueingConsumer consumer = new QueueingConsumer(channel);
//引數:佇列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//回圈獲取訊息
while(true){
//獲取訊息,如果沒有訊息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:" + msg);
}
}
}
交換機-----topic exchange
exchange 將Routekey和某個topic進行一個模糊匹配,發送給對應佇列、可以用通配符進行匹配
比如下面例子
代碼:
- 接收端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4TopicExchange1 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 宣告
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
// 只能匹配一個 例如:user.txt、user.py都可以,但是user.txt.py 不行
//String routingKey = "user.*";
// user.txt、user.py 、user.txt.py 都可以匹配到
String routingKey = "user.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化訊息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 引數:佇列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("consumer1 start.. ");
// 回圈獲取訊息
while(true){
// 獲取訊息,如果沒有訊息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
}
}
}
- 發送端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4TopicExchange {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
//2 創建Connection
Connection connection = connectionFactory.newConnection();
//3 創建Channel
Channel channel = connection.createChannel();
//4 宣告
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 發送
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}
交換機-----Fanout exchange 廣播模式
1.不處理路由鍵,只需要簡單的將佇列系結到交換機上, 2.發送到交換機的訊息都會被轉發到與該交換機系結的所有佇列上, 3.Fanout交換機轉發訊息是最快的,
代碼:見示例文章開始GitHub地址
四、RabbitMQ高級特性
1、訊息如何保障 100% 的投遞成功
生產端的可靠性投遞的標志:
1、訊息成功發出 2、mq節點成功接收 3、發送端MQ節點確認應答 4、完善的訊息補償機制 解決:訊息資訊落庫,對訊息狀態進行打標
冪等性
1、 select count(1) from t_order where id = 唯一id(或)指紋碼
2、唯一id或指紋碼機制,利用資料庫主鍵去重
2、Confirm
第一步:再channel上開啟確認模式:channel.confirmSelect();
第二步:再channel上添加監聽:addConfirmListener,監聽成功和失敗的回傳結果,根據具體的結果對訊息進行重新發送、或記錄日期等后續處理!
3、return訊息機制
ReturnListener用于處理不可路由的訊息
我們的訊息生產者,通過指定一個Exchage和Routingkey,把訊息送達某一個佇列中去,然后我們的消費者監聽佇列,進行消費處理操作,如果沒有合適的佇列,則會由returnListener進行接受,
Mandatory:如果為true,則監聽器會接收到路由不可達的訊息,然后進行后續處理,如果為false,那么broker端自動洗掉該訊息,
4、消費端ACK與重回佇列
消費端ACK:
- 在作業的時候一般不會選擇自動ack
- 消費端的手工ack分為兩種ACK和NACK
- 消費端進行消費的時候,如果由于業務例外我們可以進行日志的記錄,然后進行補償,這種建議回復NACK,不要重回佇列
- 如果由于服務器宕機等嚴重問題,那我們就需要手工進行ACK保障消費端消費成功
消費端的重回佇列
- 是為了對沒有處理成功的訊息,把訊息重新會投遞給broker,
- 重回佇列,會回到佇列的尾部
- 也會造成一條訊息一直重復投遞,死回圈了
- 在實際應用中,都會關閉重回佇列,也就是設定為false
5、TTL佇列和訊息
TTL: time to live的縮寫,也就是生存時間,
- RabbitMQ 支持訊息過期時間,在訊息發送時可以進行指定
- RabbitMQ支持佇列過期時間,從訊息入佇列開始計算,只要超過了佇列的超時間時間配置,那么訊息會自動的清除
死佇列: DLX,Dead-Letter-Exchange
- 利用DLX,當訊息在一個佇列中變成死信(dead message)之后,它能被重新publish到另一個Exchange,這個Exchange就是DLX.
訊息變成死信的幾種情況
- 訊息被拒絕 并且requeue = false
- 訊息TTL過期
- 佇列達到最大長度
DLX也是一個正常的Exchange,實際上是一個屬性控制
- 當佇列中有死信時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange上,進而被路由到另一個佇列.
- 可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補rabbitMQ3.0以前的immediate引數功能,
- 在正常佇列上添加引數:arguments.put("x-dead-letter-exchange","dlx.exchange");這樣訊息過期、requeue、佇列達到最大長度時,就可以直接路由到死信佇列,
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/270911.html
標籤:其他
上一篇:專案經理VS產品經理VS架構師
