Kafka分区副本重分配

1、前言

在实际生产过程中,我们可能会遇到以下两种情况:

  1. 对集群中一个节点进行有计划的下线操作,为了保证分区及副本的合理分配,我们希望通过某种方式能够将该节点上的分区副本迁移到其他的可用节点上
  2. 当集群中新增broker节点时,只有新创建的主题分区才有可能被分配到这个节点上,而之前的主题分区并不会自动分配到新加入的节点中,因为在它们创建时还没有这个新节点,这样新节点的负载和原先节点的负载之间严重不均衡。

为了解决上述问题,需要让分区副本进行合理的分配,即分区副本重分配。获取分配方案通常有两种方式:

  1. 自动分配,执行脚本生成较均衡的分配方案
  2. 手动分配,自己指定分区分配方案

可参考另一篇文章Kafka分区副本分配规则
当获取了分配方案之后,怎么去执行这套分配方案呢?客户端和服务端分别做了哪些事情?这是本文将介绍的重点

2、分区副本重分配流程图

在这里插入图片描述

3、分区副本重分配详细分析

3.1 客户端行为

3.1.1 执行副本重分配脚本
  1. 脚本内容模版
{
	"version": 1,
	"partitions": [ {
		"topic": "rp1",
		"partition": 0,
		"replicas": [1, 0],
		"log_dirs": ["any", "any"]
	}]
}

log_dirs列表的长度要和replicas,如果想实现跨路径迁移,只需要在里面填入绝对路径;若不实现跨路径迁移,填入"any"即可,或者直接删除"log_dirs"字段

  1. 命令及参数
--zookeeper localhost/kafka
--reassignment-json-file jsonFiles/reassignment-json-file.json
--execute

–zookeeper后面接的是zk地址,–reassignment-json-file后接副本重分配文件,即上面1中内容

3.1.2 解析并验证传入的参数

具体源码在ReassignPartitionsCommand.executeAssignment中

// 解析传入的参数
  def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], opts: ReassignPartitionsCommandOptions): Unit = {
    val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
    val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
    val interBrokerThrottle = opts.options.valueOf(opts.interBrokerThrottleOpt)
    val replicaAlterLogDirsThrottle = opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt)
    val timeoutMs = opts.options.valueOf(opts.timeoutOpt)
    executeAssignment(zkClient, adminClientOpt, reassignmentJsonString, Throttle(interBrokerThrottle, replicaAlterLogDirsThrottle), timeoutMs)
  }

  def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L): Unit = {
    // 验证传入的副本分配方式是否正确,比如brokerId是否存在,replicas和log_dirs列表长度是否相同
    val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)
    ......
3.1.3 处理重分配任务
  1. 通过判断admin/reassign_partitions/节点是否存在判断是否有正在执行的重分配任务
  2. 若有重分配任务,则更新zk中broker下的限流信息
  3. 若无重分配任务,则执行以下流程
    • 将针对Topic和Broker的限流信息写到zk中
    • 获取有效的partitions
    • 跨路径迁移,发送AlterReplicaLogDirsRequest通知Broker在指定路径创建副本
    • 将重分配数据写入到 zk /admin/reassign_partitions节点
    • 跨路径迁移,发送AlterReplicaLogDirsRequest确认broker开始迁移副本到指定路径
def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L): Unit = {
    ......

    // If there is an existing rebalance running, attempt to change its throttle
    // 通过判断admin/reassign_partitions/节点是否存在判断是否有正在执行的重分配任务,若有,则更新zk中broker下的限流信息,若无,则开始进行副本重分配
    if (zkClient.reassignPartitionsInProgress()) {
      println("There is an existing assignment running.")
      // 如果有限流参数(--throttle, --replica-alter-log-dirs-throttle)则动态配置与之相关的broker限流信息
      reassignPartitionsCommand.maybeLimit(throttle)
    } else {
      // 打印当前副本分配方式
      printCurrentAssignment(zkClient, partitionAssignment.map(_._1.topic))
      if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)
        println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
      // 将限流信息和副本分配内容写入zk中
      if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) {
        println("Successfully started reassignment of partitions.")
      } else
        println("Failed to reassign partitions %s".format(partitionAssignment))
    }

将限流信息和副本分配内容写入zk中:

def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = {
    // 将针对Topic和broker的限流信息写到zk中
    maybeThrottle(throttle)
    try {
      // 获取有效的partitions 
      val validPartitions = proposedPartitionAssignment.groupBy(_._1.topic())
        .flatMap { case (topic, topicPartitionReplicas) =>
          validatePartition(zkClient, topic, topicPartitionReplicas)
        }
      if (validPartitions.isEmpty) false
      else {
        if (proposedReplicaAssignment.nonEmpty && adminClientOpt.isEmpty)
          throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory")
        val startTimeMs = System.currentTimeMillis()

        // 跨路径迁移,发送AlterReplicaLogDirsRequest通知broker在指定路径创建副本
        // Send AlterReplicaLogDirsRequest to allow broker to create replica in the right log dir later if the replica has not been created yet.
        if (proposedReplicaAssignment.nonEmpty)
          alterReplicaLogDirsIgnoreReplicaNotAvailable(proposedReplicaAssignment, adminClientOpt.get, timeoutMs)

        // Create reassignment znode so that controller will send LeaderAndIsrRequest to create replica in the broker
        // 将重分配数据写入到/admin/reassign_partitions中
        zkClient.createPartitionReassignment(validPartitions.map({case (key, value) => (new TopicPartition(key.topic, key.partition), value)}).toMap)

        // 跨路径迁移,发送AlterReplicaLogDirsRequest确认broker开始迁移副本到指定路径
        // Send AlterReplicaLogDirsRequest again to make sure broker will start to move replica to the specified log directory.
        // It may take some time for controller to create replica in the broker. Retry if the replica has not been created.
        var remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
        val replicasAssignedToFutureDir = mutable.Set.empty[TopicPartitionReplica]
        while (remainingTimeMs > 0 && replicasAssignedToFutureDir.size < proposedReplicaAssignment.size) {
          replicasAssignedToFutureDir ++= alterReplicaLogDirsIgnoreReplicaNotAvailable(
            proposedReplicaAssignment.filter { case (replica, _) => !replicasAssignedToFutureDir.contains(replica) },
            adminClientOpt.get, remainingTimeMs)
          Thread.sleep(100)
          remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
        }
        replicasAssignedToFutureDir.size == proposedReplicaAssignment.size
      }
    } catch {
      case _: NodeExistsException =>
        val partitionsBeingReassigned = zkClient.getPartitionReassignment()
        throw new AdminCommandFailedException("Partition reassignment currently in " +
          "progress for %s. Aborting operation".format(partitionsBeingReassigned))
    }
  }

zk中/admin/reassign_partitions节点的数据和重分配脚本内容一致

3.2 服务端行为

3.2.1 基本概念

假设某Topic分区原来副本为[0, 1],现在要将副本变为[0, 2]

变量简写源码中对应的变量解释说明例子
RSreassignment.replicas所有的副本集合,包括原始副本和目标副本[0, 1, 2]
ORSreassignment.originReplicas原始副本集合[0, 1]
TRSreassignment.targetReplicas目标副本集合[0, 2]
ARreassignment.addingReplicas需要添加的副本集合[2]
RRreassignment.removingReplicas需要删除的副本集合[1]
3.2.2 Controller节点行为

Controller节点监听到zk中/admin/reassign_partitions节点变化,开始执行以下逻辑

  1. KafkaController.processZkPartitionReassignment
  • 判定当前Broker是不是Controller节点,且/admin/reassign_partitions路径存在
  • 从zk中读取分区副本重分配的策略
  • 调用maybeTriggerPartitionReassignment方法开始进行副本重分配(具体分析在2中)
  • 分配失败的话将一些分区信息从zk中删除
private def processZkPartitionReassignment(): Set[TopicPartition] = {
    // We need to register the watcher if the path doesn't exist in order to detect future
    // reassignments and we get the `path exists` check for free
    // 若当前broker是controller, 且/admin/reassign_partitions路径存在
    if (isActive && zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
      val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
      val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]

      // 从zk中读取分区副本重分配的策略
      zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) =>
        maybeBuildReassignment(tp, Some(targetReplicas)) match {
          case Some(context) => partitionsToReassign.put(tp, context)
          case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
        }
      }

      // 副本重分配的地方
      reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign)
      val (partitionsReassigned, partitionsFailed) = reassignmentResults.partition(_._2.error == Errors.NONE)
      if (partitionsFailed.nonEmpty) {
        warn(s"Failed reassignment through zk with the following errors: $partitionsFailed")
        // 分配失败的话将一些分区信息从zk中删除
        maybeRemoveFromZkReassignment((tp, _) => partitionsFailed.contains(tp))
      }
      partitionsReassigned.keySet
    } else {
      Set.empty
    }
  }
  1. KafkaController.maybeTriggerPartitionReassignment
  • 判断Topic是否正在被删除
  • 如果Topic正在被删除,则跳过
  • 如果Topic没有正在被删除,则执行分配任务(具体分析在3中)
private def maybeTriggerPartitionReassignment(reassignments: Map[TopicPartition, ReplicaAssignment]): Map[TopicPartition, ApiError] = {
    reassignments.map { case (tp, reassignment) =>
      val topic = tp.topic

      // topic是否正在被删除
      val apiError = if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
        info(s"Skipping reassignment of $tp since the topic is currently being deleted")
        new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.")
      } else {
        val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)
        if (assignedReplicas.nonEmpty) {
          try {
            // 针对topic的某个分区,执行分配任务
            onPartitionReassignment(tp, reassignment)
            ApiError.NONE
          } catch {
            case e: ControllerMovedException =>
              info(s"Failed completing reassignment of partition $tp because controller has moved to another broker")
              throw e
            case e: Throwable =>
              error(s"Error completing reassignment of partition $tp", e)
              new ApiError(Errors.UNKNOWN_SERVER_ERROR)
          }
        } else {
            new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.")
        }
      }

      tp -> apiError
    }
  }

  1. KafkaController.onPartitionReassignment
  • 3.1 在内存中标记topic正在reassignment,防止删除
  • 3.2 在zk和内存中写入最新的分配方式
  • 3.3 判断分区重分配是否完成,刚开始分区副本重分配未完成
    • 3.3.1 给RS中所有的副本所在的broker发送LeaderAndIsr请求
    • 3.3.2 副本状态机将AR中所有副本置为NewReplica状态
private def onPartitionReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = {
    // 3.1 在内存中标记topic正在reassignment,防止删除
    // While a reassignment is in progress, deletion is not allowed
    topicDeletionManager.markTopicIneligibleForDeletion(Set(topicPartition.topic), reason = "topic reassignment in progress")

    // 3.2 在zk和内存中写入最新的分配方式
    updateCurrentReassignment(topicPartition, reassignment)

    val addingReplicas = reassignment.addingReplicas
    val removingReplicas = reassignment.removingReplicas

    if (!isReassignmentComplete(topicPartition, reassignment)) {
      // 3.3 若分区重分配未完成(根据zk上分区state节点里面的isr是否包含TRS判断)

      // 3.3.1 给RS中所有的副本所在的broker发送LeaderAndIsr请求
      // A1. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR).
      updateLeaderEpochAndSendRequest(topicPartition, reassignment)
      // 3.3.2 副本状态机将AR中所有副本置为NewReplica状态
      // A2. replicas in AR -> NewReplica
      startNewReplicasForReassignedPartition(topicPartition, addingReplicas)
    } else {
      ......
    }
  }

// 3.2 在zk和内存中写入最新的分配方式
updateCurrentReassignment(topicPartition, reassignment)

private def updateCurrentReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = {
    // 4.2.1 获取内存中已经在进行的分配方式
    val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)

    // 4.2.2 当前分配方式和内存中存在的分配方式不同
    if (currentAssignment != reassignment) {
      debug(s"Updating assignment of partition $topicPartition from $currentAssignment to $reassignment")

      // 4.2.2.1 将最新的分配方式更新到zk brokers/topics/{topicName}
      // U1. Update assignment state in zookeeper
      updateReplicaAssignmentForPartition(topicPartition, reassignment)

      // 4.2.2.2 将最新的分配方式更新到内存中
      // U2. Update assignment state in memory
      controllerContext.updatePartitionFullReplicaAssignment(topicPartition, reassignment)

      // 4.2.2.3 如果重新分配已经在进行中,那么一些当前新增的副本有可能被立即删除,在这种情况下,我们需要停止replicas
      // If there is a reassignment already in progress, then some of the currently adding replicas
      // may be eligible for immediate removal, in which case we need to stop the replicas.
      val unneededReplicas = currentAssignment.replicas.diff(reassignment.replicas)
      if (unneededReplicas.nonEmpty)
        stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas)
    }

    // 注册一个监听节点/brokers/topics/{topicName}/partitions/{分区号}/state变更的处理器
    val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
    zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)

    controllerContext.partitionsBeingReassigned.add(topicPartition)
  }

zk中brokers/topics/{topicName}节点内容为:

{"version":2,"partitions":{"0":[1,0]},"adding_replicas":{"0":[0]},"removing_replicas":{}}

若分区重分配未完成(根据zk上分区state节点里面的isr是否包含TRS判断),Controller给RS中所有的副本所在的broker发送LeaderAndIsr请求,新增的副本会去leader拉取数据,当数据同步完成,则会更新/brokers/topics/{topicName}/partitions/{分区号}/state节点信息(详细可看下一节Broker节点行为)

  1. Controller节点监听到/brokers/topics/{topicName}/partitions/{分区号}/state节点变化
  • 判断分区重分配任务是否完成(根据zk上分区state节点里面的isr是否包含TRS判断)
  • 若完成,则走以下逻辑
    • 副本状态机将所有新增加的副本修改为上线状态
    • 将内存中的RS修改为TRS
    • 处理原分区leader不在TRS中的情况: 重新选举leader
    • 下线需要删除的副本并删除
    • 用RS = TRS, AR = [], RR = [] 更新zk /broker/topics/{topicName} 节点
    • 删除监听/brokers/topics/{topicName}/partitions/{分区号}/state的监听器,清除zk中reassign_partitions中该partition节点数据
    • 向每个broker发送更新元数据request
    • 释放对topic的标记
  // Controller监听zk节点/brokers/topics/{topicName}/partitions/{分区号}/state
  // 监听到isr节点发生变化
  private def processPartitionReassignmentIsrChange(topicPartition: TopicPartition): Unit = {
    if (!isActive) return

    if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
      val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition)

      // 重分配任务是否完成
      if (isReassignmentComplete(topicPartition, reassignment)) {
        // resume the partition reassignment process
        info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " +
          s"reassigning partition $topicPartition")
        onPartitionReassignment(topicPartition, reassignment)
      }
    }
  }

private def onPartitionReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = {
    // 3.1 在内存中标记topic正在reassignment,防止删除
    // While a reassignment is in progress, deletion is not allowed
    topicDeletionManager.markTopicIneligibleForDeletion(Set(topicPartition.topic), reason = "topic reassignment in progress")

    // 3.2 在zk和内存中写入最新的分配方式
    updateCurrentReassignment(topicPartition, reassignment)

    val addingReplicas = reassignment.addingReplicas
    val removingReplicas = reassignment.removingReplicas

    if (!isReassignmentComplete(topicPartition, reassignment)) {
      ......
    } else {
      // 3.4 若分区重分配已完成

      // 3.4.1 副本状态机将所有新增加的副本修改为上线状态
      // B1. replicas in AR -> OnlineReplica
      replicaStateMachine.handleStateChanges(addingReplicas.map(PartitionAndReplica(topicPartition, _)), OnlineReplica)

      // 3.4.2 将内存中的RS修改为TRS
      // B2. Set RS = TRS, AR = [], RR = [] in memory.
      val completedReassignment = ReplicaAssignment(reassignment.targetReplicas)
      controllerContext.updatePartitionFullReplicaAssignment(topicPartition, completedReassignment)

      // 3.4.3 处理原分区leader不在TRS中的情况: 重新选举leader
      // B3. Send LeaderAndIsr request with a potential new leader (if current leader not in TRS) and
      //   a new RS (using TRS) and same isr to every broker in ORS + TRS or TRS
      moveReassignedPartitionLeaderIfRequired(topicPartition, completedReassignment)

      // 3.4.4 下线需要删除的副本并删除
      // B4. replicas in RR -> Offline (force those replicas out of isr)
      // B5. replicas in RR -> NonExistentReplica (force those replicas to be deleted)
      stopRemovedReplicasOfReassignedPartition(topicPartition, removingReplicas)

      // 3.4.5 用RS = TRS, AR = [], RR = [] 更新zk /broker/topics/{topicName} 节点
      // B6. Update ZK with RS = TRS, AR = [], RR = [].
      updateReplicaAssignmentForPartition(topicPartition, completedReassignment)

      // 3.4.6 删除监听/brokers/topics/{topicName}/partitions/{分区号}/state的监听器,
      // 清除zk中reassign_partitions中该partition节点数据
      // B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it.
      removePartitionFromReassigningPartitions(topicPartition, completedReassignment)

      // 3.4.7 向每个broker发送更新元数据request
      // B8. After electing a leader in B3, the replicas and isr information changes, so resend the update metadata request to every broker
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))

      // 3.4.8 释放对topic的标记
      // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
      topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))
    }
  }
3.2.3 Broker节点行为
3.2.3.1 Broker对LeaderAndIsrRequest的处理

入口在kafka.server.KafkaApis.handleLeaderAndIsrRequest,分区副本重分配逻辑是先增加副本然后下线删除不要的副本,增加副本时调用makeFollowers方法本地创建副本文件夹,然后启动对应的ReplicaFetcherThread来跟leader通信拉取数据。

4、相关问题

4.1 多余的副本是怎么删除的?

在这里插入图片描述

  Controller行为中,当重分配任务完成后,会将RR中的副本状态变更,并删除副本,在KafkaController.updateLeaderEpochAndSendRequest中。
  这里主要是副本状态机进行操作,涉及多个状态的变更,即–> OfflineReplica --> ReplicaDeletionStarted --> ReplicaDeletionSuccessful --> NonExistentReplica,在这些流转过程中,先停止了副本fetch,然后发起了StopReplicaRequest, StopReplicaRequest会将副本丢到待删除副本队列中,具体可看Controller副本状态机原理

4.2 leader变更是怎么处理的?

在这里插入图片描述

  这里主要分为两种情况,Controller节点进行处理,在KafkaController.updateLeaderEpochAndSendRequest中

  1. 目标副本不包括leader, 分区状态机处理时需带上leader选举策略重新选举leader
  2. 目标副本包括leader,但leader死亡,分区状态机处理时需带上leader选举策略重新选举leader
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐