八、Linux二进制安装Kafka集群

1 下载

1.1 Zookeeper下载

官网下载:zookeeper官网下载
百度网盘:百度网盘下载
在这里插入图片描述
在这里插入图片描述

1.2 Kafka下载

下载2.x.x系列最新的版本

官网下载:Kafka官网下载
百度网盘:百度网盘下载

在这里插入图片描述

2 单机安装

mkdir /opt/kafka

将下载好的文件放到/opt/kafka下,解压

tar -zxvf kafka_2.13-3.7.1.tgz

在这里插入图片描述
运行zookeeper(单机时,没必要单独安装zookeeper)

cd /opt/kafka/kafka_2.13-3.7.1/bin/

# 前台运行的命令
./zookeeper-server-start.sh ../config/zookeeper.properties

# 或者后台运行
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

修改Kafka配置文件

broker.id=0:加入集群的时候,id要变,不能跟以前的一样,单机无需修改。
listeners=PLAINTEXT://192.168.200.161:9092,这里ip地址要改,改成服务器的内网ip,否则内网其它的服务器找不能访问这个server
zookeeper.connect=192.168.200.161:2181,这里要填写zookeeper的地址和端口。

在这里插入图片描述
在这里插入图片描述
启动Kafka

# 前台运行
./kafka-server-start.sh ../config/server.properties 

#后台运行
./kafka-server-start.sh -daemon ../config/server.properties 

测试

# 查询kafka主题
./kafka-topics.sh --list --bootstrap-server 192.168.200.161:9092

# 创建主题
./kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic tests --bootstrap-server 192.168.200.161:9092

zookeeper注册服务脚本

[Unit]
Description=Zookeeper service
After=network.target

[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/java/jdk1.8.0_371/bin"
User=root
Group=root
ExecStart=/opt/kafka/kafka_2.13-3.7.1/bin/zookeeper-server-start.sh  /opt/kafka/kafka_2.13-3.7.1/config/zookeeper.properties
ExecStop=/opt/kafka/kafka_2.13-3.7.1/bin/zookeeper-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

kafka 注册服务脚本

[Unit]
Description=Apache Kafka server (broker)
After=network.target  zookeeper.service

[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/java/jdk1.8.0_371/bin"
User=root
Group=root
ExecStart=/opt/kafka/kafka_2.13-3.7.1/bin/kafka-server-start.sh /opt/kafka/kafka_2.13-3.7.1/config/server.properties
ExecStop=/opt/kafka/kafka_2.13-3.7.1/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

3 集群安装Kakfa

3.1 安装前准备

将下载的kafka和zookeeper安装包放到 /opt/kafka目录下(三台机器都需要)
在这里插入图片描述
查看 hosts文件,确保集群机器都在(不知如何修改可以看服务器配置修改

vi /etc/hosts

在这里插入图片描述

3.2 安装zookeeper集群

解压

cd /opt/kafka/
tar -zxvf apache-zookeeper-3.9.2-bin.tar.gz

修改文件目录

mv apache-zookeeper-3.9.2-bin zookeeper-3.9.2

在这里插入图片描述
创建数据存放目录

mkdir /opt/kafka/zookeeper-3.9.2/data

修改配置文件

cd /opt/kafka/zookeeper-3.9.2/conf/
cp zoo_sample.cfg zoo.cfg 
vi zoo.cfg

直接替换配置文件

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=30000
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=10
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/kafka/zookeeper-3.9.2/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=2000
maxSessionTimeout=60000000
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=10
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1

snapCount=3000000

preAllocSize=131072

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true


server.1=192.168.200.161:2888:3888
server.2=192.168.200.162:2888:3888
server.3=192.168.200.163:2888:3888

配置文件解析

# zookeeper 数据目录
dataDir=/opt/zookeeper-3.6.3/data
# 3台服务器的配置
# 这里可以使用IP,也可以使用主机名,但使用主机名时要配置好hosts文件。
server.1=ta01:2888:3888:
server.2=ta02:2888:3888
server.3=ta03:2888:3888

#配置snapshot文件清理策略
# 开启清理事务日志和快照文件的功能,单位是小时。默认是0,表示不开启自动清理功能。
autopurge.purgeInterval=1

# 指定了需要保留的文件数目。默认是保留3个
autopurge.snapRetainCount=10

限制snapshot数量

# 每snapCount次事务日志输出后,触发一次快照(snapshot)ZooKeeper会生成一个snapshot文件和事务日志文件。 默认是100000。
snapCount=3000000

tickTime=2000
ZK中的一个时间单元。ZK中所有时间都是以这个时间单元为基础,进行整数倍配置的。例如,session的最小超时时间是2*tickTime。 默认值2000,单位毫秒。

initLimit=10
Follower在启动过程中,会从Leader同步所有最新数据,然后确定自己能够对外服务的起始状态。Leader允许F在 initLimit 时间内完成这个工作。通常情况下,我们不用太在意这个参数的设置。如果ZK集群的数据量确实很大了,F在启动的时候,从Leader上同步数据的时间也会相应变长,因此在这种情况下,有必要适当调大这个参数了。
initLimit=30000

syncLimit=10
在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。如果L发出心跳包在syncLimit之后,还没有从F那里收到响应,那么就认为这个F已经不在线了。注意:不要把这个参数设置得过大,否则可能会掩盖一些问题。 建议设置为10。

maxClientCnxns :
单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制。请注意这个限制的使用范围,仅仅是单台客户端机器与单台ZK服务器之间的连接数限制,不是针对指定客户端IP,也不是ZK集群的连接数限制,也不是单台ZK对所有客户端的连接数限制。
建议设置为2000

maxSessionTimeout=60000000
Session超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置为最大或最小时间。默认的Session超时时间是在2 * tickTime ~ 20 * tickTime 这个范围

preAllocSize=131072
预先开辟磁盘空间,用于后续写入事务日志。默认是64M,每个事务日志大小就是64M。如果ZK的快照频率较大的话,建议适当减小这个参数。单位kb。


service.N =YYY:A:B
N:代表服务器编号(也就是myid里面的值)
YYY:服务器地址
A:表示 FlowerLeader的通信端口,简称服务端内部通信的端口(默认2888B:表示 是选举端口(默认是3888

创建myid文件
注意:echo 的值是需要和上面的 server.1=node161:2888:3888 对应的。比如我的 node161 这台服务器就是 1 node162 这台服务器就是 2

echo 1 > /opt/kafka/zookeeper-3.9.2/data/myid
echo 2 > /opt/kafka/zookeeper-3.9.2/data/myid
echo 3 > /opt/kafka/zookeeper-3.9.2/data/myid

配置环境变量

vi /etc/profile
export ZOOKEEPER_HOME=/opt/kafka/zookeeper-3.9.2
export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile

启动zookeeper

cd /opt/kafka/zookeeper-3.9.2/bin/

启动

./zkServer.sh start

停止

./zkServer.sh stop

查看状态

./zkServer.sh status

注意:启动会稍微慢一些,启动后稍等一下再去查询状态
在这里插入图片描述

3.3 安装Kafka集群

解压文件

cd /opt/kafka/ 
tar -zxvf kafka_2.13-3.7.1.tgz

在这里插入图片描述
创建数据存放目录

mkdir /opt/kafka/kafka_2.13-3.7.1/data

修改kafka配置文件

vi /opt/kafka/kafka_2.13-3.7.1/config/server.properties

修改项(只需要修改这些就可以了)

# 这个就和zookeeper对应就行,不要重复
broker.id=1

# 当前机器IP
listeners=PLAINTEXT://192.168.200.161:9092

# 当前机器IP
advertised.listeners=PLAINTEXT://192.168.200.161:9092

log.dirs=/opt/kafka/kafka_2.13-3.7.1/data

# zookeeper 集群 
zookeeper.connect=192.168.200.161:2181,192.168.200.162:2181,192.168.200.163:2181

配置文件详细解析(按照需要修改)

# broker 的全局唯一编号,不能重复,每台机器递增设置
broker.id=1
#服务器监听地址
listeners=SASL_PLAINTEXT://192.168.19.128:9092,client2://192.168.19.128:9093
#外部访问监听地址,根据服务器监听地址映射端口
advertised.listeners=SASL_PLAINTEXT://192.168.19.128:9092,client2://192.168.19.128:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,client2:SASL_PLAINTEXT,client3:SASL_PLAINTEXT

# 使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT 

#SASL机制 
sasl.enabled.mechanisms=PLAIN 
sasl.mechanism.inter.broker.protocol=PLAIN 

# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。 
#allow.everyone.if.no.acl.found=true 
super.users=User:admin

# 处理网络请求的线程数量,默认
num.network.threads=3

# 用来处理磁盘IO的线程数量,默认
num.io.threads=8

# 发送,接受, 请求套接字的缓冲区大小,默认
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# kafka 运行日志存放路径
log.dirs=/storage/kafka/logs/kafka
# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=1
# 用来恢复和清理data下数据的线程数量,默认
num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# segment文件保留的最长时间,超时将被删除,默认
log.retention.hours=168
# 滚动生成新的segment文件的最大时间,默认
log.roll.hours=168

log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#服务zookeeper连接地址
zookeeper.connect=192.168.19.128:2181
zookeeper.connection.timeout.ms=6000
#group.initial.rebalance.delay.ms=0

启动Kafka

cd /opt/kafka/kafka_2.13-3.7.1/bin/
./kafka-server-start.sh  ../config/server.properties

验证
在其他两台机器上查看主题

./kafka-topics.sh --list --bootstrap-server 192.168.200.162:9092

在一台机器上创建主题

./kafka-topics.sh --create --replication-factor 3 --partitions 1 --topic asda --bootstrap-server 192.168.200.161:9092

生产者

./kafka-console-producer.sh  --broker-list 192.168.200.161:9092 --topic asda

消费者

./kafka-console-consumer.sh --bootstrap-server 192.168.200.162:9092 --from-beginning --topic asda

在这里插入图片描述

3.4 设置开机自启

zookeeper注册服务脚本

vi /etc/systemd/system/zookeeper.service
[Unit]
Description=zookeeper service
After=network.target
#ConditionPathExists=/opt/kafka/zookeeper-3.9.2/conf/zoo.cfg

[Service]
Type=forking
User=root
Group=root
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/java/jdk1.8.0_371/bin"
ExecStart=/opt/kafka/zookeeper-3.9.2/bin/zkServer.sh start
ExecStop=/opt/kafka/zookeeper-3.9.2/bin/zkServer.sh stop
ExecReload=/opt/kafka/zookeeper-3.9.2/bin/zkServer.sh restart
Restart=always
RestartSec=10
StartLimitInterval=5

[Install]
WantedBy=multi-user.target

kafka 注册服务脚本

vi /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service

[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/java/jdk1.8.0_371/bin"
User=root
Group=root
ExecStart=/opt/kafka/kafka_2.13-3.7.1/bin/kafka-server-start.sh /opt/kafka/kafka_2.13-3.7.1/config/server.properties
ExecStop=/opt/kafka/kafka_2.13-3.7.1/bin/kafka-server-stop.sh
Restart=always
RestartSet=10
StartLimitInterval=5

[Install]
WantedBy=multi-user.target
systemctl daemon-reload
systemctl start zookeeper
systemctl start kafka

3.5 kafka配置文件(后续新增)

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

############################# Server Basics #############################

# 唯一的 broker ID,每个节点必须不同
broker.id=1

############################# Socket Server Settings #############################

# 监听的地址和端口
listeners=PLAINTEXT://172.16.55.72:9092
advertised.listeners=PLAINTEXT://172.16.55.72:9092

# 网络和 I/O 线程数(根据 CPU 核心数调整)
num.network.threads=6
num.io.threads=16

# 增大发送和接收缓冲区大小(单位:字节)
socket.send.buffer.bytes=512000
socket.receive.buffer.bytes=512000
socket.request.max.bytes=209715200

############################# Log Basics #############################

# 日志存储目录
log.dirs=/home/kafka/kafka_2.13-3.7.1/data

# 分区数量(可在 topic 创建时调整)
num.partitions=3

# 恢复线程数量(每个数据目录 4 个线程)
num.recovery.threads.per.data.dir=4

############################# Internal Topic Settings #############################

# 内部 topic 的复制因子(高可用性设置为 3)
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

############################# Log Flush Policy #############################

# 消息写入日志后强制刷新到磁盘的最大间隔
log.flush.interval.messages=100000
log.flush.interval.ms=5000

############################# Log Retention Policy #############################

# 日志文件保留时间(单位:小时)
log.retention.hours=48

# 日志文件总大小限制(单位:字节)
log.retention.bytes=10737418240

# 日志段文件最大大小(单位:字节)
log.segment.bytes=1073741824

# 日志清理检查的时间间隔(单位:毫秒)
log.retention.check.interval.ms=600000

############################# Zookeeper #############################

# Zookeeper 集群地址(以逗号分隔)
zookeeper.connect=172.16.55.72:2181,172.16.55.73:2181,172.16.55.74:2181

# Zookeeper 连接超时时间(单位:毫秒)
zookeeper.connection.timeout.ms=30000

############################# Group Coordinator Settings #############################

# 初始化消费者组重新平衡的延迟时间(单位:毫秒)
group.initial.rebalance.delay.ms=0

############################# Memory and Performance #############################

# 日志刷新调度器的时间间隔(单位:毫秒)
log.flush.scheduler.interval.ms=2000

# 日志索引条目的间隔(单位:字节)
log.index.interval.bytes=4096

# 延迟日志段删除的时间(单位:毫秒)
log.segment.delete.delay.ms=60000

############################# Replica Management #############################

# 副本同步时的缓冲区大小(单位:字节)
replica.fetch.max.bytes=209715200

# 副本同步间隔(单位:毫秒)
replica.fetch.wait.max.ms=500

# 最小同步副本数量(至少保证 2 个副本同步)
min.insync.replicas=2

############################# Quotas and Limits #############################

# 每个生产者的限速(单位:字节/秒)
quota.producer.default=104857600

# 每个消费者的限速(单位:字节/秒)
quota.consumer.default=104857600

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐