Kafka本地集群搭建完成,介绍在shell中进行简单使用,并记录。

启动zookeeper(非自带)

cd /opt/zookeeper/bin/
./zkServer.sh start

启动kafka

cd /opt/kafka/
bin/kafka-server-start.sh config/server.properties &

& 表示后台运行,这样就可以启动后离开控制台。

bin/kafka-server-start.sh -daemon config/server.properties &
-daemon 守护线程,安装成功后启动可加这个参数,安装过程中暂时不要加

关闭kafka

bin/kafka-server-stop.sh config/server.properties &

查看所有Topic

cd /opt/kafka/bin/
./kafka-topics.sh -list -zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181

创建Topic

cd /opt/kafka/bin/
./kafka-topics.sh --create --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --replication-factor 2 --partitions 3 --topic kafka-action

--create 设置此次操作的action类型为创建
--zookeeper 设置zookeeper集群地址
--replication-factor 设置topic的副本因子
--partitions 设置topic的分区个数
--topic 设置topic的名称

描述Topic

cd /opt/kafka/bin/
./kafka-topics.sh -describe -zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181

详细信息的属性:
 - Topic 主题名称
 - Partition 分区编号
 - Leader 当前分区负责读写的节点,只有主副本才会接受消息的读写
 - Replicas 分区的复制节点列表,它与主题的副本数量有关,默认只有一个副本,即主副本
 - Isr 同步状态的副本,是Replicas的子集,必须是存活的,并且都能赶上主副本

启动一个向kafka-action发送消息的生产者

cd /opt/kafka/bin/
./kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic  kafka-action

--broker-list 指定kafka的代理地址列表(必传)
--topic 消息被发送的目标主题(必传)
--producer.config 用于加载一个生产者级别相关配置的配置文件,如product.properties
--producer-property 通过该命令参数可以直接在启动生产者命令行中设置生产者级别的配置,在命令行中设置的参数将会覆盖所加载配置文件中的参数设置
--property 通过该命令可以设置消息消费者相关的配置

消费kafka-action主题

旧版消费者:
./kafka-simple-consumer-shell.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic  kafka-action
新版消费者:
./kafka-console-consumer.sh  -bootstrap-server  192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --new-consumer --consumer-property  group.id=new-consumer-test --consumer-property  client.id=new-consumer-cl --topic kafka-action

--new-consumer 列出新消费者类型的所有消费组信息

最后附上危险操作步骤:
kafka-topics.sh的delete命令删除topic

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐