1. 分区副本选主的流程

Kafka 为了保证集群节点宕机情况下消息生产消费功能的可用性,实现了一套失败转移机制,这套机制运行的流程如下图所示,实现的关键则在于以下两个请求:

  1. 节点注册请求 BrokerRegistration
    broker 节点启动时会向集群 Leader 发送注册请求,集群 Leader 处理请求并将节点状态维护在本地列表中,请求完成后 broker 节点会定时向集群 Leader 报告心跳
  2. 节点心跳请求 BrokerHeartbeat
    Leader 节点收到节点心跳时将检查本地注册列表中是否有节点超时,如有超时则需要进行下线处理,将 Leader 副本分布在失败节点上的分区重新进行选主,同时生成元数据的变动记录,关于元数据变动传播生效的机制可参考 Kafka 3.0 源码笔记(9)-Kafka 服务端元数据的主从同步

在这里插入图片描述

2. 源码分析

以下为 kafka 实现分区副本 Leader 失败选举机制的源码时序示意,可以看到重要处理分为两个部分:

  1. 节点注册请求 BrokerRegistration 的处理
  2. 节点心跳请求 BrokerHeartbeat 的交互

在这里插入图片描述

2.1 BrokerRegistration 请求的处理

2.1.1 BrokerRegistration 请求的发起
  1. 通过process.roles 属性配置了 broker 角色的 Kafka 节点会在启动的时候创建 BrokerServer 对象,并执行 BrokerServer.scala#startup() 方法进行基本的初始化工作

    这个方法的源码比较长,笔者在 Kafka 3.0 源码笔记(2)-Kafka 服务端的启动与请求处理源码分析 中进行过相关分析,其中和本文相关的部分是调用 BrokerLifecycleManager.scala#start() 方法启动 broker 生命周期管理器

      def startup(): Unit = {
     if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
     try {
       info("Starting broker")
       
       ......
       
       lifecycleManager.start(() => metadataListener.highestMetadataOffset(),
         BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
           "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
         metaProps.clusterId, networkListeners, supportedFeatures)
    
       // Register a listener with the Raft layer to receive metadata event notifications
       raftManager.register(metadataListener)
       
       ......
       
       // Block until we've caught up with the latest metadata from the controller quorum.
       lifecycleManager.initialCatchUpFuture.get()
    
       // Apply the metadata log changes that we've accumulated.
       metadataPublisher = new BrokerMetadataPublisher(config, metadataCache,
         logManager, replicaManager, groupCoordinator, transactionCoordinator,
         clientQuotaMetadataManager, featureCache, dynamicConfigHandlers.toMap)
    
       // Tell the metadata listener to start publishing its output, and wait for the first
       // publish operation to complete. This first operation will initialize logManager,
       // replicaManager, groupCoordinator, and txnCoordinator. The log manager may perform
       // a potentially lengthy recovery-from-unclean-shutdown operation here, if required.
       metadataListener.startPublishing(metadataPublisher).get()
    
       // Log static broker configurations.
       new KafkaConfig(config.originals(), true)
    
       // Enable inbound TCP connections.
       socketServer.startProcessingRequests(authorizerFutures)
    
       // We're now ready to unfence the broker. This also allows this broker to transition
       // from RECOVERY state to RUNNING state, once the controller unfences the broker.
       lifecycleManager.setReadyToUnfence()
    
       maybeChangeStatus(STARTING, STARTED)
     } catch {
       case e: Throwable =>
         maybeChangeStatus(STARTING, STARTED)
         fatal("Fatal error during broker startup. Prepare to shutdown", e)
         shutdown()
         throw e
     }
    }
    
  2. BrokerLifecycleManager.scala#start() 方法看上去比较简单,关键在于创建 StartupEvent 事件对象,并将该事件投递到异步队列 KafkaEventQueue 中。有关于这个事件队列,笔者在 Kafka 3.0 源码笔记(8)-Kafka 服务端集群 Leader 对 CreateTopics 请求的处理 中详细分析了其运行原理,此处不再赘述,读者只需要知道事件被消费处理时会执行 StartupEvent#run() 方法

    def start(highestMetadataOffsetProvider: () => Long,
             channelManager: BrokerToControllerChannelManager,
             clusterId: String,
             advertisedListeners: ListenerCollection,
             supportedFeatures: util.Map[String, VersionRange]): Unit = {
     eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
       channelManager, clusterId, advertisedListeners, supportedFeatures))
    }
    
  3. StartupEvent#run() 方实现如下,可以看到核心处理是向异步队列 KafkaEventQueue 中投递一个延时任务,这个任务的核心在于执行 BrokerLifecycleManager.scala#sendBrokerRegistration() 方法

      private class StartupEvent(highestMetadataOffsetProvider: () => Long,
                      channelManager: BrokerToControllerChannelManager,
                      clusterId: String,
                      advertisedListeners: ListenerCollection,
                      supportedFeatures: util.Map[String, VersionRange]) extends EventQueue.Event {
     override def run(): Unit = {
       _highestMetadataOffsetProvider = highestMetadataOffsetProvider
       _channelManager = channelManager
       _channelManager.start()
       _state = BrokerState.STARTING
       _clusterId = clusterId
       _advertisedListeners = advertisedListeners.duplicate()
       _supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures)
       eventQueue.scheduleDeferred("initialRegistrationTimeout",
         new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
         new RegistrationTimeoutEvent())
       sendBrokerRegistration()
       info(s"Incarnation ${incarnationId} of broker ${nodeId} in cluster ${clusterId} " +
         "is now STARTING.")
     }
    }
    
  4. BrokerLifecycleManager.scala#sendBrokerRegistration() 方法源码如下,关键处理其实分为两步:

    1. 组装 BrokerRegistration 请求对象
    2. 调用 BrokerToControllerChannelManager.scala#sendRequest() 方法向集群 Leader 发起网络请求,并设置请求响应的处理器为 BrokerRegistrationResponseHandler。此处发起网络请求的部分主要依赖底层 NetworkClient,笔者在Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析 有提及,此处不再赘述
      private def sendBrokerRegistration(): Unit = {
     val features = new BrokerRegistrationRequestData.FeatureCollection()
     _supportedFeatures.asScala.foreach {
       case (name, range) => features.add(new BrokerRegistrationRequestData.Feature().
         setName(name).
         setMinSupportedVersion(range.min()).
         setMaxSupportedVersion(range.max()))
     }
     val data = new BrokerRegistrationRequestData().
         setBrokerId(nodeId).
         setClusterId(_clusterId).
         setFeatures(features).
         setIncarnationId(incarnationId).
         setListeners(_advertisedListeners).
         setRack(rack.orNull)
     if (isTraceEnabled) {
       trace(s"Sending broker registration ${data}")
     }
     _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
       new BrokerRegistrationResponseHandler())
    }
    
  5. 集群 Leader 处理完 BrokerRegistration 请求,将响应送达发起请求的节点时,将触发请求发起节点的BrokerRegistrationResponseHandler#onComplete() 方法对响应数据进行处理。这个方法比较简单,可以看到无论 broker 节点是否成功注册到 Leader 节点,核心都是定时发起下一次的交互请求

    private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler {
     override def onComplete(response: ClientResponse): Unit = {
       if (response.authenticationException() != null) {
         error(s"Unable to register broker ${nodeId} because of an authentication exception.",
           response.authenticationException());
         scheduleNextCommunicationAfterFailure()
       } else if (response.versionMismatch() != null) {
         error(s"Unable to register broker ${nodeId} because of an API version problem.",
           response.versionMismatch());
         scheduleNextCommunicationAfterFailure()
       } else if (response.responseBody() == null) {
         warn(s"Unable to register broker ${nodeId}.")
         scheduleNextCommunicationAfterFailure()
       } else if (!response.responseBody().isInstanceOf[BrokerRegistrationResponse]) {
         error(s"Unable to register broker ${nodeId} because the controller returned an " +
           "invalid response type.")
         scheduleNextCommunicationAfterFailure()
       } else {
         val message = response.responseBody().asInstanceOf[BrokerRegistrationResponse]
         val errorCode = Errors.forCode(message.data().errorCode())
         if (errorCode == Errors.NONE) {
           failedAttempts = 0
           _brokerEpoch = message.data().brokerEpoch()
           registered = true
           initialRegistrationSucceeded = true
           info(s"Successfully registered broker ${nodeId} with broker epoch ${_brokerEpoch}")
           scheduleNextCommunicationImmediately() // Immediately send a heartbeat
         } else {
           info(s"Unable to register broker ${nodeId} because the controller returned " +
             s"error ${errorCode}")
           scheduleNextCommunicationAfterFailure()
         }
       }
     }
    
2.1.2 BrokerRegistration 请求的处理
  1. Kafka 集群的 Leader 节点收到 BrokerRegistration 请求,经过底层网络组件的协议解析后会将其分发到 ControllerApis.scala#handleBrokerRegistration() 方法进行处理,此处核心是调用 QuorumController.java#registerBroker() 方法完成节点注册的业务处理

    def handleBrokerRegistration(request: RequestChannel.Request): Unit = {
     val registrationRequest = request.body[BrokerRegistrationRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    
     controller.registerBroker(registrationRequest.data).handle[Unit] { (reply, e) =>
       def createResponseCallback(requestThrottleMs: Int,
                                  reply: BrokerRegistrationReply,
                                  e: Throwable): BrokerRegistrationResponse = {
         if (e != null) {
           new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
             setThrottleTimeMs(requestThrottleMs).
             setErrorCode(Errors.forException(e).code))
         } else {
           new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
             setThrottleTimeMs(requestThrottleMs).
             setErrorCode(NONE.code).
             setBrokerEpoch(reply.epoch))
         }
       }
       requestHelper.sendResponseMaybeThrottle(request,
         requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e))
     }
    }
    
  2. QuorumController.java#registerBroker() 方法的处理完全遵照 Kafka 3.0 源码笔记(8)-Kafka 服务端集群 Leader 对 CreateTopics 请求的处理 中提到的异步处理框架,核心业务被封装在 ControllerWriteEvent 事件中,可以看到有以下关键几步:

    1. 调用ClusterControlManager.java#registerBroker()方法将发起请求节点添加到本地列表中,维护其状态
    2. 调用 QuorumController.java#rescheduleMaybeFenceStaleBrokers() 方法检查本地列表中是否有超时的节点,超时节点将被置为 FENCED 状态,并进行分区副本 Leader 的重新选举,关于这个方法下文将详细分析
     @Override
     public CompletableFuture<BrokerRegistrationReply>
             registerBroker(BrokerRegistrationRequestData request) {
         return appendWriteEvent("registerBroker", () -> {
             ControllerResult<BrokerRegistrationReply> result = clusterControl.
                 registerBroker(request, writeOffset + 1, featureControl.
                     finalizedFeatures(Long.MAX_VALUE));
             rescheduleMaybeFenceStaleBrokers();
             return result;
         });
     }
    
  3. ClusterControlManager.java#registerBroker()方法的关键分为如下几步,至此 broker 节点注册到了 Leader 本地列表,BrokerRegistration 请求处理基本结束

    1. 首先根据请求携带的 brokerId 查找本地列表中的 BrokerRegistration 节点注册信息,如果这个节点已经注册了,则通过 BrokerHeartbeatManager.java#hasValidSession() 方法检查该节点是否超时未报告心跳
    2. 生成节点注册元数据记录 RegisterBrokerRecord 对象,这个记录对象将在 ControllerWriteEvent 事件处理完成后在异步处理框架中写入到元数据 topic( __cluster_metadata),后续会重放到内存当中
    3. 调用 BrokerHeartbeatManager.java#touch() 方法维护节点状态 BrokerHeartbeatState
     public ControllerResult<BrokerRegistrationReply> registerBroker(
             BrokerRegistrationRequestData request,
             long brokerEpoch,
             FeatureMapAndEpoch finalizedFeatures) {
         if (heartbeatManager == null) {
             throw new RuntimeException("ClusterControlManager is not active.");
         }
         int brokerId = request.brokerId();
         BrokerRegistration existing = brokerRegistrations.get(brokerId);
         if (existing != null) {
             if (heartbeatManager.hasValidSession(brokerId)) {
                 if (!existing.incarnationId().equals(request.incarnationId())) {
                     throw new DuplicateBrokerRegistrationException("Another broker is " +
                         "registered with that broker id.");
                 }
             } else {
                 if (!existing.incarnationId().equals(request.incarnationId())) {
                     // Remove any existing session for the old broker incarnation.
                     heartbeatManager.remove(brokerId);
                     existing = null;
                 }
             }
         }
    
         RegisterBrokerRecord record = new RegisterBrokerRecord().setBrokerId(brokerId).
             setIncarnationId(request.incarnationId()).
             setBrokerEpoch(brokerEpoch).
             setRack(request.rack());
         for (BrokerRegistrationRequestData.Listener listener : request.listeners()) {
             record.endPoints().add(new BrokerEndpoint().
                 setHost(listener.host()).
                 setName(listener.name()).
                 setPort(listener.port()).
                 setSecurityProtocol(listener.securityProtocol()));
         }
         for (BrokerRegistrationRequestData.Feature feature : request.features()) {
             Optional<VersionRange> finalized = finalizedFeatures.map().get(feature.name());
             if (finalized.isPresent()) {
                 if (!finalized.get().contains(new VersionRange(feature.minSupportedVersion(),
                         feature.maxSupportedVersion()))) {
                     throw new UnsupportedVersionException("Unable to register because " +
                         "the broker has an unsupported version of " + feature.name());
                 }
             }
             record.features().add(new BrokerFeature().
                 setName(feature.name()).
                 setMinSupportedVersion(feature.minSupportedVersion()).
                 setMaxSupportedVersion(feature.maxSupportedVersion()));
         }
    
         if (existing == null) {
             heartbeatManager.touch(brokerId, true, -1);
         } else {
             heartbeatManager.touch(brokerId, existing.fenced(), -1);
         }
    
         List<ApiMessageAndVersion> records = new ArrayList<>();
         records.add(new ApiMessageAndVersion(record,
             REGISTER_BROKER_RECORD.highestSupportedVersion()));
         return ControllerResult.of(records, new BrokerRegistrationReply(brokerEpoch));
     }
    

2.2 BrokerHeartbeat 请求的交互

2.2.1 BrokerHeartbeat 请求的发起
  1. 2.1.1 节第5步中,节点注册成功则触发 BrokerLifecycleManager.scala#scheduleNextCommunicationImmediately() 执行,可以看到这个方法只是入口,核心其实是通过 BrokerLifecycleManager.scala#scheduleNextCommunication() 方法向异步队列 KafkaEventQueue 中投递延时事件 CommunicationEvent,这个事件被消费时将触发 CommunicationEvent#run() 执行

    private def scheduleNextCommunicationImmediately(): Unit = scheduleNextCommunication(0)
    
    private def scheduleNextCommunication(intervalNs: Long): Unit = {
     trace(s"Scheduling next communication at ${MILLISECONDS.convert(intervalNs, NANOSECONDS)} " +
       "ms from now.")
     val deadlineNs = time.nanoseconds() + intervalNs
     eventQueue.scheduleDeferred("communication",
       new DeadlineFunction(deadlineNs),
       new CommunicationEvent())
    }
    
  2. CommunicationEvent#run() 方法如下,可以看到这里会对当前节点的注册状态进行判断,进而决定是否发送心跳,这也就是其上游触发点不需要关心节点是否注册成功的原因

    private class CommunicationEvent extends EventQueue.Event {
     override def run(): Unit = {
       if (registered) {
         sendBrokerHeartbeat()
       } else {
         sendBrokerRegistration()
       }
     }
    }
    
  3. BrokerLifecycleManager.scala#sendBrokerHeartbeat() 方法发送 BrokerHeartbeat 请求 的处理流程其实和 BrokerRegistration 请求 类似,关键如下:

    1. 组装 BrokerHeartbeat 请求对象
    2. 调用 BrokerToControllerChannelManager.scala#sendRequest() 方法向集群 Leader 发起网络请求,并设置请求响应的处理器为 BrokerHeartbeatResponseHandler
      private def sendBrokerHeartbeat(): Unit = {
     val metadataOffset = _highestMetadataOffsetProvider()
     val data = new BrokerHeartbeatRequestData().
       setBrokerEpoch(_brokerEpoch).
       setBrokerId(nodeId).
       setCurrentMetadataOffset(metadataOffset).
       setWantFence(!readyToUnfence).
       setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
     if (isTraceEnabled) {
       trace(s"Sending broker heartbeat ${data}")
     }
     _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data),
       new BrokerHeartbeatResponseHandler())
    }
    
  4. BrokerHeartbeatResponseHandler#onComplete() 方法负责对 BrokerHeartbeat请求 的响应数据进行处理,可以看到这个方法的核心也是定时发起下一次交互,要么重新注册,要么向集群 Leader 报告心跳,不再赘述

     private class BrokerHeartbeatResponseHandler extends ControllerRequestCompletionHandler {
     override def onComplete(response: ClientResponse): Unit = {
       if (response.authenticationException() != null) {
         error(s"Unable to send broker heartbeat for ${nodeId} because of an " +
           "authentication exception.", response.authenticationException());
         scheduleNextCommunicationAfterFailure()
       } else if (response.versionMismatch() != null) {
         error(s"Unable to send broker heartbeat for ${nodeId} because of an API " +
           "version problem.", response.versionMismatch());
         scheduleNextCommunicationAfterFailure()
       } else if (response.responseBody() == null) {
         warn(s"Unable to send broker heartbeat for ${nodeId}. Retrying.")
         scheduleNextCommunicationAfterFailure()
       } else if (!response.responseBody().isInstanceOf[BrokerHeartbeatResponse]) {
         error(s"Unable to send broker heartbeat for ${nodeId} because the controller " +
           "returned an invalid response type.")
         scheduleNextCommunicationAfterFailure()
       } else {
         val message = response.responseBody().asInstanceOf[BrokerHeartbeatResponse]
         val errorCode = Errors.forCode(message.data().errorCode())
         if (errorCode == Errors.NONE) {
           failedAttempts = 0
           _state match {
             case BrokerState.STARTING =>
               if (message.data().isCaughtUp()) {
                 info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.")
                 _state = BrokerState.RECOVERY
                 initialCatchUpFuture.complete(null)
               } else {
                 debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.")
               }
               // Schedule the heartbeat after only 10 ms so that in the case where
               // there is no recovery work to be done, we start up a bit quicker.
               scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
             case BrokerState.RECOVERY =>
               if (!message.data().isFenced()) {
                 info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.")
                 _state = BrokerState.RUNNING
               } else {
                 info(s"The broker is in RECOVERY.")
               }
               scheduleNextCommunicationAfterSuccess()
             case BrokerState.RUNNING =>
               debug(s"The broker is RUNNING. Processing heartbeat response.")
               scheduleNextCommunicationAfterSuccess()
             case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
               if (!message.data().shouldShutDown()) {
                 info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still waiting " +
                   "for the active controller.")
                 if (!gotControlledShutdownResponse) {
                   // If this is the first pending controlled shutdown response we got,
                   // schedule our next heartbeat a little bit sooner than we usually would.
                   // In the case where controlled shutdown completes quickly, this will
                   // speed things up a little bit.
                   scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS))
                 } else {
                   scheduleNextCommunicationAfterSuccess()
                 }
               } else {
                 info(s"The controlled has asked us to exit controlled shutdown.")
                 beginShutdown()
               }
               gotControlledShutdownResponse = true
             case BrokerState.SHUTTING_DOWN =>
               info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat response.")
             case _ =>
               error(s"Unexpected broker state ${_state}")
               scheduleNextCommunicationAfterSuccess()
           }
         } else {
           warn(s"Broker ${nodeId} sent a heartbeat request but received error ${errorCode}.")
           scheduleNextCommunicationAfterFailure()
         }
       }
     }
    
2.2.2 BrokerHeartbeat 请求的处理
  1. ControllerApis.scala#handleBrokerRegistration() 方法负责BrokerHeartbeat请求的处理,其源码如下,可以看到和处理节点注册的流程高度一致,此处将触发 QuorumController.java#processBrokerHeartbeat() 方法执行

      def handleBrokerHeartBeatRequest(request: RequestChannel.Request): Unit = {
     val heartbeatRequest = request.body[BrokerHeartbeatRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    
     controller.processBrokerHeartbeat(heartbeatRequest.data).handle[Unit] { (reply, e) =>
       def createResponseCallback(requestThrottleMs: Int,
                                  reply: BrokerHeartbeatReply,
                                  e: Throwable): BrokerHeartbeatResponse = {
         if (e != null) {
           new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
             setThrottleTimeMs(requestThrottleMs).
             setErrorCode(Errors.forException(e).code))
         } else {
           new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
             setThrottleTimeMs(requestThrottleMs).
             setErrorCode(NONE.code).
             setIsCaughtUp(reply.isCaughtUp).
             setIsFenced(reply.isFenced).
             setShouldShutDown(reply.shouldShutDown))
         }
       }
       requestHelper.sendResponseMaybeThrottle(request,
         requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e))
     }
    }
    
  2. QuorumController.java#processBrokerHeartbeat() 方法核心的业务逻辑如下:

    1. 调用 ReplicationControlManager#processBrokerHeartbeat() 方法依据请求数据更新节点信息
    2. 调用 QuorumController.java#rescheduleMaybeFenceStaleBrokers() 方法检查本地列表中是否有超时的节点,超时节点将被置为 FENCED 状态,并需要进行分区副本 Leader 的重新选举
     @Override
     public CompletableFuture<BrokerHeartbeatReply>
             processBrokerHeartbeat(BrokerHeartbeatRequestData request) {
         return appendWriteEvent("processBrokerHeartbeat",
             new ControllerWriteOperation<BrokerHeartbeatReply>() {
                 private final int brokerId = request.brokerId();
                 private boolean inControlledShutdown = false;
    
                 @Override
                 public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
                     ControllerResult<BrokerHeartbeatReply> result = replicationControl.
                         processBrokerHeartbeat(request, lastCommittedOffset);
                     inControlledShutdown = result.response().inControlledShutdown();
                     rescheduleMaybeFenceStaleBrokers();
                     return result;
                 }
    
                 @Override
                 public void processBatchEndOffset(long offset) {
                     if (inControlledShutdown) {
                         clusterControl.heartbeatManager().
                             updateControlledShutdownOffset(brokerId, offset);
                     }
                 }
             });
     }
    
  3. ReplicationControlManager#processBrokerHeartbeat() 方法实现如下,关键处理分为以下几步:

    1. 调用 BrokerHeartbeatManager.java#calculateNextBrokerState() 方法计算报告心跳的节点接下来的状态
    2. 如果节点当前状态和接下来的状态不一致,则说明节点即将发生状态变化,需要进行对应的处理,生成元数据变动记录,后续将其封装到事件处理结果对象
    3. 更新维护节点状态到本地列表
    ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
                 BrokerHeartbeatRequestData request, long lastCommittedOffset) {
         int brokerId = request.brokerId();
         long brokerEpoch = request.brokerEpoch();
         clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
         BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
         BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId,
             request, lastCommittedOffset, () -> brokersToIsrs.hasLeaderships(brokerId));
         List<ApiMessageAndVersion> records = new ArrayList<>();
         if (states.current() != states.next()) {
             switch (states.next()) {
                 case FENCED:
                     handleBrokerFenced(brokerId, records);
                     break;
                 case UNFENCED:
                     handleBrokerUnfenced(brokerId, brokerEpoch, records);
                     break;
                 case CONTROLLED_SHUTDOWN:
                     generateLeaderAndIsrUpdates("enterControlledShutdown[" + brokerId + "]",
                         brokerId, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
                     break;
                 case SHUTDOWN_NOW:
                     handleBrokerFenced(brokerId, records);
                     break;
             }
         }
         heartbeatManager.touch(brokerId,
             states.next().fenced(),
             request.currentMetadataOffset());
         boolean isCaughtUp = request.currentMetadataOffset() >= lastCommittedOffset;
         BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp,
                 states.next().fenced(),
                 states.next().inControlledShutdown(),
                 states.next().shouldShutDown());
         return ControllerResult.of(records, reply);
     }
    
  4. 回到本节步骤2第2步QuorumController.java#rescheduleMaybeFenceStaleBrokers() 方法的核心在于延时执行 ReplicationControl.java#maybeFenceOneStaleBroker() 方法检测心跳超时节点,并递归调用自身以便处理多个节点下线的情况

    private void rescheduleMaybeFenceStaleBrokers() {
         long nextCheckTimeNs = clusterControl.heartbeatManager().nextCheckTimeNs();
         if (nextCheckTimeNs == Long.MAX_VALUE) {
             cancelMaybeFenceReplicas();
             return;
         }
         scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () -> {
             ControllerResult<Void> result = replicationControl.maybeFenceOneStaleBroker();
             // This following call ensures that if there are multiple brokers that
             // are currently stale, then fencing for them is scheduled immediately
             rescheduleMaybeFenceStaleBrokers();
             return result;
         });
     }
    
  5. ReplicationControlManager.java#maybeFenceOneStaleBroker() 方法的关键处理如下:

    1. 首先调用 BrokerHeartbeatManager.java#findOneStaleBroker() 检测是否存在心跳超时的节点
    2. 如果存在则触发失败转移机制,取列表中超时时间最长的一个节点出来调用 ReplicationControlManager.java#handleBrokerFenced() 进行下线处理
    
     ControllerResult<Void> maybeFenceOneStaleBroker() {
         List<ApiMessageAndVersion> records = new ArrayList<>();
         BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
         heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
             // Even though multiple brokers can go stale at a time, we will process
             // fencing one at a time so that the effect of fencing each broker is visible
             // to the system prior to processing the next one
             log.info("Fencing broker {} because its session has timed out.", brokerId);
             handleBrokerFenced(brokerId, records);
             heartbeatManager.fence(brokerId);
         });
         return ControllerResult.of(records, null);
     }
    
  6. BrokerHeartbeatManager.java#findOneStaleBroker() 方法比较简单,可以看到就是从头遍历 unfenced 节点列表,通过 BrokerHeartbeatManager.java#hasValidSession() 方法判断节点是否超时

    Optional<Integer> findOneStaleBroker() {
         BrokerHeartbeatStateIterator iterator = unfenced.iterator();
         if (iterator.hasNext()) {
             BrokerHeartbeatState broker = iterator.next();
             // The unfenced list is sorted on last contact time from each
             // broker. If the first broker is not stale, then none is.
             if (!hasValidSession(broker)) {
                 return Optional.of(broker.id);
             }
         }
         return Optional.empty();
     }
    
    private boolean hasValidSession(BrokerHeartbeatState broker) {
         if (broker.fenced()) {
             return false;
         } else {
             return broker.lastContactNs + sessionTimeoutNs >= time.nanoseconds();
         }
     }
    
  7. ReplicationControlManager.java#handleBrokerFenced() 开始进行失败转移处理,核心在于 ReplicationControlManager.java#generateLeaderAndIsrUpdates() 方法调用

    void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
         BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId);
         if (brokerRegistration == null) {
             throw new RuntimeException("Can't find broker registration for broker " + brokerId);
         }
         generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, records,
             brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
         records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
             setId(brokerId).setEpoch(brokerRegistration.epoch()),
             FENCE_BROKER_RECORD.highestSupportedVersion()));
     }
    
  8. ReplicationControlManager.java#generateLeaderAndIsrUpdates() 方法主要负责为副本 Leader 分布在失败节点上的分区重新选举 Leader 副本,此处关键处理为 PartitionChangeBuilder.java#build() 方法调用

        void generateLeaderAndIsrUpdates(String context,
                                      int brokerToRemove,
                                      int brokerToAdd,
                                      List<ApiMessageAndVersion> records,
                                      Iterator<TopicIdPartition> iterator) {
         int oldSize = records.size();
    
         // If the caller passed a valid broker ID for brokerToAdd, rather than passing
         // NO_LEADER, that node will be considered an acceptable leader even if it is
         // currently fenced. This is useful when handling unfencing. The reason is that
         // while we're generating the records to handle unfencing, the ClusterControlManager
         // still shows the node as fenced.
         //
         // Similarly, if the caller passed a valid broker ID for brokerToRemove, rather
         // than passing NO_LEADER, that node will never be considered an acceptable leader.
         // This is useful when handling a newly fenced node. We also exclude brokerToRemove
         // from the target ISR, but we need to exclude it here too, to handle the case
         // where there is an unclean leader election which chooses a leader from outside
         // the ISR.
         Function<Integer, Boolean> isAcceptableLeader =
             r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.unfenced(r));
    
         while (iterator.hasNext()) {
             TopicIdPartition topicIdPart = iterator.next();
             TopicControlInfo topic = topics.get(topicIdPart.topicId());
             if (topic == null) {
                 throw new RuntimeException("Topic ID " + topicIdPart.topicId() +
                         " existed in isrMembers, but not in the topics map.");
             }
             PartitionRegistration partition = topic.parts.get(topicIdPart.partitionId());
             if (partition == null) {
                 throw new RuntimeException("Partition " + topicIdPart +
                     " existed in isrMembers, but not in the partitions map.");
             }
             PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
                 topicIdPart.topicId(),
                 topicIdPart.partitionId(),
                 isAcceptableLeader,
                 () -> configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name));
    
             // Note: if brokerToRemove was passed as NO_LEADER, this is a no-op (the new
             // target ISR will be the same as the old one).
             builder.setTargetIsr(Replicas.toList(
                 Replicas.copyWithout(partition.isr, brokerToRemove)));
    
             builder.build().ifPresent(records::add);
         }
         if (records.size() != oldSize) {
             if (log.isDebugEnabled()) {
                 StringBuilder bld = new StringBuilder();
                 String prefix = "";
                 for (ListIterator<ApiMessageAndVersion> iter = records.listIterator(oldSize);
                      iter.hasNext(); ) {
                     ApiMessageAndVersion apiMessageAndVersion = iter.next();
                     PartitionChangeRecord record = (PartitionChangeRecord) apiMessageAndVersion.message();
                     bld.append(prefix).append(topics.get(record.topicId()).name).append("-").
                         append(record.partitionId());
                     prefix = ", ";
                 }
                 log.debug("{}: changing partition(s): {}", context, bld.toString());
             } else if (log.isInfoEnabled()) {
                 log.info("{}: changing {} partition(s)", context, records.size() - oldSize);
             }
         }
     }
    
  9. PartitionChangeBuilder.java#build() 方法中和副本 Leader 选举相关的重点如下:

    1. 调用 PartitionChangeBuilder.java#shouldTryElection() 方法判断当前分区副本 Leader 是否在 ISR 列表中,不在则需要重新选举副本 Leader
    2. 调用 PartitionChangeBuilder.java#tryElection() 方法执行重新选举分区副本 Leader 的动作
    3. 生成 PARTITION_CHANGE_RECORD 元数据变动记录,记录分区信息的变化。该元数据变动在集群节点上重放时会检测 Leader 变动,分区 Leader 每变动一次版本号自增1。分区版本号主要和 Kafka 的异常恢复机制有关,感兴趣的读者可参考 Kafka 3.0 源码笔记(12)-Kafka 服务端分区异常恢复机制的源码分析
    public Optional<ApiMessageAndVersion> build() {
         PartitionChangeRecord record = new PartitionChangeRecord().
             setTopicId(topicId).
             setPartitionId(partitionId);
    
         completeReassignmentIfNeeded();
    
         if (shouldTryElection()) {
             tryElection(record);
         }
    
         triggerLeaderEpochBumpIfNeeded(record);
    
         if (!targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) {
             record.setIsr(targetIsr);
         }
         if (!targetReplicas.isEmpty() && !targetReplicas.equals(Replicas.toList(partition.replicas))) {
             record.setReplicas(targetReplicas);
         }
         if (!targetRemoving.equals(Replicas.toList(partition.removingReplicas))) {
             record.setRemovingReplicas(targetRemoving);
         }
         if (!targetAdding.equals(Replicas.toList(partition.addingReplicas))) {
             record.setAddingReplicas(targetAdding);
         }
         if (changeRecordIsNoOp(record)) {
             return Optional.empty();
         } else {
             return Optional.of(new ApiMessageAndVersion(record,
                 PARTITION_CHANGE_RECORD.highestSupportedVersion()));
         }
     }
    
  10. PartitionChangeBuilder.java#tryElection() 方法实际完成分区副本 Leader 的选举,此处分为两个步骤:

    1. 新建 BestLeader 对象,在它的构造方法中完成 Leader 选举
    2. 判断新选举的 Leader 是否和分区的当前 Leader 相同,不相同才需要将其更新到元数据变动记录中传播出去
    private void tryElection(PartitionChangeRecord record) {
        BestLeader bestLeader = new BestLeader();
        if (bestLeader.node != partition.leader) {
            record.setLeader(bestLeader.node);
            if (bestLeader.unclean) {
                // If the election was unclean, we have to forcibly set the ISR to just the
                // new leader. This can result in data loss!
                record.setIsr(Collections.singletonList(bestLeader.node));
            }
        }
    }
    
  11. BestLeader 选举比较简单,可以看到其实就是遍历本地分区的副本列表选取合适的副本即可,不过这里分为以下两种模式,至此分区副本失败选主的流程基本结束

    1. 从 ISR 列表中选取第一个合适的节点(UNFENCED 状态),默认模式
    2. 从所有节点中选取第一个合适的节点(UNFENCED 状态),以上模式选举失败才进入,默认关闭
    class BestLeader {
        final int node;
        final boolean unclean;
    
        BestLeader() {
            for (int replica : targetReplicas) {
                if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) {
                    this.node = replica;
                    this.unclean = false;
                    return;
                }
            }
            if (uncleanElectionOk.get()) {
                for (int replica : targetReplicas) {
                    if (isAcceptableLeader.apply(replica)) {
                        this.node = replica;
                        this.unclean = true;
                        return;
                    }
                }
            }
            this.node = NO_LEADER;
            this.unclean = false;
        }
    }
    
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐