中間件及單體架構
訊息中間件
在實際的專案中,大部分的企業專案開發中,在早期都采用的是單體的架構模式

單體架構
把所有的業務和模塊、源代碼、靜態資源檔案等都放在一個工程中,如果其中的一個模塊升級或迭代發生一個很小的變動都會重新編譯和重新部署專案,(耦合度高)
這種的架構存在的問題就是:
- 耦合度太高
- 運維的成本過高
- 不易維護
- 服務器的成本高(服務器資源的浪費)
- 以及升級架構的復雜度也會增大(局限性)
這樣就有后續的分布式架構系統
分布式架構

單體架構一個請求由一個系統共同完成,分布式架構系統就是一個請求由服務器的多個服務(服務或者系統)協同處理完成(跨JVM的請求呼叫)
和單體架構不同的是,單體架構師一個請求發起jvm調度執行緒(tomcat執行緒池)分配執行緒thread 來處理請求指導釋放
分布式系統是:一個請求是由多個系統共同協同完成,jvm和環境都可能是獨立
比如:單體架構就是建造一個小房子很快就能搞定,但要建造大型的建筑就必須是各個環節的協同和分布,這樣的目的也是專案發展到后期的時候需要部署和思考的問題
分布式系統存在的問題:
- 學習成本高,技術堆疊過多
- 運維成本和服務器成本增高
- 人員的成本也會增高
- 專案的負載度會上升
- 面臨的錯誤和容錯性會成倍增加
- 占用的服務器埠和通訊的選擇成本高
- 安全性的考慮和因素逼迫可能選擇RMI/MQ相關的服務器端通訊
好處:
- 服務系統的獨立,占用的服務器資源減少和占用的硬體成本減少(可以合理的分配服務資源,不造成服務器資源的浪費)
- 系統的獨立維護和部署,耦合度降度,可插拔性
- 系統的架構和技術堆疊的選擇可以變得靈活
- 彈性的部署,不會造成平臺因部署造成的癱瘓和停服的狀態
基于訊息中間件的分布式系統的架構

訊息中間件(是否具有通訊的能力,是否擁有高性能,易靈活,支持多平臺,持久化功能,高可靠性,容錯性,訊息的分發策略等)
- 利用可靠的訊息傳遞機制進行系統和系統直接的通訊
- 通過提供訊息傳遞和訊息的排隊機制,它可以在分布式系統環境下擴展行程間的通訊
訊息中間件的應用場景
1、跨系統的資料傳遞
2、高并發的流量削峰
3、資料的分發和異步處理
方法堆疊呼叫的程序:串行執行

并行執行:(異步編程)

4、大資料分布與傳遞
5、分布式事務
常見的訊息中間件
ActiveMQ,RabbitMQ,Kafka,RocketMQ等
訊息中間件的本質及設計
本質:它是一種接受資料、接受請求、存盤資料、發送資料等功能的技術服務
MQ訊息佇列:負責資料的傳接受,存盤和傳遞,所以性能要高于普通的服務和技術,要遵循協議

訊息中間件的核心組成部分
1、訊息的協議
2、訊息持久化機制
3、訊息的分發策略
4、訊息的高可用、高可靠
5、訊息的容錯機制
訊息佇列的協議
什么是協議
訊息中間件負責資料的傳遞、存盤、和分發訊息三個部分,資料的存盤和分發的程序中要遵循某種約定成俗的規范,無論是采用底層的TCP/IP,UDP協議還是其他在這之上自己去開發的協議等,這些約定成俗的規范就稱之為:協議
協議
1、計算機底層作業系統和應用程式通訊時共同遵守的一組約定,只有遵循共同的約定和規范,系統和底層作業系統之間才能相互交流(為了資料的流向、通訊、接受、分發)
2、和一般的網路應用程式的不同,它主要負責資料的接受和傳遞,所以性能比較的高
3、協議對資料格式和計算機之間交換資料都必須嚴格遵循規范
網路協議的三要素
1、語法:語法是用戶資料與控制資訊的結構與格式以及資料出現的順序
2、語意:語意是解釋控制資訊每個部分的意義,他規定了需要發出何種控制資訊,以及完成的動作與做出什么樣的回應
3、時序:時序是對事件發生順序的詳細說明
Http請求協議:
1、語法:http規定了請求報文和回應報文的格式
2、語意:客戶端主動發起請求稱之為請求(這是一種定義,同時你發起的是post/get請求)
3、時序:一個請求對應一個回應(一定先有請求再有回應,這個是時序)
訊息中間件采用的協議有:openwire,AMQP,MQTT,Kafka,OpenMessage協議
面試:為什么訊息中間件不直接使用http協議?
1、因為http請求報文頭和回應報文頭是比較復雜的,包含了cookie,資料的加密解密,狀態碼,回應碼等附加的功能,但是對于一個訊息而言,不需要這么復雜,只要負責資料傳輸、存盤、分發就行,一定要準球的是高性能,盡量簡潔、快速
2、大部分情況下http大部分都是短鏈接(在實際的互動程序中,一個請求到回應很有可能會中斷,中斷以后就不會進行持久化,會造成請求的丟失),這樣就不利于訊息中間件的業務場景,因為訊息中間件可能是一個長期的獲取訊息的程序,出現問題和故障要對資料或訊息進行持久化等,目的是為了保證訊息和資料的高可靠和穩健的運行,(長鏈接:當服務器出現故障,重啟后依然可以進行資料的傳遞)
AMQP協議
Advance Message Queuing Protocol 是高級訊息佇列協議,提供統一訊息服務的應用層標準高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計(基于此協議的客戶端與訊息中間件可傳遞訊息,并不受客戶端、中間件不同產品,不同的開發語言等條件的限制),Erlang中的實作有rabbitMQ等
特性:
1、分布式事務支持
2、訊息的持久化支持
3、高性能和高可靠的訊息處理優勢
支持者: rabbitMQ. activeMQ
MQTT協議
Message Queueing Telementry Transport 訊息佇列是IBM 開發的一個即時通訊協議,物聯網系統架構中的重要組成部分
特點:
1、輕量
2、結構簡單
3、傳輸快,不支持事務
4、沒有持久化設計
應用場景:
1、適用于計算能力有限
2、低帶寬
3、網路不穩低的場景
支持者:RabbitMQ activeMQ
OpenMessage協議
阿里,雅虎,滴滴出行等公司共同參與創立的分布式訊息中間件、流處理等領域的應用開發標準,
特點:
1、結構簡單
2、決議速度快
3、支持事務和持久化設計
Kafka 協議
基于TCP/IP的二進制協議,訊息內部是通過長度來分割,有一些基本資料型別組成
特點:
1、結構簡單
2、決議速度快
3、無事務支持
4、有持久化設計
協議:是在tcp/ip協議基礎之上構建的一種約定成俗的規范和機制,主要目的是可以讓客戶端(應用程式 Java go) 進行溝通和通訊,并且這種協議下規范必須具有持久性、高可用、高可靠的性能
訊息佇列的持久化
持久化
簡單來說就是將資料存入磁盤,而不是存在記憶體中隨服務器重啟斷開而消失,使資料能夠永久保存

訊息分發策略
MQ訊息佇列有如下的角色:
1、生產者
2、存盤訊息
3、消費者

輪詢分發(公平的,不會因為服務器的快慢、性能的高低而造成資料的傾斜)
公平分發(能者多勞,會造成資料的傾斜)
訊息佇列的高可用和高可靠
什么是高可用機制
高可用:是指產品在規定的條件和規定的時刻或事件內處于可執行規定功能狀態的能力
當業務量增加時,請求也過大,一臺訊息中間件服務器會觸及硬體(cpu\記憶體\磁盤)的極限,一臺訊息服務器以及無法滿足業務的請求,所以訊息中間件必須支持集群部署,來達到高可用的目的
集群模式
1 master-slave 主從共享資料的部署方式

生產者將消費發送到Master節點,所有的都連接到這個訊息佇列,共享這塊資料區域,Master節點負責寫入,一旦Master掛掉,slave節點繼續服務,從而形成高可用
2 -Master -slave 主從同步部署方式

這種模式寫入訊息同樣在Master主節點上,但主節點會同步資料到slave節點形成副本,和zookeeper或者redis主從機制類似,這樣可以達到負載均衡的效果,如果消費者有多個這樣就可以去不同的節點進行消費,因為訊息的拷貝和同步會占用很大的寬帶和網路資源,在后續的rabbitMQ中會有使用(訊息副本,單寫多讀的機制)
3 - 多主集群同步部署模式

多寫多讀的機制,和上面區別不是很大,但他的寫入可以往任意節點去寫入
4 -多主集群轉發部署模式

(元資料的轉發機制,元資料不會把訊息的本體放入元資料資訊里面,只會把佇列名、鏈接資訊等存入其中)如果你插入的資料是broker-1 中,元資料資訊會存盤資料的相關描述和記錄存放的位置(佇列),它會對描述資訊(元資料)資訊進行同步,如果消費者在broker-2 中進行消費,發現沒有對應的訊息,可以從其他對應的元資料資訊中去查詢,然后回傳對應的訊息資訊,(減少空間存盤和資源浪費)
(例如:買火車票或者黃牛票,第一個黃牛有顧客說要買的演唱會門票,但是沒有,但他回去聯系其他的黃牛詢問,如果有就回傳)
5 - Master-slave與Breoker-cluster組合的方案

實作多主多從的熱備機制來完成訊息的高可用以及資料的熱備機制,在生產規模達到一定階段的時候,使用的頻率較高
集群模式最終目的都是為來保證:訊息服務器不會掛掉,出現故障依然可以抱著訊息服務繼續使用
要么訊息共享、要么訊息同步、要么元資料共享
什么是高可靠機制
高可靠:是指系統可以無故障低持續運行,比如一個系統突然崩潰,報錯,例外等等并不影響線上業務的正常運行,出錯幾率極低
如何保證中間件訊息的可靠性
1、訊息的傳輸:通過協議來保證系統間資料決議的正確性
2、訊息的存盤可靠:通過持久化來保證訊息的可靠性
RabbitMQ 入門和安裝
官網:https://www.rabbitmq.com
mac
安裝Brew
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
下載安裝erlang和RabbitMQ
// 安裝erlang環境
brew install erlang
// 更新brew資源
brew update
// 執行安裝
brew install rabbitmq
出現版本號表示erlang安裝完成
erl -v
安裝RabbitMq的可視化監控插件
// 切換到MQ目錄,注意你的安裝版本可能不是3.8.14
cd /usr/local/Cellar/rabbitmq/3.8.14/
// 啟用rabbitmq management插件
sudo sbin/rabbitmq-plugins enable rabbitmq_management
配置環境變數
sudo vi /etc/profile
//加入以下兩行
export RABBIT_HOME=/usr/local/Cellar/rabbitmq/3.8.14
export PATH=$PATH:$RABBIT_HOME/sbin
:wq! 強制保存退出
// 立即生效
source /etc/profile
后臺啟動rabbitMQ
// 后臺啟動
sudo rabbitmq-server -detached
// 查看狀態
sudo rabbitmqctl status
// 訪問可視化監控插件的界面
// 瀏覽器內輸入 http://localhost:15672,默認的用戶名密碼都是guest,登錄后可以在Admin那一列選單內添加自己的用戶
sudo rabbitmqctl stop 關閉
檔案查看:
- mac安裝brew(親測):
https://blog.csdn.net/yuanshangshenghuo/article/details/106599836
Mac安裝erlang和rabbitmq:
https://www.jianshu.com/p/59b97f388268
mac + RabbitMQ 安裝:https://www.jianshu.com/p/60c358235705
所遇問題:
-
brew install opencv@2 時報錯 Error: Can’t create update lock in /usr/local/var/homebrew/locks!
https://blog.csdn.net/u012135425/article/details/89095183 -
Mac安裝Homebrew的正確姿勢:
https://www.jianshu.com/p/e0471aa6672d?utm_campaign=hugo -
chown: /usr/local: Operation not permitted:
-
https://blog.csdn.net/qq_15722433/article/details/99735837
RabbitMQ 管理界面及權限操作
// 安裝rabbitmq management插件
sudo sbin/rabbitmq-plugins enable rabbitmq_management
// 或是
rabbitmq-plugins enable rabbitmq_management
瀏覽器內輸入 http://localhost:15672,默認的用戶名密碼都是guest,(只能在localhost本機下訪問),需要添加遠程登錄的用戶
安裝完成以后,重啟服務即可
// 后臺啟動
sudo rabbitmq-server -detached
// Linux 啟動
systemctl restart rabbitmq-server
注意:在對應服務器(阿里云,騰訊云等)的安全組中開放
15672的埠!!!
// 新增用戶 用戶名 密碼
sudo rabbitmqctl add_user admin admin
// 設定用戶admin分配操作權限
sudo rabbitmqctl set_user_tags admin administrator
用戶級別:
- 1、administrator:(超級管理員)可以登錄控制臺、查看所有資訊、可以對rabbitmq進行管理
- 2、monitoring :監控者,登錄控制臺,查看所有資訊
- 3、policymaker:策略制定者 登錄控制臺,指定策略
- 4、management: 普通管理員 登錄控制臺
- 5、none : 不能訪問management plugin
// 為用戶添加資源權限(授予admin 訪問虛擬機所有根節點的權限)
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
小結
# 添加賬號密碼
rabbitmqctl add_user 賬號 密碼
# 賦予角色
rabbitmqctl set_user_tags 賬號 administrator
# 修改密碼
rabbitmqctl change_password 用戶名 新密碼
# 洗掉用戶
rabbitmqctl delete_user 用戶名
# 查看用戶清單
rabbitmqctl list_users
# 為用戶設定 administrator角色
rabbitmqctl set_permissions -p / 用戶名 ".*" ".*" ".*"
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
# 啟動服務器
rabbitmq-server -detached
# 關閉服務器
rabbitmqctl stop
# 查看所有佇列資訊
rabbitmqctl list_queues
# 關閉應用
rabbitmqctl stop_app
# 啟動應用,和上述關閉命令配合使用,達到清空佇列的目的
rabbitmqctl start_app
# 清除所有佇列
rabbitmqctl reset
# 服務器重啟
systemctl restart rabbitmq-server
- rabbitmq 管理及常用命令 https://blog.csdn.net/wochunyang/article/details/52449559
Docker安裝RabbitMQ
虛擬化容器技術-Docker的安裝
(1) yum 包更新到最新
> yum update
(2) 安裝需要的軟體包 yum-util 提供yum-config-manager功能,另外兩個是devicemapper驅動依賴的
> yum install -y yum-utils device-mapper-persistent-data lvm2
(3) 設定yum源為阿里云
> yum-config-manager --add-repo http://mirrors.aliyun.com/docer- ce/linux/centos/docker-ce.repo
(4) 安裝docker
> yum install docker-ce -y
(5) 安裝后查看docker 版本
> docker -v
(6) 安裝阿里云的鏡像加速器
sudo mkdir -p /etc/docker
sudo tee /etc/doker/daemon.json <<-'EOF'
{
"registry-mirrors":["https://0wrdwnn6.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
RabbitMQ的角色分類
none :
不能訪問management plugin(無法登錄界面)
management:查看呢自己相關節點資訊
列出自己可以通過AMQP登入的虛擬機
查看自己的虛擬機節點 virtual hosts 的queues,exchange和bindings資訊
查看和關閉自己的channels(通道)和connections(鏈接)
查看有關自己的虛擬機節點 virtual hosts 的統計資訊,包括其他用戶在這個節點 virtual hosts 中的活動資訊
Policymaker
包含management所有權限
查看和創建、洗掉自己的virtual hosts(虛擬機節點)所屬的policies和parameters資訊
Monitoring
包含management所有權限
羅列出所有的virtual hosts,包括不能登錄的virtual hosts
查看其他用戶的connections和channels資訊
查看節點級別的資料 如:clustering和memory使用情況
查看所有的 virtual hosts 的全域統計資訊
Administrator
最高權限
可以創建和洗掉virtual hosts
可以查看、創建和洗掉users
查看創建permissions
關閉所有用戶的connections
RabbitMQ入門案例
simple簡單模式
官方檔案:https://www.rabbitmq.com/getstarted.html
生產者—>佇列—>消費者

構建一個maven工程

匯入依賴
<!-- java 原生rabbitmq依賴-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
生產者
// 簡單模式----生產者
public class Producer {
public static void main(String[] args) {
// 所有的中間件技術都是基于tcp/ip協議基礎上構建新型的協議規范,rabbitmq遵循的是amqp
// 協議遵循 ip port
// 1、創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 通過連接工廠設定賬號密碼
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、創建連接connection
connection = connectionFactory.newConnection("生產者");
// 3、通過連接獲取通道channel
channel = connection.createChannel();
// 4、 通過創建交換機、宣告佇列,系結關系,路由key,發送訊息和接受訊息
String queueName = "queue1";
/**宣告佇列
* @params1 queue 佇列名,
* @params2 durable 是否需要持久化,durable false (訊息是否存盤)
* 如果false 非持久化 true是持久化嗎?非持久化會存盤嗎?
* @params3 exclusive 是否要具有排他性,是否是獨占佇列
* @params4 autoDelete是否要自動洗掉,隨著最后一個消費者消費完畢之后是否把佇列自動洗掉
* @params5 arguments 攜帶的附屬引數(設定佇列的有效期,訊息最大長度)
*/
channel.queueDeclare(queueName, false, false, false, null);
// 5、準備訊息內容
String message = "hello";
// 6、發送訊息給佇列 queue (交換機 ,佇列路由key[佇列名字],訊息的狀態控制【持久化】,訊息主題【訊息內容】)
channel.basicPublish("", queueName, null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}



消費者
// 簡單模式----消費者
public class Customer {
public static void main(String[] args) {
// 所有的中間件技術都是基于tcp/ip協議基礎上構建新型的協議規范,rabbitmq遵循的是amqp
// 協議遵循 ip port
// 1、創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 通過連接工廠設定賬號密碼
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、創建連接connection
connection = connectionFactory.newConnection("生產者");
// 3、通過連接獲取通道channel
channel = connection.createChannel();
// 4、 通過創建交換機、宣告佇列,系結關系,路由key,發送訊息和接受訊息
channel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到訊息是:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受訊息失敗了,,,,");
}
});
System.out.println("開始接受訊息");
// 進行阻斷,接受訊息不關閉
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

非持久化訊息佇列,隨著服務器的關閉,會被移除,而持久化訊息佇列不會被移除
持久化的訊息會存盤,非持久化的訊息也會存盤,但會隨著服務器重啟而丟失
AMQP
Advance Message Queuing Protocol (高級訊息佇列協議) 是應用層協議的一個開發標準,為面向訊息的中間件設計
AMQP生產者流轉程序

面試:rabbitmq 為什么是基于channel去處理的而不是連接?
長連接-----信道channel
AMQP消費者流轉程序
broker (節點)

RabbitMQ的核心組成部分

-
Server:Broker ,接受客戶端的連接,實作AMQP物體服務,安裝rabbitmq-server
-
Connection:連接,應用程式與Broker的網路連接 TCP/IP 三次握手和四次揮手(會出現性能損耗,長連接)
-
Channel:網路信道,幾乎所有的操作都在channel中進行,channel是進行資訊讀寫的通道,客戶端可以建立對各channel,每個channel代表一個會話任務
-
Message:訊息,服務與應用程式之間傳送的資料,由Properties和body組成,properties是對訊息進行修飾,(如:訊息的優先級、延遲等高級特性)body則是訊息體的內容
-
Virtual Host:虛擬地址,由于進行邏輯隔離,最上層的訊息路由,一個虛擬主機路由可以由若干個交換機和佇列,同一個虛擬主機里面不能有相同名字的交換機(做隔離,虛擬機節點)
-
exchange:交換機,接受訊息,根據路由鍵發送訊息到系結的佇列(不具備訊息存盤的能力)
-
Bindings:系結,交換機和佇列之間的虛擬連接,binding中可以保護多個routing key
-
Routing key:路由key,是一個路由規則,虛擬機可以用它來確定如何路由一個特定訊息(條件,類似于資料庫的條件where routekey= course)
-
queue:佇列,也稱為message queue 訊息佇列,保存訊息并將它轉發給消費者
面試題:可以存在沒有交換機的佇列嗎?exchange
不可能,雖然沒有指定交換機,但是一定會存在一個默認的交換機
RabbitMQ運行流程

RabbitMQ 支持訊息的模式
簡單模式 Simple
理解
模擬生產者往交換機里面發送訊息


模擬消費者接受訊息

注意:
1、無論是簡單模式還是作業佇列模式,都會有一個默認交換機存在
2、發送訊息是交換機來發送,不是佇列發送,交換機會接收到訊息然后推送給佇列,消費者會自動監聽和訂閱佇列,把訊息進行推送
3、如果佇列沒有宣告交換機,一定系結的默認交換機
4、生產程序中進行訊息預覽時 一定要選擇:Nack message requeue true
(ack 會當成真實的消費進行移除,造成資料的丟失!!)
發布與訂閱Fanout模式-Publish/Subscribe
理解(web界面模擬)

建立一個交換機

創建佇列

系結關系
-
佇列界面系結

-
交換機界面系結(都可)

在交換機里發送訊息


接受訊息

發布訂閱模式具體實作
Fanout --是一種廣播機制,它是沒有路由key的模式
生產者
// 生產者
// 如果在界面已經把交換機和佇列的關系系結好,程式中就可以不需要系結了
public class Producer {
public static void main(String[] args) {
// 1、創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
// 通過連接工廠設定賬號密碼
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、通過連接獲取通道channel
channel = connection.createChannel();
// 5、準備發送訊息的內容
String message = "hello fanout_exchange java";
// 6、準備交換機
String exchange = "fanout-exchange";
// 7、定義路由key
String routeKey = "";
// 8、指定交換機的型別
String type = "fanout";
// 9、發送訊息給中間件rabbitmq-server
// (交換機exchange,佇列名稱/routingkey,屬性配置,發送訊息的內容)
channel.basicPublish(exchange, routeKey, null, message.getBytes());
System.out.println("訊息發送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 10、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 11、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
消費者
// 消費者
public class Customer {
private static Runnable runnable = new Runnable() {
public void run() {
// 1、創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
// 獲取訊息佇列的名字
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、從連接中獲取通道channel
channel = connection.createChannel();
// 5、申明佇列queue 存盤資訊
// 如果佇列不存在,則會創建 rabbintmq不允許創建兩個相同的佇列名稱,否則會報錯
// 這里如果queue 已經被創建過一次,可以不需要定義
// channel.queueDeclare("queue1",false,false,false,null);
// 6、定義接受訊息的回呼
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(message.getEnvelope().getDeliveryTag());
System.out.println("收到訊息是:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受訊息失敗了,,,,");
}
});
System.out.println(queueName+"開始接受訊息");
// 進行阻斷,接受訊息不關閉
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
// 啟動三個執行緒去執行
new Thread(runnable,"queue1").start();
new Thread(runnable,"queue2").start();
new Thread(runnable,"queue3").start();
}
}
列印
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
queue1開始接受訊息
queue2開始接受訊息
queue3開始接受訊息
1
1
收到訊息是:hello fanout_exchange java
1
收到訊息是:hello fanout_exchange java
收到訊息是:hello fanout_exchange java
路由Routing(Direct)模式
理解
默認交換機模式就是direct模式

創建交換機

系結佇列,并創建路由key


發送訊息


Direct模式具體實作
生產者
// 生產者
// 如果在界面已經把交換機和佇列的關系系結好,程式中就可以不需要系結了
public class Producer {
public static void main(String[] args) {
// 1、創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
// 通過連接工廠設定賬號密碼
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、通過連接獲取通道channel
channel = connection.createChannel();
// 5、準備發送訊息的內容
String message = "hello fanout_exchange java";
// 6、準備交換機
String exchange = "direct_exchange";
// 7、定義路由key
String routeKey = "email";
// 8、指定交換機的型別
String type = "direct";
// 9、發送訊息給中間件rabbitmq-server
// (交換機exchange,佇列名稱/routingkey,屬性配置,發送訊息的內容)
channel.basicPublish(exchange, routeKey, null, message.getBytes());
System.out.println("訊息發送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 10、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 11、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
消費者
// 消費者
public class Customer {
private static Runnable runnable = new Runnable() {
public void run() {
// 1、創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
// 獲取訊息佇列的名字
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、從連接中獲取通道channel
channel = connection.createChannel();
// 5、申明佇列queue 存盤資訊
// 如果佇列不存在,則會創建 rabbintmq不允許創建兩個相同的佇列名稱,否則會報錯
// 這里如果queue 已經被創建過一次,可以不需要定義
// channel.queueDeclare("queue1",false,false,false,null);
// 6、定義接受訊息的回呼
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(queueName+"收到訊息是:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受訊息失敗了,,,,");
}
});
System.out.println(queueName+"開始接受訊息");
// 進行阻斷,接受訊息不關閉
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
// 啟動三個執行緒去執行
new Thread(runnable,"queue1").start();
new Thread(runnable,"queue2").start();
new Thread(runnable,"queue3").start();
}
}
Topic(主題)模式–Topics
理解
可以支持模糊匹配的路由key

創建交換機

系結佇列 (# 表示0個或多個或多級 *有且只有一級)

發送訊息

Queue1 接收到訊息(以com開頭后面是com.xxx/com.xx.xx/ com都可以)
- com.course.order — queue1 queue2 queue3 收到
- com.course.order.xxx.xxx — queue1 queue3 收到
- course — 都收不到
- com.course.order.user.test — queue1 queue3 queue4 收到
Topic模式具體實作
生產者
public class Producer {
public static void main(String[] args) {
// 1、創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
// 通過連接工廠設定賬號密碼
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、通過連接獲取通道channel
channel = connection.createChannel();
// 5、準備發送訊息的內容
String message = "hello fanout_exchange java";
// 6、準備交換機
String exchange = "topic_exchange";
// 7、定義路由key
String routeKey = "com.order.test.xxx";
// 8、指定交換機的型別
String type = "topic";
// 9、發送訊息給中間件rabbitmq-server
// (交換機exchange,佇列名稱/routingkey,屬性配置,發送訊息的內容)
channel.basicPublish(exchange, routeKey, null, message.getBytes());
System.out.println("訊息發送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 10、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 11、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
消費者等同
引數Headers模式
理解
代碼中傳入引數的位置

創建交換機

系結佇列
發送

完整的宣告創建方式
生產者
// 生產者
public class Producer {
public static void main(String[] args) {
// 1、創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
// 通過連接工廠設定賬號密碼
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、通過連接獲取通道channel
channel = connection.createChannel();
// 5、準備發送訊息的內容
String message = "hello 自己創建交換機 系結關系 佇列";
// 6、準備交換機
String exchangeName = "direct_message_exchange";
// 指定交換機的型別 direct/topic/fanout/headers
String exchaneType = "direct";
// 宣告交換機,所謂的持久化 durable(交換機會不會隨著服務器重啟造成丟失,true就不會丟失,false代表會丟失)
channel.exchangeDeclare(exchangeName,exchaneType,true);
// 8、宣告佇列
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);
// 9、系結佇列
channel.queueBind("queue5",exchangeName,"order");
channel.queueBind("queue6",exchangeName,"order");
channel.queueBind("queue7",exchangeName,"course");
// 10、發送訊息給中間件rabbitmq-server
// (交換機exchange,佇列名稱/routingkey,屬性配置,發送訊息的內容)
channel.basicPublish(exchangeName, "order", null, message.getBytes());
System.out.println("訊息發送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 10、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 11、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
消費者
// 消費者
public class Customer {
private static Runnable runnable = new Runnable() {
public void run() {
// 1、創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
// 獲取訊息佇列的名字
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、從連接中獲取通道channel
channel = connection.createChannel();
// 5、申明佇列queue 存盤資訊
// 如果佇列不存在,則會創建 rabbintmq不允許創建兩個相同的佇列名稱,否則會報錯
// 這里如果queue 已經被創建過一次,可以不需要定義
// channel.queueDeclare("queue1",false,false,false,null);
// 6、定義接受訊息的回呼
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(queueName+"收到訊息是:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受訊息失敗了,,,,");
}
});
System.out.println(queueName+"開始接受訊息");
// 進行阻斷,接受訊息不關閉
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
// 啟動三個執行緒去執行
new Thread(runnable,"queue5").start();
new Thread(runnable,"queue6").start();
new Thread(runnable,"queue7").start();
}
}
在開發中可以通過界面進行系結,宣告,在Java代碼中就可以移除
在消費者和生產程序中,如果交換機不存在則會出現例外
佇列未進行注冊也會出現例外
宣告/系結在消費者宣告或生產者宣告都是可以的
work模式
當有多個消費者時,我們的訊息會被哪個消費者消費呢,該如何均衡消費者消費資訊的多少呢?(主要有兩種模式)
- 1、輪詢模式的分發:一個消費者一條,按均分配
- 2、公平分發:根據消費者的消費能力進行公平分發,處理快的處理的多,處理慢的處理的少,按勞分配;
輪詢模式(Round-Robin)
訊息的分配模式時一個消費者分配一條,直至訊息消費完成
不會因為服務器資源處理訊息速度的快慢,而產生消費不對等性
生產者
// 生產者
public class Producer {
public static void main(String[] args) {
// 1、創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
// 通過連接工廠設定賬號密碼
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、通過連接獲取通道channel
channel = connection.createChannel();
// 5、準備發送訊息的內容
for (int i = 0; i < 20; i++) {
String msg = "hello"+i;
// 發送訊息給中間件rabbitmq-server
// (交換機exchange,佇列名稱/routingkey,屬性配置,發送訊息的內容)
channel.basicPublish("", "queue1", null, msg.getBytes());
Thread.sleep(1000);
}
System.out.println("訊息發送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 10、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 11、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
消費者
// 消費者
public class Work1 {
public static void main(String[] args) {
// 1、創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、從連接中獲取通道channel
channel = connection.createChannel();
// 5、申明佇列queue 存盤資訊
// 如果佇列不存在,則會創建 rabbintmq不允許創建兩個相同的佇列名稱,否則會報錯
// 這里如果queue 已經被創建過一次,可以不需要定義
// channel.queueDeclare("queue1",false,false,false,null);
// 6、定義接受訊息的回呼
Channel finalChannel = channel;
// finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("work1收到訊息是:" + new String(message.getBody(), "UTF-8"));
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受訊息失敗了,,,,");
}
});
System.out.println("work1開始接受訊息");
// 進行阻斷,接受訊息不關閉
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
public class Work2 {
public static void main(String[] args) {
// 1、創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、從連接中獲取通道channel
channel = connection.createChannel();
// 5、申明佇列queue 存盤資訊
// 如果佇列不存在,則會創建 rabbintmq不允許創建兩個相同的佇列名稱,否則會報錯
// 這里如果queue 已經被創建過一次,可以不需要定義
// channel.queueDeclare("queue1",false,false,false,null);
// 6、定義接受訊息的回呼
Channel finalChannel = channel;
// finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("work2收到訊息是:" + new String(message.getBody(), "UTF-8"));
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受訊息失敗了,,,,");
}
});
System.out.println("work2開始接受訊息");
// 進行阻斷,接受訊息不關閉
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
公平分發
生產者
// 生產者
public class Producer {
public static void main(String[] args) {
// 1、創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
// 通過連接工廠設定賬號密碼
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、通過連接獲取通道channel
channel = connection.createChannel();
// 5、準備發送訊息的內容
for (int i = 0; i < 20; i++) {
String msg = "hello"+i;
// 發送訊息給中間件rabbitmq-server
// (交換機exchange,佇列名稱/routingkey,屬性配置,發送訊息的內容)
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("訊息發送成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 10、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 11、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
消費者
公平分發:一定要將應答機制改為手動應答
定義指標: qos = 1(默認情況下是沒有設定的,表示每次從佇列重去除的資料量,不應設定太大值需要根據記憶體、cpu的占用率)
public class Work1 {
public static void main(String[] args) {
// 1、創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、從連接中獲取通道channel
channel = connection.createChannel();
// 5、申明佇列queue 存盤資訊
// 如果佇列不存在,則會創建 rabbintmq不允許創建兩個相同的佇列名稱,否則會報錯
// 這里如果queue 已經被創建過一次,可以不需要定義
// channel.queueDeclare("queue1",false,false,false,null);
// 6、定義接受訊息的回呼
final Channel finalChannel = channel;
// 指標定義出來 qos = 1(默認情況下是沒有設定的,表示每次從佇列重去除的資料量)
finalChannel.basicQos(1);
// 公平分發------一定要將應答機制改為手動應答, autoAck :false
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("work1收到訊息是:" + new String(message.getBody(), "UTF-8"));
Thread.sleep(1000);
// 改為手動應答--單條消費
finalChannel.basicAck(message.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受訊息失敗了,,,,");
}
});
System.out.println("work1開始接受訊息");
// 進行阻斷,接受訊息不關閉
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
public class Work2 {
public static void main(String[] args) {
// 1、創建連接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、設定連接屬性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 虛擬訪問節點
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠中獲取連接
connection = connectionFactory.newConnection("生產者");
// 4、從連接中獲取通道channel
channel = connection.createChannel();
// 5、申明佇列queue 存盤資訊
// 如果佇列不存在,則會創建 rabbintmq不允許創建兩個相同的佇列名稱,否則會報錯
// 這里如果queue 已經被創建過一次,可以不需要定義
// channel.queueDeclare("queue1",false,false,false,null);
// 6、定義接受訊息的回呼
final Channel finalChannel = channel;
finalChannel.basicQos(1);
// 將應答機制改為手動應答, autoAck :false
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("work2收到訊息是:" + new String(message.getBody(), "UTF-8"));
Thread.sleep(200);
// 改為手動應答--單條消費
finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受訊息失敗了,,,,");
}
});
System.out.println("work2開始接受訊息");
// 進行阻斷,接受訊息不關閉
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道(先關通道,再關連接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
RabbitMQ使用場景
為什么使用rabbitMQ
分布式架構使用,模塊與模塊之間要進行互通和協同,然后就考慮使用什么樣的訊息佇列,選擇了rabbitMQ,是異步的,多執行緒,是一個分發機制,多執行緒機制,可以讓我們的網站性能成倍的提升,因為異步就可以讓處理資料的能力變得高效和穩健,在開發的程序中把各個服務進行分裂,就可以解耦
01解耦、削峰、異步
同步異步的問題(串行)
串行方式:將訂單資訊寫入資料庫成功后,再發送郵件、短信,以上三個任務全部完成后,再回傳客戶端

代碼
從上至下的代碼方式(穿行執行:所有方法的時間總和)
public void makeOrder{
// 1 保存訂單
orderService.saveOrder();
// 2 發送短信服務
messageService.sendSMS("order");// 1-2s
// 3 發送Email服務
emailService .sendEmail("order");//1-2s
// 4 發送app服務
appService.sendApp("order");
}
并行方式 異步執行緒池
并行方式:將訂單資訊寫入資料庫成功后,發送注冊郵件的同時,發送短信,以上三個任務全部完成后,回傳客戶端,(并行的方式可以提高處理時間)

代碼
public void makeOrder{
// 1 保存訂單
orderService.saveOrder();
// 2 相關發送
relationMessage();
}
public void relationMessage(){
// 異步
theadpool submit(new Callable<Object>(){
public Object call() throws Exception {
messageService.sendSMS("order");
}
});
theadpool submit(new Callable<Object>(){
public Object call() throws Exception {
emailService .sendEmail("order");
}
});
theadpool submit(new Callable<Object>(){
public Object call() throws Exception {
appService.sendApp("order");
}
});
}
存在問題:
1、耦合度高
2、需要自己寫執行緒池維護成本高
3、出現了訊息可能會丟失:需要自己做訊息補償
4、如何保證訊息的可靠性:需要自己寫
5、如果服務器承載不了:需要自己寫高可用
異步訊息佇列方式
下單當作生產者,服務變成消費者

好處:
1、完全解耦,用MQ建立橋接
2、有獨立的執行緒池和運行模型
3、出現了訊息可能會丟失:MQ有持久化功能
4、如何保證訊息的可靠性:死信佇列和訊息轉移等
5、如果服務器承載不了:需要自己寫高可用,HA鏡像模型高可以

02高內聚,低耦合
解耦:

03流量的削峰
RabbitMQ與springboot整合
Faout模式(配置類方式系結關系)

生產者 (創建一個生產者工程)
<!--rabbitMQ 的start依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
組態檔
# 服務埠
server:
port: 8080
# 配置rabbitmq服務 本機服務可以不用配置
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 127.0.0.1
port: 5672
代碼
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 模擬用戶下單
* @param userId
* @param productId
* @param num
*/
public void makeOrder(String userId,String productId,int num){
// 1:根據商品id查詢庫存是否充足
// 2:保存訂單
String orderId = UUID.randomUUID().toString();
System.out.println("訂單生產成功:"+orderId);
// 3:通過MQ來完成訊息的分發
// 引數1:交換機 引數2:路由Key/queue佇列名稱 引數3:訊息內容
String exchangName = "fanout_order_exchange";
String rouetingKey = "";
rabbitTemplate.convertAndSend(exchangName,rouetingKey,orderId);
}
}
config
@Configuration
public class RabbitMQConfiguration {
// 1:宣告注冊fanout 模式的交換機
@Bean
public FanoutExchange fanoutExchange(){
// 引數名 是否持久化 是否自動洗掉
return new FanoutExchange("fanout_order_exchange",true,false);
}
// 2:宣告佇列 sms.fanout.queue duanxin.fanout.queue email.fanout.queue
@Bean
public Queue smsQueue(){
return new Queue("sms.fanout.queue",true);
}
@Bean
public Queue duanxinQueue(){
return new Queue("duanxin.fanout.queue",true);
}
@Bean
public Queue emailQueue(){
return new Queue("email.fanout.queue",true);
}
// 3:完成系結關系 佇列和交換機系結
@Bean
public Binding smsBingding(){
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding duanxinBingding(){
return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBingding(){
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
}
測驗
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
orderService.makeOrder("1","1",12);
}
}
消費者 (創建一個消費者工程)
# 服務埠
server:
port: 8081
# 配置rabbitmq服務 本機服務可以不用配置
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 127.0.0.1
port: 5672
代碼
@RabbitListener(queues = {"email.fanout.queue"})
@Component
public class FanoutEmailConsumer {
@RabbitHandler
public void reviseMessage(String message) {
System.out.println("email fanout ----接收到了訂單資訊是:-->" + message);
}
}
@RabbitListener(queues = {"sms.fanout.queue"})
@Component
public class FanoutSMSConsumer {
@RabbitHandler
public void reviseMessage(String message) {
System.out.println("sms fanout ----接收到了訂單資訊是:-->" + message);
}
}
@RabbitListener(queues = {"duanxin.fanout.queue"})
@Component
public class FanoutDuanxinConsumer {
@RabbitHandler
public void reviseMessage(String message) {
System.out.println("duanxin fanout ----接收到了訂單資訊是:-->" + message);
}
}
運行啟動類就可以接收到訊息

Direct模式
開發中能用fanout模式就用fanout模式,否則direct模式會消耗性能
config
@Configuration
public class DirectRabbitMQConfiguration {
// 1:宣告注冊Direc 模式的交換機
@Bean
public DirectExchange directExchange(){
// 引數名 是否持久化 是否自動洗掉
return new DirectExchange("direct_order_exchange",true,false);
}
// 2:宣告佇列 sms.direct.queue duanxin.direct.queue email.direct.queue
@Bean
public Queue directsmsQueue(){
return new Queue("sms.direct.queue",true);
}
@Bean
public Queue directduanxinQueue(){
return new Queue("duanxin.direct.queue",true);
}
@Bean
public Queue directemailQueue(){
return new Queue("email.direct.queue",true);
}
// 3:完成系結關系 佇列和交換機系結
@Bean
public Binding directsmsBingding(){
return BindingBuilder.bind(directsmsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding directduanxinBingding(){
return BindingBuilder.bind(directduanxinQueue()).to(directExchange()).with("duanxin");
}
@Bean
public Binding directemailBingding(){
return BindingBuilder.bind(directemailQueue()).to(directExchange()).with("email");
}
}
生產者
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 模擬用戶下單---direct
* @param userId
* @param productId
* @param num
*/
public void makeOrderDirect(String userId,String productId,int num){
// 1:根據商品id查詢庫存是否充足
// 2:保存訂單
String orderId = UUID.randomUUID().toString();
System.out.println("訂單生產成功:"+orderId);
// 3:通過MQ來完成訊息的分發
// 引數1:交換機 引數2:路由Key/queue佇列名稱 引數3:訊息內容
String exchangName = "direct_order_exchange";
rabbitTemplate.convertAndSend(exchangName,"email",orderId);
rabbitTemplate.convertAndSend(exchangName,"duanxin",orderId);
}
}
測驗
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
private OrderService orderService;
@Test
void testOrderDirect() {
orderService.makeOrderDirect("1","1",12);
}
}
消費者
@RabbitListener(queues = {"sms.direct.queue"})
@Component
public class DirectSMSConsumer {
@RabbitHandler
public void reviseMessage(String message) {
System.out.println("sms direct ----接收到了訂單資訊是:-->" + message);
}
}
@RabbitListener(queues = {"email.direct.queue"})
@Component
public class DirectEmailConsumer {
@RabbitHandler
public void reviseMessage(String message) {
System.out.println("email direct ----接收到了訂單資訊是:-->" + message);
}
}
@RabbitListener(queues = {"duanxin.direct.queue"})
@Component
public class DirectDuanxinConsumer {
@RabbitHandler
public void reviseMessage(String message) {
System.out.println("duanxin direct ----接收到了訂單資訊是:-->" + message);
}
}

面試題:系結關系配置在生產者好還是配置在消費者好?
其實無論配置在哪一端都是可以的,兩邊都配置也可以,最好的還是在消費者配置系結關系,因為啟動程序中,消費者如果佇列還沒有被宣告,就會出錯,一定要先把佇列宣告好,因為消費者是直接和佇列打交道的地方,消費者是最先起來的服務
topic模式(注解方式的系結關系)
配置類和注解都可以確定系結關系,二者選擇其一(推薦配置類方式)
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.sms.#"
))
public class TopicDuanxinConsumer {
@RabbitHandler
public void reviseMessage(String message) {
System.out.println("duanxin topic ----接收到了訂單資訊是:-->" + message);
}
}
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.email.#"
))
public class TopicEmailConsumer {
@RabbitHandler
public void reviseMessage(String message) {
System.out.println("email topic ----接收到了訂單資訊是:-->" + message);
}
}
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "com.#"
))
public class TopicSMSConsumer {
@RabbitHandler
public void reviseMessage(String message) {
System.out.println("sms topic ----接收到了訂單資訊是:-->" + message);
}
}
生產者
/**
* 模擬用戶下單---topic
* @param userId
* @param productId
* @param num
*/
public void makeOrderTopic(String userId,String productId,int num){
// 1:根據商品id查詢庫存是否充足
// 2:保存訂單
String orderId = UUID.randomUUID().toString();
System.out.println("訂單生產成功:"+orderId);
// 3:通過MQ來完成訊息的分發
// 引數1:交換機 引數2:路由Key/queue佇列名稱 引數3:訊息內容
String exchangName = "topic_order_exchange";
String routingKey="com.sms";
rabbitTemplate.convertAndSend(exchangName,routingKey,orderId);
}
RabbitMQ高級
過期時間TTL
概述
過期時間TTL表示可以對訊息設定預期的時間,在這個時間內都可以被消費者接識訓取;過了這個時間之后訊息將自動被洗掉.
RabbitMQ 可以對 訊息和佇列 設定TTL,目前有兩種方法可以設定
- 通過佇列屬性設定,佇列中所有訊息都有相同的過期時間
- 對訊息進行單獨設定,每條訊息TTL可以不同
如果上述的兩種方法同時使用,則訊息的過期時間是兩者之間TTL較小的那個數值為準.
訊息在佇列的生存時間一旦超過設定的TTL值,就稱為dead message 被投遞到死信佇列,消費者將無法再收到該訊息
設定佇列TTL
config
@Configuration
public class TTLRabbitMQConfiguration {
// 1:宣告注冊Direc 模式的交換機
@Bean
public DirectExchange TTLdirectExchange() {
// 引數名 是否持久化 是否自動洗掉
return new DirectExchange("ttl_direct_exchange", true, false);
}
// 2:宣告佇列 佇列的過期時間
@Bean
public Queue ttlQueue() {
// 設定過期時間
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);// 這里一定是個int型別 單位:毫秒
// 名字 持久化 排他性 自動洗掉 過期時間引數
return new Queue("ttl.direct.queue", true,false,false,args);
}
// 3:完成系結關系 佇列和交換機系結
@Bean
public Binding ttlBingding() {
return BindingBuilder.bind(ttlQueue()).to(TTLdirectExchange()).with("ttl");
}
}
生產者
/**
* 模擬用戶下單---ttl
* @param userId
* @param productId
* @param num
*/
public void makeOrderTtl(String userId,String productId,int num){
// 1:根據商品id查詢庫存是否充足
// 2:保存訂單
String orderId = UUID.randomUUID().toString();
System.out.println("訂單生產成功:"+orderId);
// 3:通過MQ來完成訊息的分發
// 引數1:交換機 引數2:路由Key/queue佇列名稱 引數3:訊息內容
String exchangName = "ttl_direct_exchange";
String routingKey="ttl";
rabbitTemplate.convertAndSend(exchangName,routingKey,orderId);
}
測驗
@Test
void testOrderTTl() {
orderService.makeOrderTtl("1","1",12);
}


過期時間的資訊,一般會用死信佇列進行接收
設定訊息過期時間TTL
config
@Configuration
public class TTLRabbitMQConfiguration {
// 1:宣告注冊Direc 模式的交換機
@Bean
public DirectExchange TTLdirectExchange() {
// 引數名 是否持久化 是否自動洗掉
return new DirectExchange("ttl_direct_exchange", true, false);
}
// 2:宣告佇列 佇列的過期時間
@Bean
public Queue ttlQueue() {
// 設定過期時間
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);// 這里一定是個int型別 單位:毫秒
// 名字 持久化 排他性 自動洗掉 過期時間引數
return new Queue("ttl.direct.queue", true,false,false,args);
}
// 2:宣告佇列 佇列的過期時間
@Bean
public Queue ttlMessageQueue() {
return new Queue("ttl.message.direct.queue", true);
}
// 3:完成系結關系 佇列和交換機系結
@Bean
public Binding ttlBingding() {
return BindingBuilder.bind(ttlQueue()).to(TTLdirectExchange()).with("ttl");
}
@Bean
public Binding ttlMessageBingding() {
return BindingBuilder.bind(ttlMessageQueue()).to(TTLdirectExchange()).with("ttlMessage");
}
}
生產者
/**
* 模擬用戶下單---ttl
* @param userId
* @param productId
* @param num
*/
public void makeOrderTtlMessage(String userId,String productId,int num){
// 1:根據商品id查詢庫存是否充足
// 2:保存訂單
String orderId = UUID.randomUUID().toString();
System.out.println("訂單生產成功:"+orderId);
// 3:通過MQ來完成訊息的分發
// 引數1:交換機 引數2:路由Key/queue佇列名稱 引數3:訊息內容
String exchangName = "ttl_direct_exchange";
String routingKey="ttlMessage";
// 給訊息設定過期時間
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 這里就是字串
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangName,routingKey,orderId,messagePostProcessor);
}
測驗
@Test
void testOrderTTlMessage() {
orderService.makeOrderTtlMessage("1","1",12);
}
過期佇列和過期訊息的差異
過期訊息,過期了就被移除了不存在
過期佇列的訊息,過期之后會寫入到死信佇列中(可以進行訊息轉移)
死信佇列
概述
DLX (Dead-Letter_Exahcnge) 死信交換機(死信郵箱),當訊息在一個佇列中變成死信之后,它能被重新發送到另一個交換機中,這個交換機就是DLX,系結DLX的佇列就稱之為死信佇列.
訊息變成死信,可能由于以下的原因:
- 訊息被拒絕
- 訊息過期
- 佇列達到最大長度
DLX也是一個正常的交換機,和一般交換機沒有區別,它能在任何的佇列上被系結,實際上就是設定某一個佇列的屬性.
當這個佇列中存在死信時,rabbitMQ就會自動地將這個資訊重新發布到設定的DLX上去,進而被路由到另一個佇列,即死信佇列
要想使用死信佇列,只需要在定義佇列的時候設定佇列引數x-dead-letter-exchange 則定義交換機即可
訊息過期
config
@Configuration
public class DeadRabbitMQConfiguration {
// 1:宣告注冊Direc 模式的交換機
@Bean
public DirectExchange deadDiredct() {
// 引數名 是否持久化 是否自動洗掉
return new DirectExchange("dead_direct_exchange", true, false);
}
// 2:宣告佇列
@Bean
public Queue deadQueue() {
return new Queue("dead.direct.queue", true);
}
@Bean
public Binding deadsBingding() {
return BindingBuilder.bind(deadQueue()).to(deadDiredct()).with("dead");
}
}
@Configuration
public class TTLRabbitMQConfiguration {
// 1:宣告注冊Direc 模式的交換機
@Bean
public DirectExchange TTLdirectExchange() {
// 引數名 是否持久化 是否自動洗掉
return new DirectExchange("ttl_direct_exchange", true, false);
}
// 2:宣告佇列 佇列的過期時間
@Bean
public Queue ttlQueue() {
// 設定過期時間
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);// 這里一定是個int型別 單位:毫秒
args.put("x-dead-letter-exchange","dead_direct_exchange");
// direct 模式需要設定 key (fanout模式可以不用設定)
args.put("x-dead-letter-routing-key","dead");
// 名字 持久化 排他性 自動洗掉 過期時間引數
return new Queue("ttl.direct.queue", true,false,false,args);
}
// 2:宣告佇列 佇列的過期時間
@Bean
public Queue ttlMessageQueue() {
return new Queue("ttl.message.direct.queue", true);
}
// 3:完成系結關系 佇列和交換機系結
@Bean
public Binding ttlBingding() {
return BindingBuilder.bind(ttlQueue()).to(TTLdirectExchange()).with("ttl");
}
@Bean
public Binding ttlMessageBingding() {
return BindingBuilder.bind(ttlMessageQueue()).to(TTLdirectExchange()).with("ttlMessage");
}
}
測驗
(修改佇列:已存在的佇列不會修改和覆寫,只會報錯,需要洗掉掉再重新創建,但真正開發程序中,應該從新創建一個佇列不應該洗掉)
@Test
void testOrderTTl() {
orderService.makeOrderTtl("1","1",12);
}

佇列達到最大長度
config
@Configuration
public class DeadRabbitMQConfiguration {
// 1:宣告注冊Direc 模式的交換機
@Bean
public DirectExchange deadDiredct() {
// 引數名 是否持久化 是否自動洗掉
return new DirectExchange("dead_direct_exchange", true, false);
}
// 2:宣告佇列
@Bean
public Queue deadQueue() {
return new Queue("dead.direct.queue", true);
}
@Bean
public Binding deadsBingding() {
return BindingBuilder.bind(deadQueue()).to(deadDiredct()).with("dead");
}
}
@Configuration
public class TTLRabbitMQConfiguration {
// 1:宣告注冊Direc 模式的交換機
@Bean
public DirectExchange TTLdirectExchange() {
// 引數名 是否持久化 是否自動洗掉
return new DirectExchange("ttl_direct_exchange", true, false);
}
// 2:宣告佇列 佇列的過期時間
@Bean
public Queue ttlQueue() {
// 設定過期時間
Map<String, Object> args = new HashMap<>();
// args.put("x-message-ttl", 5000);// 這里一定是個int型別 單位:毫秒
args.put("x-max-length",5);
args.put("x-dead-letter-exchange","dead_direct_exchange");
// direct 模式需要設定 key (fanout模式可以不用設定)
args.put("x-dead-letter-routing-key","dead");
// 名字 持久化 排他性 自動洗掉 過期時間引數
return new Queue("ttl.direct.queue", true,false,false,args);
}
// 2:宣告佇列 佇列的過期時間
@Bean
public Queue ttlMessageQueue() {
return new Queue("ttl.message.direct.queue", true);
}
// 3:完成系結關系 佇列和交換機系結
@Bean
public Binding ttlBingding() {
return BindingBuilder.bind(ttlQueue()).to(TTLdirectExchange()).with("ttl");
}
@Bean
public Binding ttlMessageBingding() {
return BindingBuilder.bind(ttlMessageQueue()).to(TTLdirectExchange()).with("ttlMessage");
}
}
測驗
@Test
void testOrderTTl() {
for (int i = 0; i < 11; i++) {
orderService.makeOrderTtl("1","1",12);
}
}

記憶體磁盤的監控
RabbitMQ的記憶體警告
當記憶體使用超過配置的閾值\磁盤空間剩余空間小于配置的閾值時,rabbitMQ會暫時阻塞客戶端的連接,并且停止接收從客戶端發來的訊息,以此避免服務器的崩潰,客戶端與服務端的心態檢測機制也會失效
如下圖:

持久化:把記憶體里面的資料同步到磁盤
rabbitMQ 的記憶體控制
參考幫助檔案:
https://www.rabbitmq.com/configure.html
當出現警告的時候,可以通過配置去修改和調整
命令的方式
rabbitmqctl set_vm_memory_high_watermark <fraction> 相對值
rabbitmqctl set_vm_memory_high_watermark absolute 50MB 絕對值
fraction/value 為記憶體的閾值.默認情況是0.4(相對值)/2GB(絕對值) 二者選其一,表示:當rabbitMQ的記憶體超過40%時,就會產生警告并且阻塞所有生產者的連接
通過此命令修改閾值在Broker 重啟以后將會失效,通過修改組態檔方式設定的閾值則不會隨著重啟而消失,單修改了組態檔一樣要重啟broker才會生效
(相對值最好寫:0.4-0.7之間,不建議超過0.7)
分析:
rabbitmqctl set_vm_memory_high_watermark absolute 50MB

組態檔方式 rabbitmq.conf
當前組態檔: /etc/rabbitmq/rabbitmq.conf
# 默認
# vm_memory_high_watermark.relative = 0.4
# 使用relative 相對值進行設定fraction 建議取值在0.4-0.7之間,不建議超過0.7
vm_memory_high_watermark.relative = 0.6
# 使用absolute 的絕對值的方式,但是是KB\MB\GB對應的命令如下(8GB*0.6=4GB)
vm_memory_high_watermark.absolute = 2GB
rabbitMQ 的記憶體換頁
在某個Broker節點及記憶體阻塞生產者之間,它會嘗試將佇列中的訊息換頁到磁盤以釋放記憶體空間,持久化和非持久化的訊息都會寫入磁盤中,其中持久化的訊息本身就在磁盤中有一個副本,所以在轉移的程序中持久化的訊息會先從記憶體中除掉
默認情況下,記憶體到達的閾值時50%時就會換頁處理
也就是在默認的情況下該記憶體的閾值是0.4的情況下,當記憶體超過0.4*0.5=0.2時,會進行換頁動作
例如:有1000MB記憶體,當記憶體的使用率達到了400MB,已經達到了極限,但因為配置的換頁記憶體0.5 ,這個時候會在達到極限400mb之前,會把記憶體中的200MB進行轉移到磁盤中,從而達到穩健的運行
可以通過設定 vm_memory_high_watermark_paging_ratio 進行調整
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7(設定小于1的值)
為什么設定小于1,因為如果設定為1的閾值,記憶體都達到了極限,再去換頁的意義不是很大
rabbitMQ的磁盤預警
當磁盤的剩余空間低于確定的閾值時,rabbitMQ同樣會阻塞生產者,這樣可以避免因非持久化的訊息持續換頁而耗盡磁盤空間導致服務器崩潰
默認情況下:磁盤預警為50MB的時候會進行預警.表示當前磁盤空間第50MB的時候會阻塞生產者并且停止記憶體訊息換頁到磁盤的程序
這個閾值可以減小,但是不能完全的消除因磁盤消耗而導致崩潰的可能性.比如在兩次磁盤空間的檢查空隙內,第一次檢查是:60MB,第二次檢查可能就是1MB ,就會出現警告

通過命令方式修改:
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
disk_limit :固定單位 KB MB GB
fraction:是相對閾值,建議范圍1.0-2.0之間(相對于記憶體)
通過組態檔配置:
disk_free_limit.relative = 3.0
disk_free_limit.absolute = 50mb
集群
erlang語言天生具備分布性特征,通過同步Erlang 集群
集群搭建
配置的前提是運行rabbitmq 可以運行起來,
# 查看行程
ps aux|grep rabbitmq
systemctl status rabbitmq-server
注意:確保rabbitmq 可以運行的,確保完成之后,把單機版的rabbitmq 服務停止,后臺看不到rabbitmq的行程為止
systemctl stop rabbitmq-server
單機多實體搭建
場景:假設有兩個rabbitmq節點,分別是rabbit-1,rabbit-2,rabbit-1作為主節點,rabbit-2作為從節點
啟動命令:RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server -detached
結束命令:rabbitmqctl -n rabbit-1 stop
具體請查看視頻
第一步:啟動第一個節點 rabbit-1
sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
至此節點rabbit-1啟動完成
第二步:啟動第二個節點 rabbit-2
注意:web管理插件埠占用,所以還要指定其web插件占用的埠號
RABBITMQ_SERVER_START_ARGS=" -rabbitmq_managment listener [{port,15673}]"
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS=" -rabbitmq_managment listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
至此節點rabbit-2啟動完成
第三步:驗證啟動 ps aux|grep rabbitmq
第四步:rabbit-1 操作作為主節點
# 停止應用(-n 節點名)
sudo rabbitmqctl -n rabbit-1 stop_app
# 目的是清除節點上的歷史資料(如果不清楚,無法將節點加入到集群)
sudo rabbitmqctl -n rabbit-1 reset
# 啟動應用
sudo rabbitmqctl -n rabbit-1 start_app
rabbit-2操作作為從節點
# 停止應用
sudo rabbitmqctl -n rabbit-2 stop_app
# 目的是清除節點上的歷史資料(如果不清楚,無法將節點加入到集群)
sudo rabbitmqctl -n rabbit-2 reset
# 將rabbit2節點加入到rabbit1,集群當中【Server-no de為服務器的主機名,需要修改】
sudo rabbitmqctl -n rabbit-2 join_cluster tabbit-1@'Server-node'
# 啟動應用
sudo rabbitmqctl -n rabbit-2 start_app
驗證集群狀態
sudo rabbitmqctl cluster_status -n rabbit-1
web 監控
#安裝插件
rabbitmq-plugins enable rabbitmq-management
在訪問時需要給15672 node-1 和15673 node-2 設定用戶名和密碼
rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"


分布式事務
簡述
分布式事務是指事物的操作位于不同的節點上,需要保證事務的AICD特性
(例如:在下單場景下,庫存和訂單如果不在同一個節點上,就涉及分布式事務)
分布式事務的方式
在分布式系統中,要實作分布式事務,有幾種解決方案
一、兩階段提交(2PC)需要資料庫產商的支持,Java組件有atomikos等
二、補償事務(TCC)嚴選,阿里,螞蟻金服
三、本地訊息表(異步確保)比如:支付寶、微信支付主動查詢支付狀態,對賬單的形式
四、MQ事務訊息 異步場景,通用性較強,拓展性較強
具體實作



基于MQ分布式事務訊息的可靠生產問題-定時重發



基于MQ的分布式事務訊息的可靠消費

解決訊息重試的幾種方案:
1、控制重發的次數 + 死信佇列

2、try catch+手動ack

3、try catch+手動ack +死信佇列+手動處理
【學相伴】RabbitMQ最新完整教程IDEA版通俗易懂 | KuangStudy | 狂神說 | 學相伴飛哥https://www.bilibili.com/video/BV1dX4y1V73G?p=37&spm_id_from=pageDriver
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/273698.html
標籤:其他
下一篇:Linux——Linux驅動之基本理論常識總結(什么是Linux驅動?Linux驅動需要掌握哪些?ARM處理體系架構及前世今生)
