一、zookeeper安装

1.1 环境准备

kafka是基于scala语言开发的,需要运行的JVM上,因此需要先安装JDK

#安装jdk1.8
yum -y install java-1.8.0-openjdk*

1.2 zookeeper安装

  • kafka在早期版本依赖zookeeper对Topic进行分区选举,虽然后续版本中持续去zk化,但即使在2.2.1、2.4.1等版本client中仍然会出现无法正常选举的问题,zk的在短期内仍是更好的选择。
  • zookeeper选举过程,需要超过半数的接收者成功应答才算选举成功,因此Zookeeper集群节点个数一般是奇数(5或6个节点最多只能有2个节点异常,避免集群分区导致脑裂问题),本文测试使用只搭建1个zk节点
#下载zookeeper
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
#安装包指定目录解压
tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz  -C  /opt/xxx/zookeeper/
#指定默认配置文件
cp conf/zoo_sample.cfg conf/zoo.cfg
# 启动zookeeper
 bin/zkServer.sh start
 bin/zkCli.sh
 ls / #查看zk的根目录相关节点

二、kafka安装

2.1 下载安装包

本文选择scale2.11版本、kafka2.4.1版本进行测试验证

wget https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz //直接到apache下载
#下载国内镜像(看了国内几家大的开源镜像都没有上述版本,下面只是介绍解题思路)
#由于linux默认情况没有配置DNS解析器,可以自行添加DNS服务器
#vi  /etc/resolv.conf
#nameserver 8.8.8.8  (谷歌的DNS服务器) 
#nameserver 114.114.114.114  (中国电信的DNS服务器) 
tar -zxvf kafka_2.11-2.4.1.tgz  -C  /opt/xxx/kafka/

2.2 修改配置项

server.properties核心配置项一览:

PropertyDefaultDescription
broker.id0非负整数,kafka集群中区别broker的唯一标识
log.dirs/tmp/kafka-logskafka存储数据的路径
listenersPLAINTEXT://:9092server对外开放服务的ip+port
zookeeper.connectlocalhost:2181配置zookeeper注册中心的集群地址,可配多个需要以;隔开
log.retention.hours168每个日志文件在本地保留的时间,单位小时
num.partitions1创建topic默认分区数
default.replication.factor1创建topic默认副本数
min.insync.replicas1保障数据一致性的判断依据。当producer设置ack=-1时,该数值为判断write至副本成功数的依据。NWR(假设N=5,W=5,R=1,需要5个节点全部写入成功消息发送成功)
delete.topic.enablefasle是否允许删除主题

修改配置文件config/server.properties

#broker.id属性在kafka集群中必须要是唯一 
broker.id=0
#kafka部署的机器ip和提供服务的端口号 
listeners=PLAINTEXT://192.168.149.128:9092
#kafka的消息存储文件
log.dir=/opt/xxx/kafka/logs 
#kafka连接zookeeper的地址(默认2181端口)
zookeeper.connect=192.168.149.128:2181 

2.3 服务启停

#后台启动kafka -daemon
bin/kafka-server-start.sh -daemon config/server.properties 
#查看zookeeper目录
bin/zkCli.sh
ls / #查看zk的根目录kafka相关节点
ls /brokers/ids #查看kafka节点
#关闭kafka
bin/kafka‐server‐stop.sh

在这里插入图片描述

2.4 测试验证

命令行作为验证功能的简单方式,了解即可

#创建主题
bin/kafka-topics.sh --create --zookeeper 192.168.149.128:2181 --replication-factor 1 --partitions 1 --topic test
#查看已创建的主题
bin/kafka-topics.sh --list --zookeeper 192.168.149.128:2181
#删除主题
bin/kafka-topics.sh --delete --topic test --zookeeper 192.168.149.128:2181
#发送消息
bin/kafka-console-producer.sh --broker-list 192.168.149.128:9092 --topic test 
#消费消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.149.128:9092 --topic test  

在这里插入图片描述

在这里插入图片描述

三、kafka集群搭建

3.1 准备搭建的集群信息

节点ipzookeeper端口kafka端口broker.id
192.168.149.128218290920
192.168.149.129218290921
192.168.149.130218290922

3.2 搭建zk集群

搭建zk集群首先要修改zoo.cfg,指定zk节点

# zookeeper集群之间心跳检测的时间间隔(单位ms)
tickTime=2000
# follower启动时要在多少次心跳内完成从leader的数据同步
initLimit=10
# leader需要将follower下线的心跳次数
syncLimit=5
# zookeeper存放数据的目录
dataDir=/tmp/zookeeper
# client连接zookeeper的端口号
clientPort=2181
# 允许客户端的最大连接数(0表示不设限制)
#maxClientCnxns=60
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
# zk监听所有可用IP地址的连接
quorumListenOnAllIPs=true
# zk节点
server.1=192.168.149.128:2888:3888
server.2=192.168.149.129:2888:3888
server.3=192.168.149.130:2888:3888

并在zk数据存放目录,指定当前节点的节点号

[root@worker2 conf]# echo 1 > /tmp/zookeeper/myid

重复上述内容,分别在3台机器上安装。并启动:

 #启动zk
 bin/zkServer.sh start
 #关闭zk
 bin/zkServer.sh stop

单节点查看zk节点数

3.3 搭建kafka集群

修改配置文件config/server.properties

#broker.id属性在kafka集群中必须要是唯一 
broker.id=0/1/2
#kafka部署的机器ip和提供服务的端口号 
listeners=PLAINTEXT://192.168.149.128/129/130:9092
#kafka的消息存储文件
log.dir=/opt/xxx/kafka/logs 
#kafka连接zookeeper的地址(默认2181端口)
zookeeper.connect=192.168.149.128:2181,192.168.149.129:2181,192.168.149.130:2181

3.4 分别启动各台kafka

bin/kafka-server-start.sh -daemon config/server.properties 

在这里插入图片描述
创建2各分区3各副本的topic:my-local-topic

bin/kafka-topics.sh --create --zookeeper 192.168.149.128:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

查看topic情况:
在这里插入图片描述

四、kafka-manager可视化管理工具搭建

4.1 管理台启动

网上找的关于搭建雅虎kafka-manager的非常详细的wiki介绍:https://www.cnblogs.com/dadonggg/p/8205302.html
安利其编译好的安装包:

注意:上面下载的是源码,下载后需要按照后面步骤进行编译。如果觉得麻烦,可以直接从下面地址下载编译好的 kafka-manager-1.3.3.7.zip。 
链接:https://pan.baidu.com/s/1qYifoa4 密码:el4o

启动管理台:

#编辑配置文件,指定zk集群地址
[root@worker1 conf]# vi application.conf
kafka-manager.zkhosts="192.168.149.128:2181,192.168.149.129:2181,192.128.149.130:2181"
#使用默认9000端口启动管理台(自行777解决权限问题)
[root@worker1 kafka-manager-1.3.3.7]# bin/kafka-manager
#或可通过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &

在这里插入图片描述

4.2 管理台查看功能

  • 进入首页先要创建集群

在这里插入图片描述

  • 输入zk集群节点后,可以看到kafka集群详细

在这里插入图片描述

  • 查看topic情况

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐