目录

目标

相关概念

环境

启动zookeeper和kafka节点

命令集锦

查看kafka节点数量

查看所有消费者组

查看消费者组详情

创建主题

扩容分区

查看主题详情

查看所有主题

删除主题

发送消息

监听&消费消息


目标

熟悉kafka各个组件的功能。

通过命令的方式验证kafka各个组件之间的关联关系。


相关概念

Broker

kafka节点,多个broker组成kafka集群。

Topic

即主题,kafka通过Topic对消息进行分类,发布到kafka的消息都需要指定Topic。

Producer

即消息生产者,向Broker发送消息的客户端。

Consumer

即消息消费者,从Broker消费消息的客户端。

ConsumerGroup

即消费者组,消费者隶属于消费者组,同一个分区的消息可以被多个消费者消费,但是同一个消费者组中只能有一个消费者可以消费。

Partition

即分区,每个Topic下都至少有一个分区,分区内部的消息是有序的。


环境

zookeeper节点组成的集群,安装目录:

/opt/apache-zookeeper-3.6.2-bin

kafka节点组成的集群,安装目录:

/opt/kafka_2.13-3.1.0

启动zookeeper和kafka节点

第一步:先启动zookeeper集群,即启动三个zookeeper节点,因为kafka的节点需要向zookeeper注册。

#第一步:进入bin目录。
cd /opt/apache-zookeeper-3.6.2-bin/bin
#第二步:启动zookeeper。
./zkServer.sh start

###########################相关命令如下:###########################
#重启
./zkServer.sh restart
#停止
./zkServer.sh stop
#查看日志
./zkServer.sh start-foreground
#进入客户端
./zkCli.sh

第二步:启动kafka集群,即启动三个kafka节点。

#第一步:进入bin目录下。
/opt/kafka_2.13-3.1.0/bin
#第二步:启动kafka。
./kafka-server-start.sh -daemon ../config/server.properties

第三步:查看kafka集群是否启动成功。

#第一步:进入其中一个ZooKeeper节点的bin目录下。
cd /opt/apache-zookeeper-3.6.2-bin/bin
#第二步:启动客户端。注意,这里的ip和端口要和我们的zookeeper配置一致。
./zkCli.sh -server 127.0.0.1:2181
#第三步:查看有几个kafka启动了,这里查看的是Kafka的broker.id。
ls /brokers/ids
#第四步:如果还想查看单个Kafka节点的状态,就指定好broker.id查看,我这里查看broker.id=0的那个Kafka节点的状态。
get /brokers/ids/0

附录:关闭kafka节点。

#在kafka的bin目录下执行关闭操作。
./kafka-server-stop.sh -daemon ../config/server.properties

命令集锦

查看kafka节点数量

#这个命令需要在zookeeper客户端查看。
ls /brokers/ids

查看所有消费者组

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list 

查看消费者组详情

#我这里查看appleGroup这个消费者组的详情。
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group appleGroup

创建主题

#iPhoneTopic是我自定义的主题名称。
#localhost表示kafka节点所在的机器ip。
#9092表示kafka节点提供服务的端口,这个端口需要与配置文件配置的端口一致。

#创建一个最简单的主题,该主题没有指定分区和副本因子,默认为1个分区,即0分区。默认1个副本因子。
./kafka-topics.sh --create --topic iPhoneTopic --bootstrap-server localhost:9092

#指定分区数量创建主题。我这里创建了主题名称为liNingShoesTopic,分区数量为3。
./kafka-topics.sh --create --bootstrap-server localhost:9092  --partitions 3 --topic liNingShoesTopic

#指定副本因子创建主题。我这里创建主题名称为liNingShortSleeveTopic,指定副本因子数量为3个。分区数量3个。
./kafka-topics.sh --create --bootstrap-server localhost:9092   --replication-factor 3 --partitions 3 --topic liNingShortSleeveTopic

扩容分区

#把liNingShortSleeveTopic主题扩容为6个分区。
#注意:目前不支持减少分区,扩容前必须存在这个主题。
./kafka-topics.sh -alter --partitions 6 --bootstrap-server localhost:9092 --topic liNingShortSleeveTopic

查看主题详情

#liNingShortSleeveTopic是我要查看的主题。
#localhost表示kafka节点所在的机器ip。
#9092表示kafka节点提供服务的端口,这个端口需要与配置文件配置的端口一致。
./kafka-topics.sh --describe --topic liNingShortSleeveTopic --bootstrap-server localhost:9092

查看所有主题

#localhost表示kafka节点所在的机器ip。
#9092表示kafka节点提供服务的端口,这个端口需要与配置文件配置的端口一致。
./kafka-topics.sh --list --bootstrap-server localhost:9092

删除主题

#iPhoneTopic是我要删除的主题。
#localhost表示kafka节点所在的机器ip。
#9092表示kafka节点提供服务的端口,这个端口需要与配置文件配置的端口一致。
./kafka-topics.sh --delete --topic iPhoneTopic --bootstrap-server localhost:9092

发送消息

#localhost表示我往本机的kafka节点发送消息。
#iPhoneTopic是我发送消息的主题。
./kafka-console-producer.sh --broker-list localhost:9092 --topic iPhoneTopic

监听&消费消息

#监听&消费一个主题。
#iPhoneTopic是我要消费消息所在的主题。
#注意:该命令只能实时消费,启动该消费命令后需要用生产者生产消息来验证。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iPhoneTopic

#监听&消费多个主题。
#消费多个消息用|隔开。比如我这里把iPhoneTopic和iPadTopic放到了监听的白名单中。
#注意:该命令只能实时消费,启动该消费命令后需要用生产者生产消息来验证。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "iPhoneTopic|iPadTopic"

#该命令可以消费以前的消息。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic iPadTopic

#把消费者分配到appleGroup消费者组,并监听。
./kafka-console-consumer.sh --bootstrap-server localhost:9092  --consumer-property group.id=appleGroup --topic iPhoneTopic
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐