• kafka服务启动

$KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties 
  • 创建Topic

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test0--zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1
--topic后面的test0是topic的名称--zookeeper应该和server.properties文件中的zookeeper.connect一样--config指定当前topic上有效的参数值--partitions指定topic的partition数量,如果不指定该数量,默认是server.properties文件中的num.partitions配置值--replication-factor指定每个partition的副本个数,默认1个
  • 列出所有Topic

$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
  • 查看Topic的分区和副本

$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181  --topic test0
  • 删除topic

#删除kafka的topic命令$KAFKA_HOME/bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test0#删除zookeeper中该topic相关的目录命令:rm -r /kafka/config/topics/test0rm -r /kafka/brokers/topics/test0
  • 查看topic消费的offset(偏移量)

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test0 --time -1
  • 修改topic的partition数量(只能增加不能减少)

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group testgroup --topic test0 --zookeeper 127.0.0.1:2181
  • 启动kafka生产者

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test0
  • 启动kafka消费者

#从头开始$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --from-beginning#从尾部开始$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --offset latest#指定分区$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --offset latest --partition 1#取指定个数$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --offset latest --partition 1 --max-messages 1
  • 查看有哪些消费者Group

$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --list
  • 查看Group详情

$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --describe
  • 删除Group

$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --delete

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐