主頁 >  其他 > 創建Topic原來還能這樣玩,真絕了!!!(附視頻)

創建Topic原來還能這樣玩,真絕了!!!(附視頻)

2021-08-12 07:08:24 其他

在這里插入圖片描述

文章目錄

    • 腳本引數
    • 創建Topic腳本
    • 創建Topic 原始碼分析
      • 1. 原始碼入口
      • 2. 創建AdminClientTopicService 物件
        • 2.1 先創建 Admin
      • 3. AdminClientTopicService.createTopic 創建Topic
        • 3.1 KafkaAdminClient.createTopics(NewTopic) 創建Topic
      • 4. 發起網路請求
      • 5. Controller角色的服務端接受請求處理邏輯
        • 5.1 KafkaApis.handle(request) 根據請求傳遞Api呼叫不同介面
        • 5.2 KafkaApis.handleCreateTopicsRequest 處理創建Topic的請求
        • 5.3 adminManager.createTopics()
        • 5.4 寫入zookeeper資料
      • 6. Controller監聽 `/brokers/topics/Topic名稱`, 通知Broker將磁區寫入磁盤
        • 6.1 onNewPartitionCreation 狀態流轉
      • 7. Broker收到LeaderAndIsrRequest 創建本地Log
    • 原始碼總結
    • Q&A
      • 創建Topic的時候 在Zk上創建了哪些節點
      • 創建Topic的時候 什么時候在Broker磁盤上創建的日志檔案
      • 如果我沒有指定磁區數或者副本數,那么會如何創建
      • 如果我手動洗掉了`/brokers/topics/`下的某個節點會怎么樣?
      • 如果我手動在zk中添加`/brokers/topics/{TopicName}`節點會怎么樣
      • 如果寫入`/brokers/topics/{TopicName}`節點之后Controller掛掉了會怎么樣
    • 附件
      • --config 可生效引數

<iframe id="7fpdpAMO-1628432716549" src="https://player.bilibili.com/player.html?aid=292148654" allowfullscreen="true" data-mediaembed="bilibili"></iframe>

Topic創建流程分析+常見問題

全套視頻請關注公眾號:石臻臻的雜貨鋪(首發)

腳本引數

sh bin/kafka-topic -help 查看更具體引數

下面只是列出了跟--create 相關的引數

引數描述例子
--bootstrap-server 指定kafka服務指定連接到的kafka服務; 如果有這個引數,則 --zookeeper可以不需要–bootstrap-server localhost:9092
--zookeeper棄用, 通過zk的連接方式連接到kafka集群;–zookeeper localhost:2181 或者localhost:2181/kafka
--replication-factor副本數量,注意不能大于broker數量;如果不提供,則會用集群中默認配置–replication-factor 3
--partitions磁區數量當創建或者修改topic的時候,用這個來指定磁區數;如果創建的時候沒有提供引數,則用集群中默認值; 注意如果是修改的時候,磁區比之前小會有問題
--replica-assignment副本磁區分配方式;創建topic的時候可以自己指定副本分配情況;--replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 這個意思是有三個磁區和三個副本,對應分配的Broker; 逗號隔開標識磁區;冒號隔開表示副本
--config<String: name=value>用來設定topic級別的配置以覆寫默認配置;只在–create 和–bootstrap-server 同時使用時候生效; 可以配置的引數串列請看文末附件例如覆寫兩個配置 --config retention.bytes=123455 --config retention.ms=600001
--command-config <String: command 檔案路徑>用來配置客戶端Admin Client啟動配置,只在–bootstrap-server 同時使用時候生效;例如:設定請求的超時時間 --command-config config/producer.proterties; 然后在檔案中配置 request.timeout.ms=300000
--create命令方式; 表示當前請求是創建Topic--create

創建Topic腳本

zk方式(不推薦)

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test

需要注意的是–zookeeper后面接的是kafka的zk配置, 假如你配置的是localhost:2181/kafka 帶命名空間的這種,不要漏掉了

kafka版本 >= 2.2 支持下面方式(推薦)

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test

當前分析的kafka原始碼版本為 kafka-2.5

創建Topic 原始碼分析

溫馨提示: 如果閱讀原始碼略顯枯燥,你可以直接看原始碼總結以及后面部分

首先我們找到原始碼入口處, 查看一下 kafka-topic.sh腳本的內容
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
最終是執行了kafka.admin.TopicCommand這個類,找到這個地方之后就可以斷點除錯原始碼了,用IDEA啟動
在這里插入圖片描述
記得配置一下入參
比如: --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic test_create_topic3
在這里插入圖片描述

1. 原始碼入口

在這里插入圖片描述
上面的原始碼主要作用是

  1. 根據是否有傳入引數--zookeeper 來判斷創建哪一種 物件topicService
    如果傳入了--zookeeper 則創建 類 ZookeeperTopicService的物件
    否則創建類AdminClientTopicService的物件(我們主要分析這個物件)
  2. 根據傳入的引數型別判斷是創建topic還是洗掉等等其他 判斷依據是 是否在引數里傳入了--create

2. 創建AdminClientTopicService 物件

val topicService = new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))

2.1 先創建 Admin

object AdminClientTopicService {
    def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = {
      bootstrapServer match {
        case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)
        case None =>
      }
      Admin.create(commandConfig)
    }

    def apply(commandConfig: Properties, bootstrapServer: Option[String]): AdminClientTopicService =
      new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))
  }
  1. 如果有入參--command-config ,則將這個檔案里面的引數都放到map commandConfig里面, 并且也加入bootstrap.servers的引數;假如組態檔里面已經有了bootstrap.servers配置,那么會將其覆寫
  2. 將上面的commandConfig 作為入參呼叫Admin.create(commandConfig)創建 Admin; 這個時候呼叫的Client模塊的代碼了, 從這里我們就可以看出,我們呼叫kafka-topic.sh腳本實際上是kafka模擬了一個客戶端Client來創建Topic的程序;
    在這里插入圖片描述

3. AdminClientTopicService.createTopic 創建Topic

topicService.createTopic(opts)

  case class AdminClientTopicService private (adminClient: Admin) extends TopicService {

    override def createTopic(topic: CommandTopicPartition): Unit = {
      //如果配置了副本副本數--replication-factor 一定要大于0
      if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
        throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")
       //如果配置了--partitions 磁區數 必須大于0
      if (topic.partitions.exists(partitions => partitions < 1))
        throw new IllegalArgumentException(s"The partitions must be greater than 0")

	  //查詢是否已經存在該Topic
      if (!adminClient.listTopics().names().get().contains(topic.name)) {
        val newTopic = if (topic.hasReplicaAssignment)
          //如果指定了--replica-assignment引數;則按照指定的來分配副本
          new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
        else {
          new NewTopic(
            topic.name,
            topic.partitions.asJava,
            topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
        }

        // 將配置--config 決議成一個配置map
        val configsMap = topic.configsToAdd.stringPropertyNames()
          .asScala
          .map(name => name -> topic.configsToAdd.getProperty(name))
          .toMap.asJava

        newTopic.configs(configsMap)
        //呼叫adminClient創建Topic
        val createResult = adminClient.createTopics(Collections.singleton(newTopic))
        createResult.all().get()
        println(s"Created topic ${topic.name}.")
      } else {
        throw new IllegalArgumentException(s"Topic ${topic.name} already exists")
      }
    }
  1. 檢查各項入參是否有問題
  2. adminClient.listTopics(),然后比較是否已經存在待創建的Topic;如果存在拋出例外;
  3. 判斷是否配置了引數--replica-assignment ; 如果配置了,那么Topic就會按照指定的方式來配置副本情況
  4. 決議配置--config 配置放到configsMap中; configsMap給到NewTopic物件
  5. 呼叫adminClient.createTopics創建Topic; 它是如何創建Topic的呢?往下分析原始碼

3.1 KafkaAdminClient.createTopics(NewTopic) 創建Topic

    @Override
    public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                                           final CreateTopicsOptions options) {
       
       //省略部分原始碼...
        Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
            new ControllerNodeProvider()) {

            @Override
            public CreateTopicsRequest.Builder createRequest(int timeoutMs) {
                return new CreateTopicsRequest.Builder(
                    new CreateTopicsRequestData().
                        setTopics(topics).
                        setTimeoutMs(timeoutMs).
                        setValidateOnly(options.shouldValidateOnly()));
            }

            @Override
            public void handleResponse(AbstractResponse abstractResponse) {
                //省略
            }

            @Override
            void handleFailure(Throwable throwable) {
                completeAllExceptionally(topicFutures.values(), throwable);
            }
        };
        
    }

這個代碼里面主要看下Call里面的介面; 先不管Kafka如何跟服務端進行通信的細節; 我們主要關注創建Topic的邏輯;

  1. createRequest會構造一個請求引數CreateTopicsRequest 例如下圖
    在這里插入圖片描述
  2. 選擇ControllerNodeProvider這個節點發起網路請求
    在這里插入圖片描述
    可以清楚的看到, 創建Topic這個操作是需要Controller來執行的;
    在這里插入圖片描述

4. 發起網路請求

==>服務端客戶端網路模型

5. Controller角色的服務端接受請求處理邏輯

首先找到服務端處理客戶端請求的 原始碼入口 ? KafkaRequestHandler.run()

主要看里面的 apis.handle(request) 方法; 可以看到客戶端的請求都在request.bodyAndSize()里面
在這里插入圖片描述

5.1 KafkaApis.handle(request) 根據請求傳遞Api呼叫不同介面

進入方法可以看到根據request.header.apiKey 呼叫對應的方法,客戶端傳過來的是CreateTopics
在這里插入圖片描述

5.2 KafkaApis.handleCreateTopicsRequest 處理創建Topic的請求


def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
    // 部分代碼省略
	//如果當前Broker不是屬于Controller的話,就拋出例外
    if (!controller.isActive) {
      createTopicsRequest.data.topics.asScala.foreach { topic =>
        results.add(new CreatableTopicResult().setName(topic.name).
          setErrorCode(Errors.NOT_CONTROLLER.code))
      }
      sendResponseCallback(results)
    } else {
     // 部分代碼省略
    }
      adminManager.createTopics(createTopicsRequest.data.timeoutMs,
          createTopicsRequest.data.validateOnly,
          toCreate,
          authorizedForDescribeConfigs,
          handleCreateTopicsResults)
    }
  }

  1. 判斷當前處理的broker是不是Controller,如果不是Controller的話直接拋出例外,從這里可以看出,CreateTopic這個操作必須是Controller來進行, 出現這種情況有可能是客戶端發起請求的時候Controller已經變更;
  2. 鑒權 【Kafka原始碼】kafka鑒權機制
  3. 呼叫adminManager.createTopics()

5.3 adminManager.createTopics()

創建主題并等等主題完全創建,回呼函式將會在超時、錯誤、或者主題創建完成時觸發

該方法過長,省略部分代碼

def createTopics(timeout: Int,
                   validateOnly: Boolean,
                   toCreate: Map[String, CreatableTopic],
                   includeConfigsAndMetatadata: Map[String, CreatableTopicResult],
                   responseCallback: Map[String, ApiError] => Unit): Unit = {

    // 1. map over topics creating assignment and calling zookeeper
    val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
    val metadata = toCreate.values.map(topic =>
      try {
          //省略部分代碼
         //檢查Topic是否存在
         //檢查 --replica-assignment引數和 (--partitions	|| --replication-factor ) 不能同時使用
         // 如果(--partitions	|| --replication-factor ) 沒有設定,則使用 Broker的配置(這個Broker肯定是Controller)
		// 計算磁區副本分配方式

        createTopicPolicy match {
          case Some(policy) =>
          //省略部分代碼
            adminZkClient.validateTopicCreate(topic.name(), assignments, configs)
            if (!validateOnly)
              adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)

          case None =>
            if (validateOnly)
             //校驗創建topic的引數準確性
              adminZkClient.validateTopicCreate(topic.name, assignments, configs)
            else
              //把topic相關資料寫入到zk中
              adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
        }

       
  }
  1. 做一些校驗檢查
    ①.檢查Topic是否存在
    ②. 檢查--replica-assignment引數和 (--partitions || --replication-factor ) 不能同時使用
    ③.如果(--partitions || --replication-factor ) 沒有設定,則使用 Broker的配置(這個Broker肯定是Controller)
    ④.計算磁區副本分配方式

  2. createTopicPolicy 根據Broker是否配置了創建Topic的自定義校驗策略; 使用方式是自定義實作org.apache.kafka.server.policy.CreateTopicPolicy介面;并 在服務器配置 create.topic.policy.class.name=自定義類; 比如我就想所有創建Topic的請求磁區數都要大于10; 那么這里就可以實作你的需求了

  3. createTopicWithAssignment把topic相關資料寫入到zk中; 進去分析一下

5.4 寫入zookeeper資料

我們進入到adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)看看有哪些資料寫入到了zk中;

  def createTopicWithAssignment(topic: String,
                                config: Properties,
                                partitionReplicaAssignment: Map[Int, Seq[Int]]): Unit = {
    validateTopicCreate(topic, partitionReplicaAssignment, config)

    // 將topic單獨的配置寫入到zk中
    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)

    // 將topic磁區相關資訊寫入zk中
    writeTopicPartitionAssignment(topic, partitionReplicaAssignment.mapValues(ReplicaAssignment(_)).toMap, isUpdate = false)
  }

原始碼就不再深入了,這里直接詳細說明一下

寫入Topic配置資訊

  1. 先呼叫SetDataRequest請求往節點/config/topics/Topic名稱 寫入資料; 這里
    一般這個時候都會回傳 NONODE (NoNode);節點不存在; 假如zk已經存在節點就直接覆寫掉
  2. 節點不存在的話,就發起CreateRequest請求,寫入資料; 并且節點型別是持久節點

這里寫入的資料,是我們入參時候傳的topic配置--config; 這里的配置會覆寫默認配置

寫入Topic磁區副本資訊

  1. 將已經分配好的副本分配策略寫入到 /brokers/topics/Topic名稱 中; 節點型別 持久節點
    在這里插入圖片描述

具體跟zk互動的地方在
ZookeeperClient.send() 這里包裝了很多跟zk的互動;
在這里插入圖片描述

6. Controller監聽 /brokers/topics/Topic名稱, 通知Broker將磁區寫入磁盤

Controller 有監聽zk上的一些節點; 在上面的流程中已經在zk中寫入了 /brokers/topics/Topic名稱 ; 這個時候Controller就監聽到了這個變化并相應;

KafkaController.processTopicChange


  private def processTopicChange(): Unit = {
    //如果處理的不是Controller角色就回傳
    if (!isActive) return
    //從zk中獲取 `/brokers/topics 所有Topic
    val topics = zkClient.getAllTopicsInCluster
    //找出哪些是新增的
    val newTopics = topics -- controllerContext.allTopics
    //找出哪些Topic在zk上被洗掉了
    val deletedTopics = controllerContext.allTopics -- topics
    controllerContext.allTopics = topics

    
    registerPartitionModificationsHandlers(newTopics.toSeq)
    val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)
    deletedTopics.foreach(controllerContext.removeTopic)
    addedPartitionReplicaAssignment.foreach {
      case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
    }
    info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
      s"[$addedPartitionReplicaAssignment]")
    if (addedPartitionReplicaAssignment.nonEmpty)
      onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
  }
  1. 從zk中獲取 /brokers/topics 所有Topic跟當前Broker記憶體中所有BrokercontrollerContext.allTopics的差異; 就可以找到我們新增的Topic; 還有在zk中被洗掉了的Broker(該Topic會在當前記憶體中remove掉)

  2. 從zk中獲取/brokers/topics/{TopicName} 給定主題的副本分配,并保存在記憶體中在這里插入圖片描述

  3. 執行onNewPartitionCreation;磁區狀態開始流轉

6.1 onNewPartitionCreation 狀態流轉

關于Controller的狀態機 詳情請看: 【kafka原始碼】Controller中的狀態機

  /**
   * This callback is invoked by the topic change callback with the list of failed brokers as input.
   * It does the following -
   * 1. Move the newly created partitions to the NewPartition state
   * 2. Move the newly created partitions from NewPartition->OnlinePartition state
   */
  private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = {
    info(s"New partition creation callback for ${newPartitions.mkString(",")}")
    partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
    partitionStateMachine.handleStateChanges(
      newPartitions.toSeq,
      OnlinePartition,
      Some(OfflinePartitionLeaderElectionStrategy(false))
    )
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
  }
  1. 將待創建的磁區狀態流轉為NewPartition;
    在這里插入圖片描述
  2. 將待創建的副本 狀態流轉為NewReplica;
    在這里插入圖片描述
  3. 將磁區狀態從剛剛的NewPartition流轉為OnlinePartition
    0. 獲取leaderIsrAndControllerEpochs; Leader為副本的第一個;
    1. 向zk中寫入/brokers/topics/{topicName}/partitions/ 持久節點; 無資料
    2. 向zk中寫入/brokers/topics/{topicName}/partitions/{磁區號} 持久節點; 無資料
    3. 向zk中寫入/brokers/topics/{topicName}/partitions/{磁區號}/state 持久節點; 資料為leaderIsrAndControllerEpoch在這里插入圖片描述
    1. 向副本所屬Broker發送leaderAndIsrRequest請求
    2. 向所有Broker發送UPDATE_METADATA 請求
  4. 將副本狀態從剛剛的NewReplica流轉為OnlineReplica ,更新下記憶體

關于磁區狀態機和副本狀態機詳情請看【kafka原始碼】Controller中的狀態機

7. Broker收到LeaderAndIsrRequest 創建本地Log

上面步驟中有說到向副本所屬Broker發送leaderAndIsrRequest請求,那么這里做了什么呢
其實主要做的是 創建本地Log

代碼太多,這里我們直接定位到只跟創建Topic相關的關鍵代碼來分析
KafkaApis.handleLeaderAndIsrRequest->replicaManager.becomeLeaderOrFollower->ReplicaManager.makeLeaders...LogManager.getOrCreateLog

  /**
   * 如果日志已經存在,只回傳現有日志的副本否則如果 isNew=true 或者如果沒有離線日志目錄,則為給定的主題和給定的磁區創建日志 否則拋出 KafkaStorageException
   */
  def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = {
    logCreationOrDeletionLock synchronized {
      getLog(topicPartition, isFuture).getOrElse {
        // create the log if it has not already been created in another thread
        if (!isNew && offlineLogDirs.nonEmpty)
          throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")

        val logDirs: List[File] = {
          val preferredLogDir = preferredLogDirs.get(topicPartition)

          if (isFuture) {
            if (preferredLogDir == null)
              throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
            else if (getLog(topicPartition).get.dir.getParent == preferredLogDir)
              throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
          }

          if (preferredLogDir != null)
            List(new File(preferredLogDir))
          else
            nextLogDirs()
        }

        val logDirName = {
          if (isFuture)
            Log.logFutureDirName(topicPartition)
          else
            Log.logDirName(topicPartition)
        }

        val logDir = logDirs
          .toStream // to prevent actually mapping the whole list, lazy map
          .map(createLogDirectory(_, logDirName))
          .find(_.isSuccess)
          .getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))
          .get // If Failure, will throw

        val log = Log(
          dir = logDir,
          config = config,
          logStartOffset = 0L,
          recoveryPoint = 0L,
          maxProducerIdExpirationMs = maxPidExpirationMs,
          producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
          scheduler = scheduler,
          time = time,
          brokerTopicStats = brokerTopicStats,
          logDirFailureChannel = logDirFailureChannel)

        if (isFuture)
          futureLogs.put(topicPartition, log)
        else
          currentLogs.put(topicPartition, log)

        info(s"Created log for partition $topicPartition in $logDir with properties " + s"{${config.originals.asScala.mkString(", ")}}.")
        // Remove the preferred log dir since it has already been satisfied
        preferredLogDirs.remove(topicPartition)

        log
      }
    }
  }
  1. 如果日志已經存在,只回傳現有日志的副本否則如果 isNew=true 或者如果沒有離線日志目錄,則為給定的主題和給定的磁區創建日志 否則拋出KafkaStorageException

詳細請看 【kafka原始碼】LeaderAndIsrRequest請求

原始碼總結

如果上面的原始碼分析,你不想看,那么你可以直接看這里的簡潔敘述

  1. 根據是否有傳入引數--zookeeper 來判斷創建哪一種 物件topicService
    如果傳入了--zookeeper 則創建 類 ZookeeperTopicService的物件
    否則創建類AdminClientTopicService的物件(我們主要分析這個物件)
  2. 如果有入參--command-config ,則將這個檔案里面的引數都放到mapl型別 commandConfig里面, 并且也加入bootstrap.servers的引數;假如組態檔里面已經有了bootstrap.servers配置,那么會將其覆寫
  3. 將上面的commandConfig作為入參呼叫Admin.create(commandConfig)創建 Admin; 這個時候呼叫的Client模塊的代碼了, 從這里我們就可以猜測,我們呼叫kafka-topic.sh腳本實際上是kafka模擬了一個客戶端Client來創建Topic的程序;
  4. 一些例外檢查
    ①.如果配置了副本副本數–replication-factor 一定要大于0
    ②.如果配置了–partitions 磁區數 必須大于0
    ③.去zk查詢是否已經存在該Topic
  5. 判斷是否配置了引數--replica-assignment ; 如果配置了,那么Topic就會按照指定的方式來配置副本情況
  6. 決議配置--config 配置放到configsMap中; configsMap給到NewTopic物件
  7. 將上面所有的引數包裝成一個請求引數CreateTopicsRequest ;然后找到是Controller的節點發起請求(ControllerNodeProvider)
  8. 服務端收到請求之后,開始根據CreateTopicsRequest來呼叫創建Topic的方法; 不過首先要判斷一下自己這個時候是不是Controller; 有可能這個時候Controller重新選舉了; 這個時候要拋出例外
  9. 服務端進行一下請求引數檢查
    ①.檢查Topic是否存在
    ②.檢查 --replica-assignment引數和 (--partitions || --replication-factor ) 不能同時使用
  10. 如果(--partitions || --replication-factor ) 沒有設定,則使用 Broker的默認配置(這個Broker肯定是Controller)
  11. 計算磁區副本分配方式;如果是傳入了 --replica-assignment;則會安裝自定義引數進行組裝;否則的話系統會自動計算分配方式; 具體詳情請看 【kafka原始碼】創建Topic的時候是如何磁區和副本的分配規則
  12. createTopicPolicy根據Broker是否配置了創建Topic的自定義校驗策略; 使用方式是自定義實作org.apache.kafka.server.policy.CreateTopicPolicy介面;并 在服務器配置 create.topic.policy.class.name=自定義類; 比如我就想所有創建Topic的請求磁區數都要大于10; 那么這里就可以實作你的需求了
  13. zk中寫入Topic配置資訊 發起CreateRequest請求,這里寫入的資料,是我們入參時候傳的topic配置--config; 這里的配置會覆寫默認配置;并且節點型別是持久節點;path = /config/topics/Topic名稱
  14. zk中寫入Topic磁區副本資訊 發起CreateRequest請求 ,將已經分配好的副本分配策略 寫入到 /brokers/topics/Topic名稱中; 節點型別 持久節點
  15. Controller監聽zk上面的topic資訊; 根據zk上變更的topic資訊;計算出新增/洗掉了哪些Topic; 然后拿到新增Topic的 副本分配資訊; 并做一些狀態流轉
  16. 向新增Topic所在Broker發送leaderAndIsrRequest請求,
  17. Broker收到發送leaderAndIsrRequest請求; 創建副本Log檔案;

在這里插入圖片描述

Q&A

創建Topic的時候 在Zk上創建了哪些節點

接受客戶端請求階段:

  1. topic的配置資訊 /config/topics/Topic名稱 持久節點
  2. topic的磁區資訊/brokers/topics/Topic名稱 持久節點

Controller監聽zk節點/brokers/topics變更階段

  1. /brokers/topics/{topicName}/partitions/持久節點; 無資料
  2. 向zk中寫入/brokers/topics/{topicName}/partitions/{磁區號} 持久節點; 無資料
  3. 向zk中寫入/brokers/topics/{topicName}/partitions/{磁區號}/state 持久節點;

創建Topic的時候 什么時候在Broker磁盤上創建的日志檔案

當Controller監聽zk節點/brokers/topics變更之后,將新增的Topic 決議好的磁區狀態流轉
NonExistentPartition->NewPartition->OnlinePartition 當流轉到OnlinePartition的時候會像磁區分配到的Broker發送一個leaderAndIsrRequest請求,當Broker們收到這個請求之后,根據請求引數做一些處理,其中就包括檢查自身有沒有這個磁區副本的本地Log;如果沒有的話就重新創建;

如果我沒有指定磁區數或者副本數,那么會如何創建

我們都知道,如果我們沒有指定磁區數或者副本數, 則默認使用Broker的配置, 那么這么多Broker,假如不小心默認值配置不一樣,那究竟使用哪一個呢? 那肯定是哪臺機器執行創建topic的程序,就是使用誰的配置;
所以是誰執行的? 那肯定是Controller啊! 上面的原始碼我們分析到了,創建的程序,會指定Controller這臺機器去進行;

如果我手動洗掉了/brokers/topics/下的某個節點會怎么樣?

詳情請看 【kafka實戰】一不小心洗掉了/brokers/topics/下的某個Topic

如果我手動在zk中添加/brokers/topics/{TopicName}節點會怎么樣

先說結論: 根據上面分析過的原始碼畫出的時序圖可以指定; 客戶端發起創建Topic的請求,本質上是去zk里面寫兩個資料

  1. topic的配置資訊 /config/topics/Topic名稱 持久節點
  2. topic的磁區資訊/brokers/topics/Topic名稱 持久節點
    所以我們繞過這一步驟直接去寫入資料,可以達到一樣的效果;不過我們的資料需要保證準確
    因為在這一步已經沒有了一些基本的校驗了; 假如這一步我們寫入的副本Brokerid不存在會怎樣,從時序圖中可以看到,leaderAndIsrRequest請求; 就不會正確的發送的不存在的BrokerId上,那么那臺機器就不會創建Log檔案;

下面不妨讓我們來驗證一下;
創建一個節點/brokers/topics/create_topic_byhand_zk 節點資料為下面資料;

{"version":2,"partitions":{"2":[3],"1":[3],"0":[3]},"adding_replicas":{},"removing_replicas":{}}

在這里插入圖片描述
這里我用的工具PRETTYZOO手動創建的,你也可以用命令列創建;
創建完成之后我們再看看本地有沒有生成一個Log檔案
在這里插入圖片描述
可以看到我們指定的Broker,已經生成了對應的磁區副本Log檔案;
而且zk中也寫入了其他的資料在這里插入圖片描述
在我們寫入zk資料的時候,就已經確定好了哪個每個磁區的Leader是誰了,那就是第一個副本默認為Leader

如果寫入/brokers/topics/{TopicName}節點之后Controller掛掉了會怎么樣

先說結論:Controller 重新選舉的時候,會有一些初始化的操作; 會把創建程序繼續下去

然后我們來模擬這么一個程序,先停止集群,然后再zk中寫入/brokers/topics/{TopicName}節點資料; 然后再啟動一臺Broker;
原始碼分析: 我們之前分析過Controller的啟動程序與選舉 有提到過,這里再提一下Controller當選之后有一個地方處理這個事情

replicaStateMachine.startup()
partitionStateMachine.startup()

啟動狀態機的程序是不是跟上面的6.1 onNewPartitionCreation 狀態流轉 的程序很像; 最終都把狀態流轉到了OnlinePartition; 伴隨著是不發起了leaderAndIsrRequest請求; 是不是Broker收到請求之后,創建本地Log檔案了

附件

–config 可生效引數

請以sh bin/kafka-topic -help 為準

configurations:                      
                  cleanup.policy                        
                  compression.type                      
                  delete.retention.ms                   
                  file.delete.delay.ms                  
                  flush.messages                        
                  flush.ms                              
                  follower.replication.throttled.       
replicas                             
                index.interval.bytes                  
                leader.replication.throttled.replicas 
                max.compaction.lag.ms                 
                max.message.bytes                     
                message.downconversion.enable         
                message.format.version                
                message.timestamp.difference.max.ms   
                message.timestamp.type                
                min.cleanable.dirty.ratio             
                min.compaction.lag.ms                 
                min.insync.replicas                   
                preallocate                           
                retention.bytes                       
                retention.ms                          
                segment.bytes                         
                segment.index.bytes                   
                segment.jitter.ms                     
                segment.ms                            
                unclean.leader.election.enable

Tips:如果關于本篇文章你有疑問,可以在公眾號給我留言,我會進行解答

PS: 文章閱讀的原始碼版本是kafka-2.5

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/293136.html

標籤:其他

上一篇:Hive、Oracle、PgSQL實際生產中遇到的問題

下一篇:基于Syntec CNC系統的二次開發介紹(一)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more