前言

kafka作为一个突出读写性能的消息队列组件。如何做好性能调优,是我们的一个课题。这里我们探讨在os层面,可以对集群进行哪些优化。

  • 文件描述符限制(File Descriptor)
  • 虚拟内存(virtual memory)
  • 网络参数

文件描述符限制

kafka的工作原理上,会使用大量的log segment文件和网络连接,这需要占用系统大量的文件描述符。我们在评估一个broker需要使用多少文件描述符去管理数据时,可以这么计算:

(number of partition)*(partition size / segment size)

另外,每一个broker还需要考虑网络开销:客户端连接、broker间的连接、zookeeper等等。一般来说,我们可以设置比较大的值,比如65535、655350等等。

建议配置

#/etc/security/limits.conf
* soft nproc  655350
* hard nproc  655350

虚拟内存 virtual memory

kafka工作上会尽可能的使用系统的page来缓存生产者跟消费的消息。当系统物理内存不足时,kafka可能会使用到swap交换分区。但是swap分区的读写速度远远不及物理内存。基于此,建议调整系统swap的使用策略。把vm.swappiness调整为1。
另外,kafka也是重度依赖磁盘i/o的性能表现。linux系统中,vm.dirty_ratio和vm.dirty_background_ratio可以控制脏页刷到磁盘的比例。更高的vm.dirty_ratio,可以减少磁盘的i/o,但是也带来了服务器因为突发性down机,内存中的数据未能来得及flush到磁盘,而导致数据丢失的风险。
- vm.dirty_ratio是指当文件系统缓存脏页数量达到系统内存百分之多少时(如5%)就会触发pdflush/flush/kdmflush等后台回写进程运行,将一定缓存的脏页异步地刷入外存
- vm.dirty_background_ratio是指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如10%),系统不得不开始处理缓存脏页(因为此时脏页数量已经比较多,为了避免数据丢失需要将一定脏页刷入外存);在此过程中很多应用进程可能会因为系统转而处理文件IO而阻塞。

vm.dirty_ratio的值永远要比vm.dirty_background_ratio要低,从这两个参数的定义来看,似乎vm.dirty_background_ratio永远也不可能触发。实际上,在系统触发vm.dirty_ratio比例进行脏页刷盘的时候,外部程序其实还是在不断写入。这个时候,还是有可能外部程序数据写入内存的速度要比系统脏页刷盘的速度要快,而导致达到vm.dirty_background_ratio设置的比例。

建议配置:

/etc/sysctl.conf
vm.swappiness=1
vm.dirty_background_ratio = 10
vm.dirty_ratio = 20

网络参数

kafka从开始就设计成一个可以处理大吞吐量的系统。而linux系统默认情况下并没有针对此做相应的调整。所以,这需要针对现实的场景对linux系统进行一些特定的调整。

  • net.core.wmem_default: Default send socket buffer size.
  • net.core.rmem_default: Default receive socket buffer size.
  • net.core.wmem_max: Maximum send socket buffer size.
  • net.core.rmem_max: Maximum receive socket buffer size.
  • net.ipv4.tcp_wmem: Memory reserved for TCP send buffers.
  • net.ipv4.tcp_rmem: Memory reserved for TCP receive buffers.
  • net.ipv4.tcp_window_scaling: TCP Window Scaling option.
  • net.ipv4.tcp_max_syn_backlog: Maximum number of outstanding TCP SYN requests (connection requests).
  • net.core.netdev_max_backlog: Maximum number of queued packets on the kernel input side (useful to deal with spike of network requests).

内核网络参数很多我也不是很明白,下面给出一个官方的推荐配置:

net.core是网络层通用配置,net.ipv4.tcp*为针对ipv4的tcp协议的配置。会覆盖net.core

/etc/sysctl.conf
net.core.wmem_default=4194304
net.core.rmem_default=4194304
net.core.wmem_max=4194304
net.core.rmem_max=4194304
net.core.optmem_max=4194304
net.ipv4.tcp_rmem="4096 87380 4194304"
net.ipv4.tcp_wmem="4096 65536 4194304"
net.core.netdev_max_backlog=250000

磁盘

尽量使用xfs文件系统。
kafka存储的文件为常规文件,对于磁盘类型其实是没有特别的要求。而目前使用最多的file system为ext4和xfs。从磁盘使用历史看ext4使用得更多,但是xfs对kafka工作负载有更好的性能特征,并且不影响稳定性。

官方在测试中显示,xfs的文件系统在处理io操作的平均时间及平均等待时间都表现得比ext4更好。并且xfs的性能表现更加稳定。

xfs需要调整的参数

largeio:这会影响stat调用报告的首选I / O大小。尽管这可以在较大的磁盘写入上实现更高的性能,但实际上对性能的影响很小或没有影响。
nobarrier:对于具有电池后备缓存的基础设备,此选项可以通过禁用定期写刷新来提供更高的性能。但是,如果基础设备的行为良好,它将向文件系统报告它不需要刷新,并且此选项将无效。

ext4需要调整的参数

data = writeback:E xt4默认为data = ordered,这在某些写入操作中具有很强的顺序。Kafka不需要此排序,因为它会对所有未刷新的日志进行非常偏执的数据恢复。此设置消除了排序约束,并且似乎大大减少了延迟。
禁用日志记录:日志记录是一个折衷方案:它使服务器崩溃后重新启动速度更快,但是它引入了很多附加锁定,从而增加了写入性能。那些不关心重新启动时间并希望减少写入延迟峰值的主要来源的人可以完全关闭日记功能。
commit = num_secs:调整ext4提交到其元数据日志的频率。将此值设置为较低的值可以减少崩溃期间未刷新数据的丢失。将此值设置为较高的值将提高吞吐量。
nobh:使用data = writeback模式时,此设置控制其他排序保证。对于Kafka,这应该是安全的,因为我们不依赖于写入顺序,并且可以提高吞吐量和延迟。
delalloc:延迟分配意味着文件系统避免在物理写入发生之前分配任何块。这使ext4可以分配较大的范围而不是较小的页面,并有助于确保按顺序写入数据。此功能非常适合吞吐量。它似乎确实涉及文件系统中的某些锁定,这增加了一些延迟差异。

应用层优化

以高于集团基线版本kafka-2.11-2.2.1、zookeeper-3.4.14为例

  • jdk
  • zookeeper
  • kafka

jdk

官方推荐使用1.8 u5及以上的版本。如下为官方文档解释:

From a security perspective, we recommend you use the latest released version of JDK 1.8 as older freely available versions have disclosed security vulnerabilities. LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector

zookeeper

从实践经验来看,中小规模的kafka集群,zookeeper对于集群的性能影响并不大。对于独立集群,zookeeper的配置可以不需要做改动。对于公共集群,建议调小syncLimit,及调大maxClientCnxns
生产配置参考

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/data/zkdata
dataLogDir=/data/zklog

clientPort=2181

#maxClientCnxns=60

#autopurge.snapRetainCount=3

#autopurge.purgeInterval=1
server.1=broker1:2888:3888
server.2=broker2:2888:3888
server.3=broker3:2888:3888

kafka

kafka-server-start.sh文件

#!/bin/bash

# 开启jmx端口,使用prometheus的jmx_exporter来监控集群状态
export JMX_PORT="9988"
export KAFKA_OPTS="-Dcom.sun.management.jmxremote.rmi.port=9988 -javaagent:/opt/kafka/libs/jmx_prometheus_javaagent-0.3.1.jar=9989:/opt/kafka/config/kafka.yml"

if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
        exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    #官方默认使用g1回收器,大部分场景也建议使用g1,如下使用UseParallelGC,针对kafka大吞吐量场景
    #堆内存大小没有标准值,具体参考主机内存情况,对于256G的broker主机,建议分配16G。
    export KAFKA_HEAP_OPTS="-Xms16g -Xmx16g -XX:+UseParallelGC -XX:+UseParallelOldGC -Xmn10g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/kafka/logs"

fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

server.properties配置文件

broker.id=1
listeners=PLAINTEXT://ip:9092
# 实际经验来看,适当增加num.network.threads有利于提升请求处理的效率
num.network.threads=16
# 这个参数建议至少是磁盘的数量
num.io.threads=24
# 
num.replica.fetchers=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
# 默认topic的partition数量,建议按照broker数来设置默认值
num.partitions=3
# 默认的副本因子,安全性设置,建议设置为2~3
default.replication.factor=2
# 该参数能加速故障节点恢复,这里是log.dirs配置一个目录,那么恢复线程数为8*1
num.recovery.threads.per.data.dir=8

# 生产安全性设置,提升集群高可用。
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=6000
# 设置group开启重平衡的延时时间,时间内允许更多的consumer加入group,避免不必要的JoinGroup与SyncGroup之间的切换。但是会带来延时增加,生产上建议设置为3s
group.initial.rebalance.delay.ms=3000

# 允许删除topic
delete.topic.enable=true
# topic不存在时候,自动创建topic。可选开启
auto.create.topics.enable=true
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐