启动:

./bin/kafka-server-start.sh -daemon config/server.properties

创建1replica 1 partition 的topic

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

查看topic当前偏移量信息

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topicName --time -1

手动设置topic数据保存时长

kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name topicName --entity-type topics --add-config retention.ms=3600000

添加分区

添加新的kafka实例,broker.id=1,然后执行如下命令,修改test的partition数为2

bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 2 --topic test

添加新的kafka实例后,新建的topic的partition会自动分配到新的broker

[root@kudu001 kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test2
[root@kudu001 kafka]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test2
Topic:test2	PartitionCount:2	ReplicationFactor:2	Configs:
	Topic: test2	Partition: 0	Leader: 1	Replicas: 1,0	Isr: 1,0
	Topic: test2	Partition: 1	Leader: 0	Replicas: 0,1	Isr: 0,1

分区迁移,分区重新划分

使用命令kafka-reassign-partitions.sh,命令接受broker-list参数,表示数据要迁移到哪些broker;
将topic test的第1个分区移动到broker.id=1的那个节点;
第一步:创建test配置文件test.json

{"topics":[{"topic":"test"}],"version": 1}

第二步:生成test的新的逻辑图

[root@kudu001 kafka]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file test.json  --broker-list "0,1" --generate
Current partition replica  assignment
#当前partition 副本配置
{"version":1,"partitions":[{"topic":"test","partition":1,"replicas":[0]},{"topic":"test","partition":0,"replicas":[0]}]}
Proposed partition reassignment configuration
#建议的partition 配置
{"version":1,"partitions":[{"topic":"test","partition":1,"replicas":[1]},{"topic":"test","partition":0,"replicas":[0]}]}

第三步:生成test 新的逻辑结构描述文件,test-reassignment.json

{
    "version":1,
    "partitions":[
        {
            "topic":"test",
            "partition":1,
            "replicas":[
                1
            ]
        },
        {
            "topic":"test",
            "partition":0,
            "replicas":[
                0
            ]
        }
    ]
}

第四步:重新分区

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --reassignment-json-file test-reassignment.json --execute

查看重新分区进度

[root@kudu001 kafka]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --reassignment-json-file test-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [test,1] completed successfully
Reassignment of partition [test,0] completed successfully

test 详情

[root@kudu001 kafka]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:2	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: test	Partition: 1	Leader: 1	Replicas: 1	Isr: 1

添加副本

给test添加1个副本,新建test-addreplica.json

{
    "version":1,
    "partitions":[
        {
            "topic":"test",
            "partition":0,
            "replicas":[
                0,
                1
            ]
        },
        {
            "topic":"test",
            "partition":1,
            "replicas":[
                1,
                0
            ]
        }
    ]
}

执行添加副本命令

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test-addreplica.json --execute

查看test详情

[root@kudu001 kafka]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:2	ReplicationFactor:2	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0,1	Isr: 0,1
	Topic: test	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1,0

topic partition leader 负载均衡

kafka集群运行一段时间后,由于一些节点的启停,topic的parition leader不在均匀分布在各个broker上,这将影响整个集群的读写性能;解决办法有两种
1.配置文件,默认是开启的

auto.leader.rebalance.enable=true
leader.imbalance.per.broker.percentage =10    
#leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.check.interval.seconds =300    
#检查leader是否不平衡的时间间隔

2.使用命令行的方式手动均衡
执行leader均衡使用的命令是

kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file test_maxwell_1-rebalance.json

test_maxwell_1详情

[root@hzv_stat_es1 ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_maxwell_1
Topic:test_maxwell_1	PartitionCount:6	ReplicationFactor:3	Configs:retention.ms=172800000,cleanup.policy=delete
	Topic: test_maxwell_1	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 1	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 2	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 3	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 4	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 5	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3

但在执行rebalance时,每个分区默认会使用Replicas中的第一个broker作为leader,例如上面的topic:test_maxwell_1,每个分区都是Replicas: 1,2,3,每个partitiion的所有replicas叫做"assigned replicas",“assigned replicas"中的第一个replicas叫"preferred replica”,刚创建的topic一般"preferred replica"是leader;所以即便执行kafka-preferred-replica-election.sh命令,leader也没办法重新分配,那就要重新分配每个partition的replica,其实就是重新分配一次,调整顺序(注意:如果preferred replica不相同,直接跳到步骤5)
第一步:
编写topic文件:test_maxwell_1.json

{"topics":[{"topic":"test_maxwell_1"}],"version": 1}

第二步,生成topic 逻辑图

[root@hzv_stat_es1 ~]# kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file test_maxwell_1.json  --broker-list "1,2,3" --generate
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test_maxwell_1","partition":3,"replicas":[1,2,3]},{"topic":"test_maxwell_1","partition":5,"replicas":[1,2,3]},{"topic":"test_maxwell_1","partition":2,"replicas":[1,2,3]},{"topic":"test_maxwell_1","partition":0,"replicas":[1,2,3]},{"topic":"test_maxwell_1","partition":4,"replicas":[1,2,3]},{"topic":"test_maxwell_1","partition":1,"replicas":[1,2,3]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"test_maxwell_1","partition":5,"replicas":[1,3,2]},{"topic":"test_maxwell_1","partition":3,"replicas":[2,1,3]},{"topic":"test_maxwell_1","partition":2,"replicas":[1,2,3]},{"topic":"test_maxwell_1","partition":0,"replicas":[2,3,1]},{"topic":"test_maxwell_1","partition":4,"replicas":[3,2,1]},{"topic":"test_maxwell_1","partition":1,"replicas":[3,1,2]}]}

第三步:创建文件test_maxwell_1-reassignment.json

{
    "version":1,
    "partitions":[
        {
            "topic":"test_maxwell_1",
            "partition":5,
            "replicas":[
                1,
                3,
                2
            ]
        },
        {
            "topic":"test_maxwell_1",
            "partition":3,
            "replicas":[
                2,
                1,
                3
            ]
        },
        {
            "topic":"test_maxwell_1",
            "partition":2,
            "replicas":[
                1,
                2,
                3
            ]
        },
        {
            "topic":"test_maxwell_1",
            "partition":0,
            "replicas":[
                2,
                3,
                1
            ]
        },
        {
            "topic":"test_maxwell_1",
            "partition":4,
            "replicas":[
                3,
                2,
                1
            ]
        },
        {
            "topic":"test_maxwell_1",
            "partition":1,
            "replicas":[
                3,
                1,
                2
            ]
        }
    ]
}

第四步:执行

kafka-reassign-partitions.sh --zookeeper localhost:2181 \
--reassignment-json-file test_maxwell_1-reassignment.json --execute

查看topic 详情

[root@hzv_stat_es1 ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_maxwell_1
Topic:test_maxwell_1	PartitionCount:6	ReplicationFactor:3	Configs:retention.ms=172800000,cleanup.policy=delete
	Topic: test_maxwell_1	Partition: 0	Leader: 1	Replicas: 2,3,1	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 1	Leader: 1	Replicas: 3,1,2	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 2	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 3	Leader: 1	Replicas: 2,1,3	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 4	Leader: 1	Replicas: 3,2,1	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 5	Leader: 1	Replicas: 1,3,2	Isr: 1,2,3

第五步:partition leader 均衡 test_maxwell_1-rebalance.json

{
    "partitions":[
        {
            "topic":"test_maxwell_1",
            "partition":0
        },
        {
            "topic":"test_maxwell_1",
            "partition":1
        },
        {
            "topic":"test_maxwell_1",
            "partition":2
        },
        {
            "topic":"test_maxwell_1",
            "partition":3
        },
        {
            "topic":"test_maxwell_1",
            "partition":4
        },
        {
            "topic":"test_maxwell_1",
            "partition":5
        }
    ]
}

执行均衡

kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file test_maxwell_1-rebalance.json

查看topic详情

[root@hzv_stat_es1 ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_maxwell_1
Topic:test_maxwell_1	PartitionCount:6	ReplicationFactor:3	Configs:retention.ms=172800000,cleanup.policy=delete
	Topic: test_maxwell_1	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 3	Leader: 1	Replicas: 1,3,2	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 4	Leader: 2	Replicas: 2,1,3	Isr: 1,2,3
	Topic: test_maxwell_1	Partition: 5	Leader: 3	Replicas: 3,2,1	Isr: 1,2,3
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐