1 單機版訊息中心
一個訊息中心,最基本的需要支持多生產者、多消費者,例如下:
class Scratch {
public static void main(String[] args) {
// 實際中會有 nameserver 服務來找到 broker 具體位置以及 broker 主從資訊
Broker broker = new Broker();
Producer producer1 = new Producer();
producer1.connectBroker(broker);
Producer producer2 = new Producer();
producer2.connectBroker(broker);
Consumer consumer1 = new Consumer();
consumer1.connectBroker(broker);
Consumer consumer2 = new Consumer();
consumer2.connectBroker(broker);
for (int i = 0; i < 2; i++) {
producer1.asyncSendMsg("producer1 send msg" + i);
producer2.asyncSendMsg("producer2 send msg" + i);
}
System.out.println("broker has msg:" + broker.getAllMagByDisk());
for (int i = 0; i < 1; i++) {
System.out.println("consumer1 consume msg:" + consumer1.syncPullMsg());
}
for (int i = 0; i < 3; i++) {
System.out.println("consumer2 consume msg:" + consumer2.syncPullMsg());
}
}
}
class Producer {
private Broker broker;
public void connectBroker(Broker broker) {
this.broker = broker;
}
public void asyncSendMsg(String msg) {
if (broker == null) {
throw new RuntimeException("please connect broker first");
}
new Thread(() -> {
broker.sendMsg(msg);
}).start();
}
}
class Consumer {
private Broker broker;
public void connectBroker(Broker broker) {
this.broker = broker;
}
public String syncPullMsg() {
return broker.getMsg();
}
}
class Broker {
// 對應 RocketMQ 中 MessageQueue,默認情況下 1 個 Topic 包含 4 個 MessageQueue
private LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue(Integer.MAX_VALUE);
// 實際發送訊息到 broker 服務器使用 Netty 發送
public void sendMsg(String msg) {
try {
messageQueue.put(msg);
// 實際會同步或異步落盤,異步落盤使用的定時任務定時掃描落盤
} catch (InterruptedException e) {
}
}
public String getMsg() {
try {
return messageQueue.take();
} catch (InterruptedException e) {
}
return null;
}
public String getAllMagByDisk() {
StringBuilder sb = new StringBuilder("\n");
messageQueue.iterator().forEachRemaining((msg) -> {
sb.append(msg + "\n");
});
return sb.toString();
}
}
問題:
- 沒有實作真正執行訊息存盤落盤
- 沒有實作 NameServer 去作為注冊中心,定位服務
- 使用 LinkedBlockingQueue 作為訊息佇列,注意,引數是無限大,在真正 RocketMQ 也是如此是無限大,理論上不會出現對進來的資料進行拋棄,但是會有記憶體泄漏問題(阿里巴巴開發手冊也因為這個問題,建議我們使用自制執行緒池)
- 沒有使用多個佇列(即多個 LinkedBlockingQueue),RocketMQ 的順序訊息是通過生產者和消費者同時使用同一個 MessageQueue 來實作,但是如果我們只有一個 MessageQueue,那我們天然就支持順序訊息
- 沒有使用 MappedByteBuffer 來實作檔案映射從而使訊息資料落盤非常的快(實際 RocketMQ 使用的是 FileChannel+DirectBuffer)
2 分布式訊息中心
2.1 問題與解決
2.1.1 訊息丟失的問題
- 當你系統需要保證百分百訊息不丟失,你可以使用生產者每發送一個訊息,Broker 同步回傳一個訊息發送成功的反饋訊息
- 即每發送一個訊息,同步落盤后才回傳生產者訊息發送成功,這樣只要生產者得到了訊息發送生成的回傳,事后除了硬碟損壞,都可以保證不會訊息丟失
- 但是這同時引入了一個問題,同步落盤怎么才能快?
2.1.2 同步落盤怎么才能快
- 使用 FileChannel + DirectBuffer 池,使用堆外記憶體,加快記憶體拷貝
- 使用資料和索引分離,當訊息需要寫入時,使用 commitlog 檔案順序寫,當需要定位某個訊息時,查詢index 檔案來定位,從而減少檔案IO隨機讀寫的性能損耗
2.1.3 訊息堆積的問題
- 后臺定時任務每隔72小時,洗掉舊的沒有使用過的訊息資訊
- 根據不同的業務實作不同的丟棄任務,具體參考執行緒池的 AbortPolicy,例如FIFO/LRU等(RocketMQ沒有此策略)
- 訊息定時轉移,或者對某些重要的 TAG 型(支付型)訊息真正落庫
2.1.4 定時訊息的實作
- 實際 RocketMQ 沒有實作任意精度的定時訊息,它只支持某些特定的時間精度的定時訊息
- 實作定時訊息的原理是:創建特定時間精度的 MessageQueue,例如生產者需要定時1s之后被消費者消費,你只需要將此訊息發送到特定的 Topic,例如:MessageQueue-1 表示這個 MessageQueue 里面的訊息都會延遲一秒被消費,然后 Broker 會在 1s 后發送到消費者消費此訊息,使用 newSingleThreadScheduledExecutor 實作
2.1.5 順序訊息的實作
-
與定時訊息同原理,生產者生產訊息時指定特定的 MessageQueue ,消費者消費訊息時,消費特定的 MessageQueue,其實單機版的訊息中心在一個 MessageQueue 就天然支持了順序訊息
-
注意:同一個 MessageQueue 保證里面的訊息是順序消費的前提是:消費者是串行的消費該 MessageQueue,因為就算 MessageQueue 是順序的,但是當并行消費時,還是會有順序問題,但是串行消費也同時引入了兩個問題:
- 引入鎖來實作串行
- 前一個消費阻塞時后面都會被阻塞
2.1.6 分布式訊息的實作
- 需要前置知識:2PC
- RocketMQ4.3 起支持,原理為2PC,即兩階段提交,prepared->commit/rollback
- 生產者發送事務訊息,假設該事務訊息 Topic 為 Topic1-Trans,Broker 得到后首先更改該訊息的 Topic 為 Topic1-Prepared,該 Topic1-Prepared 對消費者不可見,然后定時回呼生產者的本地事務A執行狀態,根據本地事務A執行狀態,來是否將該訊息修改為 Topic1-Commit 或 Topic1-Rollback,消費者就可以正常找到該事務訊息或者不執行等
注意,就算是事務訊息最后回滾了也不會物理洗掉,只會邏輯洗掉該訊息
2.1.7 訊息的 push 實作
- 注意,RocketMQ 已經說了自己會有低延遲問題,其中就包括這個訊息的 push 延遲問題
- 因為這并不是真正的將訊息主動的推送到消費者,而是 Broker 定時任務每5s將訊息推送到消費者
2.1.8 訊息重復發送的避免
- RocketMQ 會出現訊息重復發送的問題,因為在網路延遲的情況下,這種問題不可避免的發生,如果非要實作訊息不可重復發送,那基本太難,因為網路環境無法預知,還會使程式復雜度加大,因此默認允許訊息重復發送
- RocketMQ 讓使用者在消費者端去解決該問題,即需要消費者端在消費訊息時支持冪等性的去消費訊息
- 最簡單的解決方案是每條消費記錄有個消費狀態欄位,根據這個消費狀態欄位來是否消費或者使用一個集中式的表,來存盤所有訊息的消費狀態,從而避免重復消費
- 具體實作可以查詢關于訊息冪等消費的解決方案
2.1.9 廣播消費與集群消費
- 訊息消費區別:廣播消費,訂閱該 Topic 的訊息者們都會消費每個訊息,集群消費,訂閱該 Topic 的訊息者們只會有一個去消費某個訊息
- 訊息落盤區別:具體表現在訊息消費進度的保存上,廣播消費,由于每個消費者都獨立的去消費每個訊息,因此每個消費者各自保存自己的訊息消費進度,而集群消費下,訂閱了某個 Topic,而旗下又有多個 MessageQueue,每個消費者都可能會去消費不同的 MessageQueue,因此總體的消費進度保存在 Broker 上集中的管理
2.1.10 RocketMQ 不使用 ZooKeeper 作為注冊中心的原因,以及自制的 NameServer 優缺點?
- ZooKeeper 作為支持順序一致性的中間件,在某些情況下,它為了滿足一致性,會丟失一定時間內的可用性,RocketMQ 需要注冊中心只是為了發現組件地址,在某些情況下,RocketMQ 的注冊中心可以出現資料不一致性,這同時也是 NameServer 的缺點,因為 NameServer 集群間互不通信,它們之間的注冊資訊可能會不一致
- 另外,當有新的服務器加入時,NameServer 并不會立馬通知到 Produer,而是由 Produer 定時去請求 NameServer 獲取最新的 Broker/Consumer 資訊(這種情況是通過 Producer 發送訊息時,負載均衡解決)
2.1.11 其它

加分項咯
- 包括組件通信間使用 Netty 的自定義協議
- 訊息重試負載均衡策略(具體參考 Dubbo 負載均衡策略)
- 訊息過濾器(Producer 發送訊息到 Broker,Broker 存盤訊息資訊,Consumer 消費時請求 Broker 端從磁盤檔案查詢訊息檔案時,在 Broker 端就使用過濾服務器進行過濾)
- Broker 同步雙寫和異步雙寫中 Master 和 Slave 的互動
- Broker 在 4.5.0 版本更新中引入了基于 Raft 協議的多副本選舉,之前這是商業版才有的特性 ISSUE-1046
3 參考
- 《RocketMQ技術內幕》:https://blog.csdn.net/prestigeding/article/details/85233529
- 關于 RocketMQ 對 MappedByteBuffer 的一點優化:https://lishoubo.github.io/2017/09/27/MappedByteBuffer%E7%9A%84%E4%B8%80%E7%82%B9%E4%BC%98%E5%8C%96/
- 阿里中間件團隊博客-十分鐘入門RocketMQ:http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/
- 分布式事務的種類以及 RocketMQ 支持的分布式訊息:https://www.infoq.cn/article/2018/08/rocketmq-4.3-release
- 滴滴出行基于RocketMQ構建企業級訊息佇列服務的實踐:https://yq.aliyun.com/articles/664608
- 基于《RocketMQ技術內幕》原始碼注釋:https://github.com/LiWenGu/awesome-rocketmq
作者:snailclimb
鏈接:RocketMQ 的幾個簡單問題與答案
來源:gitee
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/236903.html
標籤:Java
