前言
我們會看到zk的資料中有一個節點
/log_dir_event_notification/,這是一個序列號持久節點
這個節點在kafka中承擔的作用是: 當某個Broker上的LogDir出現例外時(比如磁盤損壞,檔案讀寫失敗,等等例外): 向zk中謝增一個子節點/log_dir_event_notification/log_dir_event_序列號;Controller監聽到這個節點的變更之后,會向Brokers們發送LeaderAndIsrRequest請求; 然后做一些副本脫機的善后操作
原始碼分析
這里說的dirLog是 server.properties中配置的log.dir 例如

副本例外處理
首先我們找到有使用這個節點的原始碼;
kafka啟動之初有呼叫
ReplicaManager.startup()
def startup(): Unit = {
// 省略...
//當inter-broker protocol (IBP) < 1.0的時候,如果存在logDir的一些例外則直接讓整個Broker啟動失敗;
val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
logDirFailureHandler.start()
}
private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
override def doWork(): Unit = {
//從佇列 offlineLogDirQueue 取資料
val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
if (haltBrokerOnDirFailure) {
fatal(s"Halting broker because dir $newOfflineLogDir is offline")
Exit.halt(1)
}
handleLogDirFailure(newOfflineLogDir)
}
}
// logDir should be an absolute path
// sendZkNotification is needed for unit test
def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true): Unit = {
// 省略...
logManager.handleLogDirFailure(dir)
if (sendZkNotification)
zkClient.propagateLogDirEvent(localBrokerId)
warn(s"Stopped serving replicas in dir $dir")
}
代碼比較長,就直接概況一下好了:
主要是當讀取或操作LogDir的時候出現了例外就會執行到這里,有可能是磁盤脫機了,或者檔案突然沒有讀取寫入權限等等之類的一些IOException例外;那么 Broker就需要做一些處理;如下
- 做個判斷
inter.broker.protocol.version協議版本 <1.0的時候 時候直接退出;那個時候還不支持單Broker上存在多個logDir; - 副本停止fetche資料
- 標記磁區下線
- 可能移除一些監控資訊
- 如果當前的
log_dir都脫機(或者例外了), 那么久可以直接shutdown這臺機器了 - 如果還有其他的
log_dir還有在線的, 那么繼續做一些其他的清理操作; - 創建持久序列節點
/log_dir_event_notification/log_dir_event_+序列號;資料是 BrokerID;例如:
/log_dir_event_notification/log_dir_event_0000000003{"version":1,"broker":20003,"event":1}
PS: log_dir 是可以在一臺Broker配置多個路徑的 ,用逗號隔開
LogDir發生例外
比如說在 給檔案加鎖的時候lockLogDirs,磁盤損壞了就拋出例外IOException
/**
* Lock all the given directories
*/
private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
dirs.flatMap { dir =>
try {
val lock = new FileLock(new File(dir, LockFile))
if (!lock.tryLock())
throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent +
". A Kafka instance in another process or thread is using this directory.")
Some(lock)
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while locking directory $dir", e)
None
}
}
}
def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {
error(msg, e)
if (offlineLogDirs.putIfAbsent(logDir, logDir) == null)
offlineLogDirQueue.add(logDir)
}
offlineLogDirQueue添加了一個例外佇列之后就回到上面的副本例外處理代碼了, 上面可是一致在queue.take()的
Controller監聽zk節點變更
KafkaController.processLogDirEventNotification
private def processLogDirEventNotification(): Unit = {
if (!isActive) return
val sequenceNumbers = zkClient.getAllLogDirEventNotifications
try {
val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)
//嘗試將這臺Broker上的所有副本 走一下狀態流轉 到 OnlineReplica
onBrokerLogDirFailure(brokerIds)
} finally {
// delete processed children
zkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion)
}
}
主要將從zk節點 /log_dir_event_notification/log_dir_event_序列號 中獲取到的資料的Broker上的所有副本進行一個副本狀態流轉 ->OnlineReplica ;關于狀態機的流轉請看 【kafka原始碼】Controller中的狀態機
- 給所有broker 發送
LeaderAndIsrRequest請求,讓brokers們去查詢他們的副本的狀態,如果副本logDir已經離線則回傳KAFKA_STORAGE_ERROR例外; - 完事之后會洗掉節點
原始碼總結
Q&A
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/302276.html
標籤:其他
上一篇:【演算法學習】1720. 解碼異或后的陣列(java / c / c++ / python / go / rust)
