鏡像模式
集群模式非常經典的就是Mirror鏡像模式,保證100%資料不丟失,在實際作業中也是用的最多的,并且實作集群比較的簡單, Mirror鏡像佇列,目的是為了保證 RabbitMQ 資料的高可靠性解決方案,主要就是實作資料的同步,一般來講2--3個節點實作資料同步(對于100%資料可靠性解決方案一般是3節點)感興趣的胖友可以體驗一哈新的閱讀地址:http://www.zhouhong.icu/post/142 (*^▽^*)
1 前提準備
1.1 服務節點分配
| 服務器IP | hostname | 節點說明 | 埠 | 管控臺地址 |
| 192.168.2.121 | zhouhong121 | rabbitmq master | 5672 | http://192.168.2.121:15672 |
| 192.168.2.122 | zhouhong122 | rabbitmq slave | 5672 | http://192.168.2.122:15672 |
| 192.168.2.123 | zhouhong123 | rabbitmq slave | 5672 | http://192.168.2.123:15672 |
2 集群搭建
前提條件:修改121、122、123三臺服務器的 hostname 并且可以使用hostname 兩兩之間 ping 通,
- 修改每臺服務器的 hostname
vim /etc/hostname
## 修改對應的名字,比如:
zhouhong121
- 更改每臺服務器的 hosts
vim /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.2.121 zhouhong121 192.168.2.122 zhouhong122 192.168.2.123 zhouhong123
- 測驗,用122的hostname ping 123

2.1 集群節點安裝
RabbitMQ下載:
rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm?
如果下載卡頓請使用我下載好的網盤進行下載即可
因為之前在121這臺安裝過:所有在另外兩臺上面快速安裝即可,詳細的安裝請參照: https://www.cnblogs.com/Tom-shushu/p/14503021.html http://www.zhouhong.icu/post/141鏈接:https://pan.baidu.com/s/1diapYC19UlDy4G-4lgZWHA 提取碼:jf5r 復制這段內容后打開百度網盤手機App,操作更方便哦
1、安裝 rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm? 2、啟動 systemctl start rabbitmq-server 3、安裝web管控臺 rabbitmq-plugins enable rabbitmq_management 4、添加用戶 sudo rabbitmqctl add_user admin admin sudo rabbitmqctl set_user_tags admin administrator sudo rabbitmqctl set_permissions -p / admin "." "." ".*" 5、重啟 systemctl start rabbitmq-server rabbitmq-plugins enable rabbitmq_management
瀏覽器以admin登錄檢查安裝是否成功:
http://192.168.2.121:15672/ http://192.168.2.122:15672/ http://192.168.2.123:15672/2.2 檔案同步(注意:.erlang.cookie為隱藏檔案,需要使用 -a 查看)
選擇121、122、123任意一個節點為Master(這里選擇71為Master),也就是說我們需要把121的Cookie檔案同步到122、123節點上去,進入/var/lib/rabbitmq目錄下,把/var/lib/rabbitmq/.erlang.cookie檔案的權限修改為777,原來是400;然后把.erlang.cookie檔案copy到各個節點下;最后把所有cookie檔案權限還原為400即可,
//進入目錄修改權限;遠程copy122、123節點 cd /var/lib/rabbitmq/ chmod 777 /var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie 192.168.2.122:/var/lib/rabbitmq/ scp /var/lib/rabbitmq/.erlang.cookie 192.168.2.123:/var/lib/rabbitmq/ // 每臺服務器為默認修改權限 chmod 400 /var/lib/rabbitmq/.erlang.cookie
2.3 組成集群
2.3.1 停止服務
我們首先停止3個節點的服務:(這里不能使用原來的命令:/etc/init.d/rabbitmq-server stop)rabbitmqctl stop
2.3.2 組成集群操作
接下來我們就可以使用集群命令,配置71、72、73為集群模式,3個節點(71、72、73)執行啟動命令,后續啟動集群使用此命令即可,rabbitmq-server -detached
2.3.3 slave加入集群操作(重新加入集群也是如此,以最開始的主節點為加入節點)
//注意做這個步驟的時候:需要配置/etc/hosts 必須相互能夠尋址到 //在122節點上執行以下操作 rabbitmqctl stop_app rabbitmqctl join_cluster --ram rabbit@zhouhong121 rabbitmqctl start_app //同樣在123節點上執行以下操作 rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@zhouhong121 rabbitmqctl start_app //在另外其他節點上操作要移除的集群節點 //rabbitmqctl forget_cluster_node rabbit@zhouhong122、122、123
2.3.4 修改集群名稱
修改集群名稱(默認為第一個node名稱):rabbitmqctl set_cluster_name rabbitmq_cluster1
2.3.5 查看集群狀態
最后在集群的任意一個節點執行命令:查看集群狀態rabbitmqctl cluster_status

2.3.6 管控臺界面(注意:這里可能之前配置的admin 角色會失效,可能需要重新配置一遍)
訪問任意一個管控臺節點:http://192.168.2.121:15672 如圖所示
如圖:121為dics 122、123為 RAM
2.4 配置鏡像佇列
設定鏡像佇列策略(在任意一個節點上執行)
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
將所有佇列設定為鏡像佇列,即佇列會被復制到各個節點,各個節點狀態一致,RabbitMQ高可用集群就已經搭建好了,我們可以重啟服務,查看其佇列是否在從節點同步,
我們在任何一個節點上建一個佇列,那這個佇列將會加到其他兩個節點上面2.5 訊息一致性問題
在使用rabbitmq中,訊息的一致性是非常重要的一個話題,下面我們來研究一下,在資料一致性方面,有哪些需要關注的,發送者發送訊息出來,在資料一致性的要求下,我們通常認為必須達到以下條件- broker持久化訊息
- publisher知道訊息已經成功持久化


3 整合SpringBoot
本篇文章所有代碼請自行到我的GitHub上拉取:https://github.com/Tom-shushu/Distributed-system-learning-notes
生產端:
-
引入依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhouhong</groupId> <artifactId>rabbit-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- springboot rabbitmq(amqp) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project>
-
組態檔 application.properties
server.servlet.context-path=/ server.port=8011 ## 鏡像佇列地址 spring.rabbitmq.addresses=192.168.2.121,192.168.2.122,192.168.2.123 spring.rabbitmq.username=admin spring.rabbitmq.password=admin ## 默認虛擬主機 spring.rabbitmq.virtual-host=/ ## 連接超時 spring.rabbitmq.connection-timeout=15000 ## 是否使用啟用訊息確認模式(可靠性投遞) spring.rabbitmq.publisher-confirms=true ## 設定reture訊息模式,注意要和mandatory一起配合使用 ## spring.rabbitmq.publisher-returns=true ## spring.rabbitmq.template.mandatory=true spring.application.name=rabbit-producer spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL
-
訊息發送
package com.zhouhong.rabbit.producer.component; import java.util.Map; import java.util.UUID; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component public class RabbbitSender { @Autowired private RabbitTemplate rabbitTemplate; /** * 這里是確認訊息的回呼監聽介面,用于確認訊息是否被 broker 所收到 */ final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { /** * @param CorrelationData 作為一個唯一的標識 * @param ack broker是否落盤成功 * @param cause 失敗的一些例外資訊 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // TODO Auto-generated method stub } }; /** * 對外發送訊息的方法 * @param massage 具體的訊息內容 * @param properties 額外的屬性 * @throws Exception */ public void send(Object message, Map<String, Object> properties) throws Exception{ MessageHeaders mhs = new MessageHeaders(properties); Message<?> msg = MessageBuilder.createMessage(message, mhs); /** * 使用的是confirms模式,所以在發訊息之前需要監控 */ rabbitTemplate.setConfirmCallback(confirmCallback); //指定業務唯一的ID CorrelationData correlationData = https://www.cnblogs.com/Tom-shushu/p/new CorrelationData(UUID.randomUUID().toString()); MessagePostProcessor mpp = new MessagePostProcessor() { @Override public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException { System.out.println("post todo: "+ message); return message; } }; rabbitTemplate.convertAndSend("exchange-1", "rabbitmq.*", msg, correlationData); } }
消費端:
-
組態檔 application.properties
server.servlet.context-path=/ server.port=8012 ## 鏡像佇列地址 spring.rabbitmq.addresses=192.168.2.121,192.168.2.122,192.168.2.123 spring.rabbitmq.username=admin spring.rabbitmq.password=admin ## 默認虛擬主機 spring.rabbitmq.virtual-host=/ ## 連接超時 spring.rabbitmq.connection-timeout=15000 ## 表示消費者訊息消費成功以后,需要手工的進行簽收(ACK) 默認為 auto spring.rabbitmq.listener.simple.acknowledge-mode=manual ## 執行緒數 spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10 ## 一條一條消費 spring.rabbitmq.listener.simple.prefetch=1 ## 最好不要在代碼里寫死配置資訊,盡量使用這種方式也就是組態檔的方式 ## 在代碼里使用 ${} 方式進行設定配置: ${spring.rabbitmq.listener.order.exchange.name} ## 交換機名稱 ## spring.rabbitmq.listener.order.exchange.name=order-exchange ## 是否持久化 ## spring.rabbitmq.listener.order.exchange.durable=true ## type 型別 ## spring.rabbitmq.listener.order.exchange.type=topic ## 規則 ## spring.rabbitmq.listener.order.exchange.key=rabbitmq.* spring.application.name=rabbit-producer spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL
-
接收訊息
package com.zhouhong.rabbit.consumer.component; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component public class RabbbitReceive { /** * 組合使用監聽 * @param message * @param channel * @throws Exception */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = https://www.cnblogs.com/Tom-shushu/p/"queue-1", durable = "true"), exchange = @Exchange(name = "exchange-1", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"), key = "rabbitmq.*" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { //1、收到訊息以后進行業務端消費處理 System.err.println("======================"); System.err.println("訊息消費:" + message.getPayload()); //2、處理成功之后獲取deliveryTay 并且進行手工的ACK操作,因為我們組態檔里面配置的是手工簽收 Long deliveryTay = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTay, false); } }
測驗:
在發送端建測驗類:
package com.zhouhong.rabbit.producer.test; import java.util.HashMap; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.zhouhong.rabbit.producer.component.RabbbitSender; @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTest { @Autowired private RabbbitSender rabbbitSender; @Test public void testSender() throws Exception{ Map<String , Object> properties = new HashMap<String, Object>(); properties.put("key1", "你好呀,RabbitMQ!!"); properties.put("key2", "你好呀,Kafka!!"); rabbbitSender.send("rabbitmq-test", properties); Thread.sleep(10000); } }
1、啟動消費者,觀察管控臺

建立了一個我們代碼里面指定的交換機 exchange-1,并且系結了我們指定的佇列queue-1,路由規則為 rabbitmq.*
2、關閉消費者,只運行我們發送端的測驗方法,觀察管控臺

我們發現會有一條未消費的訊息,
3、接著,我們再啟動消費端,觀察管控臺

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/270929.html
標籤:架構設計
上一篇:你以為在做的是微服務?不!你只是做了個比單體還糟糕的分布式單體!
下一篇:關于行程內快取與分布式快取
