文章目錄
- 1、Federation Exchange(聯邦交換機)
- 1.1 為什么使用聯邦交換機
- 1.2 搭建步驟
- 1.2.1 需要保證每臺節點單獨運行
- 1.2.2 在每臺機器上開啟federation相關插件
- 1.2.3 原理圖(先運行consumer在node2創建fed_exchange)
- 1.2.4 在downstream(node2)配置upstream(node1)
- 1.2.5 添加policy
- 2、Federation Queue(聯邦佇列)
- 2.1 為什么使用聯邦佇列
- 2.2 搭建步驟
- 2.2.1 原理圖
- 2.2.2 添加upstream
- 2.2.3 添加policy
- 3、Shovel
- 3.1 為什么使用Shovel
- 3.2 搭建步驟
- 3.2.1 開啟的插件(需要的機器都開啟)
- 3.2.2 原理圖(在源頭發送的訊息直接會進入到目的地佇列)
- 3.2.3 添加shovel源和目的地
1、Federation Exchange(聯邦交換機)
1.1 為什么使用聯邦交換機
??(broker 北京),(broker 深圳)彼此之間相距甚遠,網路延遲是一個不得不面對的問題,有一個在北京 的業務(Client 北京) 需要連接(broker 北京),向其中的交換器 exchangeA 發送訊息,此時的網路延遲很小, (Client 北京)可以迅速將訊息發送至 exchangeA 中,就算在開啟了 publisherconfirm 機制或者事務機制的 情況下,也可以迅速收到確認資訊,此時又有個在深圳的業務(Client 深圳)需要向 exchangeA 發送訊息, 那么(Client 深圳) (broker 北京)之間有很大的網路延遲,(Client 深圳) 將發送訊息至 exchangeA 會經歷一 定的延遲,尤其是在開啟了 publisherconfirm 機制或者事務機制的情況下,(Client 深圳) 會等待很長的延 遲時間來接收(broker 北京)的確認資訊,進而必然造成這條發送執行緒的性能降低,甚至造成一定程度上的 阻塞,
??將業務(Client 深圳)部署到北京的機房可以解決這個問題,但是如果(Client 深圳)呼叫的另些服務都部 署在深圳,那么又會引發新的時延問題,總不見得將所有業務全部部署在一個機房,那么容災又何以實作? 這里使用 Federation 插件就可以很好地解決這個問題.

1.2 搭建步驟
1.2.1 需要保證每臺節點單獨運行
1.2.2 在每臺機器上開啟federation相關插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_managemen
裝完之后看下管理界面

1.2.3 原理圖(先運行consumer在node2創建fed_exchange)
消費者代碼:

public class Consumer {
//佇列的名稱
public static final String QUEUE_NAME="mirrior_hello";
//交換機的名稱
public static final String FED_EXCHANGE="fed_exchange";
//接收訊息
public static void main(String[] args) throws IOException, TimeoutException {
//創建連接工廠
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.159.34");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(FED_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.queueDeclare("node2_queue",true,false,false,null);
channel.queueBind("node2_queue",FED_EXCHANGE,"routeKey");
//宣告 接收訊息
DeliverCallback deliverCallback=(consumerTag, message)->{
System.out.println(new String(message.getBody()));
};
//取消訊息時的回呼
CancelCallback cancelCallback=consumerTag -> {
System.out.println("訊息消費被中斷");
};
/**
* 消費者消費訊息
* 1.消費哪個佇列
* 2.消費成功之后是否要自動應答 true代表自動應答 false手動應答
* 3.消費者成功消費的回呼
* 4.消費者取消消費的回呼
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
1.2.4 在downstream(node2)配置upstream(node1)

1.2.5 添加policy

查看下是否搭建成功,點擊Federation Status

2、Federation Queue(聯邦佇列)
2.1 為什么使用聯邦佇列
??聯邦佇列可以在多個 Broker 節點(或者集群)之間為單個佇列提供均衡負載的功能,一個聯邦佇列可以 連接一個或者多個上游佇列(upstream queue),并從這些上游佇列中獲取訊息以滿足本地消費者消費訊息 的需求,
2.2 搭建步驟
2.2.1 原理圖

2.2.2 添加upstream
這一步和1.2.4一致

2.2.3 添加policy

3、Shovel
3.1 為什么使用Shovel
??Federation 具備的資料轉發功能類似,Shovel 夠可靠、持續地從一個 Broker 中的佇列(作為源端,即 source)拉取資料并轉發至另一個 Broker 中的交換器(作為目的端,即 destination),作為源端的佇列和作 為目的端的交換器可以同時位于同一個 Broker,也可以位于不同的 Broker 上,Shovel 可以翻譯為"鏟子", 是一種比較形象的比喻,這個"鏟子"可以將訊息從一方"鏟子"另一方,Shovel 行為就像優秀的客戶端應用 程式能夠負責連接源和目的地、負責訊息的讀寫及負責連接失敗問題的處理,
3.2 搭建步驟
3.2.1 開啟的插件(需要的機器都開啟)
rabbitmq-plugins enable rabbitmq_shovel

rabbitmq-plugins enable rabbitmq_shovel_management

3.2.2 原理圖(在源頭發送的訊息直接會進入到目的地佇列)

3.2.3 添加shovel源和目的地

檢查下是否搭建成功

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/397589.html
標籤:其他
上一篇:Elasticsearch 7.X RESTful 風格 索引、檔案、映射操作
下一篇:Flink寫入Hbase
