点击上方蓝色“大数据实战演练”,选择“设为星标”或“置顶”

回复“资源”领取独家整理的学习资料!

每一个成功人士的背后,必定曾经做出过勇敢而又孤独的决定。

放弃不难,但坚持很酷~

前言:

前几天,我通过 Kafka 自带的 kafka-reassign-partitions.sh 脚本工具,完成了对 topic 分区副本数的增加。其实 kafka-reassign-partitions.sh 不仅可以实现分区副本数的增加,它还可以实现对 topic 分区的分配。

所以对于 topic 分区分配以及分区副本数的增加,本篇小文都会讲到,图文实操,讲解详细,看完别忘了点赞哦!

Kafka:2.11-1.1.0

一、准备工作

1、创建一个 8 分区、1 副本的 topic:

已知,Kafka 集群中有两个 kafka broker ,id 分别为 200、201 。

# 创建名为create17的topic
./bin/kafka-topics.sh --create --zookeeper cdh-worker-1:2181/kafka --replication-factor 1 --partitions 8 --topic create17

# 查看名为create17的topic详情:分区数、副本数、Isr等
./bin/kafka-topics.sh --describe --zookeeper cdh-worker-1:2181/kafka --topic create17

二、分区分配

已知,Kafka 集群中有两个 kafka broker ,id 分别为 200、201 ,现在再加一个 broker 节点,id 为 202 。Kafka 只会负载均衡新创建的 topic 分区。所以现在,我们需要将已存在的 create17 topic 的 8 个分区均匀分布在 3 个 broker 节点上,以便实现尽可能的负载均衡,提高写入和消费速度。

Kafka 不会对已存在的分区进行均衡分配,所以需要我们手动执行分区分配操作。

1、声明要分配分区的 topic 列表

Kafka 的 bin 目录中有一个 kafka-reassign-partitions.sh 脚本工具,我们可以通过它分配分区。在此之前,我们需要先按照要求定义一个文件,里面说明哪些 topic 需要分配分区。文件内容如下:

> cat topic-generate.json
{
  "topics": [
    {
      "topic": "create17"
    }
  ],
  "version": 1
}
2、通过 --topics-to-move-json-file 参数,生成分区分配策略 --generate
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --topics-to-move-json-file topic-generate.json --broker-list "200,201,202" --generate
  • --broker-list:值为要分配的 kafka broker id,以逗号分隔,该参数必不可少。脚本会根据你的 topic-generate.json 文件,获取 topic 列表,为这些 topic 生成分布在 broker list 上面的分区分配策略。

输出结果中有你当前的分区分配策略,也有 Kafka 期望的分配策略,在期望的分区分配策略里,kafka 已经尽可能的为你分配均衡。

我们先将 Current partition replica assignment 的内容备份,以便回滚到原来的分区分配状态。

然后将 Proposed partition reassignment configuration 的内容拷贝到一个新的文件中(文件名称、格式任意,但要保证内容为json格式)。

> cat partition-replica-reassignment.json
{"version":1,"partitions":[{"topic":"create17","partition":2,"replicas":[200],"log_dirs":["any"]},{"topic":"create17","partition":7,"replicas":[202],"log_dirs":["any"]},{"topic":"create17","partition":4,"replicas":[202],"log_dirs":["any"]},{"topic":"create17","partition":1,"replicas":[202],"log_dirs":["any"]},{"topic":"create17","partition":6,"replicas":[201],"log_dirs":["any"]},{"topic":"create17","partition":3,"replicas":[201],"log_dirs":["any"]},{"topic":"create17","partition":0,"replicas":[201],"log_dirs":["any"]},{"topic":"create17","partition":5,"replicas":[200],"log_dirs":["any"]}]}
3、通过 --reassignment-json-file 参数,执行分区分配策略 --execute
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --reassignment-json-file partition-replica-reassignment.json --execute
4、通过 --reassignment-json-file 参数,检查分区分配进度 --verify
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --reassignment-json-file partition-replica-reassignment.json --verify

我们再来看一下 topic : create17 的详细信息,broker 200 上有两个分区、broker 201、202 上分别有三个分区:

三、增大分区副本数

1、增加各 partition 所属的 replicas broker id

修改 partition-replica-reassignment.json 文件,增加各 partition 所属的 replicas broker id 。

{"version":1,"partitions":[{"topic":"create17","partition":6,"replicas":[201,200,202]},{"topic":"create17","partition":2,"replicas":[201,200,202]},{"topic":"create17","partition":4,"replicas":[201,200,202]},{"topic":"create17","partition":5,"replicas":[201,200,202]},{"topic":"create17","partition":0,"replicas":[201,200,202]},{"topic":"create17","partition":3,"replicas":[201,200,202]},{"topic":"create17","partition":1,"replicas":[201,200,202]},{"topic":"create17","partition":7,"replicas":[201,200,202]}]}
2、通过 --reassignment-json-file 参数,执行分区副本分配策略 --execute
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --reassignment-json-file partition-replica-reassignment.json --execute

然后,查看一下 topic:create17 ,发现:Replicas 列表正如上述 partition-replica-reassignment.json 描述的一样。

每个 partitiion 的所有 replicas 叫做 "assigned replicas" ,"assigned replicas" 中的第一个 replica 叫 "preferred replica",当 kafka leader replica 挂掉的话,partition 会选择 "preferred replica" 做为 leader replica 。

但根据上述图片示例,很明显,Replicas 列表分配不均。我们可以使用 --generate 参数再生成一份分配相对均匀的分区副本策略,这样就不用我们自己排列组合了。

最后得到的分区副本策略是这样的:

{"version":1,"partitions":[{"topic":"create17","partition":2,"replicas":[200,202,201],"log_dirs":["any","any","any"]},{"topic":"create17","partition":7,"replicas":[202,201,200],"log_dirs":["any","any","any"]},{"topic":"create17","partition":4,"replicas":[202,200,201],"log_dirs":["any","any","any"]},{"topic":"create17","partition":1,"replicas":[202,201,200],"log_dirs":["any","any","any"]},{"topic":"create17","partition":6,"replicas":[201,200,202],"log_dirs":["any","any","any"]},{"topic":"create17","partition":3,"replicas":[201,202,200],"log_dirs":["any","any","any"]},{"topic":"create17","partition":0,"replicas":[201,200,202],"log_dirs":["any","any","any"]},{"topic":"create17","partition":5,"replicas":[200,201,202],"log_dirs":["any","any","any"]}]}

再执行一下 --execute 操作,最后的 topic:create17 的详细信息为:

这样的话,假如 broker 202 挂了的话,分区1、7 的 Leader replica 会变为 201;分区4的 Leader replica 会变为 200 ,比上面咱们手动生成的策略要好。

四、小结

1、本文介绍了使用 Kafka 自带的 kafka-reassign-partitions.sh 脚本工具完成对 topic 的分区分配、分区副本增加操作。该脚本有三个参数:

  • --generate:配合着 --topics-to-move-json-file 可以生成分区分配策略,该参数适用于分区多的情况。

  • --execute:配合着 --reassignment-json-file 可以执行分区分配策略。

  • --verify:配合着 --reassignment-json-file 可以检查分区分配进度。

通过以上命令,是既可以分配分区,也可以增加分区副本数,非常方便。

2、也简单介绍了 kafka preferred replica ,它是 "assigned replicas" 中的第一个 replica 。当 kafka leader replica 挂掉的话, partition 会选择 "preferred replica" 做为 leader replica 。

抛一个小疑问,kafka leader replica 如果不挂掉的话,如何选择某 replica 为指定 leader 呢?我们下一篇答案揭晓。

‍‍‍‍‍‍‍‍‍‍好了,本篇 kafka 实操就到这里结束啦,感觉有帮助的朋友,希望能点个赞哦!

欢迎大家留言讨论

   ???? ???? ????

往期推荐

解惑 | 为什么我根据时间戳获得的offset为空呢?

两种实现方式 | 如何查看消费者组的消费情况

Kafka消费者 之 如何订阅主题或分区

Kafka消费者 之 指定位移消费

Kafka基础(二):生产者相关知识汇总

Kafka基础(一):基本概念及生产者、消费者示例

扫一扫,我们的故事就开始了。

如果这篇文章对你有所启发,点赞、转发都是一种支持!

让我知道你在看

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐