Topic是怎么被洗掉的?
Kafka有很多狀態機和管理器,如Controller通道管理器ControllerChannelManager、處理Controller事件的ControllerEventManager等,這些管理器和狀態機,大多與各自“宿主”聯系密切,就如Controller這倆管理器,必須與Controller組件緊耦合,才能實作各自功能,
Kafka還有一些狀態機和管理器,具有相對獨立的功能框架,不嚴重依賴使用方,如:
-
TopicDeletionManager(主題洗掉管理器)
負責對指定Kafka主題執行洗掉操作,清除待洗掉主題在集群上的各類“痕跡”,
-
ReplicaStateMachine(副本狀態機)
負責定義Kafka副本狀態、合法的狀態轉換,以及管理狀態之間的轉換,
-
PartitionStateMachine(磁區狀態機)
負責定義Kafka磁區狀態、合法的狀態轉換,以及管理狀態之間的轉換,
本文看看Kafka是如何洗掉一個主題的,
前言
以為成功執行kafka-topics.sh --delete命令后,主題就會被洗掉,這種不正確的認知會導致經常發現主題沒被刪干凈,于是,網傳終極“武林秘籍”:手動洗掉磁盤上的日志檔案,手動洗掉ZooKeeper下關于主題的各節點,但我不推薦這么干:
-
并不完整
除非你重啟Broker,否則,這套“秘籍”無法清理Controller端和各個Broker上元資料快取中的待洗掉主題的相關條目
-
并沒有被官方所認證,后果自負
與其琢磨洗掉主題失敗之后怎么自救,還是研究Kafka到底如何執行該操作,TopicDeletionManager.scala包括:
-
DeletionClient介面:負責實作洗掉主題以及后續的動作
如更新元資料
-
ControllerDeletionClient類:實作DeletionClient介面的類,分別實作了剛剛說到的那4個方法,
-
TopicDeletionManager類:主題洗掉管理器類
定義方法維護主題洗掉前后集群狀態的正確性,如,何時洗掉主題、何時主題不能被洗掉、主題洗掉程序中要規避哪些操作等
DeletionClient介面及實作
洗掉主題,并將洗掉主題的事件同步給其他Broker,
DeletionClient介面目前只有一個實作類ControllerDeletionClient,構造器的兩個欄位:
-
KafkaController實體
Controller組件物件
-
KafkaZkClient實體
Kafka與ZooKeeper互動的客戶端物件
API
deleteTopic
洗掉主題在zk上的所有“痕跡”,分別呼叫KafkaZkClient的3個方法洗掉ZooKeeper下/brokers/topics/節點、/config/topics/節點和/admin/delete_topics/節點,
deleteTopicDeletions
洗掉zk下待洗掉主題的標記節點,呼叫KafkaZkClient#deleteTopicDeletions,批量洗掉一組主題在/admin/delete_topics下的子節點,注意,deleteTopicDeletions這個方法名結尾的Deletions,表示/admin/delete_topics下的子節點,所以:
- deleteTopic是洗掉主題
- deleteTopicDeletions是洗掉/admin/delete_topics下的對應子節點
這兩個方法里都有epochZkVersion欄位,代表期望的Controller Epoch版本號,若使用一個舊Epoch版本號執行這些方法,zk會拒絕,因為和它自己保存的版本號不匹配,若一個Controller的Epoch<ZooKeeper中保存的,則該Controller很可能是已過期的Controller,這就是Zombie Controller,epochZkVersion欄位的作用,就是隔離Zombie Controller發送的操作,
mutePartitionModifications
屏蔽主題磁區資料變更監聽器:取消/brokers/topics/節點資料變更的監聽,
當該主題的磁區資料發生變更后,由于對應zk監聽器已被取消,因此不會觸發Controller相應處理邏輯,
為何取消該監聽器?為避免操作相互干擾:假設用戶A發起主題洗掉,同時用戶B為這個主題新增磁區,此時,這兩個操作就會沖突,若允許Controller同時處理這倆操作,勢必會造成邏輯混亂及狀態不一致,為應對這種情況,在移除主題副本和磁區物件前,代碼要先執行這個方法,確保不再回應用戶對該主題的其它操作,
mutePartitionModifications呼叫unregisterPartitionModificationsHandlers,并接著呼叫KafkaZkClient#unregisterZNodeChangeHandler,取消zk上對給定主題的磁區節點資料變更的監聽,
sendMetadataUpdate
呼叫KafkaController#sendUpdateMetadataRequest,給集群所有Broker發送更新請求,告訴它們不要再為已洗掉主題的磁區提供服務:

該方法會給集群中的所有Broker發送更新元資料請求,告知它們要同步給定磁區的狀態,
TopicDeletionManager定義及初始化

創建TopicDeletionManager類實體
在KafkaController類初始化時被創建:

實體化了一個全新的ControllerDeletionClient物件,然后利用該物件實體和replicaStateMachine、partitionStateMachine,一起創建TopicDeletionManager實體,
KafkaServerStartable.startup()=》KafkaServer.startup()=》KafkaController.init=》TopicDeletionManager
TopicDeletionManager重要API
除了類定義和初始化,還有resumeDeletions:重啟主題洗掉操作程序,
主題因為某些事件可能一時無法完成洗掉,如主題磁區正在進行副本重分配等,一旦這些事件完成,主題重新具備可洗掉資格,就需呼叫resumeDeletions重啟洗掉操作,

- 從元資料快取中獲取要洗掉主題串列,之后定義了兩個空的主題串列,分別保存待重試洗掉主題和待洗掉主題
- 遍歷每個要洗掉的主題,去看它所有副本的狀態,如果副本狀態都是ReplicaDeletionSuccessful,就表明該主題已經被成功洗掉,此時,再呼叫completeDeleteTopic方法,完成后續的操作就可以了,對于那些洗掉操作尚未開始,并且暫時無法執行洗掉的主題,原始碼會把這類主題加到待重試主題串列中,用于后續重試;如果主題是能夠被洗掉的,就將其加入到待洗掉串列中,
- 最后,呼叫retryDeletionForIneligibleReplicas重試待重試主題串列中的主題洗掉操作,對待洗掉主題串列中的主題則呼叫onTopicDeletion洗掉,
retryDeletionForIneligibleReplicas重試主題洗掉:將對應主題副本的狀態,從ReplicaDeletionIneligible變更到OfflineReplica,這樣,后續再次呼叫resumeDeletions,就會嘗試重新洗掉主題,
下面,我再用一張圖來解釋下resumeDeletions方法的執行流程:

resumeDeletions串聯起了TopicDeletionManger中的很多方法,較關鍵的:
completeDeleteTopic:

onTopicDeletion:

onTopicDeletion會多次使用磁區狀態機,調整待洗掉主題的磁區狀態,最后呼叫onPartitionDeletion執行真正的底層物理磁盤檔案洗掉,這是通過副本狀態機狀態轉換操作完成的,
總結
在主題洗掉程序中,Kafka會調整集群中三個地方的資料:
-
ZooKeeper
洗掉主題時,zk上與該主題相關的所有ZNode節點必須被清除
-
元資料快取
Controller端元資料快取中的相關項,也必須要被處理,并且要被同步到集群的其他Broker上
-
磁盤日志檔案
要清理的首要目標
這三個地方須統一處理,就好似原子操作,回想“秘籍”,它無法清除Controller端的元資料快取項,因此,避免使用這“大招”,
DeletionClient介面主要是操作ZooKeeper,實作ZooKeeper節點的洗掉等操作,
TopicDeletionManager,是在KafkaController創建程序中被初始化的,主要通過與元資料快取進行互動的方式,來更新各類資料,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/400492.html
標籤:其他
上一篇:實時數倉分層之DWM存在的意義
下一篇:RabbitMQ速通入門
