八、Linux二进制安装Kafka集群
Linux二进制安装Kafka集群 部署开机自启
·
目录
八、Linux二进制安装Kafka集群
1 下载
1.1 Zookeeper下载
官网下载:zookeeper官网下载
百度网盘:百度网盘下载
1.2 Kafka下载
下载2.x.x系列最新的版本
2 单机安装
mkdir /opt/kafka
将下载好的文件放到/opt/kafka下,解压
tar -zxvf kafka_2.13-3.7.1.tgz
运行zookeeper(单机时,没必要单独安装zookeeper)
cd /opt/kafka/kafka_2.13-3.7.1/bin/
# 前台运行的命令
./zookeeper-server-start.sh ../config/zookeeper.properties
# 或者后台运行
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
修改Kafka配置文件
broker.id=0:加入集群的时候,id要变,不能跟以前的一样,单机无需修改。
listeners=PLAINTEXT://192.168.200.161:9092,这里ip地址要改,改成服务器的内网ip,否则内网其它的服务器找不能访问这个server
zookeeper.connect=192.168.200.161:2181,这里要填写zookeeper的地址和端口。
启动Kafka
# 前台运行
./kafka-server-start.sh ../config/server.properties
#后台运行
./kafka-server-start.sh -daemon ../config/server.properties
测试
# 查询kafka主题
./kafka-topics.sh --list --bootstrap-server 192.168.200.161:9092
# 创建主题
./kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic tests --bootstrap-server 192.168.200.161:9092
zookeeper注册服务脚本
[Unit]
Description=Zookeeper service
After=network.target
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/java/jdk1.8.0_371/bin"
User=root
Group=root
ExecStart=/opt/kafka/kafka_2.13-3.7.1/bin/zookeeper-server-start.sh /opt/kafka/kafka_2.13-3.7.1/config/zookeeper.properties
ExecStop=/opt/kafka/kafka_2.13-3.7.1/bin/zookeeper-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
kafka 注册服务脚本
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/java/jdk1.8.0_371/bin"
User=root
Group=root
ExecStart=/opt/kafka/kafka_2.13-3.7.1/bin/kafka-server-start.sh /opt/kafka/kafka_2.13-3.7.1/config/server.properties
ExecStop=/opt/kafka/kafka_2.13-3.7.1/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
3 集群安装Kakfa
3.1 安装前准备
将下载的kafka和zookeeper安装包放到 /opt/kafka目录下(三台机器都需要)
查看 hosts文件,确保集群机器都在(不知如何修改可以看服务器配置修改)
vi /etc/hosts
3.2 安装zookeeper集群
解压
cd /opt/kafka/
tar -zxvf apache-zookeeper-3.9.2-bin.tar.gz
修改文件目录
mv apache-zookeeper-3.9.2-bin zookeeper-3.9.2
创建数据存放目录
mkdir /opt/kafka/zookeeper-3.9.2/data
修改配置文件
cd /opt/kafka/zookeeper-3.9.2/conf/
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
直接替换配置文件
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=30000
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=10
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/kafka/zookeeper-3.9.2/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=2000
maxSessionTimeout=60000000
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=10
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
snapCount=3000000
preAllocSize=131072
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=192.168.200.161:2888:3888
server.2=192.168.200.162:2888:3888
server.3=192.168.200.163:2888:3888
配置文件解析
# zookeeper 数据目录
dataDir=/opt/zookeeper-3.6.3/data
# 3台服务器的配置
# 这里可以使用IP,也可以使用主机名,但使用主机名时要配置好hosts文件。
server.1=ta01:2888:3888:
server.2=ta02:2888:3888
server.3=ta03:2888:3888
#配置snapshot文件清理策略
# 开启清理事务日志和快照文件的功能,单位是小时。默认是0,表示不开启自动清理功能。
autopurge.purgeInterval=1
# 指定了需要保留的文件数目。默认是保留3个
autopurge.snapRetainCount=10
限制snapshot数量
# 每snapCount次事务日志输出后,触发一次快照(snapshot)。 ZooKeeper会生成一个snapshot文件和事务日志文件。 默认是100000。
snapCount=3000000
tickTime=2000
ZK中的一个时间单元。ZK中所有时间都是以这个时间单元为基础,进行整数倍配置的。例如,session的最小超时时间是2*tickTime。 默认值2000,单位毫秒。
initLimit=10
Follower在启动过程中,会从Leader同步所有最新数据,然后确定自己能够对外服务的起始状态。Leader允许F在 initLimit 时间内完成这个工作。通常情况下,我们不用太在意这个参数的设置。如果ZK集群的数据量确实很大了,F在启动的时候,从Leader上同步数据的时间也会相应变长,因此在这种情况下,有必要适当调大这个参数了。
initLimit=30000
syncLimit=10
在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。如果L发出心跳包在syncLimit之后,还没有从F那里收到响应,那么就认为这个F已经不在线了。注意:不要把这个参数设置得过大,否则可能会掩盖一些问题。 建议设置为10。
maxClientCnxns :
单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制。请注意这个限制的使用范围,仅仅是单台客户端机器与单台ZK服务器之间的连接数限制,不是针对指定客户端IP,也不是ZK集群的连接数限制,也不是单台ZK对所有客户端的连接数限制。
建议设置为2000
maxSessionTimeout=60000000
Session超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置为最大或最小时间。默认的Session超时时间是在2 * tickTime ~ 20 * tickTime 这个范围
preAllocSize=131072
预先开辟磁盘空间,用于后续写入事务日志。默认是64M,每个事务日志大小就是64M。如果ZK的快照频率较大的话,建议适当减小这个参数。单位kb。
service.N =YYY:A:B
N:代表服务器编号(也就是myid里面的值)
YYY:服务器地址
A:表示 Flower 跟 Leader的通信端口,简称服务端内部通信的端口(默认2888)
B:表示 是选举端口(默认是3888)
创建myid文件
注意:echo 的值是需要和上面的 server.1=node161:2888:3888 对应的。比如我的 node161 这台服务器就是 1 node162 这台服务器就是 2
echo 1 > /opt/kafka/zookeeper-3.9.2/data/myid
echo 2 > /opt/kafka/zookeeper-3.9.2/data/myid
echo 3 > /opt/kafka/zookeeper-3.9.2/data/myid
配置环境变量
vi /etc/profile
export ZOOKEEPER_HOME=/opt/kafka/zookeeper-3.9.2
export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile
启动zookeeper
cd /opt/kafka/zookeeper-3.9.2/bin/
启动
./zkServer.sh start
停止
./zkServer.sh stop
查看状态
./zkServer.sh status
注意:启动会稍微慢一些,启动后稍等一下再去查询状态
3.3 安装Kafka集群
解压文件
cd /opt/kafka/
tar -zxvf kafka_2.13-3.7.1.tgz
创建数据存放目录
mkdir /opt/kafka/kafka_2.13-3.7.1/data
修改kafka配置文件
vi /opt/kafka/kafka_2.13-3.7.1/config/server.properties
修改项(只需要修改这些就可以了)
# 这个就和zookeeper对应就行,不要重复
broker.id=1
# 当前机器IP
listeners=PLAINTEXT://192.168.200.161:9092
# 当前机器IP
advertised.listeners=PLAINTEXT://192.168.200.161:9092
log.dirs=/opt/kafka/kafka_2.13-3.7.1/data
# zookeeper 集群
zookeeper.connect=192.168.200.161:2181,192.168.200.162:2181,192.168.200.163:2181
配置文件详细解析(按照需要修改)
# broker 的全局唯一编号,不能重复,每台机器递增设置
broker.id=1
#服务器监听地址
listeners=SASL_PLAINTEXT://192.168.19.128:9092,client2://192.168.19.128:9093
#外部访问监听地址,根据服务器监听地址映射端口
advertised.listeners=SASL_PLAINTEXT://192.168.19.128:9092,client2://192.168.19.128:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,client2:SASL_PLAINTEXT,client3:SASL_PLAINTEXT
# 使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
#allow.everyone.if.no.acl.found=true
super.users=User:admin
# 处理网络请求的线程数量,默认
num.network.threads=3
# 用来处理磁盘IO的线程数量,默认
num.io.threads=8
# 发送,接受, 请求套接字的缓冲区大小,默认
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# kafka 运行日志存放路径
log.dirs=/storage/kafka/logs/kafka
# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=1
# 用来恢复和清理data下数据的线程数量,默认
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# segment文件保留的最长时间,超时将被删除,默认
log.retention.hours=168
# 滚动生成新的segment文件的最大时间,默认
log.roll.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#服务zookeeper连接地址
zookeeper.connect=192.168.19.128:2181
zookeeper.connection.timeout.ms=6000
#group.initial.rebalance.delay.ms=0
启动Kafka
cd /opt/kafka/kafka_2.13-3.7.1/bin/
./kafka-server-start.sh ../config/server.properties
验证
在其他两台机器上查看主题
./kafka-topics.sh --list --bootstrap-server 192.168.200.162:9092
在一台机器上创建主题
./kafka-topics.sh --create --replication-factor 3 --partitions 1 --topic asda --bootstrap-server 192.168.200.161:9092
生产者
./kafka-console-producer.sh --broker-list 192.168.200.161:9092 --topic asda
消费者
./kafka-console-consumer.sh --bootstrap-server 192.168.200.162:9092 --from-beginning --topic asda
3.4 设置开机自启
zookeeper注册服务脚本
vi /etc/systemd/system/zookeeper.service
[Unit]
Description=zookeeper service
After=network.target
#ConditionPathExists=/opt/kafka/zookeeper-3.9.2/conf/zoo.cfg
[Service]
Type=forking
User=root
Group=root
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/java/jdk1.8.0_371/bin"
ExecStart=/opt/kafka/zookeeper-3.9.2/bin/zkServer.sh start
ExecStop=/opt/kafka/zookeeper-3.9.2/bin/zkServer.sh stop
ExecReload=/opt/kafka/zookeeper-3.9.2/bin/zkServer.sh restart
Restart=always
RestartSec=10
StartLimitInterval=5
[Install]
WantedBy=multi-user.target
kafka 注册服务脚本
vi /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/java/jdk1.8.0_371/bin"
User=root
Group=root
ExecStart=/opt/kafka/kafka_2.13-3.7.1/bin/kafka-server-start.sh /opt/kafka/kafka_2.13-3.7.1/config/server.properties
ExecStop=/opt/kafka/kafka_2.13-3.7.1/bin/kafka-server-stop.sh
Restart=always
RestartSet=10
StartLimitInterval=5
[Install]
WantedBy=multi-user.target
systemctl daemon-reload
systemctl start zookeeper
systemctl start kafka
3.5 kafka配置文件(后续新增)
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
############################# Server Basics #############################
# 唯一的 broker ID,每个节点必须不同
broker.id=1
############################# Socket Server Settings #############################
# 监听的地址和端口
listeners=PLAINTEXT://172.16.55.72:9092
advertised.listeners=PLAINTEXT://172.16.55.72:9092
# 网络和 I/O 线程数(根据 CPU 核心数调整)
num.network.threads=6
num.io.threads=16
# 增大发送和接收缓冲区大小(单位:字节)
socket.send.buffer.bytes=512000
socket.receive.buffer.bytes=512000
socket.request.max.bytes=209715200
############################# Log Basics #############################
# 日志存储目录
log.dirs=/home/kafka/kafka_2.13-3.7.1/data
# 分区数量(可在 topic 创建时调整)
num.partitions=3
# 恢复线程数量(每个数据目录 4 个线程)
num.recovery.threads.per.data.dir=4
############################# Internal Topic Settings #############################
# 内部 topic 的复制因子(高可用性设置为 3)
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
############################# Log Flush Policy #############################
# 消息写入日志后强制刷新到磁盘的最大间隔
log.flush.interval.messages=100000
log.flush.interval.ms=5000
############################# Log Retention Policy #############################
# 日志文件保留时间(单位:小时)
log.retention.hours=48
# 日志文件总大小限制(单位:字节)
log.retention.bytes=10737418240
# 日志段文件最大大小(单位:字节)
log.segment.bytes=1073741824
# 日志清理检查的时间间隔(单位:毫秒)
log.retention.check.interval.ms=600000
############################# Zookeeper #############################
# Zookeeper 集群地址(以逗号分隔)
zookeeper.connect=172.16.55.72:2181,172.16.55.73:2181,172.16.55.74:2181
# Zookeeper 连接超时时间(单位:毫秒)
zookeeper.connection.timeout.ms=30000
############################# Group Coordinator Settings #############################
# 初始化消费者组重新平衡的延迟时间(单位:毫秒)
group.initial.rebalance.delay.ms=0
############################# Memory and Performance #############################
# 日志刷新调度器的时间间隔(单位:毫秒)
log.flush.scheduler.interval.ms=2000
# 日志索引条目的间隔(单位:字节)
log.index.interval.bytes=4096
# 延迟日志段删除的时间(单位:毫秒)
log.segment.delete.delay.ms=60000
############################# Replica Management #############################
# 副本同步时的缓冲区大小(单位:字节)
replica.fetch.max.bytes=209715200
# 副本同步间隔(单位:毫秒)
replica.fetch.wait.max.ms=500
# 最小同步副本数量(至少保证 2 个副本同步)
min.insync.replicas=2
############################# Quotas and Limits #############################
# 每个生产者的限速(单位:字节/秒)
quota.producer.default=104857600
# 每个消费者的限速(单位:字节/秒)
quota.consumer.default=104857600
更多推荐
已为社区贡献2条内容
所有评论(0)