RocketMQ 4.7.1原始碼決議/吐槽
- 前言
- 槽點一(檔案)
- 槽點二(springboot版本)
- 槽點三(對Spring-cloud-stream支持低)
- 槽點四(向IE看起???版本不穩定/源代碼注釋少的可憐)
- 原始碼下載
- 正文
- NameServer
- Broker
- 啟動
- 注冊
- Producer
- 啟動
- 發送訊息
- Consumer
- 訊息獲取
- 負載均衡
- END
前言
距離上次發布 RocketMQ 的博文已經有六個月的時間了,中間從有開始想寫原始碼的念頭,動手到現在斷斷續續的寫完花了也有三個多月時間,具體為啥會寫那么久,主要是因為,最終 RocketMQ 被我放棄,更改使用 RabbitMq了
為啥放棄 RocketMQ ? 我稍微吐槽一下好了 ╮(╯_╰)╭
最開始決定使用 RocketMQ 是基于以下幾點考慮:
- 這是阿里生態的一部分
- 支撐過雙十一的考驗
- 和 Kafka 與 RabbitMQ 相比,適用的場景更為全面,并且支持事務
- 近期文章有許多于其相關的影子,證明了它的活躍度不斷上升,做為技術人當然有必要站在技術的最前沿
- 國產咱必須得支持
槽點一(檔案)
為啥最終放棄了呢 ╮(╯▽╰)╭ 且不說它還有開源版和商業版之分,這邊以開源版為例,
因為,專案都更新到 4.7.1 了(現在已經是4.8.0),然而官方的檔案寫的真的是…無力吐槽了…((/- -)/,就跟沒人維護一樣,撰寫的版本還是 4.3.0的
我也是看著看著覺得不對勁,最終才發現這個問題!!!
槽點二(springboot版本)
還有一些 " 小 " 問題 …( _ _)ノ|壁
比如:
微服務都以 spring boot 為驅動,目的是為了簡化配置快速開發,RocketMQ 自然也有 對應的 spring Boot 包,但是!里面對應依賴的springBoot的版本都偏低,所以,還需要自己排除相對應依賴,
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
里面對應依賴springBoot版本是2.0.5,所以需要排除自己加上新的版本依賴,
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
槽點三(對Spring-cloud-stream支持低)
再比如:
spring 社區對所有MQ進行了高度的抽象,推出了一個統一的訊息驅動框架,目的是為了呼叫的時候不同太關注于技術,只需要關注于業務邏輯,那么MQ是使用哪一種就對業務沒有任何影響,以下就是官方鏈接,
https://spring.io/projects/spring-cloud-stream

天啊,竟然有那么好的東西!何愁專案延期!!何愁大局不穩!!何愁江山不定!!!

剛想拿過來用,發現!!!!!

官方目前只封裝了kafka、kafka Stream、RabbitMQ的具體依賴,其它是需要廠家自己去維護的,然而,這群阿里的大神,貌似目前看來,并不熱衷于維護它,包括檔案也是,都是十分陳舊的!!!!
目前 rocketMq stream 最新版本是 2.2.5
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2.2.5.RELEASE</version>
但是它依賴的rocketmq-client版本還是4.4.0


槽點四(向IE看起???版本不穩定/源代碼注釋少的可憐)
不得不說,版本迭代還是很快的,但是你也不能改動那么大啊,新版本對比舊版本源代碼變了許多,有點 看IE的感覺!!!
由于官方檔案寫的不好,版本又舊,社區又不算太活躍,所以只能自己去看源代碼,但是!!!!天啊 ,注釋呢!!!不寫注釋你不是耍流氓嗎,瞧瞧spring原始碼雖然也沒怎么看懂,但是人家注釋還是有的!!!

由于以上總總原因,開發成本略高,專案又緊,迫不得已,改用 RabbitMQ進行開發,

原始碼下載
當初剛開始撰寫RocketMQ系列的時候,最新的代碼是4.71,但是最近新版本是4.8.0,由于博主之前都以4.7.1作為研究,所以本篇博文以4.7.1進行講解,
官方網址以及下載安裝前面的博文已經進行了詳細的描述,所以這邊僅僅附上4.7.1的原始碼下載連接,
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip
正文
以下對于原始碼的理解是基于官方檔案案例以一些參考資料,若有理解錯的地方歡迎大家指出來,
NameServer
先附上一張大致講解圖(感覺不是很好看,可是我的美術造詣也就這水平了 ( ̄_, ̄ ) )

代碼詳解

NameServer 的啟動是通過 NamesrvStartup 的 main 方法進行啟動,而 main 方法主要做了兩件事
- 創建NamesrvController
- 啟動
那么接下來就進行跟蹤分析,
首先看一下創建NamesrvController做了什么事?


其中快取資訊registerConfig具體操作如下:

Controller中的configuration是在new的時候,在構造器中生成,其中的allConfigs也是創建的時候生成,


創建 NamesrvController 就如上所示,那么接下來看start操作,

總共做了兩件事,第一件初始化引數,第二件宣告關閉事件,關閉的時候會關閉定時任務和執行緒池等,所以關閉的時候用它的 showdown 會比 kil l-9 優雅

接下來關注初始化事件,

初始化事件做的事情不多,其中移除不存活的Broker還有點意思,其它就沒什么了(那個博主沒看懂的除外)

至此,NameServer 的 啟動撰寫完畢
Broker
繼續附上一張大大大圖

代碼詳解:
啟動

Broker 的啟動也是根據 BrokerStartup 的 main 方法進行,主要做了兩件事:
- 創建 BrokerController
- 啟動
先看一下 BrokerController 創建的時候做了什么事

這邊和 NameServer 有一點最大的差別,它多了一種配置 NettyClientConfig ,之所以會多了一個配置,是因為在使用事務訊息模式的時候,他事可以當做客戶端的,


這邊倒是沒有什么好關注的,主要看看初始化時候做了什么事

看一下加載磁盤資料做了什么?



我們接著往下看

但是還是有一個定時器是可以研究一下的,就是那個可以寫sql的過濾 tag

既然這邊有個定時器,持久化filter,那么就代表 filter 是在Broker中進行的,所以它的效率才會那么高!!!
注冊
注冊是在創建完Controller后的啟動中進行的

整個start就做了這一件核心事


先看找尋Broker的代碼

當回圈完所有的Broker,逐一進行判斷后,在呼叫后面 doRegisterBrokerAll 進行注冊(中間找的代碼看不懂!!!)

到此,關于 Broker 啟動,博主能看懂的都描述完畢,
Producer
生產者的代碼主要包含兩個部分,一個是啟動,一個是發送訊息
老規矩,先上圖:

啟動
先說說啟動干了啥:

皮一下,很開心 <( ̄︶ ̄)>


這里是各種組件的初始化,底層運用到相當多的netty的相關知識,有興趣的可以自行查看,
發送訊息


核心的關鍵步驟就以上三點,
先看如何獲取到訊息佇列:

接著再看如何發送訊息:

從NameServer 找尋 Broker地址,是更新快取再從快取中獲取

在發送Netty請求時,實際上是指定的MessageQueue,而不是Topic,Topic只是用來找 MessageQueue ,
Consumer
消費者也是兩部分,一部分啟動,然后啟動時運行組件進行資料的消費,散亂東西有點多,圖不好畫,那我就簡單畫??ヾ(≧?≦*)ゝ




客戶端是否順序消費就在于此實體化,

啟動的核心代碼中,初始化了許多的組件,但是與我們使用關聯性比較高的是負載均衡和具體的訊息獲取,

訊息獲取
先看一下訊息獲取的原始碼

這里面有兩點可以了解的
第一點是選擇消費實體,這里的實體實在之前 注冊本地消費者的時候 ,寫入與快取


快取已寫入,所以啟動的時候就可以獲取得到:

第二點就是,雖然我們使用的是推的模式進行資料的獲取和消費,但是最侄訓是用拉的模式進行處理,
接下來繼續看拉取的流程


這邊也有兩點可以講解
第一是 拉取訊息的回呼方法
第二是拉取訊息的具體方式
但是由于拉取訊息后才會進入回呼,一些引數配也有影響,比如,defaultMQPushConsumer.getPullBatchSize(),就是拉取的數量,回呼會用到,所以先看第二種,


以上就是拉取訊息,那么最后再觀察下 如何處理消費 pullCallback !
PullCallback 最終是呼叫 onSuccess方法,而里面我認為最核心的是


訊息的處理有兩種,一種是并發,一種是順序:
并發

這里有一點需要注意ps(這里是半懵半猜的,有看懂的可以提醒一下博主)
defaultMQPushConsumer.getConsumeMessageBatchMaxSize()
是判斷批量的數量大小,但是默認 是一條,網路上卻有的資料是說 32,跟進去卻沒有發現哪里有進行數值的修改,只在上一步拉取的時候有設定32的拉取數量,姑且只能猜測是這個數值在其它封裝的地方有進行默認數值的更改,
順序:
這個還是有點意思的,畢竟多個 MessageQueue 獲取訊息怎么才能順序
ConsumeRequest 中

然而他是拿到一個鎖一個 ㄟ( ▔, ▔ )ㄏ,
負載均衡


至于負載均衡模式是哪里來的,在我們最開始創建實體的時候們就有一個默認模式的設定



END
怎么說呢,一把辛酸淚吧,哪個專案不是資源少時間緊,如果用一個組件還要花相當的精力去研究,除非甲方仁慈公司寬容,不然還是放棄吧,
真的要用 RocketMQ 的話,那就用商業版吧(我覺得這個開源版目的就是引導我們用商業版!!!)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/258393.html
標籤:其他
