平常我们通过命令 kafka-server-start /usr/local/etc/kafka/server.properties 启动,kafka的启动类是Kafka.scala,最终会调用Kafka.scala类的main方法。
另外,启动脚本中还会设置相关JVM参数,如log4j配置文件地址、JVM堆大小等等。

下面通过源码简单分析下kafka的启动流程以及shutdown的实现。本系列的源码分析都以0.10.2版本为准

启动入口

//Kafka.scala
def main(args: Array[String]): Unit = {
  try {
	//根据命令行的参数,获取配置文件中的相关配置,这里获取到的也就是/usr/local/etc/kafka/server.properties的内容
	//这里同时还会检查是否有入参,如果没有就会报错
    val serverProps = getPropsFromArgs(args)
	//根据配置构造一个kafkaServerStartable对象,这里面会检验必要的参数是否有值
    val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

    //绑定一个进程关闭的钩子
    Runtime.getRuntime().addShutdownHook(new Thread() {
      override def run() = {
        kafkaServerStartable.shutdown
      }
    })
	//在KafkaServerStartable.scala的startup方法中,会继续调用KafkaServer#startup()方法
	//在KafkaServer#startup()方法中,开始初始化并加载各个组件
    kafkaServerStartable.startup
	//阻塞直到kafka被关闭
	//底层用了java的CountDownLatch.await()。当kafka被关闭时,对应的CountDownLatch.countDown()方法会被调用,这时候程序就会真正退出
    kafkaServerStartable.awaitShutdown
  }
  catch {
    case e: Throwable =>
      fatal(e)
      System.exit(1)
  }
  System.exit(0)
}

broker的生命周期

kafka的生命周期中的各个状态如下图:

*
*                +-----------+
*                |Not Running|
*                +-----+-----+
*                      |
*                      v
*                +-----+-----+
*                |Starting   +--+
*                +-----+-----+  | +----+------------+
*                      |        +>+RecoveringFrom   |
*                      v          |UncleanShutdown  |
*               +-------+-------+ +-------+---------+
*               |RunningAsBroker|            |
*               +-------+-------+<-----------+
*                       |
*                       v
*                +-----+------------+
*                |PendingControlled |
*                |Shutdown          |
*                +-----+------------+
*                      |
*                      v
*               +-----+----------+
*               |BrokerShutting  |
*               |Down            |
*               +-----+----------+
*                     |
*                     v
*               +-----+-----+
*               |Not Running|
*               +-----------+
*
  • NOT Running : 未运行状态
  • Starting:正在启动中
  • RunningAsBroker: broker在运行中
  • RecoveringFromUncleanShutdown:从上次不完整的关闭中恢复状态,这个状态和logManager有关
  • PendingControlledShutdown:broker向controller报告关闭,等待controller应答中
  • BrokerShuttingDown:broker在关闭中

sever启动流程

//KafkaServer#startup()
def startup() {
  try {
    info("starting")

    if (isShuttingDown.get)
      throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

    if (startupComplete.get)
      return

    val canStartup = isStartingUp.compareAndSet(false, true)
    if (canStartup) {
      //设置broker状态为Starting
      brokerState.newState(Starting)

      //启动一个定时任务的线程池
      kafkaScheduler.startup()

      //初始化zk组件,后续用于监听、获取zk数据用
      zkUtils = initZk()

      //获取集群的id,如果当前集群尚未生成集群id,那就生成一个,对应zk的 /cluster/id 的值
      _clusterId = getOrGenerateClusterId(zkUtils)
      info(s"Cluster ID = $clusterId")

      //获取或者生成一个brokerId
      config.brokerId = getBrokerId
      this.logIdent = "[Kafka Server " + config.brokerId + "], "

      //创建一个用于度量的组件
      val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
        Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
      reporters.add(new JmxReporter(jmxPrefix))
      val metricConfig = KafkaServer.metricConfig(config)
      metrics = new Metrics(metricConfig, reporters, time, true)

      quotaManagers = QuotaFactory.instantiate(config, metrics, time)
      notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)

      //创建日志管理组件,创建时会检查log目录下是否有.kafka_cleanshutdown文件,如果没有的话,broker进入RecoveringFrom UncleanShutdown 状态
      logManager = createLogManager(zkUtils.zkClient, brokerState)
      logManager.startup()

      //创建元数据管理组件
      metadataCache = new MetadataCache(config.brokerId)
      //创建凭证提供者组件
      credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)

      //创建一个sockerServer组件,并启动。该组件启动后,就会开始接收rpc请求了
      socketServer = new SocketServer(config, metrics, time, credentialProvider)
      socketServer.startup()

      //创建一个副本管理组件,并启动该组件
      replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
        isShuttingDown, quotaManagers.follower)
      replicaManager.startup()

      //创建kafka控制器,并启动。该控制器启动后broker会尝试去zk创建节点竞争成为controller
      kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
      kafkaController.startup()
      //创建一个集群管理组件
      adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)

      //创建群组协调器,并且启动
      groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
      groupCoordinator.startup()

      //构造授权器
      authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
        val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
        authZ.configure(config.originals())
        authZ
      }

      //构造api组件,针对各个接口会处理不同的业务
      apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
        kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
        clusterId, time)
      //请求处理池
      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
        config.numIoThreads)

      Mx4jLoader.maybeLoad()

      //动态配置处理器的相关配置
      dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
        ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
        ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
        ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

      //初始化动态配置管理器,并启动
      dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
      dynamicConfigManager.startup()

      /* tell everyone we are alive */
      val listeners = config.advertisedListeners.map { endpoint =>
        if (endpoint.port == 0)
          endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
        else
          endpoint
      }
	  //kafka健康检查组件
      kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
        config.interBrokerProtocolVersion)
      kafkaHealthcheck.startup()

      //记录一下恢复点
      checkpointBrokerId(config.brokerId)

      /* register broker metrics */
      registerStats()
	  //broker进入RunningAsBroker状态
      brokerState.newState(RunningAsBroker)
      shutdownLatch = new CountDownLatch(1)
      startupComplete.set(true)
      isStartingUp.set(false)
      AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
      info("started")
    }
  }
  catch {
    case e: Throwable =>
      fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
      isStartingUp.set(false)
      shutdown()
      throw e
  }
}

启动流程就是初始化一堆组件,然后该启动的启动。这些组件以后我会一个一个介绍,现在大家先简单了解一下它们的启动过程就好了。

shutdown实现

在启动代码那里,可以看到kafka在那里面已经做了shutdown的钩子。当kafka关闭的是会执行kafkaServerStartable#shutdown()方法。最后调用了KafkaServer#shutdown()方法

def shutdown() {
  try {
    info("shutting down")

    if (isStartingUp.get)
      throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")

    // To ensure correct behavior under concurrent calls, we need to check `shutdownLatch` first since it gets updated
    // last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
    // `true` at the end of this method.
    if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
	  //controlledShutdown()里面会通知controller自己关闭了,会一直阻塞到通知成功,这时候broker会进入PendingControlled Shutdown状态
      CoreUtils.swallow(controlledShutdown())
	  //broker进入BrokerShutting Down状态
      brokerState.newState(BrokerShuttingDown)
      if (socketServer != null)
        CoreUtils.swallow(socketServer.shutdown())
      if (requestHandlerPool != null)
        CoreUtils.swallow(requestHandlerPool.shutdown())
      CoreUtils.swallow(kafkaScheduler.shutdown())
      if (apis != null)
        CoreUtils.swallow(apis.close())
      CoreUtils.swallow(authorizer.foreach(_.close()))
      if (replicaManager != null)
        CoreUtils.swallow(replicaManager.shutdown())
      if (adminManager != null)
        CoreUtils.swallow(adminManager.shutdown())
      if (groupCoordinator != null)
        CoreUtils.swallow(groupCoordinator.shutdown())
      if (logManager != null)
        CoreUtils.swallow(logManager.shutdown())
      if (kafkaController != null)
        CoreUtils.swallow(kafkaController.shutdown())
      if (zkUtils != null)
        CoreUtils.swallow(zkUtils.close())
      if (metrics != null)
        CoreUtils.swallow(metrics.close())
	  //broker进入Not Running状态
      brokerState.newState(NotRunning)

      startupComplete.set(false)
      isShuttingDown.set(false)
      CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString))
      shutdownLatch.countDown()
      info("shut down completed")
    }
  }
  catch {
    case e: Throwable =>
      fatal("Fatal error during KafkaServer shutdown.", e)
      isShuttingDown.set(false)
      throw e
  }
}

做的事情就是将之前的那些组件优雅的关闭掉。在关闭这些组件前,可能还会去通知controller自己关闭了,收到controller答复后再继续关闭剩下的组件。

向controller报告关闭

如果 controlled.shutdown.enable 开启的话,broker在关闭时会向controller报告自己关闭,这样controller可以对broker的下线及时做一些操作,比如partition的重新选举、分区副本的关闭、通知其他的broker元数据变动等。该配置默认是开启的。

broker向controller报告关闭的流程是这样的:

  1. 检查controlled.shutdown.enable是否开启,如果开启,broker就进入PendingControlledShutdown状态,同时开始向controller发送shutdown请求
  2. 检查重试次数是否达到controlled.shutdown.max.retries配置的值,如果是的话则退出不在发送shutdown请求。重试次数默认是3次
  3. 从zk获取当前controller所在的id,然后向该节点发送shutdown请求
  4. 如果请求失败,休眠controlled.shutdown.retry.backoff.ms毫秒后,重试次数+1,重新进入步骤二
  5. 如果请求成功,说明controller接受到了shutdown请求,并处理完成。
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐