04-Kafka主题和分区
文章目录Kafka主题和分区一、主题管理1.1 创建主题1.2 分区副本分配1.2.1 分区和角色1.2.2 分配逻辑1.3 查看主题1.4 修改主题1.5 配置管理1.6 主题参数1.7 删除主题二、KafkaAdminClient2.1 基本使用2.2 主题合法性验证三、分区管理3.1 优先副本选举3.1.1 优先副本3.1.2 auto.leader.rebalance.enable3.1.
Kafka主题和分区
一、主题管理
Kafka系列文章是基于:深入理解Kafka:核心设计与实践原理一书,结合自己的部分实践和总结。
1.1 创建主题
- 如果broker端配置auto.create.topics.enable为true(默认为true),当收到客户端的元数据请求时则会创建topic,(消费消息和生产消息都会收到元数据请求)。创建主题脚本为:kafka-topics.sh
PS: 创建topic时,分区数为num.partitions(默认1),副本因子为default.replication.factor
下面是通过命令指定分区数和副本数
./kafka-topics.sh --zookeeper localhost:2181 --create --topic testTopic --partitions 1 --replication-factor 1
- 创建好的topic可以在日志路径下查看分区在每个broker的分配信息,也可以在zookeeper中获取分配信息,命令是:get /brokers/topics/topicName
[zk: localhost:2181(CONNECTED) 3] get /brokers/topics/testTopic-1
{"version":1,"partitions":{"0":[0]}}
cZxid = 0xd3
ctime = Sat Jul 20 09:57:21 CST 2019
mZxid = 0xd3
mtime = Sat Jul 20 09:57:21 CST 2019
pZxid = 0xd4
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 36
numChildren = 1
PS: 其中"0":[0]表示0号分区在0号broker,如果是"1":[1,2,3] 则 表示1号分区的三个副本分别在1、2和3号broker节点。
- kafka-topics.sh :kafka-topics.sh可以执行多种类型的topic操作,包括create、delete、alter、list、describe,所需参数也各有不同。
//1.create 新增主题
./kafka-topics.sh --zookeeper localhost:2181 --create --topic testTopic --partitions 1 --replication-factor 1
//2.delete 删除主题;注意部不是立刻删除,而且delete.topic.enable为false的时候,删除操作是没有作用的
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testTopic-2
Topic testTopic-2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
//3.alter 修改主题
//4.list 展示所有主题
./kafka-topics.sh --zookeeper localhost:2181 --list
//5.describe 查看主题详情;展示主题,分区数,副本数(AR),Leader副本所在broker,ISR等信息
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic testTopic (如果没有指定--topic则会展示全部主题信息)
Topic:testTopic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: testTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- 补充
1. kafka-topics.sh 支持自定义分区在broker上的分配关系,通过参数--replica-assignment指定,且此时不需要--partitions和--replication-factor参数了
2. kafka-topics.sh 支持通过config参数来设置更多的参数,且config中的参数会覆盖原本的默认配置,比如:--config max .message.bytes=lOOOO,--config配置多次,比
如:--config cleanup.policy=compact --config max .message.bytes=lOOOO, 在zookeeper的/config/topics/topicName下可以看到config相关的参数配置
3. kafka-topics.sh 支持参数--if-not-exists来避免创建重复的topic
4. kafkaserver通过参数broker.rack指定机架信息,如果指定了那么创建topic的时候会尽量将副本分配在不同的机架,机架信息要么所有的broker都需要配置要么都不配置,但
是也可以使用kafka-topics.sh的参数--disable-rack-aware 来取消机架感知
- Java实现;前面调用 kafka-topics.sh脚本实现的功能,在Java中有对应的Api,下面给出一个简单的示例
public class CreateTopic {
public static final String BROKER_LIST = "192.168.13.53:2181";
public static void main(String[] args) {
String[] options = new String[]{
"--zookeeper", BROKER_LIST,
"--create",
"--replication-factor", "1",
"--partitions", "1",
"--topic", "topic-1"
};
TopicCommand.main(options);
}
}
1.2 分区副本分配
1.2.1 分区和角色
- 分区分配的几个角度
1.生产者角度:关注的是每条消息发往哪个分区
2.消费者角度:关注的是每个消费者被指定消费哪个分区
3.服务端:关注一个Topic下的每个分区的若干副本如何保存在不同的broker节点
1.2.2 分配逻辑
- 如果没有开启机架感知(比如没有配置机架或者指定了disable-rack-aware),那么分配逻辑如下
1.最开始会将所有的可用broker初始化一个List集合
2.然后随机计算首个分配的broker;(并不是每次都从0号broker开始,那样会不均匀且0号broker可能已经退出集群了);
3.给分区取一个编号,按照顺序从分区编号为0的分区开始分配;
4.分配的时候会计算一个步长shift,比如0号分区的第一个副本在broker为2,那么1号副本就在broker2+shift,这个计算细节就没有研究了,可以参照scala源码
- 如果开启机架感知,会将机架信息作为一个附加参考项,下面是对比前面的区别
1.在计算初始化broker集合的时候,是轮询的而不是顺序的;比如A机架有1、2和3,B机架有4、5和6,C机架有7、8和9,那么broker的集合是[1,4,7,2,5,8,3,6,9];
2.另外在分配的时候会做一些校验,比如A机架中的一个broker已经分了一个副本了,B和C机架中还有一个机架没有改分区副本,那么就一定不会把新的副本分配到A机架,而是B或者C机架
PS:总体来说就是进来均匀分布,有机架的时候就尽量把副本分配到不同的机架
1.3 查看主题
- 1.1中提到了查看分区信息的describe参数,这里补充其几个非常有用的参数。
1. --topics-with-overrides:找出所有包含覆盖配置的topic,这些topic包含于集群不一样的配置,命令会将对应的配置等信息展示出来
intellif@segment2:/opt/kafka/kafka_2.11-2.0.0/bin$ ./kafka-topics.sh --zookeeper localhost:2181 --describe --topics-with-overrides
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
2. --under-replicated-partitions:找出包含失效副本的分区,(即ISR<AR),此时有可能部分broker节点失效或者同步效率降低,如果没有则什么都不返回
./kafka-topics.sh --zookeeper localhost:2181 --describe --under-replicated-partitions
3. --unavailable-partitions:找出没有leader的分区,这些分区已经处于离线状态,对外界不可用,如果没有则什么都不返回
./kafka-topics.sh --zookeeper localhost:2181 --describe --unavailable-partitions
1.4 修改主题
- 1.1中提到了修改主题的alter参数,这里补充其几个非常有用的参数。
- 修改分区;
//修改分区,warn日志提示修改分区可能会改变消息计算得到的分区数
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic testTopic --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
//再次查看topic有3个分区
intellif@segment2:/opt/kafka/kafka_2.11-2.0.0/bin$ ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic testTopic
Topic:testTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: testTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: testTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: testTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
PS:分区数只能增加不能减少,The number of partitions for a topic can only be increased,减少分区收益很低,但是在代码实现上非常复杂,需要考虑的问题也非常多
intellif@segment2:/opt/kafka/kafka_2.11-2.0.0/bin$ ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic testTopic --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic testTopic currently has 3 partitions, 2 would not be an increase.
[2019-07-22 12:19:44,134] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic testTopic currently has 3 partitions, 2 would not be an increase.
(kafka.admin.TopicCommand$)
PPS:修改topic为了避免topic不存在,可以添加参数if-exists
- 修改配置; 配合config参数,可以修改topic的配置
intellif@segment2:/opt/kafka/kafka_2.11-2.0.0/bin$ ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic testTopic --config max.message.bytes=20000
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "testTopic".
PS: 通过config指定的参数,可以通过delete-config来删除配置
1.5 配置管理
- kafka-configs.sh用于管理Kafka的配置,主要是在运行时修改配置达到动态调整配置的作用。包括alter变更配置和describe查看配置两类,alter包括增删改三类,另外kafka-configs.sh支持 topic,用户,broker和客户端这几类配置的修改。
1.6 主题参数
-
Topic相关的参数在broker服务端都有参数对应,比如topic的cleanup.policy对应broker的log.cleanup.policy。如果创建topic的时候没有指定,那么就会使用broker的,创建时可以指定也可以在创建之后变更。
-
具体参数可以阅读参考文章[1]的第四章4.1.6小节
1.7 删除主题
- 删除无用的主题可以节约资源,使用./kafka-topics.sh来完成。
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testTopic-2
- 删除主题的本质是在Zookeeper的/admin/delete_topics路径下创建一个节点,节点名称和待删除的topic相同,这相当于将topic标记为待删除,真正的删除动作由Kafka的控制器完成(与创建topic类似)。因此删除topic也可以直接在zookeeper中操作
create /admin/delete_topics/topicname ""
- topic的元数据保存在zookeeper的/brokers/topics和/config/topics下面,日志文件保存在log.dirs下面(或者lg.dir)也可以手动清理这三个地方的数据删除topic,不过相对较复杂。
二、KafkaAdminClient
- 类似于前面的kafka-topics.sh脚本可以让我们对kafka的主题进行管理,在1.1小节中已经给出了使用TopicCommand创建主题的用法,但是这样的方法实际上与调用脚本类似,而且没有返回值,交互性比较差。但是如果想要将Kafka的管理功能与程序或者系统相结合,则可以使用KafkaAdminClient提供的Api。
2.1 基本使用
- 创建Topic
private static void createWithAdminApi() {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.11.27:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("topic-test-1", 1, (short) 1);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
try {
Void aVoid = createTopicsResult.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
PS: KafkaAdminClient提供了增删改Topic,查看Topic信息,展示所有Topic,修改分区,查询修改配置等功能,具体参数可以阅读参考文章[1]的第四章4.2.1小节
2.2 主题合法性验证
-
有时候为了管理Topic的创建,让Topic更加规范,比如创建的副本参数,分区数量,命名规则的有要求,而不允许随意创建,那么可以实现Kafka中的CreateTopicPolicy接口,之后再使用KafkaAdminClient创建Topic的时候,如果不满足该接口实现类中定义的校验规则,则创建会失败。
public class MyCreateTopicPolicy implements CreateTopicPolicy {
//校验方法,鉴定主题参数的合法性
@Override
public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {
}
//关闭Kafka服务时执行
@Override
public void close() throws Exception {
}
//Kafka服务启动时执行
@Override
public void configure(Map<String, ?> configs) {
}
}
- 注意要使用该方法需要在broker端将参数create.topic.policy.class.name的值为CreateTopicPolicy实现类的全限定类名,该配置默认为null,不会做校验
三、分区管理
3.1 优先副本选举
3.1.1 优先副本
- 多副本机制是保证一个分区高可用的关键,而分区高可用是Kafka高可用的必要条件。
PS: 一个broker上最多只能有一个分区的一个副本,换言之副本因子是不能大于broker数量的。(大于是没有意义的,一个broker上保存多份副本不能容灾,浪费资源)
- 多副本中的leader负责接收读写请求,follwer负责和leader同步数据,在leader故障后会重新再follower中选举出新的leader。但是这种机制会引入一个问题,假设期初每个broker上的leader数量是均衡的,那么整个Kafka集群的负载是均衡的(假设不同分区的请求负载相同),在节点故障触发leader重新选举之后,可能会导致某些broker节点上的leader副本很多,部分节点很少甚至没有leader副本,这样整个集群对外负载就不均衡了,从而影响到整个集群的健壮性和稳定性。
PS: 假设每个分区的负载相同,那么比较理想的状态是每个broker上运行的leader副本数差别不大。
-
为了解决前面所述问题,让leader副本能够均匀分布,首先引出了优先副本的概念(Preferred replica),优先副本是指分区的AR集合列表中第一个副本,理想情况下优先副本就是leader副本,因此保证leader副本的均衡分布等价于保证所有优先副本的均衡分布。
-
优先副本选举:优先副本选举就是指通过一定的方式促使优先副本被选举为leader副本。
PS: 即使优先副本在集群中均匀分布不代表集群是负载均衡的,因为每个分区的负载有可能不一样,尤其是业务上使用了消息key的情况下,因此优先副本选举只是从leader副本的角度来考虑负载均衡,但不能和整个集群的负载均衡等价。
3.1.2 auto.leader.rebalance.enable
- Kafka通过该参数来提供分区自动平衡功能,默认该配置开启。Kafka通过一个定时任务来检查每个broker的不平衡率是否超过阈值,如果超过则会执行优先副本选举来达到分区平衡。
定时周期:leader.imbalance.check.interval.seconds
不平衡率 = 非优先副本的leader / 分区总数
阈值:leader.imbalance.per.broker.percentage配置
PS :开启自动平衡时,平衡执行的时间不可控,有可能在关键时刻造成一定的阻塞,而且一定程度的不均衡是可以接受的,因此生产环境建议关闭,在真正需要平衡的时候手动介入进行leader副本重新平衡。
3.1.3 kafka-perferred-replica-election.sh
- kafka-perferred-replica election.sh脚本提供了leader副本重新平衡的功能,在关闭了auto.leader.rebalance.enable的情况下,必要时刻可以手动进行leader分区平衡
./kafka-perferred-replica-election.sh --zookeeper localhost:2181
PS : 命令会对所有的分区进行优先副本选举并打印出执行的相关信息,副本转移是一个成本比较高的工作,也可以采用更小粒度的对部分分区执行优先副本选举,并且要避开业务高峰。
- kafka-perferred-replica-election.sh可以通过–path-to-json-file参数来小批量的对部分分区进行优先副本选举。
./kafka-perferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file election.json
election.json:
{
"partitions": [
{
"partition": 0,
"topic": "topic-1"
},
{
"partition": 0,
"topic": "topic-2"
}
]
}
3.2 分区重分配
-
当Kafka集群扩容或者broker失效时,为了保证节点之间的负载均衡,需要对topic的分区副本进行必要的合理重新分配。Kafka提供了kafka-reassign-partitions.sh脚本来执行重分配工作。kafka-reassign-partitions.sh重分配大致分为三个步骤:
-
第一步:创建json文件
//在shell中我们指定了需要重分配的topic
//reassign.json
{
"topics": [
{
"topic": "topic-1"
}
],
"version": 1
}
- 第二步:获取候选重分配方案
//kafka可以自动给我们提供一份重分配的方案,注意这不是唯一的
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --generate --topics-to-move-json-file reassign.json
//将打印出的方案保存到result.json
- 第三步:执行重分配
//这一步会将分区信息按照分取方案重新划分
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file result.json
PS: 重分配的原理是,首先通过控制器为分区添加新的副本,新的副本从leader同步数据,然后控制器将旧的副本清除;在执行完毕之后,可以查看分区重分配的进度
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file result.json
PPS:重分配需要占用一定的资源,包含IO和磁盘等,可以小批量的进行重分配,另外在重分配的时候还可以限制其流量的大小。
3.3 复制限流
- 在重分配的过程中如果数据量很大,势必会占用不少的IO资源,影响性能和业务,小粒度能够起到很大的作用,但是如果某个topic流量很大,那么减小粒度也没有作用了,因此Kafka还提供了限流机制来控制副本间复制过程中的流量大小,相关配置如下:
follower.replication.throttled.rate:follower副本的复制速度,B/s
leader.replication.throttled.rate:leader副本的传输速度,B/s
PS:我们不能停止Kafka集群修改配置再重启,因此可以使用kafka-config.sh看动态修改配置,将参数修改之后,再执行重分配,然后再修改成最初的值。
PPS:注意这个配置是broker级别的
- Kafka还提供了主题级别的配置来限流;使用kafka-config.sh配置时,指定topic,对应的限制只对对应的topic有效
follower.replication.throttled.replicas:follower副本的复制速度,B/s
leader.replication.throttled.replicas:leader副本的传输速度,B/s
PS:这个配置是topic级别的
- kafka-reassign-partitions.sh限流:kafka-reassign-partitions.sh本身也可以通过参数限流,而且相对更加简单,通过–throttle 10指定即可。
3.4 修改副本因子
- 有时候我们需要扩展副本因子来加强数据的安全性与如容错性,或者减少副本因子节约资源。修改副本因子也是通过kafka-reassign-partitions.sh脚本来实现的,形式如下:
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file add.json
PS: 在add.json中包含分区的相关信息
四、分区数设计
4.1 性能测试
- Kafka自带的性测试脚本:kafka-producer-perf-test.sh,也可以使用kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance的方式,具体可以阅读参考文章[2]
4.2 分区数和吞吐
- 因为一个分区只能分配给一个消费者分组中的一个消费者,因此理论上分区数量越多可以并行的消费者数量越多吞吐量越大,但是并不是绝对的,在一定范围内增加分区可以增加吞吐。
4.3 分区数上限和影响因素
- 系统文件描述符限制是分区的理论上限值;(Kafka底层通过日志文件保存消息,每一个分区对应一个日志文件,因此分区不能无限增大)
intellif@ubuntu:/opt/gp6_beta4/gpdb$ ulimit -n //
1024
intellif@ubuntu:/opt/gp6_beta4/gpdb$ ulimit -Sn //软限制
1024
intellif@ubuntu:/opt/gp6_beta4/gpdb$ ulimit -Hn //硬限制
1048576
//查看指定进程占用的描述符数量
ls /proc/pid/fd | wc -l
ulimit - n 65535 指定同一时间可开启的文件数量
- leader切换
当分区数量过多,一个broker节点上的leader副本也有可能很多,如果该节点故障发生leader切换则可能导致较长的时候内相关分区不可用。
- 分区和key
Kafka消息的key值相同时会被发送到同一个分区,(关于key计算分区的规则可以参考02-Kafka生产者的1.5.2小节),因此在设计分区的时候也要充分考虑这个因素,这对于一些需要按序消费的场景很有用也很关键,因为分区数量的变化会导致消费被发送到另一个分区。
- 其他
另外过多的分区也会加长Kafka的启动时间和关闭时间
五、小结
- 本文主要写了关于主题和分区的内容,包括主题的维护,增删改查各种操作,分区的管理和Kafka为了维持节点之间的负载均衡所做的工作,包括优先副本,限流,分区重分配等,另外带出来Kafka提供的相关脚本的使用。
六、参考
更多推荐
所有评论(0)