一、安装部署

1、jar包下载

http://kafka.apache.org/downloads
2、解压安装包

 tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/

3、修改解压后的文件名称

 mv kafka_2.11-2.4.1/ kafka

4、在/opt/module/kafka目录下创建logs文件夹

 mkdir logs

5、修改配置文件

 cd config/
 vi server.properties
 #broker的全局唯一编号,不能重复
 broker.id=0
 #删除topic功能使能
 delete.topic.enable=true
 #处理网络请求的线程数量
 num.network.threads=3
 #用来处理磁盘IO的现成数量
 num.io.threads=8
 #发送套接字的缓冲区大小
 socket.send.buffer.bytes=102400
 #接收套接字的缓冲区大小
 socket.receive.buffer.bytes=102400
 #请求套接字的缓冲区大小
 socket.request.max.bytes=104857600
 #kafka运行日志存放的路径
 log.dirs=/opt/module/kafka/logs
 #topic在当前broker上的分区个数
 num.partitions=1
 #用来恢复和清理data下数据的线程数量
 num.recovery.threads.per.data.dir=1
 #segment文件保留的最长时间,超时将被删除
 log.retention.hours=168
 #配置连接Zookeeper集群地址
 zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka

6、配置环境变量

sudo vi /etc/profile.d/my_env.sh
 #KAFKA_HOME
 export KAFKA_HOME=/opt/module/kafka
 export PATH=$PATH:$KAFKA_HOME/bin  
source /etc/profile

7、分发安装包

 xsync kafka/

注意:分发之后记得配置其他机器的环境变量
分别在hadoop102和hadoop103上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
注:broker.id不得重复

8、启动集群

注意:先启动zookeeper集群
分别启动集群或者群起:

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

9、关闭集群

bin/kafka-server-stop.sh

二、Kafka命令行操作

1)查看当前服务器中的所有topic

 kafka-topics.sh --list --bootstrap-server hadoop101:9092

2)创建topic

 kafka-topics.sh --create --bootstrap-server hadoop101:9092 --topic first --partitions 2 --replication-factor 2

选项说明:

–topic 定义topic名

–replication-factor 定义副本数

–partitions 定义分区数

3)删除topic

 kafka-topics.sh --bootstrap-server hadoop101:9092 --delete --topic first

需要server.properties中设置delete.topic.enable=true否则只是标记删除。

4)发送消息

 kafka-console-producer.sh  --broker-list hadoop101:9092 --topic first

hello world

caron caron

5)消费消息

 kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --from-beginning --topic first

–from-beginning:会把主题中以往所有的数据都读取出来。

6)查看某个Topic的详情

 kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first

7)修改分区数

 kafka-topics.sh --bootstrap-server hadoop101:9092 --alter --topic first --partitions 6
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐