前言

首先确定kafka与zookeeper集群的调优方向

调大zookeeper的heap内存  

调大zookeeper的heap内存,默认是1G,可以根据服务器大小配置其堆内存为2G或者4G,kafka实时传输的数据如果达到PB级别的话,得观察一下YGC和FGC的值可以适当再次调大。

修改kafka的副本数

修改kafka的副本数,默认的副本数是1,建议修改为2,如果副本数为2,那么容灾能力就是1,如果副本数3,则容灾能力就是2,当然副本数越多,可能会导致集群的性能下降,但是可靠性更强,各有利弊,推荐副本数为2。

kafka推荐分区数

kafka推荐分区数,默认的分区数是1,理论上来说,parition的数量小于core的数量的话,值越大,kafka的吞吐量就越高,但是你必须得考虑你的磁盘IO的瓶颈,因此我不推荐你将分区数这只过大,建议这个值大于broker的数量比如集群broker的只有5台,集群的partition数量是20。

kafka的heap内存

kafka的heap内存,默认也是1G,生成环境中建议将它调大,不知道大家有没有发现,你broker的heap内存不管有多的,它都能给你吃满!生成环境中给kafka的heap内存是6G,kafka主要使用堆外内存,即大量使用操作系统的页缓存,因此其并不需要分配太多的堆内存空间,zookeeper给的是2G,剩下的全部给操作系统预留着,否则你的机器会非常的卡顿。

kafka配置文件调优

参见kafka的配置分为 broker、producter、consumer三个不同的配置详解

部署zookeeper集群

下载zookeeper-3.4.13.tar.gz

切记,不要弄太高版本,否则zookeeper-3.5.*安装报错:找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain

 [root@z_k zookeeper]# wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

解压zookeeper

[root@z_k zookeeper]# tar -zxf zookeeper-3.4.13.tar.gz -C /soft/

创建软连接

[root@z_k ~]# cp -r /soft/zookeeper-3.4.13/ /soft/zk

配置zookeeper的堆内存配置文件

[root@z_k ~]# cat /soft/zk/conf/java.env 
#!/bin/bash

#指定JDK的安装路径
export JAVA_HOME=/soft/jdk

#指定zookeeper的heap内存大小
export JVMFLAGS="-Xms2048m -Xmx2048m $JVMFLAGS"

修改zookeeper的配置文件zoo.cfg

需要手动创建,或者从“ /soft/zk/conf/zoo_sample.cfg ”赋值一个模板即可

[root@z_k ~]# vim /soft/zk/conf/zoo.cfg 
# 滴答,计时的基本单位,默认是2000毫秒,即2秒。它是zookeeper最小的时间单位,
# 用于丈量心跳时间和超时时间等,通常设置成默认2秒即可。
tickTime=2000
# 初始化限制是10滴答,默认是10个滴答,即默认是20秒。
# 指定follower节点初始化是链接leader节点的最大tick次数。
initLimit=5
# 数据同步的时间限制,默认是5个滴答,即默认时间是10秒。设定了follower节点与leader节点进行同步的最大时间。
# 与initLimit类似,它也是以tickTime为单位进行指定的。
syncLimit=2
# 指定zookeeper的工作目录,这是一个非常重要的参数,zookeeper会在内存中在内存只能中保存系统快照,
# 并定期写入该路径指定的文件夹中。生产环境中需要注意该文件夹的磁盘占用情况。
dataDir=/home/z_k/zookeeper
# 监听zookeeper的默认端口。zookeeper监听客户端链接的端口,一般设置成默认2181即可。
clientPort=2181
# 这个操作将限制连接到 ZooKeeper 的客户端的数量,限制并发连接的数量,它通过 IP 来区分不同的客户端。
# 此配置选项可以用来阻止某些类别的 Dos 攻击。将它设置为 0 或者忽略而不进行设置将会取消对并发连接的限制。
#maxClientCnxns=60
 
# 在上文中已经提到,3.4.0及之后版本,ZK提供了自动清理事务日志和快照文件的功能,这个参数指定了清理频率,单位是小时,
# 需要配置一个1或更大的整数,默认是0,表示不开启自动清理功能。
#autopurge.purgeInterval=1
# 这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。
#autopurge.snapRetainCount=3
#server.x=[hostname]:nnnnn[:nnnnn],这里的x是一个数字,与myid文件中的id是一致的。
# 右边可以配置两个端口,第一个端口用于F和L之间的数据同步和其它通信,第二个端口用于Leader选举过程中投票通信。  
server.117=10.1.3.117:2888:3888
server.118=10.1.3.118:2888:3888
server.119=10.1.3.119:2888:3888
[root@z_k ~]# 

编写zookeeper的启动脚本

[root@z_k ~]# vim /usr/local/bin/xzk.sh 
#判断用户是否传参
if [ $# -ne 1 ];then
    echo "无效参数,用法为: $0  {start|stop|restart|status}"
    exit
fi
#获取用户输入的命令
cmd=$1
#定义函数功能
function zookeeperManger(){
    case $cmd in
    start)
        echo "启动服务"        
        remoteExecution start
        ;;
    stop)
        echo "停止服务"
        remoteExecution stop
        ;;
    restart)
        echo "重启服务"
        remoteExecution restart
        ;;
    status)
        echo "查看状态"
        remoteExecution status
        ;;
    *)
        echo "无效参数,用法为: $0  {start|stop|restart|status}"
        ;;
    esac
}
#定义执行的命令
function remoteExecution(){
    for (( i=117 ; i<=119 ; i++ )) ; do
            tput setaf 2
            echo ========== kafka${i} zkServer.sh  $1 ================
            tput setaf 9
            ssh kafka${i}  "source /etc/profile ; zkServer.sh $1"
    done
}
#调用函数
zookeeperManger

赋予执行权限

[root@z_k ~]#
[root@z_k ~]# chmod +x /usr/local/bin/xzk.sh 
[root@z_k ~]# 
[root@z_k ~]# ll /usr/local/bin/xzk.sh 
-rwxr-xr-x 1 root root 1101 Nov  7 10:53 /usr/local/bin/xzk.sh
[root@z_k ~]# 

同步系统配置文件xrsync.sh

[root@z_k ~]# vim /usr/local/bin/xrsync.sh 
#判断用户是否传参
if [ $# -lt 1 ];then
    echo "请输入参数";
    exit
fi
#获取文件路径
file=$@
#获取子路径
filename=`basename $file`
#获取父路径
dirpath=`dirname $file`
#获取完整路径
cd $dirpath
fullpath=`pwd -P`
#同步文件到Kafka集群
for (( i=117;i<=119;i++ ))
do
    #使终端变绿色
    tput setaf 2
    echo =========== kafka${i} : $file ===========
    #使终端变回原来的颜色,即白灰色
    tput setaf 7
    #远程执行命令
    rsync -lr $filename `whoami`@kafka${i}:$fullpath
    #判断命令是否执行成功
    if [ $? == 0 ];then
        echo "命令执行成功"
    fi
done
[root@z_k ~]# chmod +x /usr/local/bin/xrsync.sh 

加入zookeeper的安装及kafka的安装目录到/etc/profile


[root@z_k ~]# tail -5 /etc/profile
#ADD Zookeeper PATH BY z_k
export ZOOKEEPER=/soft/zk
export PATH=$PATH:$ZOOKEEPER/bin
export KAFKA_HOME=/soft/kafka
export PATH=$PATH:$KAFKA_HOME/bin

执行同步脚本

[root@z_k ~]# 
[root@z_k ~]# xrsync.sh /etc/profile
=========== z_k : /etc/profile ===========
命令执行成功
=========== kafka117 : /etc/profile ===========
命令执行成功
=========== kafka118 : /etc/profile ===========
命令执行成功
=========== kafka119 : /etc/profile ===========
命令执行成功

[root@z_k ~]# 

将上述解压的配置文件使用xrsync.sh同步到其它节点

 同步zookeeper安装文件,到各个节点,接下来需要在“/home/z_k/zookeeper/”目录中创建一个myid,并写入配置文件。也可以使用一个shell循环搞定,仅供参考。

for (( i=117;i<=119;i++ )) do ssh kafka${i} "mkdir -p /soft/kafka/config" ;done
for (( i=117;i<=119;i++ )) do ssh kafka${i} "mkdir -p /home/data_zk/kafka/logs" ;done
for (( i=117;i<=119;i++ )) do ssh kafka${i} "mkdir -p /home/data_zk/kafka/logs2" ;done
for (( i=117;i<=119;i++ )) do ssh kafka${i} "mkdir -p /home/data_zk/kafka/logs3" ;done
xrsync.sh /soft/zk
for (( i=117;i<=119;i++ )) do ssh kafka${i} "echo -n $i > /home/z_k/zookeeper/myid" ;done

启动zookeeper并查看状态

[root@z_k ~]# xzk.sh status        
#这是查看zookeeper的状态,如果是启动,或停止zookeeper,
#直接调用start或者stop方法即可
查看状态
========== kafka117 zkServer.sh status ================
ZooKeeper JMX enabled by default
Using config: /soft/zk/bin/../conf/zoo.cfg
Mode: follower
========== kafka118 zkServer.sh status ================
ZooKeeper JMX enabled by default
Using config: /soft/zk/bin/../conf/zoo.cfg
Mode: leader        #很显然,该节点为zookeeper节点。
========== kafka119 zkServer.sh status ================
ZooKeeper JMX enabled by default
Using config: /soft/zk/bin/../conf/zoo.cfg
Mode: follower
[root@z_k ~]# 

部署kafka集群

官网下载kafka(kafka_2.11-0.10.2.1.tgz)

 [root@z_k ~]# wget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz

解压kafka

[root@z_k ~]# tar -zxf kafka_2.11-0.10.2.1.tgz -C /soft/

创建软连接

[root@z_k ~]# cp -r /soft/kafka_2.11-0.10.2.1/ /soft/kafka

修改kafka的配置文件(server.properties)

[root@z_k ~]# cat /soft/kafka/config/server.properties | grep -v ^# | grep -v ^$
broker.id=117
delete.topic.enable=true
auto.create.topics.enable=false
port=9092
host.name=10.1.3.117
num.network.threads=30
num.io.threads=30
socket.send.buffer.bytes=5242880
socket.receive.buffer.bytes=5242880
socket.request.max.bytes=104857600
queued.max.requests=1000
log.dirs=/home/z_k/kafka/logs,/home/z_k/kafka/logs2,/home/z_k/kafka/log3
num.partitions=20
num.recovery.threads.per.data.dir=1
default.replication.factor=2
message.max.bytes=104857600
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=600000
zookeeper.connect=10.1.3.117:2181,10.1.3.118:2181,10.1.3.119:2181
zookeeper.session.timeout.ms=180000
zookeeper.connection.timeout.ms=6000
max.request.size=104857600
fetch.message.max.bytes=104857600
replica.fetch.max.bytes=104857600
replica.fetch.wait.max.ms=2000
unclean.leader.election=false
num.replica.fetchers=5
[root@z_k ~]# 

 [root@z_k ~]# cat /soft/kafka/config/server.properties                                               #上述配置文件详解

修改kafka的启动脚本

主要按需求,调堆大内存。

[root@z_k ~]# cat /soft/kafka/bin/kafka-server-start.sh 
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
    echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
    exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi


if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    #在这里指定堆内存为20G
    export KAFKA_HEAP_OPTS="-Xmx20G -Xms20G"     
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac


exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

编写kafka启动脚本

[root@z_k ~]# cat /usr/local/bin/xkafka.sh 
#!/bin/bash

#判断用户是否传参
if [ $# -ne 1 ];then
    echo "无效参数,用法为: $0  {start|stop}"
    exit
fi

#获取用户输入的命令
cmd=$1

for (( i=117 ; i<=119 ; i++ )) ; do
    tput setaf 2
    echo ========== kafka${i}  $cmd ================
    tput setaf 9
    case $cmd in
        start)
        ssh  kafka${i} "source /etc/profile ; nohup kafka-server-start.sh /soft/kafka/config/server.properties >> /home/z_k/kafka/console/kafka-`date +%F`.log &" 
            #ssh  kafka${i}  "source /etc/profile ; kafka-server-start.sh -daemon /soft/kafka/config/server.properties"
            echo  kafka${i}  "服务已启动"
            ;;
        stop) 
            ssh kafka${i}  "source /etc/profile ; kafka-server-stop.sh" 
            echo kafka${i}  "服务已停止"
            ;;
            *) 
            echo "无效参数,用法为: $0  {start|stop}"
            exit 
            ;;
     esac
done
[root@z_k ~]# chmod +x /usr/local/bin/xkafka.sh 

同步kafka安装文件到各个节点

xrsync.sh /soft/kafka
for (( i=117;i<=119;i++ )) do ssh kafka${i} "sed -i 's#broker.id=117#broker.id=${i}#g' /soft/kafka/config/server.properties ";done
for (( i=117;i<=119;i++ )) do ssh kafka${i} "sed -i 's#host.name=10.1.3.117#host.name=10.1.3.${i}#g' /soft/kafka/config/server.properties ";done

查看各个服务器的启动进程

编写批量执行的脚本,需要你手动配置SSH免密要登录。

[root@z_k ~]# cat /usr/local/bin/xcall.sh  
#!/bin/bash
#判断用户是否传参
if [ $# -lt 1 ];then
        echo "请输入参数"
        exit
fi

#获取用户输入的命令
cmd=$@

#Kafka集群批量执行命令
for (( i=117;i<=119;i++ ))
do
    #使终端变绿色
    tput setaf 2
    echo =========== kafka${i} : $cmd ===========
    #使终端变回原来的颜色,即白灰色
    tput setaf 7
    #远程执行命令
    ssh kafka${i} $cmd
    #判断命令是否执行成功
    if [ $? == 0 ];then
        echo "命令执行成功"
    fi
done
[root@z_k ~]# chmod +x /usr/local/bin/xkafka.sh 

 查看kafka及zookeeper集群的是否正常

[root@z_k ~]# xcall.sh jps

命令执行成功
=========== kafka117 : jps ===========
953 Jps
8236 Kafka
4735 QuorumPeerMain
命令执行成功
=========== kafka118 : jps ===========
4616 QuorumPeerMain
2425 Jps
8382 Kafka
命令执行成功
=========== kafka119 : jps ===========
23953 Jps
4763 QuorumPeerMain
8079 Kafka

搭建KafkaEagle

搭建KafkaEagle,可以监控kafka及zookeeper集群的是否正常,下载kafka-eagle-web-2.0.2-bin.tar.gz

搭建教程,详见https://blog.csdn.net/m0_46522455/article/details/108827979

监控kafka集群http://172.16.6.13:8048/system/resource

大屏展示

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐