一、Zookeeper 安装

第一步、前期准备:

  • 操作系统 centos 6
  • 安装包 zookeeper-3.4.10.tar.gz
  • Java 环境 jdk1.8.0_151

第二步:Zookeeper 集群搭建

1. 简介

Kafka 依赖 Zookeeper 管理自身集群(Broker、Offset、Producer、Consumer等),所以先要安装 Zookeeper。
为了达到高可用的目的,Zookeeper 自身也不能是单点,接下来就介绍如何搭建一个最小的 Zookeeper 集群(3个 zk 节点)。
我准备了 3台虚拟机 分别为
192.168.138.132
192.168.138.133
192.168.138.134

2. 安装

 tar -zxvf zookeeper-3.4.10.tar.gz

3. 配置

1)配置文件位置
路径:zookeeper/conf
2)生成配置文件
将 zoo_sample.cfg 复制一份,命名为 zoo.cfg,此即为Zookeeper的配置文件。

cd zookeeper-3.4.10
cd conf
cp zoo_sample.cfg zoo.cfg
mkdir /usr/logs/zkdata
mkdir /usr/logs/zkdatalog

3)编辑配置文件zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/logs/zkdata
dataLogDir=/usr/logs/zkdatalog
clientPort=2181
server.1=192.168.138.132:4001:4002
server.2=192.168.138.133:4001:4002
server.3=192.168.138.134:4001:4002
#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
#192.168.7.107为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888

4)配置文件说明:


 dataDir 和 dataLogDir 需要在启动前创建完成
 clientPort 为 zookeeper的服务端口
 server.1server.2server.3 为 zk 集群中三个 node 的信息,定义格式为 hostname:port1:port2,其中 port1 是 node 间通信使用的端口,port2 是node 选举使用的端口,需确保三台主机的这两个端口都是互通的。

4. 更改日志配置

Zookeeper 默认会将控制台信息输出到启动路径下的 zookeeper.out 中,通过如下方法,可以让 Zookeeper 输出按尺寸切分的日志文件:

1)修改conf/log4j.properties文件,将
    zookeeper.root.logger=INFO, CONSOLE
    改为
    zookeeper.root.logger=INFO, ROLLINGFILE
2)修改bin/zkEnv.sh文件,将
    ZOO_LOG4J_PROP="INFO,CONSOLE"
    改为
    ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

5. 按照上述操作,在另外两台主机上安装并配置 zookeeper

可以使用scp命令直接从已经安装好的主机同步到另外两台主机

scp -r root@192.168.138.132:/usr/softs/zookeeper-3.4.10   /usr/softs

6. 创建 myid 文件

分别在三台主机的 dataDir 路径下创建一个文件名为 myid 的文件,文件内容为该 zk 节点的编号。
例如,在第一台主机上建立的 myid!件内容是 1,第二台是 2,第三台是3
我的 目录是 /usr/logs/zkdata

这里写图片描述

7. 启动

依次启动三台主机上的 zookeeper 服务

# cd bin
# ./zkServer.sh start

这里写图片描述

8. 查看集群状态

3个节点启动完成后,可依次执行如下命令查看集群状态:

./zkServer.sh status

这里写图片描述

可以看到133这台机器被zk选择为了leader,132和134为follower,假如132机器down掉后,zk会再次选择集群中的一台主机作为新的的leader,如果集群down掉一半那么zk就无法选择leader了,同时无法正常工作了。
这里我们先关闭133的zk

./zkServer.sh stop

这里写图片描述

这里我们可以看到133主机的zk已经关闭 134被选为了新的leader。


二、kaka集群安装

虽然kafka本身有自带的zookeeper,但是建议用外部的zk。

1.前期准备

java环境 (注意有些java版本可能和kafka不兼容,建议使用jdk-8u151,本人使用jdk-8u11出过一次小问题)
zookeeper环境
kafka_2.10-0.10.1.0.tgz

2. 安装

# tar zxvf kafka_2.12-0.10.2.0.tgz
# mv kafka_2.12-0.10.2.0 kafka

3. 配置

配置文件位置
路径:kafka/config/server.properties
修改配置文件

broker.id=134   
port=9092  
host.name=192.168.138.134
#llisteners=PLAINTEXT://192.168.138.134:9092
advertised.port=9092
advertised.host.name=192.168.138.134
#ladvertised.listeners=PLAINTEXT://192.168.138.134:9092
num.network.threads=3
num.io.threads=8  
log.dirs=/usr/logs/kafka/  
delete.topic.enable=true
socket.send.buffer.bytes=102400  
socket.receive.buffer.bytes=102400  
socket.request.max.bytes=104857600  
num.partitions=1  
log.retention.hours=168  
message.max.byte=5242880   
default.replication.factor=2   
replica.fetch.max.bytes=5242880  
log.segment.bytes=1073741824  
log.retention.check.interval.ms=300000  
log.cleaner.enable=false  
zookeeper.connect=192.168.138.132:2181,192.168.138.133:2181,192.168.138.134:21

4.配置文件解释:

broker.id   当前机器在集群中的唯一标识,和zookeeper的myid性质一样  建议用自己主机的后三位  每台(主机)broker不一致
port       当前kafka对外提供服务的端口默认是9092  生产者(producer)要以这个端口为准     
host.name  这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。  (填写本机地址即可)
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/usr/logs/kafka/   #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.138.132:2181,192.168.138.133:2181,192.168.138.134:2181   #设置zookeeper的连接端口 消费的时候要以这个端口消费

5.在另两台主机同步安装 kafka,并做配置

注意 每台主机的broker.id、host.name不一样

6.在三台主机上分别启动 Kafka 服务

# bin/kafka-server-start.sh -daemon config/server.properties

7.创建topic名为 test,拥有两个分区,两个副本的Topic

bin/kafka-topics.sh --create --zookeeper 192.168.138.132:2181,192.168.138.133:2181,192.168.138.134:2181 --replication-factor 2 --partitions 2 --topic test

这里写图片描述

8.查看zk集群上的 topic列表

bin/kafka-topics.sh --list --zookeeper 192.168.138.132:2181,192.168.138.133:2181,192.168.138.134:2181

这里写图片描述
这里可以看到我们刚刚创建的topic test

9.查看topic描述

bin/kafka-topics.sh --describe --zookeeper 192.168.138.132:2181,192.168.138.133:2181,192.168.138.134:2181 --topic test

这里写图片描述

10.创建生产者producer

在 132主机执行

bin/kafka-console-producer.sh --broker-list 192.168.138.132:9092,192.168.138.133:9092,192.168.138.134:9092 --topic test

11.创建消费者consumer 在134主机执行

bin/kafka-console-consumer.sh --zookeeper 192.168.138.132:2181,192.168.138.133:2181,192.168.138.134:2181  --from-beginning  --topic  test

这里写图片描述

可以看到我们在132主机上 输入的内容在 134主机上会被同步打印出来,这样我们的kafka集群就搭建成功了。

其他命令:

kafka启动后查看zk broker情况
进入zk安装目录

bin/zkCli.sh -server 192.168.138.132:2181,192.168.138.133:2181,192.168.138.134:2181

ls /

ls /brokers/ids

可以查看到这里有我们设置的 132  133  134主机  

get /brokers/ids/132  可以看到132主机的详细信息

这里写图片描述
关闭kafka

bin/kafka-server-stop.sh -daemon config/server.properties

如果要修改 kafka的broker.id

1.先修改
    config/server.properties
2.再修改config/server.properties配置的 log.dirs  下面的 meta.properties  
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐