系列文章:
RabbitMQ從零到集群高可用(.NetCore5.0) - RabbitMQ簡介和六種作業模式詳解
RabbitMQ從零到集群高可用(.NetCore5.0) - 死信佇列,延時佇列
RabbitMQ從零到集群高可用(.NetCore5.0) - 高可用集群構建落地
一、集群架構簡介
當單臺 RabbitMQ 服務器的處理訊息的能力達到瓶頸時,此時可以通過 RabbitMQ 集群來進行擴展,從而達到提升吞吐量的目的,RabbitMQ 集群是一個或多個節點的邏輯分組,集群中的每個節點都是對等的,每個節點共享所有的用戶,虛擬主機,佇列,交換器,系結關系,運行時引數和其他分布式狀態等資訊,一個高可用,負載均衡的 RabbitMQ 集群架構應類似下圖:

決議說明:
最下面層是RabbitMQ的集群,沒有ha鏡像時是普通集群,普通集群的缺點是掛了一個機器,以這個機器為根的佇列就無法使用了(一個佇列的資料只會存在一個節點),無法實作高可用,
所以把佇列變成鏡像佇列,這樣每個節點都會有一份完整的資料,有節點掛了也不影響使用,實作了高可用,但RabbitMQ集群本身沒有實作負載均衡,也就是說對于一個三節點的集群,
每個節點的負載可能都是不相同的,
HAProxy層的作用就是為了實作RabbitMQ集群的負載均衡,但一個節點的話顯然也不能高可用,所以需要兩個HAProxy實作HaProxy的高可用,但沒法實作自動的故障轉移,就是HAProxy1掛了,
需要手動把ip地址改成HAProxy2的,
所以需要用到KeepAlived,它通常由一主一備兩個節點組成,同一時間內只有主節點會提供對外服務,并同時提供一個虛擬的 IP 地址 (Virtual Internet Protocol Address ,簡稱 VIP),可以避免暴露真實ip , 如果主節點故障,那么備份節點會自動接管 VIP 并成為新的主節點 ,直到原有的主節點恢復,
生產環境架構應該為:
機器1:RabbitMQ1,機器2:RabbitMQ2,機器3:RabbitMQ3,機器4:HAProxy+keeplived(主),機器5(HAProxy+keeplived(備),
這里資源原因只有3臺機器,所以搭建架構為:
| 172.16.2.84(rabbit1) | RabbitMQ1,HAProxy+keeplived(主) |
| 172.16.2.85(rabbit2) | RabbitMQ2,HAProxy+keeplived(備) |
| 172.16.2.86(rabbit3) | RabbitMQ3 |
二、普通集群搭建
1)各個節點分別安裝RabbitMQ
這里前面的章節功能使用是用的docker安裝,這里集群不用docker,因為docker安裝會有很多的映射,很容易擾亂,裝有docker的rabbitmq的,需要先把docker停掉,
這里準備了3臺機器,172.16.2.84(rabbit1),172.16.2.85(rabbit2),172.16.2.86(rabbit3),為了避免混淆,下面都會用rabbit1,rabbit2,rabbit3稱呼,
rabbit1,rabbit2,rabbit3都執行一遍下面的安裝,
1.1安裝前,先修改hostname,因為rabbitmq集群通訊需要用hostname
rabbit1機器執行
hostnamectl set-hostname rabbit1 --static
rabbit2機器執行
hostnamectl set-hostname rabbit2 --static
rabbit3機器執行
hostnamectl set-hostname rabbit3 --static
rabbit1,rabbit2,rabbit3執行同樣操作
#修改hosts,因為rabbitmq通訊要通過hostname vi /etc/hosts
1.2把ip對應hostname添加到后面
172.16.2.84 rabbit1
172.16.2.85 rabbit2
172.16.2.86 rabbit3

# 重啟網路 systemctl restart network # 重啟機器 init 6
1.3安裝Erlang
#第一步 運行package cloud提供的erlang安裝腳本 curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash ? #第二步 安裝erlang yum install erlang ? #第三步 查看erlang版本號,在命令列直接輸入erl erl
1.4安裝RabbitMQ
#第一步 先匯入兩個key rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey rpm --import https://packagecloud.io/gpg.key ? #第二步 運行package cloud提供的rabbitmq安裝腳本 curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash ? #第三步 下載rabbit安裝檔案 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.5/rabbitmq-server-3.9.5-1.el8.noarch.rpm ? #第四步 rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc ? #第五步 rabbitMQ依賴 yum -y install epel-release yum -y install socat ? #第六步 安裝 rpm -ivh rabbitmq-server-3.9.5-1.el8.noarch.rpm ? #第七步 啟用管理平臺插件,啟用插件后,可以可視化管理RabbitMQ rabbitmq-plugins enable rabbitmq_management ? #第八步 啟動應用 systemctl start rabbitmq-server
上面rabbitmq的版本號安裝檔案地址:https://github.com/rabbitmq/rabbitmq-server/releases/
1.5設定訪問權限
#創建管理員賬戶 rabbitmqctl add_user admin 123456 #設定注冊的賬戶為管理員 rabbitmqctl set_user_tags admin administrator #授權遠程訪問 rabbitmqctl set_permissions -p / admin "." "." ".*" #重啟服務 systemctl restart rabbitmq-server
注意這里關了防火墻,如果開啟防火墻的話,需要把15672埠開放出來
#查看防火墻狀態 systemctl status firewalld #關閉防火墻 systemctl stop firewalld
到這里,3臺機器都安裝好了RabbitMQ,

2)把節點加入集群
2.1停止服務rabbit2,rabbit3
#停止全部服務 systemctl stop rabbitmq-server
2.2拷貝cookie
將rabbit1上的.erlang.cookie檔案拷貝到其他兩臺主機上,該 cookie 檔案相當于密鑰令牌,集群中的 RabbitMQ 節點需要通過交換密鑰令牌以獲得相互認證,因此處于同一集群的所有節點需要具有相同的密鑰令牌,否則在搭建程序中會出現 Authentication Fail 錯誤,只要保證這3臺機器中的.erlang.cookie內的密鑰字串一致即可,這里把rabbit1的密鑰復制到rabbit2,rabbit3,
3個機器都給.erlang.cookie 400權限,
#給600權限 chmod 600 /var/lib/rabbitmq/.erlang.cookie
rabbit1機器
#編輯檔案 vi /var/lib/rabbitmq/.erlang.cookie

把內容復制出來,修改rabbit2,rabbit3這個檔案的值為這個,
啟動服務
#開啟全部服務 systemctl start rabbitmq-server
2.3集群搭建
RabbitMQ 集群的搭建需要選擇其中任意一個節點為基準,將其它節點逐步加入,這里我們以 rabbit1為基準節點,將 rabbit2 和 rabbit3 加入集群,在 rabbit2和 rabbit3上執行以下命令:
# 1.停止服務 rabbitmqctl stop_app # 2.重置狀態(需要更改節點型別的時候執行,首次不需要執行,除非你節點是以disk加入集群的) rabbitmqctl reset # 3.節點加入 #rabbitmqctl join_cluster --ram rabbit@rabbit1 rabbitmqctl join_cluster rabbit@rabbit1 # 4.啟動服務 rabbitmqctl start_app
join_cluster 命令有一個可選的引數 --ram ,該引數代表新加入的節點是記憶體節點,默認是磁盤節點,如果是記憶體節點,則所有的佇列、交換器、系結關系、用戶、訪問權限和 vhost 的元資料都將存盤在記憶體中,如果是磁盤節點,則存盤在磁盤中,記憶體節點可以有更高的性能,但其重啟后所有配置資訊都會丟失,因此RabbitMQ 要求在集群中至少有一個磁盤節點,其他節點可以是記憶體節點,大多數情況下RabbitMQ 的性能都是夠用的,可以采用默認的磁盤節點的形式,
另外,如果節點以磁盤節點的形式加入,則需要先使用 reset 命令進行重置,然后才能加入現有群集,重置節點會洗掉該節點上存在的所有的歷史資源和資料,采用記憶體節點的形式加入時可以略過 reset 這一步,因為記憶體上的資料本身就不是持久化的,
操作上面的,一個普通集群就搭建成功了,打開rabbit管理界面(隨便打開一個都是一樣的),

3)代碼演示普通集群的問題
普通集群中, 第一次創建佇列時,會隨機選一個節點作為根節點,這個節點會存盤佇列的資訊(互動機,路由,佇列名等)和佇列的資料,其它兩個節點只會同步根節點的元資訊(交換機,路由,佇列名)等,
但不會存盤佇列的資料,他們是通過元資訊找到根節點讀寫訊息,

例如集群選擇了rabbit2作為根節點,那么資料存盤在rabbit2,rabbit1和rabbit3是沒有資料的,那么如果rabbit2宕機了,佇列里面的資料就取不到了,
代碼演示,.NetCore5.0讀取集群連接代碼,
/// <summary> /// 獲取集群連接物件 /// </summary> /// <returns></returns> public static IConnection GetClusterConnection() { var factory = new ConnectionFactory { UserName = "admin",//賬戶 Password = "123456",//密碼 VirtualHost = "/" //虛擬機 }; List<AmqpTcpEndpoint> list = new List<AmqpTcpEndpoint>() { new AmqpTcpEndpoint(){HostName="172.16.2.84",Port=5672}, new AmqpTcpEndpoint(){HostName="172.16.2.85",Port=5672}, new AmqpTcpEndpoint(){HostName="172.16.2.86",Port=5672} }; return factory.CreateConnection(list); }
生產者代碼:
/// <summary> /// 作業佇列模式 /// </summary> public static void WorkerSendMsg() { string queueName = "worker_order";//佇列名 //創建連接 using (var connection = RabbitMQHelper.GetClusterConnection()) { //創建信道 using (var channel = connection.CreateModel()) { //創建佇列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; //訊息持久化 for ( var i=0;i<10;i++) { string message = $"Hello RabbitMQ MessageHello,{i+1}"; var body = Encoding.UTF8.GetBytes(message); //發送訊息到rabbitmq channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: properties, body); Console.WriteLine($"發送訊息到佇列:{queueName},內容:{message}"); } } } }
執行:

查看RabbitMQ管理界面

RabbitMQ選擇了rabbit3作為根節點,現在試一下停止rabbit3節點
#停止服務 rabbitmqctl stop_app

發現佇列不可用了,啟動rabbit3,再試一下停止rabbit2

發現佇列正常,試下消費資料:
public static void WorkerConsumer() { string queueName = "worker_order"; var connection = RabbitMQHelper.GetClusterConnection(); { //創建信道 var channel = connection.CreateModel(); { //創建佇列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); ///prefetchCount:1來告知RabbitMQ,不要同時給一個消費者推送多于 N 個訊息,也確保了消費速度和性能 ///global:是否設為全域的 ///prefetchSize:單條訊息大小,通常設0,表示不做限制 //是autoAck=false才會有效 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true); int i = 1; int index = new Random().Next(10); consumer.Received += (model, ea) => { //處理業務 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"{i},消費者:{index},佇列{queueName}消費訊息長度:{message.Length}"); Thread.Sleep(1000); channel.BasicAck(ea.DeliveryTag, false); //訊息ack確認,告訴mq這條佇列處理完,可以從mq洗掉了 i++; }; channel.BasicConsume(queueName, autoAck: false, consumer); } } }

如果根節點掛了,再往集群發送資料,RabbitMQ又會從其余的選一個作為根節點,就可能的情況是,多個節點都存有不同佇列的資料,

到這里,可以看到普通集群并不可以高可用, 根節點掛了,在這個節點上的佇列也不可用了,但三個節點都沒問題的時候,可以提高并發和佇列的負載,
要實作高可用,就要用到了鏡像集群,
三、鏡像集群
鏡像集群,在每個節點上都同步一份鏡像資料,相當于每個節點都有一份完整的資料,這樣有節點宕機了,還能正常提供RabbitMQ服務,
變鏡像集群很簡單,在上面普通集群的基礎上,在任意一個節點下執行
#把集群變成鏡像集群 rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
執行完,看回佇列,多了ha標識,


四、HAProxy環境搭建,
4.1下載
HAProxy 官方下載地址為:https://www.haproxy.org/#down ,如果這個網站無法訪問,也可以從 https://src.fedoraproject.org/repo/pkgs/haproxy/ 上進行下載,
#下載haproxy wget https://www.haproxy.org/download/2.4/src/haproxy-2.4.3.tar.gz
解壓
#解壓 tar xf haproxy-2.4.3.tar.gz
4.2編譯
安裝gcc
yum install gcc
進入解壓后根目錄,執行下面的編譯命令:
#進入haproxy-2.4.3目錄后執行 make TARGET=linux-glibc PREFIX=/usr/app/haproxy-2.4.3 make install PREFIX=/usr/app/haproxy-2.4.3
4.3配置環境變數
#編輯檔案 vi /etc/profile
#把這兩項加上 export HAPROXY_HOME=/usr/app/haproxy-2.4.3 export PATH=$PATH:$HAPROXY_HOME/sbin

使得配置的環境變數立即生效:
source /etc/profile
4.4負載均衡配置
新建組態檔haproxy.cfg,這里我新建的位置為:/etc/haproxy/haproxy.cfg,檔案內容如下:
global # 日志輸出配置、所有日志都記錄在本機,通過 local0 進行輸出 log 127.0.0.1 local0 info # 最大連接數 maxconn 4096 # 改變當前的作業目錄 chroot /usr/app/haproxy-2.4.3 # 以指定的 UID 運行 haproxy 行程 uid 99 # 以指定的 GID 運行 haproxy 行程 gid 99 # 以守護進行的方式運行 daemon # 當前行程的 pid 檔案存放位置 pidfile /usr/app/haproxy-2.4.3/haproxy.pid ? # 默認配置 defaults # 應用全域的日志配置 log global # 使用4層代理模式,7層代理模式則為"http" mode tcp # 日志類別 option tcplog # 不記錄健康檢查的日志資訊 option dontlognull # 3次失敗則認為服務不可用 retries 3 # 每個行程可用的最大連接數 maxconn 2000 # 連接超時 timeout connect 5s # 客戶端超時 timeout client 120s # 服務端超時 timeout server 120s ? # 系結配置 listen rabbitmq_cluster bind :5671 # 配置TCP模式 mode tcp # 采用加權輪詢的機制進行負載均衡 balance roundrobin # RabbitMQ 集群節點配置 server rabbit1 rabbit1:5672 check inter 5000 rise 2 fall 3 weight 1 server rabbit2 rabbit2:5672 check inter 5000 rise 2 fall 3 weight 1 server rabbit3 rabbit3:5672 check inter 5000 rise 2 fall 3 weight 1 ? # 配置監控頁面 listen monitor bind :8100 mode http option httplog stats enable stats uri /stats stats refresh 5s
上傳上去的,記得打開看一下換行符有沒有出行這些符號,如果有要刪掉,不然會報錯,

負載均衡的主要配置在 listen rabbitmq_cluster 下,這里指定負載均衡的方式為加權輪詢,同時定義好健康檢查機制:
server rabbit1 rabbit1:5672 check inter 5000 rise 2 fall 3 weight 1
以上配置代表對地址為 rabbit1:5672 的 rabbit1 節點每隔 5 秒進行一次健康檢查,如果連續兩次的檢查結果都是正常,則認為該節點可用,此時可以將客戶端的請求輪詢到該節點上;如果連續 3 次的檢查結果都不正常,則認為該節點不可用,weight 用于指定節點在輪詢程序中的權重,
4.5啟動服務
haproxy -f /etc/haproxy/haproxy.cfg
啟動后可以在監控頁面進行查看,埠為設定的 8100,完整地址為:http://172.16.2.84:8100/stats ,頁面情況如下:

所有節點都為綠色,代表節點健康,此時證明 HAProxy 搭建成功,并已經對 RabbitMQ 集群進行監控,
這里已經實作了RabbitMQ的負載均衡了,代碼怎么通過Haproxy連接Rabbit集群呢,因為上面配置Haproxy暴露的埠是5671,所以Ip是Haproxy的ip:5671,
連接代碼,可以通過haproxy1 或haproxy2都可以:
public static IConnection GetConnection() { ConnectionFactory factory = new ConnectionFactory { HostName = "172.16.2.84",//haproxy ip Port = 5671,//haproxy 埠 UserName = "admin",//賬號 Password = "123456",//密碼 VirtualHost = "/" //虛擬主機 }; return factory.CreateConnection(); }
五、KeepAlived 環境搭建
接著就可以搭建 Keepalived 來解決 HAProxy 故障轉移的問題,這里我在 rabbit1 和 rabbit2 上安裝 KeepAlived ,兩臺主機上的搭建的步驟完全相同,只是部分配置略有不同,具體如下:
官網:https://www.keepalived.org
5.1安裝
yum install -y keepalived
5.2修改組態檔
安裝了keepalived后,組態檔生成在/etc/keepalived/keepalived.conf
這里先對keepalived1上keepalived.conf組態檔進行修改,完整內容如下:
global_defs { # 路由id,主備節點不能相同 router_id node1 } ? # 自定義監控腳本 vrrp_script chk_haproxy { # 腳本位置 script "/etc/keepalived/haproxy_check.sh" # 腳本執行的時間間隔 interval 5 weight 10 } ? vrrp_instance VI_1 { # Keepalived的角色,MASTER 表示主節點,BACKUP 表示備份節點 state MASTER # 指定監測的網卡,可以使用 ip addr 進行查看 interface ens33 # 虛擬路由的id,主備節點需要設定為相同 virtual_router_id 1 # 優先級,主節點的優先級需要設定比備份節點高 priority 100 # 設定主備之間的檢查時間,單位為秒 advert_int 1 # 定義驗證型別和密碼 authentication { auth_type PASS auth_pass 123456 } ? # 呼叫上面自定義的監控腳本 track_script { chk_haproxy } ? virtual_ipaddress { # 虛擬IP地址,可以設定多個 172.16.2.200 } }
以上配置定義了 keepalived1上的 Keepalived 節點為 MASTER 節點,并設定對外提供服務的虛擬 IP 為 172.16.2.200,此外最主要的是定義了通過 haproxy_check.sh 來對 HAProxy 進行監控,這個腳本需要我們自行創建,內容如下:
#!/bin/bash # 判斷haproxy是否已經啟動 if [ `ps -C haproxy --no-header | wc -l` -eq 0 ] ; then #如果沒有啟動,則啟動 haproxy -f /etc/haproxy/haproxy.cfg fi ? #睡眠3秒以便haproxy完全啟動 sleep 3 ? #如果haproxy還是沒有啟動,此時需要將本機的keepalived服務停掉,以便讓VIP自動漂移到另外一臺haproxy if [ `ps -C haproxy --no-header | wc -l` -eq 0 ]; then systemctl stop keepalived fi
創建后為其賦予執行權限:
chmod +x /etc/keepalived/haproxy_check.sh
這個腳本主要用于判斷 HAProxy 服務是否正常,如果不正常且無法啟動,此時就需要將本機 Keepalived 關閉,從而讓虛擬 IP 漂移到備份節點,
備份節點(keepalived2)的配置與主節點基本相同,但是需要修改其 state 為 BACKUP;同時其優先級 priority 需要比主節點低,完整配置如下:
global_defs { # 路由id,主備節點不能相同 router_id node2 ? } ? vrrp_script chk_haproxy { script "/etc/keepalived/haproxy_check.sh" interval 5 weight 10 } ? vrrp_instance VI_1 { # BACKUP 表示備份節點 state BACKUP interface ens33 virtual_router_id 1 # 優先級,備份節點要比主節點低 priority 50 advert_int 1 authentication { auth_type PASS auth_pass 123456 } track_script { chk_haproxy } ? virtual_ipaddress { 172.16.2.200 } }
haproxy_check.sh檔案和keepalived1相同
5.3啟動服務
分別在KeepAlived1和KeepAlived2上啟動KeepAlived服務,命令如下:
systemctl start keepalived
啟動后此時 keepAlived1 為主節點,可以在 keepAlived1 上使用 ip a 命令查看到虛擬 IP 的情況:

此時只有 keepAlived1 上是存在虛擬 IP 的,而 keepAlived2 上是沒有的,

5.4驗證故障轉移
這里我們驗證一下故障轉移,因為按照我們上面的檢測腳本,如果 HAProxy 已經停止且無法重啟時 KeepAlived 服務就會停止,這里我們直接使用以下命令停止 Keepalived1 服務:
systemctl stop keepalived
此時再次使用 ip a 分別查看,可以發現 keepalived1上的 VIP 已經漂移到 keepalived2上,情況如下:

此時對外服務的 VIP 依然可用,代表已經成功地進行了故障轉移,至此集群已經搭建成功,任何需要發送或者接受訊息的客戶端服務只需要連接到該 VIP 即可,示例如下:
public static IConnection GetConnection() { ConnectionFactory factory = new ConnectionFactory() { HostName = "172.16.2.200",//vip Port = 5671,//haproxy 埠 UserName = "admin",//賬號 Password = "123456",//密碼 VirtualHost = "/" //虛擬主機 }; return factory.CreateConnection(); }
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/298203.html
標籤:.NET Core
