前言

本篇文章主要讲解Kafka的集群搭建以及一些注意事项。之所以写这篇博客,是因为网上充斥着单机集群搭建的教程。但是服务器集群和单机虽说是理论上是通的,但是不一样的地方还是很多,尤其是Zookeeper操作就明显不一样。至少得先把Zookeeper搭建起来成功运行才能继续Kafka集群的搭建。而且在配置上服务器集群和单机集群确实有很多注意不到的点。这篇文章就是要把服务器集群整个流程写下来,供给后来者以参考。以及笔者搭建过程中遇到的问题,也一并梳理出来,帮助大家搭建一个可用的Kafka生产集群。当然这个文章只是说怎么搭建,使用的心得本文不涉及。更多内容请点击【Apache Kafka API AdminClient系列】

版本说明

下面是笔者的环境,之所以这样选择,是因为笔者的公司刚好用的就是这一套。kafka_2.11-2.1.1这个版本官方标配的是zookeeper-3.4.13,但是14和13几乎没有大的差别,所以一样可以使用。目前笔者将kafka升级到2.7.0,Zookeeper升级到3.5.8,此文依然有效。

  • Kafka:kafka_2.11-2.1.1.tgz
  • OS:CentOS7
  • Zookeeper:zookeeper-3.4.14.tar.gz
  • Java:Java version “1.8.0_191”

Zookeeper配置

成功配置一个可用的Zookeeper服务器集群至关重要。如果不知道怎么配置的,或者还没有配置好需要从0开始配置Zookeeper的可以参考【Zookeeper + Centos7 详细安装教程】,这里就不再赘述了。那么我们就假设大家的机器里已经运行好了Zookeeper。这里说一下笔者的Zookeeper集群IP,一共5台:

server.1=192.168.33.101:2887:3887
server.2=192.168.33.102:2887:3887
server.3=192.168.33.103:2887:3887
server.4=192.168.33.104:2887:3887
server.5=192.168.33.105:2887:3887

而且笔者为了比较好管理,另外创建了一个/kafka的节点存放Kafka的数据,这里随喜好,不做强求。如下:

[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper, kafka] 			 #/kafka存放数据
[zk: localhost:2181(CONNECTED) 1] ls /kafka
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 2]

Kakfa配置

从这里开始正式介绍Kafka的相关配置,这里笔者的集群为7台Kafka机器。如大家所见的,其中5台安装了Zookeeper。当然所有的机器都必须是Java支持的。

192.168.33.101:9092		#Zookeeper + Kakfa
192.168.33.102:9092		#Zookeeper + Kakfa
192.168.33.103:9092		#Zookeeper + Kakfa
192.168.33.104:9092		#Zookeeper + Kakfa
192.168.33.105:9092		#Zookeeper + Kakfa
192.168.33.106:9092		#Kakfa
192.168.33.107:9092		#Kakfa
第一步 下载

当然是下载下来kafka_2.11-2.1.1.tgz这个包,大家可以从官网,或者国内镜像找到响应的版本,相信这个就不用说了。tar -zxvf kafka_2.11-2.1.1.tgz解压到自定义目录,比如笔者解压到apps下:

[root@centos01 /]# cd /usr/local/apps/
[root@centos01 apps]# ls
jdk1.8.0_191  kafka_2.11-2.1.1  zookeeper-3.4.14
第二步 配置

找到kafka_2.11-2.1.1/config/路径下的server.properties,这个就是kafka集群的主要配置文件。我们要修改的内容就在这里。里面的参数很多,具体是什么意思大家有兴趣自己搜一下,网上很多资料,复制粘贴也没什么意思。比较好的是和集群启动相关的没几个,所以笔者就把需要改的几个参数挑出来。这里以101号机器为例子,但是其他的机器要照着配置。

#服务器id,这个是每个机器的唯一id。每个机器都是独一无二的,习惯性就按顺序来。
broker.id=1 
#监听端口,这里也可以用hostname代替,只要host里面配置好了都可以
listeners = PLAINTEXT://192.168.33.101:9092 
#存放当前数据的目录,这里所有机器尽量都一样,这样操作起来比较容易
log.dirs=/home/data/kafka-logs
#连接Zookeeper。注意这里笔者再Zookeeper里面创建了一个叫做“/kakfa”的节点,所以最后一个ip上跟着
# 一个/kafka,看起来比较整洁。如果没有这样的需求不需要带。
zookeeper.connect=192.168.33.101:2181,192.168.33.102:2181,192.168.33.103:2181,192.168.33.104:2181,192.168.33.105:2181/kafka
#如果要直连根节点,请仿照这个
#zookeeper.connect=192.168.33.101:2181,192.168.33.102:2181,192.168.33.103:2181,192.168.33.104:2181,192.168.33.105:2181

102号机器例子,其他机器以此类推。

broker.id=2 
listeners = PLAINTEXT://192.168.33.102:9092 
log.dirs=/home/data/kafka-logs
zookeeper.connect=192.168.33.101:2181,192.168.33.102:2181,192.168.33.103:2181,192.168.33.104:2181,192.168.33.105:2181/kafka
第三步 创建目录

创建log.dirs目录用来对应配置。笔者配置的是/home/data/kafka-logs,就去相应的地方把这个文件夹创建出来。所有机器都要这么做一遍。

第四步 配置防火墙

开启防火墙9092端口,为了防止防火墙阻挡通信。

firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload   # 配置立即生效
[root@centos03 ~]# firewall-cmd --zone=public --list-ports  #查看端口是不是开了
2181/tcp 2887/tcp 3887/tcp 9092/tcp
第五步 开启Kafka

开启Kafka。没错这里就可以开启了,但是kafka的启动居然还藏着坑。经过多方测试,这样启动是最稳的:
首先,我们先到kafka的主目录里,也就是解压目录:/kafka_2.11-2.1.1这里。

[root@centos05 kafka_2.11-2.1.1]# ls
bin  config  libs  LICENSE  logs  nohup.out  NOTICE  site-docs
[root@centos05 kafka_2.11-2.1.1]#

然后,在这个目录里执行./bin/kafka-server-start.sh config/server.properties &这句话,100%能用。当然也可以nohup这样后台启动,这样就不需要等启动完成了再Ctrl+c出来。

一般启动
[root@centos06 kafka_2.11-2.1.1]# ./bin/kafka-server-start.sh config/server.properties &

nohup启动
[root@centos06 kafka_2.11-2.1.1]# nohup ./bin/kafka-server-start.sh config/server.properties &
[1] 2072
[root@centos06 kafka_2.11-2.1.1]# nohup: ignoring input and appending output to ‘nohup.out’
^C  #Ctrl+c跳出来

正常启动以后,如果一切都没问题就会在最后有这样一句话INFO [KafkaServer id=7] started,到这儿一般就没大问题了。

[2020-08-03 21:14:43,794] INFO [ProducerId Manager 7]: Acquired new producerId block (brokerId:7,blockStartProducerId:10000,blockEndProducerId:10999) by writing to Zk with path version 11 (kafka.coordinator.transaction.ProducerIdManager)
[2020-08-03 21:14:43,816] INFO [TransactionCoordinator id=7] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-08-03 21:14:43,825] INFO [TransactionCoordinator id=7] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-08-03 21:14:43,827] INFO [Transaction Marker Channel Manager 7]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2020-08-03 21:14:43,844] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2020-08-03 21:14:43,862] INFO [SocketServer brokerId=7] Started processors for 1 acceptors (kafka.network.SocketServer)
[2020-08-03 21:14:43,863] INFO Kafka version : 2.1.1 (org.apache.kafka.common.utils.AppInfoParser)
[2020-08-03 21:14:43,863] INFO Kafka commitId : 21234bee31165527 (org.apache.kafka.common.utils.AppInfoParser)
[2020-08-03 21:14:43,864] INFO [KafkaServer id=7] started (kafka.server.KafkaServer)

验证可用

为了保险起见,当然要验证一下是不是成功了。最快的当然就是直接操作了,所以去创建一个topic写点数据。注意笔者都是在Kafka的根目录执行的命令。如果在bin目录下,自已调整一下命令。
特别强调一点:如果在Zookeeper根目录下,没有像笔者一样创建一个新的节点,就把/kafka去掉。笔者都是按照自己的机器配置写的命令,不可一股脑的照搬。
比如查看topic列表:
有独立节点:/bin/kafka-topics.sh --list --zookeeper 192.168.33.102:2181/kafka
在根节点上:/bin/kafka-topics.sh --list --zookeeper 192.168.33.102:2181

创建topic命令:

sh kafka-topics.sh --create --zookeeper 192.168.33.103:2181/kafka --replication-factor 1 --partitions 1 --topic my-topic

[root@centos02 bin]# sh kafka-topics.sh --create --zookeeper 192.168.33.103:2181/kafka --replication-factor 1 --partitions 1 --topic my-topic
Created topic "my-topic".
浏览所有topic命令:

./bin/kafka-topics.sh --list --zookeeper 192.168.33.102:2181/kafka

[root@centos04 kafka_2.11-2.1.1]#  ./bin/kafka-topics.sh --list --zookeeper 192.168.33.102:2181/kafka
__consumer_offsets
my-topic
test_topic
testtopic
浏览指定topic命令:

./bin/kafka-topics.sh --describe --zookeeper 192.168.33.102:2181/kafka --topic my-topic

[root@centos04 kafka_2.11-2.1.1]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.33.102:2181/kafka --topic my-topic
Topic:my-topic  PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: my-topic Partition: 0    Leader: 4       Replicas: 4     Isr: 4

生产Console消息:

./bin/kafka-console-producer.sh --broker-list 192.168.33.102:9092 --topic my-topic

[root@centos02 kafka_2.11-2.1.1]# bin/kafka-console-producer.sh --broker-list 192.168.33.102:9092 --topic my-topic
>aaaaaaaa
>aaaaaaa
>hello world
消费Console消息:

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.33.103:9092 --topic my-topic --from-beginning

[root@centos03 kafka_2.11-2.1.1]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.33.103:9092 --topic my-topic --from-beginning
aaaaaaaa
aaaaaaa
hello world
Zookeeper验证:

基本上一套下来,就可以使用了。当然我们可以去用zkCli.sh去看下里面到底有没有我们的数据。我们刚刚创建的my-topic这个topic就在里面存着。

[zk: localhost:2181(CONNECTED) 2] ls /
[zookeeper, kafka]
[zk: localhost:2181(CONNECTED) 3] ls /kafka
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 4] ls /kafka/brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/topics
[testtopic, my-topic, test_topic, __consumer_offsets]    #my-topic在这里
[zk: localhost:2181(CONNECTED) 6]

总结

到此一个可用的集群已经搭建完毕,如果对Kafka进行连接的相关SASL权限认证请参考【Kafka 如何给集群配置Scram账户认证】,如果需要对Kafka进行SSL(SASL_SSL)认证请参考【Kafka 如何给集群配置SSL认证】

可能遇到的问题

其实本篇帖子到这里算是进入精华部分了,这里是笔者遇到这个各种坑,总结给大家。因为笔者相信,笔者遇到的,也是大家可能遇到的。

Error 1 ERROR Exiting Kafka due to fatal exception
[2020-08-03 17:03:41,395] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.nio.file.NoSuchFileException: config/server.properties
        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
        at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
        at java.nio.file.Files.newByteChannel(Files.java:361)
        at java.nio.file.Files.newByteChannel(Files.java:407)
        at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
        at java.nio.file.Files.newInputStream(Files.java:152)
        at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:560)
        at kafka.Kafka$.getPropsFromArgs(Kafka.scala:42)
        at kafka.Kafka$.main(Kafka.scala:58)
        at kafka.Kafka.main(Kafka.scala)

Solution 1
这个问题折腾了一下午,其实就是找不到配置文件server.properties了,为什么找不到呢,因为执行路径不对啊。如果你在bin目录执行命令,很有可能就会碰见这个问题。找不到config/目录里面的server.properties,因为bin目录他俩同级别啊,同志们,想明白以后恍然大悟。直接在bin下运行命令,能找到才有鬼嘞,妥妥都是泪啊。
在这里插入图片描述
所以最稳妥的方法就是在Kafka根目录下运行:
./bin/kafka-server-start.sh config/server.properties &

Solution 2
如果不是上面的问题,查看log.dirs参数,是不是没有配置对。这里会生成很多文件,如果没有配置对,也会启动不起来的。尤其是检查最后一个目录一定不要有“/”
配置错误:
log.dirs=/home/data/kafka-logs/
正确配置:
log.dirs=/home/data/kafka-logs

Error 2 Permission denied
nohup: failed to run command ‘bin/kafka-server-start.sh’: Permission denied
-bash: ./kafka-server-start.sh: Permission denied
-bash: ./kafka-server-stop.sh: Permission denied
-bash: ./kafka-topics.sh: Permission denied

类似的这种,反正就是各种Permission denied权限不够。其实很简答,输入命令多了,难免缺点东西。比如sh文件,本身不给权限的情况下,是得用sh命令执行的,或者./执行。命令长了,就容易缺东少西。不嫌麻烦的同学,可以一个一个的给chrom 777 kafka-server-start.sh,或者每行命令多敲几个字也行。怎么样一劳永逸呢?就是直接把bin目录下所有文件都给赋予执行权限就好了嘛。用chmod -R 777 bin/全部拿下。当然公司里如果权限管理不好的话,不建议这么做。

[root@centos02 kafka_2.11-2.1.1]# chmod -R 777 bin/ #递归给权限,本文件夹和子文件全部都有执行权限。
[root@centos02 kafka_2.11-2.1.1]# ll
total 144
drwxrwxrwx. 3 root root  4096 Aug  3 10:42 bin 
[root@centos02 kafka_2.11-2.1.1]# cd bin/
[root@centos02 bin]# ll
total 132
-rwxrwxrwx. 1 root root 1421 Aug  3 10:42 connect-distributed.sh
-rwxrwxrwx. 1 root root 1418 Aug  3 10:42 connect-standalone.sh
-rwxrwxrwx. 1 root root  861 Aug  3 10:42 kafka-acls.sh
-rwxrwxrwx. 1 root root  873 Aug  3 10:42 kafka-broker-api-versions.sh
-rwxrwxrwx. 1 root root  864 Aug  3 10:42 kafka-configs.sh
-rwxrwxrwx. 1 root root  945 Aug  3 10:42 kafka-console-consumer.sh
-rwxrwxrwx. 1 root root  944 Aug  3 10:42 kafka-console-producer.sh
-rwxrwxrwx. 1 root root  871 Aug  3 10:42 kafka-consumer-groups.sh
-rwxrwxrwx. 1 root root  948 Aug  3 10:42 kafka-consumer-perf-test.sh
-rwxrwxrwx. 1 root root  871 Aug  3 10:42 kafka-delegation-tokens.sh
-rwxrwxrwx. 1 root root  869 Aug  3 10:42 kafka-delete-records.sh
-rwxrwxrwx. 1 root root  866 Aug  3 10:42 kafka-dump-log.sh
-rwxrwxrwx. 1 root root  863 Aug  3 10:42 kafka-log-dirs.sh
-rwxrwxrwx. 1 root root  862 Aug  3 10:42 kafka-mirror-maker.sh
-rwxrwxrwx. 1 root root  886 Aug  3 10:42 kafka-preferred-replica-election.sh
-rwxrwxrwx. 1 root root  959 Aug  3 10:42 kafka-producer-perf-test.sh
-rwxrwxrwx. 1 root root  874 Aug  3 10:42 kafka-reassign-partitions.sh
-rwxrwxrwx. 1 root root  874 Aug  3 10:42 kafka-replica-verification.sh
-rwxrwxrwx. 1 root root 9290 Aug  3 10:42 kafka-run-class.sh
-rwxrwxrwx. 1 root root 1376 Aug  3 10:42 kafka-server-start.sh
-rwxrwxrwx. 1 root root  997 Aug  3 10:42 kafka-server-stop.sh
-rwxrwxrwx. 1 root root  945 Aug  3 10:42 kafka-streams-application-reset.sh
-rwxrwxrwx. 1 root root  863 Aug  3 10:42 kafka-topics.sh
-rwxrwxrwx. 1 root root  958 Aug  3 10:42 kafka-verifiable-consumer.sh
-rwxrwxrwx. 1 root root  958 Aug  3 10:42 kafka-verifiable-producer.sh
-rwxrwxrwx. 1 root root 1722 Aug  3 10:42 trogdor.sh
drwxrwxrwx. 2 root root 4096 Aug  3 10:42 windows
-rwxrwxrwx. 1 root root  867 Aug  3 10:42 zookeeper-security-migration.sh
-rwxrwxrwx. 1 root root 1393 Aug  3 10:42 zookeeper-server-start.sh
-rwxrwxrwx. 1 root root 1001 Aug  3 10:42 zookeeper-server-stop.sh
-rwxrwxrwx. 1 root root  968 Aug  3 10:42 zookeeper-shell.sh
Error 3 Replication factor: 2 larger than available brokers: 0.
Error while executing topic command : Replication factor: 2 larger than available brokers: 0.

这局报错最直接的说法就是,Kafka没起来,所以一个broker都没有。会出现在创建topic的时候,这个就是配置的错误了。因为笔者是弄了一个的节点,所以一开始在连接Zookeeper的时候给每一个地址后都配置了这个节点,于是KafkaZookeeper节点名字给识别成kafka,192.168.33.102:2181kafka,192.168.33.103:2181类似这种,而笔者的本意是IP:Port/kafka。这是分割字符串分割错了,也算是配置的一个坑。
配置错误:
zookeeper.connect=192.168.33.101:2181/kafka,192.168.33.102:2181/kafka,192.168.33.103:2181/kafka,192.168.33.104:2181/kafka,192.168.33.105:2181/kafka
正确配置:
zookeeper.connect=192.168.33.101:2181,192.168.33.102:2181,192.168.33.103:2181,192.168.33.104:2181,192.168.33.105:2181/kafka
或者
zookeeper.connect=192.168.33.101:2181,192.168.33.102:2181,192.168.33.103:2181,192.168.33.104:2181,192.168.33.105:2181

附修改文件打开数

未来kafka越用越多,必然会导致打开文件数很多。CentOS7和别的linux一样也是限制了最大操作的文件数量的。但是这里能放开。
首先先去vi /etc/security/limits.conf这个里面,添加如下四行,保存退出。

[root@centos07 ~]# vi /etc/security/limits.conf
#添加这样四行
* soft nofile 204800
* hard nofile 204800
* soft nproc 204800
* hard nproc 204800
# End of file

然后再去/etc/security/limits.d/这个目录下修改20-nproc.conf文件,把原来的4096后面加一个0就可以了,保存退出。

[1]+  Stopped                 vi /etc/security/limits.conf
[root@centos07 ~]# cd /etc/security/limits.d/
[root@centos07 limits.d]# vi 20-nproc.conf
*          soft    nproc     40960   #原值4096
root       soft    nproc     unlimited
[root@centos07 limits.d]#

最后reboot重启系统,就完成了配置。那么这篇博客也到此结束了,谢谢大家。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐