Kafka主题和分区

一、主题管理

Kafka系列文章是基于:深入理解Kafka:核心设计与实践原理一书,结合自己的部分实践和总结。

1.1 创建主题

  • 如果broker端配置auto.create.topics.enable为true(默认为true),当收到客户端的元数据请求时则会创建topic,(消费消息和生产消息都会收到元数据请求)。创建主题脚本为:kafka-topics.sh
PS: 创建topic时,分区数为num.partitions(默认1),副本因子为default.replication.factor

下面是通过命令指定分区数和副本数
./kafka-topics.sh  --zookeeper localhost:2181  --create   --topic testTopic --partitions 1  --replication-factor 1
  • 创建好的topic可以在日志路径下查看分区在每个broker的分配信息,也可以在zookeeper中获取分配信息,命令是:get /brokers/topics/topicName
[zk: localhost:2181(CONNECTED) 3] get /brokers/topics/testTopic-1
{"version":1,"partitions":{"0":[0]}}
cZxid = 0xd3
ctime = Sat Jul 20 09:57:21 CST 2019
mZxid = 0xd3
mtime = Sat Jul 20 09:57:21 CST 2019
pZxid = 0xd4
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 36
numChildren = 1

PS: 其中"0":[0]表示0号分区在0号broker,如果是"1":[1,2,3] 则 表示1号分区的三个副本分别在123号broker节点。
  • kafka-topics.sh :kafka-topics.sh可以执行多种类型的topic操作,包括create、delete、alter、list、describe,所需参数也各有不同。
//1.create 新增主题
./kafka-topics.sh  --zookeeper localhost:2181  --create   --topic testTopic --partitions 1  --replication-factor 1

//2.delete 删除主题;注意部不是立刻删除,而且delete.topic.enable为false的时候,删除操作是没有作用的
./kafka-topics.sh  --zookeeper   localhost:2181  --delete --topic testTopic-2
Topic testTopic-2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.


//3.alter  修改主题


//4.list  展示所有主题
./kafka-topics.sh  --zookeeper   localhost:2181  --list


//5.describe 查看主题详情;展示主题,分区数,副本数(AR),Leader副本所在broker,ISR等信息
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic testTopic (如果没有指定--topic则会展示全部主题信息)

Topic:testTopic	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: testTopic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
  • 补充
1. kafka-topics.sh 支持自定义分区在broker上的分配关系,通过参数--replica-assignment指定,且此时不需要--partitions和--replication-factor参数了
2. kafka-topics.sh 支持通过config参数来设置更多的参数,且config中的参数会覆盖原本的默认配置,比如:--config max .message.bytes=lOOOO,--config配置多次,比
如:--config cleanup.policy=compact --config max .message.bytes=lOOOO, 在zookeeper的/config/topics/topicName下可以看到config相关的参数配置
3. kafka-topics.sh 支持参数--if-not-exists来避免创建重复的topic
4. kafkaserver通过参数broker.rack指定机架信息,如果指定了那么创建topic的时候会尽量将副本分配在不同的机架,机架信息要么所有的broker都需要配置要么都不配置,但
是也可以使用kafka-topics.sh的参数--disable-rack-aware 来取消机架感知
  • Java实现;前面调用 kafka-topics.sh脚本实现的功能,在Java中有对应的Api,下面给出一个简单的示例
public class CreateTopic {
    public static final String BROKER_LIST = "192.168.13.53:2181";

    public static void main(String[] args) {
        String[] options = new String[]{
                "--zookeeper", BROKER_LIST,
                "--create",
                "--replication-factor", "1",
                "--partitions", "1",
                "--topic", "topic-1"
        };

        TopicCommand.main(options);
    }
}

1.2 分区副本分配

1.2.1 分区和角色
  • 分区分配的几个角度
1.生产者角度:关注的是每条消息发往哪个分区
2.消费者角度:关注的是每个消费者被指定消费哪个分区
3.服务端:关注一个Topic下的每个分区的若干副本如何保存在不同的broker节点
1.2.2 分配逻辑
  • 如果没有开启机架感知(比如没有配置机架或者指定了disable-rack-aware),那么分配逻辑如下
1.最开始会将所有的可用broker初始化一个List集合
2.然后随机计算首个分配的broker;(并不是每次都从0号broker开始,那样会不均匀且0号broker可能已经退出集群了);
3.给分区取一个编号,按照顺序从分区编号为0的分区开始分配;
4.分配的时候会计算一个步长shift,比如0号分区的第一个副本在broker为2,那么1号副本就在broker2+shift,这个计算细节就没有研究了,可以参照scala源码
  • 如果开启机架感知,会将机架信息作为一个附加参考项,下面是对比前面的区别
1.在计算初始化broker集合的时候,是轮询的而不是顺序的;比如A机架有1、2和3,B机架有4、5和6,C机架有7、8和9,那么broker的集合是[1,4,7,2,5,8,3,6,9];
2.另外在分配的时候会做一些校验,比如A机架中的一个broker已经分了一个副本了,B和C机架中还有一个机架没有改分区副本,那么就一定不会把新的副本分配到A机架,而是B或者C机架

PS:总体来说就是进来均匀分布,有机架的时候就尽量把副本分配到不同的机架

1.3 查看主题

  • 1.1中提到了查看分区信息的describe参数,这里补充其几个非常有用的参数。
1. --topics-with-overrides:找出所有包含覆盖配置的topic,这些topic包含于集群不一样的配置,命令会将对应的配置等信息展示出来
intellif@segment2:/opt/kafka/kafka_2.11-2.0.0/bin$ ./kafka-topics.sh  --zookeeper   localhost:2181  --describe --topics-with-overrides
Topic:__consumer_offsets	PartitionCount:50	ReplicationFactor:1	Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

2. --under-replicated-partitions:找出包含失效副本的分区,(即ISR<AR),此时有可能部分broker节点失效或者同步效率降低,如果没有则什么都不返回 
./kafka-topics.sh  --zookeeper   localhost:2181  --describe --under-replicated-partitions

3. --unavailable-partitions:找出没有leader的分区,这些分区已经处于离线状态,对外界不可用,如果没有则什么都不返回
./kafka-topics.sh  --zookeeper   localhost:2181  --describe --unavailable-partitions

1.4 修改主题

  • 1.1中提到了修改主题的alter参数,这里补充其几个非常有用的参数。
  • 修改分区;
//修改分区,warn日志提示修改分区可能会改变消息计算得到的分区数
./kafka-topics.sh  --zookeeper   localhost:2181  --alter --topic testTopic --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

//再次查看topic有3个分区
intellif@segment2:/opt/kafka/kafka_2.11-2.0.0/bin$ ./kafka-topics.sh  --zookeeper   localhost:2181  --describe --topic testTopic
Topic:testTopic	PartitionCount:3	ReplicationFactor:1	Configs:
	Topic: testTopic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: testTopic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: testTopic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

PS:分区数只能增加不能减少,The number of partitions for a topic can only be increased,减少分区收益很低,但是在代码实现上非常复杂,需要考虑的问题也非常多
intellif@segment2:/opt/kafka/kafka_2.11-2.0.0/bin$ ./kafka-topics.sh  --zookeeper   localhost:2181  --alter --topic testTopic --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic testTopic currently has 3 partitions, 2 would not be an increase.
[2019-07-22 12:19:44,134] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic testTopic currently has 3 partitions, 2 would not be an increase.
 (kafka.admin.TopicCommand$)

PPS:修改topic为了避免topic不存在,可以添加参数if-exists
  • 修改配置; 配合config参数,可以修改topic的配置
intellif@segment2:/opt/kafka/kafka_2.11-2.0.0/bin$ ./kafka-topics.sh  --zookeeper   localhost:2181  --alter --topic testTopic --config max.message.bytes=20000
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "testTopic".

PS: 通过config指定的参数,可以通过delete-config来删除配置

1.5 配置管理

  • kafka-configs.sh用于管理Kafka的配置,主要是在运行时修改配置达到动态调整配置的作用。包括alter变更配置和describe查看配置两类,alter包括增删改三类,另外kafka-configs.sh支持 topic,用户,broker和客户端这几类配置的修改。

1.6 主题参数

  • Topic相关的参数在broker服务端都有参数对应,比如topic的cleanup.policy对应broker的log.cleanup.policy。如果创建topic的时候没有指定,那么就会使用broker的,创建时可以指定也可以在创建之后变更。

  • 具体参数可以阅读参考文章[1]的第四章4.1.6小节

1.7 删除主题

  • 删除无用的主题可以节约资源,使用./kafka-topics.sh来完成。
./kafka-topics.sh  --zookeeper   localhost:2181  --delete --topic testTopic-2
  • 删除主题的本质是在Zookeeper的/admin/delete_topics路径下创建一个节点,节点名称和待删除的topic相同,这相当于将topic标记为待删除,真正的删除动作由Kafka的控制器完成(与创建topic类似)。因此删除topic也可以直接在zookeeper中操作
create /admin/delete_topics/topicname ""
  • topic的元数据保存在zookeeper的/brokers/topics和/config/topics下面,日志文件保存在log.dirs下面(或者lg.dir)也可以手动清理这三个地方的数据删除topic,不过相对较复杂。

二、KafkaAdminClient

  • 类似于前面的kafka-topics.sh脚本可以让我们对kafka的主题进行管理,在1.1小节中已经给出了使用TopicCommand创建主题的用法,但是这样的方法实际上与调用脚本类似,而且没有返回值,交互性比较差。但是如果想要将Kafka的管理功能与程序或者系统相结合,则可以使用KafkaAdminClient提供的Api。

2.1 基本使用

  • 创建Topic
private static void createWithAdminApi() {
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.11.27:9092");
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
        AdminClient adminClient = AdminClient.create(properties);
        NewTopic newTopic = new NewTopic("topic-test-1", 1, (short) 1);
        CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
        try {
            Void aVoid = createTopicsResult.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
PS: KafkaAdminClient提供了增删改Topic,查看Topic信息,展示所有Topic,修改分区,查询修改配置等功能,具体参数可以阅读参考文章[1]的第四章4.2.1小节

2.2 主题合法性验证

  • 有时候为了管理Topic的创建,让Topic更加规范,比如创建的副本参数,分区数量,命名规则的有要求,而不允许随意创建,那么可以实现Kafka中的CreateTopicPolicy接口,之后再使用KafkaAdminClient创建Topic的时候,如果不满足该接口实现类中定义的校验规则,则创建会失败。

public class MyCreateTopicPolicy implements CreateTopicPolicy {
    
    //校验方法,鉴定主题参数的合法性
    @Override
    public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {

    }

    //关闭Kafka服务时执行
    @Override
    public void close() throws Exception {

    }

    //Kafka服务启动时执行
    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 注意要使用该方法需要在broker端将参数create.topic.policy.class.name的值为CreateTopicPolicy实现类的全限定类名,该配置默认为null,不会做校验

三、分区管理

3.1 优先副本选举

3.1.1 优先副本
  • 多副本机制是保证一个分区高可用的关键,而分区高可用是Kafka高可用的必要条件。
PS: 一个broker上最多只能有一个分区的一个副本,换言之副本因子是不能大于broker数量的。(大于是没有意义的,一个broker上保存多份副本不能容灾,浪费资源)
  • 多副本中的leader负责接收读写请求,follwer负责和leader同步数据,在leader故障后会重新再follower中选举出新的leader。但是这种机制会引入一个问题,假设期初每个broker上的leader数量是均衡的,那么整个Kafka集群的负载是均衡的(假设不同分区的请求负载相同),在节点故障触发leader重新选举之后,可能会导致某些broker节点上的leader副本很多,部分节点很少甚至没有leader副本,这样整个集群对外负载就不均衡了,从而影响到整个集群的健壮性和稳定性。
PS: 假设每个分区的负载相同,那么比较理想的状态是每个broker上运行的leader副本数差别不大。
  • 为了解决前面所述问题,让leader副本能够均匀分布,首先引出了优先副本的概念(Preferred replica),优先副本是指分区的AR集合列表中第一个副本,理想情况下优先副本就是leader副本,因此保证leader副本的均衡分布等价于保证所有优先副本的均衡分布。

  • 优先副本选举:优先副本选举就是指通过一定的方式促使优先副本被选举为leader副本。

PS: 即使优先副本在集群中均匀分布不代表集群是负载均衡的,因为每个分区的负载有可能不一样,尤其是业务上使用了消息key的情况下,因此优先副本选举只是从leader副本的角度来考虑负载均衡,但不能和整个集群的负载均衡等价。
3.1.2 auto.leader.rebalance.enable
  • Kafka通过该参数来提供分区自动平衡功能,默认该配置开启。Kafka通过一个定时任务来检查每个broker的不平衡率是否超过阈值,如果超过则会执行优先副本选举来达到分区平衡。
定时周期:leader.imbalance.check.interval.seconds
不平衡率 = 非优先副本的leader / 分区总数 
阈值:leader.imbalance.per.broker.percentage配置

PS :开启自动平衡时,平衡执行的时间不可控,有可能在关键时刻造成一定的阻塞,而且一定程度的不均衡是可以接受的,因此生产环境建议关闭,在真正需要平衡的时候手动介入进行leader副本重新平衡。
3.1.3 kafka-perferred-replica-election.sh
  • kafka-perferred-replica election.sh脚本提供了leader副本重新平衡的功能,在关闭了auto.leader.rebalance.enable的情况下,必要时刻可以手动进行leader分区平衡
./kafka-perferred-replica-election.sh --zookeeper localhost:2181 

PS : 命令会对所有的分区进行优先副本选举并打印出执行的相关信息,副本转移是一个成本比较高的工作,也可以采用更小粒度的对部分分区执行优先副本选举,并且要避开业务高峰。
  • kafka-perferred-replica-election.sh可以通过–path-to-json-file参数来小批量的对部分分区进行优先副本选举。
./kafka-perferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file election.json

election.json:

{
  "partitions": [
    {
      "partition": 0,
      "topic": "topic-1"
    },
    {
      "partition": 0,
      "topic": "topic-2"
    }
  ]
}

3.2 分区重分配

  • 当Kafka集群扩容或者broker失效时,为了保证节点之间的负载均衡,需要对topic的分区副本进行必要的合理重新分配。Kafka提供了kafka-reassign-partitions.sh脚本来执行重分配工作。kafka-reassign-partitions.sh重分配大致分为三个步骤:

  • 第一步:创建json文件

//在shell中我们指定了需要重分配的topic

//reassign.json
{
  "topics": [
    {
      "topic": "topic-1"
    }
  ],
  "version": 1
}
  • 第二步:获取候选重分配方案
//kafka可以自动给我们提供一份重分配的方案,注意这不是唯一的

./kafka-reassign-partitions.sh  --zookeeper localhost:2181  --generate --topics-to-move-json-file  reassign.json

//将打印出的方案保存到result.json
  • 第三步:执行重分配
//这一步会将分区信息按照分取方案重新划分
./kafka-reassign-partitions.sh  --zookeeper localhost:2181  --execute --reassignment-json-file  result.json

PS: 重分配的原理是,首先通过控制器为分区添加新的副本,新的副本从leader同步数据,然后控制器将旧的副本清除;在执行完毕之后,可以查看分区重分配的进度
./kafka-reassign-partitions.sh  --zookeeper localhost:2181  --verify --reassignment-json-file  result.json

PPS:重分配需要占用一定的资源,包含IO和磁盘等,可以小批量的进行重分配,另外在重分配的时候还可以限制其流量的大小。

3.3 复制限流

  • 在重分配的过程中如果数据量很大,势必会占用不少的IO资源,影响性能和业务,小粒度能够起到很大的作用,但是如果某个topic流量很大,那么减小粒度也没有作用了,因此Kafka还提供了限流机制来控制副本间复制过程中的流量大小,相关配置如下:
follower.replication.throttled.rate:follower副本的复制速度,B/s
leader.replication.throttled.rate:leader副本的传输速度,B/s

PS:我们不能停止Kafka集群修改配置再重启,因此可以使用kafka-config.sh看动态修改配置,将参数修改之后,再执行重分配,然后再修改成最初的值。

PPS:注意这个配置是broker级别的
  • Kafka还提供了主题级别的配置来限流;使用kafka-config.sh配置时,指定topic,对应的限制只对对应的topic有效
follower.replication.throttled.replicas:follower副本的复制速度,B/s
leader.replication.throttled.replicas:leader副本的传输速度,B/s

PS:这个配置是topic级别的
  • kafka-reassign-partitions.sh限流:kafka-reassign-partitions.sh本身也可以通过参数限流,而且相对更加简单,通过–throttle 10指定即可。

3.4 修改副本因子

  • 有时候我们需要扩展副本因子来加强数据的安全性与如容错性,或者减少副本因子节约资源。修改副本因子也是通过kafka-reassign-partitions.sh脚本来实现的,形式如下:
./kafka-reassign-partitions.sh  --zookeeper localhost:2181  --execute --reassignment-json-file  add.json

PS: 在add.json中包含分区的相关信息

四、分区数设计

4.1 性能测试

4.2 分区数和吞吐

  • 因为一个分区只能分配给一个消费者分组中的一个消费者,因此理论上分区数量越多可以并行的消费者数量越多吞吐量越大,但是并不是绝对的,在一定范围内增加分区可以增加吞吐。

4.3 分区数上限和影响因素

  • 系统文件描述符限制是分区的理论上限值;(Kafka底层通过日志文件保存消息,每一个分区对应一个日志文件,因此分区不能无限增大)
intellif@ubuntu:/opt/gp6_beta4/gpdb$ ulimit -n  //
1024
intellif@ubuntu:/opt/gp6_beta4/gpdb$ ulimit -Sn   //软限制
1024
intellif@ubuntu:/opt/gp6_beta4/gpdb$ ulimit -Hn   //硬限制
1048576


//查看指定进程占用的描述符数量
ls /proc/pid/fd | wc -l

ulimit - n 65535 指定同一时间可开启的文件数量

  • leader切换
当分区数量过多,一个broker节点上的leader副本也有可能很多,如果该节点故障发生leader切换则可能导致较长的时候内相关分区不可用。
  • 分区和key
Kafka消息的key值相同时会被发送到同一个分区,(关于key计算分区的规则可以参考02-Kafka生产者的1.5.2小节),因此在设计分区的时候也要充分考虑这个因素,这对于一些需要按序消费的场景很有用也很关键,因为分区数量的变化会导致消费被发送到另一个分区。
  • 其他
另外过多的分区也会加长Kafka的启动时间和关闭时间

五、小结

  • 本文主要写了关于主题和分区的内容,包括主题的维护,增删改查各种操作,分区的管理和Kafka为了维持节点之间的负载均衡所做的工作,包括优先副本,限流,分区重分配等,另外带出来Kafka提供的相关脚本的使用。

六、参考

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐