查看kafka集群topic数量

./kafka-topics.sh --list --zookeeper ip:2181

示例:

[root@master bin]# ./kafka-topics.sh --list --zookeeper 127.0.0.1:2181
__consumer_offsets
send_test

查看topic元数据信息

./kafka-topics.sh --describe --zookeeper ip:2181 --topic topic_name

示例:

[root@master bin]# ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic send_test
Topic:send_test	PartitionCount:1	ReplicationFactor:2	Configs:
	Topic: send_test	Partition: 0	Leader: 8	Replicas: 8,9	Isr: 8,9

topic分区扩容

./kafka-topics.sh --zookeeper ip:2181 --topic topic_name --alter --partitions 4
示例:

[root@master bin]# ./kafka-topics.sh --zookeeper 127.0.0.1:2181  --topic send_test --alter --partitions 4
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@master bin]# 
[root@master bin]# 
[root@master bin]# 
[root@master bin]# ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic send_test
Topic:send_test	PartitionCount:4	ReplicationFactor:2	Configs:
	Topic: send_test	Partition: 0	Leader: 8	Replicas: 8,9	Isr: 8,9
	Topic: send_test	Partition: 1	Leader: 9	Replicas: 9,10	Isr: 9,10
	Topic: send_test	Partition: 2	Leader: 10	Replicas: 10,8	Isr: 10,8
	Topic: send_test	Partition: 3	Leader: 8	Replicas: 8,10	Isr: 8,10

topic创建

./kafka-topics.sh --zookeeper ip:2181 --create --topic topic_name --partitions 16 --replication-factor 2

示例:

[root@master bin]#  ./kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test_create  --partitions 1  --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "test_create".
[root@master bin]# ./kafka-topics.sh --list --zookeeper 127.0.0.1:2181
__consumer_offsets
send_test
test_create

修改topic数据过期时间

./kafka-configs.sh --alter --zookeeper ip:2181 --entity-type topics --entity-name topic_name --add-config retention.ms=86400000

示例:

[root@master bin]# ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic send_test
Topic:send_test	PartitionCount:4	ReplicationFactor:2	Configs:
	Topic: send_test	Partition: 0	Leader: 8	Replicas: 8,9	Isr: 8,9
	Topic: send_test	Partition: 1	Leader: 9	Replicas: 9,10	Isr: 9,10
	Topic: send_test	Partition: 2	Leader: 10	Replicas: 10,8	Isr: 10,8
	Topic: send_test	Partition: 3	Leader: 8	Replicas: 8,10	Isr: 8,10
[root@master bin]# 
[root@master bin]# ./kafka-configs.sh --alter --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name send_test --add-config retention.ms=86400000
Completed Updating config for entity: topic 'send_test'.
[root@master bin]# 
[root@master bin]# ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic send_test
Topic:send_test	PartitionCount:4	ReplicationFactor:2	Configs:retention.ms=86400000
	Topic: send_test	Partition: 0	Leader: 8	Replicas: 8,9	Isr: 8,9
	Topic: send_test	Partition: 1	Leader: 9	Replicas: 9,10	Isr: 9,10
	Topic: send_test	Partition: 2	Leader: 10	Replicas: 10,8	Isr: 10,8
	Topic: send_test	Partition: 3	Leader: 8	Replicas: 8,10	Isr: 8,10
[root@master bin]# 

查看数据偏移量

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ip:9092 --topic topic_name --time -1

示例:

[root@master bin]# ./kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic send_test --time -1
send_test:2:0
send_test:1:0
send_test:3:0
send_test:0:7

查看kafka集群中所有消费组

./kafka-consumer-groups.sh --bootstrap-server ip:9092 --list

示例:

[root@master bin]#  ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
console-consumer-47912

查看消费组积压消费信息

./kafka-consumer-groups.sh --bootstrap-server ip:9092 --describe --group group_name

示例:

[root@master bin]#  ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

console-consumer-41354
[root@master bin]# ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group console-consumer-41354
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
send_test                      0          7               7               0          consumer-1-c4c90a77-4f77-47c5-b35f-a395f651e739   /192.168.122.129               consumer-1
send_test                      1          0               0               0          consumer-1-c4c90a77-4f77-47c5-b35f-a395f651e739   /192.168.122.129               consumer-1
send_test                      2          0               0               0          consumer-1-c4c90a77-4f77-47c5-b35f-a395f651e739   /192.168.122.129               consumer-1
send_test                      3          0               0               0          consumer-1-c4c90a77-4f77-47c5-b35f-a395f651e739   /192.168.122.129               consumer-1
[root@master bin]# 

生成者生产消息

./kafka-console-producer.sh --broker-list 172.16.0.240:9092 --topic send_test

示例:

[root@master bin]#  ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic send_test
1234
5678

消费者消费

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic send_test

示例:

[root@master bin]# ./kafka-console-consumer.sh  --bootstrap-server 127.0.0.1:9092 --topic send_test
1234
5678

版本说明。1


  1. 以上命令在 kafka版本0.10.2.1下亲测有效 ↩︎

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐