在这里插入图片描述

下面聊聊Kafka常用命令

1、Topic管理命令

以topic:test_1为例

1.1、创建topic

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test_1

参数说明:

  • –bootstrap-server:用来指定Kafka服务的连接,若有该参数,则–zookeeper参数可以不需要;
  • –zookeeper:废弃,通过zk的方式连接到Kafka集群;
  • –replication-factor:用来指定副本数量,这里需要注意的是不能大于broker的数量,不提供的话使用集群默认值;
  • –partitions:用来指定分区数量,当创建或修改topic时用来指定分区数,若创建时没提供该参数,则使用集群默认值,这里需要注意的是不能比之前的小,不然会报错;

1.2、topic扩容分区

将分区从3个扩容到4个

# kafka旧版本使用zk方式,不推荐使用
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_1 --replication-factor 3 --partitions 4
# kafka2.2版本以后使用broker_host:port方式,推荐使用,注意topic一旦创建,partition只能增加不能减少
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test_1 --replication-factor 3 --partitions 4

1.3、删除topic

# 指定topic进行删除
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test_1
# 支持以正则表达式匹配方式进行删除,比如删除以test_开头的topic
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic "test_.*"

1.4、查看topic列表

# 查看所有topic列表
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list --exclude-internal
# 查看正则匹配的topic列表
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list --exclude-internal --topic "test_.*"

参数说明:

  • –exclude-internal:排除kafka内部的topic,如_consumer_offsets-*
  • –topic:支持正则匹配topic

1.5、查看topic详情

# 查看单个topic,排除内部topic
./bin/kafka-topics.sh --topic test_1 --bootstrap-server localhost:9092 --describe --exclude-internal

1.6、查看topic message数量

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_1 -time -1

参数说明:

  • -time-1:表示要获取指定topic所有分区当前的最大位移;

2、Consumer管理命令

2.1、查看consumer group列表

./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

2.2、查看指定group.id的消费情况

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_1 --describe

2.3、删除group

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_1 --delete

2.4、重置offset

# 查看当前group是否为active状态,若是active状态则不能修改
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_1 --describe
# 重置
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_1 --reset-offsets -to-offset 100 --topic test_1 --execute
# 导出offset
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_1 --reset-offsets -to-offset 100 --topic test_1 --export > offset.log

3、Kafka操作命令

3.1、启动kafka

# 启动kafka命令
./bin/kafka-server-start.sh config/server.properties &
# 停止kafka服务
./bin/kafka-server-stop.sh config/server.properties &

3.2、发送消息

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_1

3.3、启动消费者

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_1 --from-beginning
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐