启动

kafka的启动依赖zookeeper,先启动zookeeper,再启动kafka

  1. bin/zookeeper-server-start.sh config/zookeeper.properties
  2. bin/kafka-server-start.sh config/server.properties

topic

topic使用文件kafka-topics.sh,基本使用:
命令必须包含一个操作: - list,–describe, - create, --alter或–delete

创建

./kafka-topics.sh --create --zookeeper "kafka001:2181,kafka002:2181,kafka003:2181"  --partitions 1 --replication-factor 1  --topic zaj

新版本需要使用命令:

./kafka-topics.sh --create --bootstrap-server  10.130.44.103:9092   --partitions 1 --replication-factor 1  --topic zaj

和端口
其中参数

  • –create 是创建topic特有的;
  • –zookeeper是一个必须的参数;新版本被--bootstrap-server替换。--bootstrap-server 填写的是kafka server的ip和端口,--zookeeper 填写的是zookeeper的ip
  • –partitions表示创建的消息分区数量;分区数量在集群上平均分配;
  • –replication-factor表示每一个消息分区复制的分数,1表示不复制;当有3个broder时,复制3份则每个broder上一份;

  1. 从命令行中获取要创建的topic名称
  2. 解析命令行指定的topic配置(如果存在的话),配置都是x=a的格式
  3. 若指定了replica-assignment参数表明用户想要自己分配分区副本与broker的映射——通常都不这么做,如果不提供该参数Kafka帮你做这件事情
  4. 检查必要的参数是否已指定,包括:zookeeper, replication-factor,partition和topic
  5. 获取/brokers/ids下所有broker并按照broker id进行升序排序
  6. 在broker上分配各个分区的副本映射 (没有指定replica-assignment参数,这也是默认的情况)
  7. 检查topic名字合法性、自定义配置的合法性,并且要保证每个分区都必须有相同的副本数
  8. 若zookeeper上已有对应的路径存在,直接抛出异常表示该topic已经存在
  9. 确保某个分区的多个副本不会被分配到同一个broker
  10. 若提供了自定义的配置,更新zookeeper的/config/topics/[topic]节点的数据
  11. 创建/brokers/topics/[topic]节点,并将分区副本分配映射数据写入该节点

查看

./kafka-topics.sh --zookeeper "kafka001:2181,kafka002:2181,kafka003:2181" --list\
  • 从zookeeper的/brokers/topics节点下获取所有topic封装成topic集合
  • 遍历该集合,查看每个topic是否是待删除topic——即在/admin/delete_topics下是否存在同名节点。如果是,打印topic已经被标记为删除;否则直接打印topic名称

删除

./kafka-topics.sh --zookeeper "kafka001:2181,kafka002:2181,kafka003:2181" --delete --topic zaj13,zaj14

查看topic详情

[kafka@kafka003 bin]$ ./kafka-topics.sh --zookeeper "kafka001:2181,kafka002:2181,kafka003:2181" --describe --topic topic_test
Topic:topic_test	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: topic_test	Partition: 0	Leader: 2	Replicas: 2,0,1	Isr: 0,2,1
	Topic: topic_test	Partition: 1	Leader: 0	Replicas: 0,1,2	Isr: 0,2,1
	Topic: topic_test	Partition: 2	Leader: 1	Replicas: 1,2,0	Isr: 0,2,1
[kafka@kafka003 bin]$ ./kafka-topics.sh --zookeeper "kafka001:2181,kafka002:2181,kafka003:2181" --describe --topic sd_call_result
Topic:sd_call_result	PartitionCount:3	ReplicationFactor:1	Configs:
	Topic: sd_call_result	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: sd_call_result	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: sd_call_result	Partition: 2	Leader: 1	Replicas: 1	Isr: 1

展示信息:

topic名称,有几个分区,几个复制版本

每个消息分区的配置:消息分区的id,消息分区的leader处于哪一个broder,该消息分区的分布情况

“replicas” 信息,在节点1,2,0上,不管node死活,只是列出信息而已.
“isr” 工作中的复制节点的集合. 也就是活的节点的集合.

参考链接:

http://www.cnblogs.com/hopelee/p/7285340.html

https://github.com/Parsely/pykafka

https://cloud.tencent.com/developer/article/1010955


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐