關注文末公眾號,有驚喜福利!
讀這原始碼有何用?
ReplicaStateMachine是內部組件,一般用戶感覺不到存在,但搞懂它,對從根本定位一些資料不一致問題大有裨益,
部署3-Broker(A、B和C)Kafka集群,版本2.0.0,在這3個Broker上創建一個單磁區、雙副本主題,
-
若兩個副本分別位于A、B,而Controller在C
當關閉A、B后,zk會顯示該主題的Leader是-1,ISR為空
-
但若兩個副本依處A、B,而Controller在B
依次關閉A、B,該主題在zk中的Leader和ISR就變成B,和上一case不符
雖非特嚴重問題,但畢竟是資料不一致,查看原始碼后,定位導致不一致原因:
- 在第一種情況下,Controller會呼叫ReplicaStateMachine,調整該主題副本的狀態,進而變更Leader和ISR
- 第二種情況下,Controller執行Failover,但并未在新Controller組件初始化時進行狀態轉換,因而出現了不一致
不閱讀這部分原始碼,就無法定位問題根因,
定義與初始化
-
ReplicaStateMachine:副本狀態機抽象類,定義了一些常用方法(如startup、shutdown等),以及handleStateChanges

-
ZkReplicaStateMachine:副本狀態機具體實作類,重寫了handleStateChanges方法,實作了副本狀態之間的狀態轉換,


-
ReplicaState:副本狀態集合,Kafka目前共定義了7種副本狀態,
ReplicaStateMachine只需接收一個ControllerContext物件實體,ControllerContext封裝了Controller端保存的所有集群元資料資訊,
構造一個ZKReplicaStateMachine實體,除了ControllerContext實體,比較重要的屬性還有:
-
KafkaZkClient物件實體
負責與ZooKeeper進行互動
-
ControllerBrokerRequestBatch實體
用于給集群Broker發送控制類請求(LeaderAndIsrRequest、StopReplicaRequest和UpdateMetadataRequest)
ControllerBrokerRequestBatch,將給定Request發送給指定Broker,它是如何發送請求的呢(結合ControllerBrokerStateInfo)
在副本狀態轉換操作的邏輯中,關鍵是為Broker上的副本更新資訊,而這是通過Controller給Broker發送請求實作的,因此,你最好了解下這里的請求發送邏輯,
副本狀態機是在何時進行初始化的?
KafkaController物件在構建時,就會初始化一個ZkReplicaStateMachine實體:

-
若一個Broker沒被選舉為Controller,它也會構建KafkaController物件實體嗎?
Yes!所有Broker在啟動時,都會創建KafkaController實體,也隨之創建ZKReplicaStateMachine實體,但只有在Controller所在的Broker,副本狀態機才會被啟動:

當Broker被成功推舉為Controller后,onControllerFailover方法會被呼叫,進而啟動該Broker早已創建好的副本狀態機和磁區狀態機,
副本狀態及狀態管理流程
副本狀態機一旦被啟動,就要管理副本狀態的轉換,
研究管理狀態前,要先明白:
-
當前都有哪些狀態
-
含義分別是什么
原始碼中的ReplicaState定義了如下副本狀態:

ReplicaState介面及其實作物件定義了每種狀態的序號,以及合法的前置狀態,以OnlineReplica為例:


其validPreviousStates屬性是個集合型別,說明Kafka只允許副本從這4種態變更到OnlineReplica態,
其余副本狀態的代碼邏輯類似,關注validPreviousStates欄位即可知曉每個狀態合法的前置狀態,
最終完整的狀態轉換規則:

-
單向箭頭表示只允許單向狀態轉換
-
雙向箭頭則表示轉換方向可以是雙向
狀態管理流程
- 當副本物件首次被創建后,置NewReplica態
- 初始化后,當副本物件能夠對外提供服務,狀態機將其調整為OnlineReplica,并一直以該狀態持續作業
- 若副本所在Broker關倍訓不能正常作業,副本要從OnlineReplica變更為OfflineReplica,
一旦開啟如洗掉主題這樣操作,狀態機會將副本狀態跳轉到ReplicaDeletionStarted,表明副本洗掉已開啟:
- 洗掉成功,置ReplicaDeletionSuccessful
- 不滿足洗掉條件(如所在Broker處下線狀態),置ReplicaDeletionIneligible,以便重試
當副本物件被洗掉后,其狀態變更為NonExistentReplica,副本狀態機將移除該副本資料,
具體實作類:ZkReplicaStateMachine
副本狀態機的具體實作類,
狀態轉換方法
-
logFailedStateChange

-
logInvalidTransition
-
logSuccessfulTransition
-
getTopicPartitionStatesFromZk
-
doRemoveReplicasFromIsr
-
removeReplicasFromIsr
-
doHandleStateChanges
handleStateChanges方法
handleStateChange處理狀態的變更,對外提供狀態轉換操作的入口方法:
def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit

- 呼叫doHandleStateChanges執行副本狀態轉換
- 給集群中相應Broker批量發送請求
執行第1步時,會將replicas按Broker ID分組,
<主題名,磁區號,副本Broker ID>表示副本物件

假設replicas為集合:
<test, 0, 0>
<test, 0, 1>
<test, 1, 0>
<test, 1, 1>)
則呼叫doHandleStateChanges前,會將replicas按Broker ID分組成:
Map(
- 0 -> Set(<test, 0, 0>, <test, 1, 0>),
- 1 -> Set(<test, 0, 1>, <test, 1, 1>)
)
之后呼叫doHandleStateChanges
doHandleStateChanges

-
嘗試獲取給定副本物件在Controller端元資料快取中的當前狀態:若未保存某副本物件的狀態,將其初始化為NonExistentReplica態
-
根據不同ReplicaState中定義的合法前置狀態集合及傳入的目標狀態(targetState),將給定副本物件集合劃分成兩部分:
-
能合法轉換的副本物件集合
-
執行非法狀態轉換的副本物件集合
doHandleStateChanges為該集合類的每個副本物件記錄一條錯誤日志
-
-
代碼攜帶能執行合法轉換的副本物件集合,進入不同代碼分支,當前Kafka為副本定義7類狀態,因此,共有7條分支
包括:
- 副本被創建時被轉換到NewReplica態
- 副本正常作業時被轉換到OnlineReplica態
- 副本停止服務后被轉換到OfflineReplica態
分支1:轉換到NewReplica


嘗試從元資料快取中,獲取這些副本物件的磁區資訊資料,包括磁區的Leader副本在哪個Broker,ISR中都有哪些副本等,
若找不到對應磁區資料,直接把副本狀態更新為NewReplica,否則,代碼就要給該副本所在Broker發送請求,讓它知道該磁區的資訊,還要給集群所有運行中的Broker發送請求,讓它們感知到新副本加入,
分支2:轉換到OnlineReplica態
副本物件正常作業時所處狀態:


遍歷副本物件,依次執行:
- 獲取元資料中該副本所屬的磁區物件及該副本的當前狀態
- 查看當前狀態是否是NewReplica
- 是,獲取磁區的副本串列,并判斷該副本是否在于當前副本串列:不在,就記錄錯誤日志并更新元資料中的副本串列
- 若狀態不是NewReplica,說明這是已存在的副本物件,則原始碼會獲取對應磁區的詳細資料,然后向該副本物件所在的Broker發送LeaderAndIsrRequest請求,令其同步獲知,并保存該磁區資料
- 將該副本物件狀態變更為OnlineReplica,至此,該副本處于正常作業狀態,
分支3:轉換到OfflineReplica狀態


- 給所有符合狀態轉換的副本所在Broker,發送StopReplicaRequest,告訴這些Broker停掉對應副本
- 根據磁區是否保存Leader資訊,將副本集合劃分成:有Leader副本集,無Leader副本集合,有無Leader資訊并不僅僅包含Leader,還有ISR和controllerEpoch等資料
- 遍歷有Leader子集合,向這些副本所在Broker發送LeaderAndIsrRequest請求,去更新停止副本操作之后的磁區資訊,再把這些磁區狀態置OfflineReplica
- 遍歷無Leader子集合,執行與上步類似操作,只是對無Leader,因未執行任何Leader選舉操作,所以給這些副本所在Broker發送的不是LeaderAndIsrRequest請求,而是UpdateMetadataRequest請求,顯式告知它們更新對應磁區的元資料,再把副本狀態置OfflineReplica
把副本狀態變更為OfflineReplica=停止對應副本+更新遠端Broker元資料
總結
Kafka的副本狀態機實作原理及原始碼:
- 副本狀態機:ReplicaStateMachine是Kafka Broker端原始碼中控制副本狀態流轉的實作類,每個Broker啟動時都會創建ReplicaStateMachine實體,但只有Controller組件所在的Broker才會啟動它,
- 副本狀態:當前,Kafka定義了7類副本狀態,同時,它還規定了每類狀態合法的前置狀態,
- handleStateChanges:用于執行狀態轉換的核心方法,底層呼叫doHandleStateChanges方法,以7路case分支的形式窮舉每類狀態的轉換邏輯,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/401592.html
標籤:其他
上一篇:B站瘋傳24小時刪
