一.启动、关闭

1.启动kafka

1)启动zookeeper(Broker需要zookeeper保存meta数据)
   bin/zkServer.sh start
2) 在每个kafka节点上:
   kafka-server.start.sh /opt/kafka_1.1.1/config/server.properties

 2.关闭kafka

先看这个No kafka server to stop无法使用命令关闭问题_M10F的博客-CSDN博客_no zookeeper server to stop

bin/kafka-server-stop.sh stop

二.基本操作

1.查看当前服务器中的所有topic

kafka-topics.sh --zookeeper cdh:2181/kafka \
--list

2.创建topic

kafka-topic.sh --zookeeper cdh:2181/kafka \
--create \
--replication-factor 1 \
--partitions 1 \
--topic first

注:
--replication-factor:副本数,副本数不能超过Broker节点的个数
--partitions:分区数
--topic:指定topic的名字

3.查看topic的详情

kafka-topics.sh --zookeeper cdh:2181/kafka \
--describe \
--topic test

Topic:test      PartitionCount:2        ReplicationFactor:1     Configs:
Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Topic: test     Partition: 1    Leader: 0       Replicas: 0     Isr: 0

注:
Partition:当前topic对应的分区编号
Replicas :partition所在的broker实例的broker.id的列表
Leader   :该partition的所有副本的leader领导者,处理所有该partition的读写
ISR      :该partition的存活副本对应的broker实例的broker.id的列表  

三台kafka节点:
Topic:hadoop	PartitionCount:3	ReplicationFactor:3		Configs:
Topic: hadoop	Partition: 0	Leader: 12	Replicas: 12,13,11	Isr: 12,13,11
Topic: hadoop	Partition: 1	Leader: 13	Replicas: 13,11,12	Isr: 13,11,12
Topic: hadoop	Partition: 2	Leader: 11	Replicas: 11,12,13	Isr: 11,12,13

4.修改topic

kafka-topics.sh --zookeeper cdh:2181 \
--alter \
--topic test \
--partition 4


Topic:test      PartitionCount:4        ReplicationFactor:1     Configs:
Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Topic: test     Partition: 1    Leader: 0       Replicas: 0     Isr: 0
Topic: test     Partition: 2    Leader: 0       Replicas: 0     Isr: 0
Topic: test     Partition: 3    Leader: 0       Replicas: 0     Isr: 0

注:partition只能改大不能改小

5.发送方消息

kafka-console-producer.sh \
--topic test \
--broker-list cdh:9092
然后手动输入:
hello world
hello pangpang
之后开另一个窗口来消费我们发送的消息

6.消费消息

kafka-console-consumer.sh \
--zookeeper cdh:2181/kafka \
--from-beginning \
--topic test
注:
--from-beginning:会把test主题中以往所有的数据都读取出来。(请根据业务场景选择是否增加该配置)

实际使用:
/opt/kafka_2.12-2.3.0/bin/kafka-console-consumer.sh 
--bootstrap-server BKFK1:9092 
--topic dev_lua_ngx_kafka_logs

zookeeper的默认端口是2181,kafka的默认端口是9092
如果namenode挂掉导致kafka消费出问题则重启脚本

三.kafka在zookeeper中的目录说明(3台Broker为例)

/cluster		
/cluster/id  {"version":"1","id":"Pks8sWZUT6GBJHqyVGQ5OA"}
代表的是一个kafka集群包含集群的版本和集群的id

/controller  {"version":1,"brokerid":1,"timestamp":"1564976668049"} 
controller:控制器
是kafka中非常重要的一个角色,控制partition的leader选举,topic的增删改查操作。
brokerid:由其id对应的broker承担controller的角色。

/controller_epoch 2:controller的纪元
代表controller的更迭,每当controller的brokerid更换一次,controller_epoch就+1.

/brokers
/brokers/ids	 [1, 2, 3] :存放当前kafka的broker实例列表
/brokers/topics	[hadoop, __consumer_offsets] :当前kafka中的topic列表
/brokers/seqid	:系统的序列id

/consumers 
老版本用于存储kafka消费者的信息,主要保存对应的offset,
新版本中基本不用,此时用户的消费信息,保存在/brokers/topics中:__consumer_offsets

/config	:存放配置信息

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐