
【原始碼】Topic洗掉流程分析+常見問題
日常運維 、問題排查 怎么能夠少了滴滴開源的
滴滴開源LogiKM一站式Kafka監控與管控平臺
文章目錄
- 洗掉Topic命令
- 相關配置
- 原始碼決議
- 1. 客戶端發起洗掉Topic的請求
- 2. Controller處理deleteTopics的請求
- 3. Controller監聽zk變更 執行洗掉Topic流程
- 3.1 resumeDeletions 執行洗掉方法
- 3.2 TopicDeletionManager.onPartitionDeletion
- 4. Brokers 接受StopReplica請求
- 4.1 日志清理定時執行緒
- 5.StopReplica 請求成功 執行回呼介面
- 6. Controller啟動時候 嘗試繼續處理待洗掉的Topic
- 6.1 獲取需要被洗掉的Topic和暫時不能洗掉的Topic
- 6.2 topicDeletionManager.init初始化洗掉管理器
- 6.3 topicDeletionManager.tryTopicDeletion嘗試恢復洗掉
- 原始碼總結
- Q&A
- 什么時候在/admin/delete_topics寫入節點的
- 什么時候真正執行洗掉Topic磁盤日志
- 為什么正在重新分配的Topic不能被洗掉
- 如果在`/admin/delete_topics/`中手動寫入一個節點會不會正常洗掉
- 如果直接洗掉ZK上的`/brokers/topics/{topicName}`節點會怎樣
- Controller通知Brokers 執行StopReplica是通知所有的Broker還是只通知跟被洗掉Topic有關聯的Broker?
- 洗掉程序有Broker不在線 或者執行失敗怎么辦
- ReplicaStateMachine 副本狀態機
- 在重新分配的程序中,如果執行洗掉操作會怎么樣
洗掉Topic命令
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
支持正則運算式匹配Topic來進行洗掉,只需要將topic 用雙引號包裹起來
例如: 洗掉以create_topic_byhand_zk為開頭的topic;
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “create_topic_byhand_zk.*”
.表示任意匹配除換行符 \n 之外的任何單字符,要匹配 . ,請使用 . ,
·*·:匹配前面的子運算式零次或多次,要匹配 * 字符,請使用 *,
.*: 任意字符
洗掉任意Topic (慎用)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “.*?”
更多的用法請參考正則運算式
相關配置
| 配置 | 描述 | 默認 |
|---|---|---|
| file.delete.delay.ms | topic洗掉被標記為–delete檔案之后延遲多長時間洗掉正在的Log檔案 | 60000 |
| delete.topic.enable | 是否能夠洗掉topic | true |
原始碼決議
如果覺得閱讀原始碼決議太枯燥,請直接看 原始碼總結及其后面部分
1. 客戶端發起洗掉Topic的請求
在【kafka原始碼】TopicCommand之創建Topic原始碼決議 里面已經分析過了整個請求流程; 所以這里就不再詳細的分析請求的程序了,直接看重點;

向Controller發起 deleteTopics請求
2. Controller處理deleteTopics的請求
KafkaApis.handle
AdminManager.deleteTopics
/**
* Delete topics and wait until the topics have been completely deleted.
* The callback function will be triggered either when timeout, error or the topics are deleted.
*/
def deleteTopics(timeout: Int,
topics: Set[String],
responseCallback: Map[String, Errors] => Unit): Unit = {
// 1. map over topics calling the asynchronous delete
val metadata = topics.map { topic =>
try {
// zk中寫入資料 標記要被洗掉的topic /admin/delete_topics/Topic名稱
adminZkClient.deleteTopic(topic)
DeleteTopicMetadata(topic, Errors.NONE)
} catch {
case _: TopicAlreadyMarkedForDeletionException =>
// swallow the exception, and still track deletion allowing multiple calls to wait for deletion
DeleteTopicMetadata(topic, Errors.NONE)
case e: Throwable =>
error(s"Error processing delete topic request for topic $topic", e)
DeleteTopicMetadata(topic, Errors.forException(e))
}
}
// 2. 如果客戶端傳過來的timeout<=0或者 寫入zk資料程序例外了 則執行下面的,直接回傳例外
if (timeout <= 0 || !metadata.exists(_.error == Errors.NONE)) {
val results = metadata.map { deleteTopicMetadata =>
// ignore topics that already have errors
if (deleteTopicMetadata.error == Errors.NONE) {
(deleteTopicMetadata.topic, Errors.REQUEST_TIMED_OUT)
} else {
(deleteTopicMetadata.topic, deleteTopicMetadata.error)
}
}.toMap
responseCallback(results)
} else {
// 3. else pass the topics and errors to the delayed operation and set the keys
val delayedDelete = new DelayedDeleteTopics(timeout, metadata.toSeq, this, responseCallback)
val delayedDeleteKeys = topics.map(new TopicKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys)
}
}
- zk中寫入資料topic
/admin/delete_topics/Topic名稱; 標記要被洗掉的Topic - 如果客戶端傳過來的timeout<=0或者 寫入zk資料程序例外了 則直接回傳例外
3. Controller監聽zk變更 執行洗掉Topic流程
KafkaController.processTopicDeletion
private def processTopicDeletion(): Unit = {
if (!isActive) return
var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
}
topicsToBeDeleted --= nonExistentTopics
if (config.deleteTopicEnable) {
if (topicsToBeDeleted.nonEmpty) {
info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
// 標記暫時不可洗掉的Topic
topicsToBeDeleted.foreach { topic =>
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
if (partitionReassignmentInProgress)
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
reason = "topic reassignment in progress")
}
// add topic to deletion list
topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
} else {
// If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
info(s"Removing $topicsToBeDeleted since delete topic is disabled")
zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
}
}
- 如果
/admin/delete_topics/下面的節點有不存在的Topic,則清理掉 - 如果配置了
delete.topic.enable=false不可洗掉Topic的話,則將/admin/delete_topics/下面的節點全部洗掉,然后流程結束 delete.topic.enable=true; 將主題標記為不符合洗掉條件,放到topicsIneligibleForDeletion中; 不符合洗掉條件的是:Topic磁區正在進行磁區重分配- 將Topic添加到洗掉Topic串列
topicsToBeDeleted中; - 然后呼叫
TopicDeletionManager.resumeDeletions()方法執行洗掉操作
3.1 resumeDeletions 執行洗掉方法
TopicDeletionManager.resumeDeletions()
private def resumeDeletions(): Unit = {
val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
val topicsEligibleForRetry = mutable.Set.empty[String]
val topicsEligibleForDeletion = mutable.Set.empty[String]
if (topicsQueuedForDeletion.nonEmpty)
topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
//如果所有副本都被標記為洗掉成功了,然后執行洗掉Topic成功操作;
if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) {
// clear up all state for this topic from controller cache and zookeeper
//執行洗掉Topic成功之后的操作;
completeDeleteTopic(topic)
info(s"Deletion of topic $topic successfully completed")
} else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
// TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
topicsEligibleForRetry += topic
}
}
// Add topic to the eligible set if it is eligible for deletion.
if (isTopicEligibleForDeletion(topic)) {
info(s"Deletion of topic $topic (re)started")
topicsEligibleForDeletion += topic
}
}
// topic deletion retry will be kicked off
if (topicsEligibleForRetry.nonEmpty) {
retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
}
// topic deletion will be kicked off
if (topicsEligibleForDeletion.nonEmpty) {
//洗掉Topic,發送UpdataMetaData請求
onTopicDeletion(topicsEligibleForDeletion)
}
}
}
-
重點看看
onTopicDeletion方法,標記所有待洗掉磁區;向Brokers發送updateMetadataRequest請求,告知Brokers這個主題正在被洗掉,并將Leader設定為LeaderAndIsrLeaderDuringDelete;- 將待洗掉的Topic的所有磁區,執行磁區狀態機的轉換 ;當前狀態–>
OfflinePartition->NonExistentPartition; 這兩個狀態轉換只是在當前Controller記憶體中更新了一下狀態; 關于狀態機請看 【kafka原始碼】Controller中的狀態機TODO…; client.sendMetadataUpdate(topics.flatMap(controllerContext.partitionsForTopic))向待洗掉Topic磁區發送UpdateMetadata請求; 這個時候更新了什么資料呢?
看上面圖片原始碼, 發送UpdateMetadata請求的時候把磁區的Leader= -2; 表示這個磁區正在被洗掉;那么所有正在被洗掉的磁區就被找到了;拿到這些待洗掉磁區之后干嘛呢?- 更新一下限流相關資訊
- 呼叫
groupCoordinator.handleDeletedPartitions(deletedPartitions): 清除給定的deletedPartitions的組偏移量以及執行偏移量洗掉的函式;就是現在該磁區不能提供服務啦,不能被消費啦
詳細請看 Kafka的元資料更新UpdateMetadata
- 呼叫
TopicDeletionManager.onPartitionDeletion介面如下;
- 將待洗掉的Topic的所有磁區,執行磁區狀態機的轉換 ;當前狀態–>
3.2 TopicDeletionManager.onPartitionDeletion
- 將所有Dead replicas 副本直接移動到
ReplicaDeletionIneligible狀態,如果某些副本已死,也將相應的主題標記為不適合洗掉,因為它無論如何都不會成功完成 - 副本狀態轉換成
OfflineReplica; 這個時候會對該Topic的所有副本所在Broker發起StopReplicaRequest請求;(引數deletePartitions = false,表示還不執行洗掉操作); 以便他們停止向Leader發送fetch請求; 關于狀態機請看 【kafka原始碼】Controller中的狀態機TODO…; - 副本狀態轉換成
ReplicaDeletionStarted狀態,這個時候會對該Topic的所有副本所在Broker發起StopReplicaRequest請求;(引數deletePartitions = true,表示執行洗掉操作),這將發送帶有 deletePartition=true 的StopReplicaRequest,并將洗掉相應磁區的所有副本中的所有持久資料
4. Brokers 接受StopReplica請求
最終呼叫的是介面
ReplicaManager.stopReplica ==> LogManager.asyncDelete
將給定主題磁區“logdir”的目錄重命名為“logdir.uuid.delete”,并將其添加到洗掉佇列中
例如 :
def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = {
val removedLog: Log = logCreationOrDeletionLock synchronized {
//將待洗掉的partition在 Logs中洗掉掉
if (isFuture)
futureLogs.remove(topicPartition)
else
currentLogs.remove(topicPartition)
}
if (removedLog != null) {
//我們需要等到要洗掉的日志上沒有更多的清理任務,然后才能真正洗掉它,
if (cleaner != null && !isFuture) {
cleaner.abortCleaning(topicPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
//重命名topic副本檔案夾 命名規則 topic-uuid-delete
removedLog.renameDir(Log.logDeleteDirName(topicPartition))
checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty)
checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
//將Log添加到待洗掉Log佇列中,等待洗掉
addLogToBeDeleted(removedLog)
} else if (offlineLogDirs.nonEmpty) {
throw new KafkaStorageException(s"Failed to delete log for ${if (isFuture) "future" else ""} $topicPartition because it may be in one of the offline directories ${offlineLogDirs.mkString(",")}")
}
removedLog
}
4.1 日志清理定時執行緒
上面我們知道最終是將待洗掉的Log添加到了
logsToBeDeleted這個佇列中; 這個佇列就是待洗掉Log佇列,有一個執行緒kafka-delete-logs專門來處理的;我們來看看這個執行緒怎么作業的
LogManager.startup 啟動的時候 ,啟動了一個定時執行緒
scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
deleteLogs _,
delay = InitialTaskDelayMs,
unit = TimeUnit.MILLISECONDS)
洗掉日志的執行緒
/**
* Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs`
* has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be
* considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed
* after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`,
* `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`.
* 洗掉標記為洗掉的日志檔案;
* file.delete.delay.ms 檔案延遲洗掉時間 默認60000毫秒
*
*/
private def deleteLogs(): Unit = {
var nextDelayMs = 0L
try {
def nextDeleteDelayMs: Long = {
if (!logsToBeDeleted.isEmpty) {
val (_, scheduleTimeMs) = logsToBeDeleted.peek()
scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
} else
currentDefaultConfig.fileDeleteDelayMs
}
while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
val (removedLog, _) = logsToBeDeleted.take()
if (removedLog != null) {
try {
//立即徹底洗掉此日志目錄和檔案系統中的所有內容
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
case e: KafkaStorageException =>
error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
}
}
}
} catch {
case e: Throwable =>
error(s"Exception in kafka-delete-logs thread.", e)
} finally {
try {
scheduler.schedule("kafka-delete-logs",
deleteLogs _,
delay = nextDelayMs,
unit = TimeUnit.MILLISECONDS)
} catch {
case e: Throwable =>
if (scheduler.isStarted) {
// No errors should occur unless scheduler has been shutdown
error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
}
}
}
}
file.delete.delay.ms 決定延遲多久洗掉
5.StopReplica 請求成功 執行回呼介面
Topic洗掉完成, 清理相關資訊
觸發這個介面的地方是: 每個Broker執行洗掉StopReplica成功之后,都會執行一個回呼函式;TopicDeletionStopReplicaResponseReceived; 當然呼叫方是Controller,回呼到的也就是Controller;
傳入回呼函式的地方

執行回呼函式 KafkaController.processTopicDeletionStopReplicaResponseReceived
-
如果回呼有例外,洗掉失敗則將副本狀態轉換成==》
ReplicaDeletionIneligible,并且重新執行resumeDeletions方法; -
如果回呼正常,則變更狀態
ReplicaDeletionStarted==》ReplicaDeletionSuccessful;并且重新執行resumeDeletions方法; -
resumeDeletions方法會判斷所有副本是否均被洗掉,如果全部洗掉了就會執行下面的completeDeleteTopic代碼;否則會繼續洗掉未被成功洗掉的副本private def completeDeleteTopic(topic: String): Unit = { // deregister partition change listener on the deleted topic. This is to prevent the partition change listener // firing before the new topic listener when a deleted topic gets auto created client.mutePartitionModifications(topic) val replicasForDeletedTopic = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful) // controller will remove this replica from the state machine as well as its partition assignment cache replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq, NonExistentReplica) controllerContext.topicsToBeDeleted -= topic controllerContext.topicsWithDeletionStarted -= topic client.deleteTopic(topic, controllerContext.epochZkVersion) controllerContext.removeTopic(topic) }- 清理記憶體中相關資訊
- 取消注冊被洗掉Topic的相關節點監聽器;節點是
/brokers/topics/Topic名稱 - 洗掉zk中的資料包括;
/brokers/topics/Topic名稱、/config/topics/Topic名稱、/admin/delete_topics/Topic名稱
6. Controller啟動時候 嘗試繼續處理待洗掉的Topic
我們之前分析Controller上線的時候有看到
KafkaController.onControllerFailover
以下省略部分代碼
private def onControllerFailover(): Unit = {
// 獲取哪些Topic需要被洗掉,哪些暫時還不能洗掉
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
info("Initializing topic deletion manager")
//Topic洗掉管理器初始化
topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
//Topic洗掉管理器 嘗試開始洗掉Topi
topicDeletionManager.tryTopicDeletion()
6.1 獲取需要被洗掉的Topic和暫時不能洗掉的Topic
fetchTopicDeletionsInProgress
topicsToBeDeleted所有需要被洗掉的Topic從zk中/admin/delete_topics獲取topicsIneligibleForDeletion有一部分Topic還暫時不能被洗掉:
①. Topic任意磁區正在進行副本重分配
②. Topic任意磁區副本存在不在線的情況(只有topic有一個副本所在的Broker例外就不能能洗掉)- 將得到的資料存在在
controllerContext記憶體中
6.2 topicDeletionManager.init初始化洗掉管理器
- 如果服務器配置
delete.topic.enable=false不允許洗掉topic的話,則洗掉/admin/delete_topics中的節點; 這個節點下面的資料是標記topic需要被洗掉的意思;
6.3 topicDeletionManager.tryTopicDeletion嘗試恢復洗掉
這里又回到了上面分析過的resumeDeletions啦;恢復洗掉操作
def tryTopicDeletion(): Unit = {
if (isDeleteTopicEnabled) {
resumeDeletions()
}
}
原始碼總結
整個Topic洗掉, 請看下圖

幾個注意點:
- Controller 也是Broker
- Controller發起洗掉請求的時候,只是跟相關聯的Broker發起洗掉請求;
- Broker不在線或者洗掉失敗,Controller會持續進行洗掉操作; 或者Broker上線之后繼續進行洗掉操作
Q&A
列舉在此主題下比較常見的問題; 如果讀者有其他問題可以在評論區評論, 博主會不定期更新
什么時候在/admin/delete_topics寫入節點的
客戶端發起洗掉操作deleteTopics的時候,Controller回應deleteTopics請求, 這個時候Controller就將待洗掉Topic寫入了zk的
/admin/delete_topics/Topic名稱節點中了;
什么時候真正執行洗掉Topic磁盤日志
Controller監聽到zk節點
/admin/delete_topics之后,向所有存活的Broker發送洗掉Topic的請求; Broker收到請求之后將待洗掉副本標記為–delete后綴; 然后會有專門日志清理現場來進行真正的洗掉操作; 延遲多久洗掉是靠file.delete.delay.ms來決定的;默認是60000毫秒 = 一分鐘
為什么正在重新分配的Topic不能被洗掉
正在重新分配的Topic,你都不知道它具體會落在哪個地方,所以肯定也就不知道啥時候洗掉啊;
等分配完畢之后,就會繼續洗掉流程
如果在/admin/delete_topics/中手動寫入一個節點會不會正常洗掉
如果寫入的節點,并不是一個真實存在的Topic;則將會直接被洗掉
當然要注意如果配置了delete.topic.enable=false不可洗掉Topic的話,則將/admin/delete_topics/下面的節點全部洗掉,然后流程結束
如果寫入的節點是一個真實存在的Topic; 則將會執行洗掉Topic的流程; 本質上跟用Kafka客戶端執行洗掉Topic操作沒有什么不同
如果直接洗掉ZK上的/brokers/topics/{topicName}節點會怎樣
TODO…
Controller通知Brokers 執行StopReplica是通知所有的Broker還是只通知跟被洗掉Topic有關聯的Broker?
只是通知跟被洗掉Topic有關聯的Broker;
請看下圖原始碼,可以看到所有需要被StopReplica的副本都是被過濾了一遍,獲取它們所在的BrokerId; 最后呼叫的時候也是sendRequest(brokerId, stopReplicaRequest);根據獲取到的BrokerId發起的請求
洗掉程序有Broker不在線 或者執行失敗怎么辦
Controller會繼續洗掉操作;或者等Broker上線然后繼續洗掉操作; 反正就是一定會保證所有的磁區都被洗掉(被標記了–delete)之后才會把zk上的資料清理掉;
ReplicaStateMachine 副本狀態機
請看 【kafka原始碼】Controller中的狀態機TODO
在重新分配的程序中,如果執行洗掉操作會怎么樣
洗掉操作會等待,等待重新分配完成之后,繼續進行洗掉操作
Finally: 本文閱讀原始碼為 Kafka-2.5
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/294267.html
標籤:其他
上一篇:計算機組成原理期末復習必備知識點大全——第五章(輸入輸出系統)
下一篇:CSS學習筆記(二)



