目录

下载

kafka配置文件介绍

一、单机模式

1.安装及修改配置文件

二、集群模式/zookeeper-集群模式

1.安装及修改配置文件

2.安装后效果展示

3.kafka报错处理

三、Kafka-Kraft-集群模式

1.安装及修改配置文件

2.安装后效果展示

四、安装kafkalytic插件


kafka命令行操作请看:http://t.csdn.cn/JfUrW


下载

官方下载地址:Apache Kafka

kafka配置文件介绍

kafka3.0.0稳定版本,配置文件介绍

kafka配置文件介绍
文件文件功能描述
connect-*kafka传输连接工具,(一般使用flume进行连接)
consumer.properties消费者的配置文件((默认本地),一般不修改
kraftkafka不依赖zookeeper的集群配置(下文会讲)
log4j.properties日志文件默认级别info
producer.properties生产者的配置文件(默认本地),一般不修改
server.properties服务配置文件
tools-log4j.properties工具日志文件默认级别info
trogdor.conf版权所有权的额外信息的文件
zookeeper.propertieszookeeper配置文件,作为kafka交给zookeeper启动的zk配置,如连接zk端口等


一、单机模式

1.安装及修改配置文件

因为kafka是单节点启动,所以配置与集群模式类似

下载、解压文件

tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/root/kafka 

进入kafka配置文件

[root@liuyizhao01 kafka]# cd config/

修改kafka配置文件server.properties

[root@liuyizhao01 config]# vim server.properties

修改:

log.dirs=/opt/root/kafka/datas #kafka数据存储目录

zookeeper.connect=192.168.xx.xxx:2181 #配置连接Zookeeper地址(在zk根目录下创建/kafka,方便管理),2181zookeeper端口

二、集群模式/zookeeper-集群模式

因为kafka开始时,就是通过zookeeper连接集群,到kafka大火才开始引入自己的集群配置kafka-kraft,所以大部分是以zookeeper-kafka集群模式

1.安装及修改配置文件

1.去kafka官网下载linux安装包,kafka3.0.0版本,2.12是scala版本

tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/root/kafka

2.修改kafka配置文件

[root@liuyizhao01 kafka]# cd config/

3.修改kafka配置文件server.properties

[root@liuyizhao01 config]# vim server.properties

4.通过zookeeper管理kafka集群修改文件server.properties,同时修改kafka所有节点的配置文件,其他一样只修改broker.id=(数字:1,2,3...)

broker.id=0 #broker的全局唯一编号,不能重复,只能是数字,作为集群的唯一编号

num.network.threads=3 #处理网络请求的线程数量

num.io.threads=8 #用来处理磁盘IO的线程数量

socket.send.buffer.bytes=102400 #发送套接字的缓冲区大小

socket.receive.buffer.bytes=102400 #接收套接字的缓冲区大小

socket.request.max.bytes=104857600 #请求套接字的缓冲区大小

log.dirs=/opt/root/kafka/datas #kafka数据存储目录

num.partitions=1 #topic在当前broker上的分区个数

num.recovery.threads.per.data.dir=1 #用来恢复和清理data下数据的线程数量

offsets.topic.replication.factor=1 # 每个topic创建时的副本数,默认时1个副本

log.retention.hours=168 #segment文件保留的最长时间,超时将被删

log.segment.bytes=1073741824 #每个segment文件的大小,默认最大1G

log.retention.check.interval.ms=300000 # 检查过期数据的时间,默认5分钟检查一次是否数据过期

zookeeper.connect=192.168.xx.xxx:2181,192.168.xx.xxx:2181,192.168.xx.xxx:2181/kafka #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理),2181zookeeper端口

5.配置kafka环境变量

在/etc/profile.d/my_env.sh文件中增加kafka环境变量配置,my_env.sh是自己创建的自定义环境变量文件。

[root@liuyizhao01 /]# vim /etc/profile.d/my_env.sh

增加如下内容:

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$KAFKA_HOME/bin

刷新一下环境变量:

[root@liuyizhao01 /]# source /etc/profile

6.启动kafka命令

前提:先启动zookeeper

kafka-server-start.sh 脚本启动,配置文件 server.properties

[root@liuyizhao01 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties

[root@liuyizhao02 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties

[root@liuyizhao03 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties

2.安装后效果展示

1.查看所有topic

 2.在first主题下,生产并消费数据

 

3.kafka报错处理

1.需要删除/kafka/config/server.properties 下定义的 log.dirs  kafka数据存储目录

log.dirs=/opt/root/kafka/datas #kafka数据存储目录

2.删除logs目录

3.删除zookeeper 下的节点 /kafka 节点node

1.打开zookeeper client

[root@liuyizhao01 zookeeper-3.5.7]# bin/zkCli.sh

2.递归删除节点

[zk: localhost:2181(CONNECTED) 0] deleteall /kafka

三、Kafka-Kraft-集群模式

1.安装及修改配置文件

1.去kafka官网下载linux安装包

tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/root/kafka

2.修改kafka/config/kraft/server.properties配置文件

[root@liuyizhao kraft]# cd kafka/config/kraft

[root@liuyizhao kraft]# vim server.properties

3. 集群其他kafka上需要node.id相应改变,值需要和controller.quorum.voters对应,同时修改kafka所有节点的配置文件,其他一样只修改broker.id=(数字:2,3...)

process.roles=broker, controller  #kafka的角色(controller相当于主机、broker节点相当于从机,主机类似zk功能)

node.id=2 #节点ID

controller.listener.names=CONTROLLER #controller服务协议别名

controller.quorum.voters=2@192.168.xx.xxx:9093,3@192.168.xx.xxx:9093,4@192.168.xx.xxx:9093 #Controller列表

listeners=PLAINTEXT://:9092,CONTROLLER://:9093 #不同服务器绑定的端口

inter.broker.listener.name=PLAINTEXT #broker服务协议别名

advertised.Listeners=PLAINTEXT://192.168.xx.xxx:9092 #broker对外暴露的地址

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT

,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL #协议别名到安全协议的映射

log.dirs=/opt/module/kafka/data  #kafka数据存储目录

5.配置kafka环境变量

在/etc/profile.d/my_env.sh文件中增加kafka环境变量配置,my_env.sh是自己创建的自定义环境变量文件。

[root@liuyizhao /]# vim /etc/profile.d/my_env.sh

增加如下内容:

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$KAFKA_HOME/bin

刷新一下环境变量:

[root@liuyizhao /]# source /etc/profile

6.启动kafka命令

注意:

1.不需要启动 zookeeper

2.如果之前启动过kafka-zookeeper模式node.id相应改变,值需要和controller.quorum.voters对应。需要根据各自的主机名称(所有机器),修改相应的advertised.Listeners地址。

1.初始化集群数据目录

1.首先生成存储目录唯一ID(整个集群唯一)

[root@liuyizhao01 kafka2]# bin/kafka-storage.sh random-uuid

得到数据(每次每个人是不一样的):J7s9e8PPTKOO47PxzI39VA

2.用该ID格式化kafka存储目录(集群所有节点)

[root@liuyizhao01 kafka2]# bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c

[root@liuyizhao02 kafka2]# bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c

[root@liuyizhao03 kafka2]# bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c

2.启动

kafka-server-start.sh 脚本启动,配置文件 server.properties

[root@liuyizhao01 kafka2]# bin/kafka-server-start.sh -daemon config/kraft/server.properties

[root@liuyizhao02 kafka2]# bin/kafka-server-start.sh -daemon config/kraft/server.properties

[root@liuyizhao03 kafka2]# bin/kafka-server-start.sh -daemon config/kraft/server.properties

2.安装后效果展示

四、安装kafkalytic插件

1.File/Settings/Plugins 搜索kafkalytic

 2.kafkalytic介绍

3.kafka tool也不错

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐