Kafka+Zookeeper单机初步搭建

1:前期软件准备

Zookeeper: zookeeper-3.4.8.tar.gz

Kafka: kafka_2.12-0.10.2.1.tgz

默认 配置好了java环境。

 

2:单机器Zookeeper搭建

 

Kafka集群是把相关的服务状态保存在Zookeeper里面的。所以要事先搭建Zookeeper

本次搭建过程中,只是为了测试,就搭建单机版本,下面的kafka也是。

2.1:解压软件

 

tar  –zxvf  zookeeper-3.4.8.tar.gz

 

进入解压好的目录里面的conf目录中。

#zoo_sample.cfg  这个文件是官方给我们的zookeeper的样板文件,给他复制一份命名为zoo.cfg,zoo.cfg是官方指定的文件命名规则。

cp ./ zoo_sample.cfg  zoo.cfg

最后生成的目录结构如下:

 

2.2:修改配置文件

对新生成的zoo.cfg文件进行修改。

 

tickTime=2000

initLimit=10

syncLimit=5

##可以在外部新建一个文件夹后单独指定

dataDir=/opt/zookeeper/zkdata

##可以在外部新建一个文件夹后单独指定

dataLogDir=/opt/zookeeper/zkdatalog

clientPort=12181 

#默认是2181

# 如果是多个服务器,可以再写server.2 server.3

server.1=192.168.7.100:2888:3888

#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里

#192.168.7.100为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888.可以随便修改

 

配置文件解释:

#tickTime:

这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

#initLimit:

这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒

#syncLimit:

这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒

#dataDir:

快照日志的存储路径

#dataLogDir:

事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多

#clientPort:

这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

 

创建myid文件

#server1

echo "1" > /opt/zookeeper/zkdata/myid

 

2.3:启动服务并查看

1:启动服务

#进入到Zookeeper的bin目录下

cd /opt/zookeeper/zookeeper-3.4.6/bin  (这个目录由你自己的安装目录决定。)

#启动服务(3台都需要操作)

./zkServer.sh start

 

如果报错:

1:kill掉这个进程

2:检查文件权限,看是否有问题。

 

2:查看状态

#检查服务器状态

./zkServer.sh status

从上面可以看出是 standlone 单机版本。

也可以使用 jps 来查看主程序

红色部分就是zk的主程序

 

2.4:注意事项

zookeeper不会主动的清除旧的快照和日志文件,这个是使用者的责任。

3:kafka单机搭建

3.1:解压软件,并修改配置文件

tar  –zxvf  kafka_2.12-0.10.2.1.tgz

进入对应的config目录,找到server.properties.在此基础上增加配置文件。

本次主要是测试为主,就修改部分参数就可以了,后续数据优化调整参数可以继续研究

broker.id=0 

#当前机器在集群中的唯一标识,和zookeeper的myid性质一样

 

#开启删除topic的开关

delete.topic.enable=true

这个参数很重要,如果后续你使用命令去删除topic,那么只会把zk上标记的topic删除,但是对实际的kafka上主题保存的数据一点都没有影响,都不会删除。

port=9092

 #当前kafka对外提供服务的端口默认是9092 可以自己修改

listeners=PLAINTEXT://:9092

#advertised.listeners=PLAINTEXT://your.host.name:9092

advertised.listeners=PLAINTEXT://192.168.10.14:9092

 

这个参数非常重要,一定要添加,这个主要是broker(kafka服务器)向生产者和消费者连接使用的,如果你的生产者和消费者是跨主机的存在,一定要把这条配置打开,否则,就算是防火墙关闭了,producer consumer也无法正常通信。

host.name=192.168.7.100

#这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。

#log.dirs=/tmp/kafka-logs

log.dirs=/usr/local/src/zookeeper_kafka/kafka/kafkalogs

#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个

这个文件夹建议你新建一个单独文件夹来指定

num.partitions=1

 #默认的分区数,一个topic默认1个分区数

zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218

#设置zookeeper的连接端口

 

3.2 启动kafka来测试

在kafka的bin目录下:

3.2.1:启动kafka

./kafka-server-start.sh -daemon ../config/server.properties

检测是否成功:

3.2.2:创建Topic来验证是否创建成功

./kafka-topics.sh --create --zookeeper 192.168.10.14:2181 --replication-factor 1 --partitions 1 --topic spdb

 

--replication-factor 1   #复制1份 2181是客户端链接ZK服务端的默认端口

--partitions 1 #创建1个分区

--topic #主题为test

注意事项:一定要等kafka启动成功后才可以创建主题。

3.2.3:查询topic列表

./kafka-topics.sh --list --zookeeper 192.168.10.14:2181

在上一次配置的日子目录(server.properties里配置的log.dirs)上可以看到,已经存在了该主题的日志信息文件了,由于server.properties里面配置的num.partitions=1所以只有一个文件夹

内部文件格式:

3.2.4:创建一个发布者和订阅者

本次操作是在同一台机器上

创建发布者:

./kafka-console-producer.sh --broker-list 192.168.10.14:9092 --topic spdb

创建订阅者:

./kafka-console-consumer.sh --zookeeper 192.168.10.14:2181 --from-beginning --topic spdb

 

 

 

 

 

 

 

3.2.5:测试发布消息和订阅消费消息

发布者:发布消息

 订阅者:马上接受了消息

3.2.6:检查topic中的消息

随着刚才的发布者向主题中发布了一条信息,导致日志文件发生了变化

由于Kafka文件的特殊性, 要查看这个日志信息,直接打开会出现乱码问题

我们可以使用下面的命令来查看:

./kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/src/zookeeper_kafka/kafka/kafkalogs/spdb-0/00000000000000000000.log  --print-data-log 

 

bin/kafka-run-class.sh kafka.tools.DumpLogSegments  --files 文件全名1,全名2,--print-data-log

这里 --print-data-log 是表示查看消息内容的,不加此项是查看不到详细的消息内容。如果要查看多个log文件可以用逗号分隔。

3.2.7 删除topic

在上面的server.properties上有一行重要配置delete.topic.enable=true

 

删除spdb主题

./kafka-topics.sh --zookeeper 192.168.10.14:2181 --delete --topic spdb

 

查看kafka上的数据

4  ELK结合kafka

上面完成了kafka的自己创建发布者和订阅者的简单尝试,现在可以使用ELK来结合kafka来进行测试,本次也主要是为了测试为主,简单配置如下:

 

按照logstash-->kafka-->zookeeper-->logstash-->elasticsearch-->kibana 数据顺序

本次是在跨主机情况下测试,两台logstash在不同机器上

 

4.1:logstash作为kafka生产者

input{

         stdin{} 

                 

}

output {

         kafka{

                          bootstrap_servers => "192.168.10.14:9092"

                          topic_id => "kafka07"

                  }

          stdout{ 

                codec => rubydebug 

            } 

}

这个会自动在kafka中创建一个topic为test。等待logstash启动完毕后,在控制台随便输入一句话,就ok。

在kafka里面可以看到topic名字为 kafka07上存在了新的消息

 

4.2:logstash作为kafka的消费者

input{

         kafka{

         zk_connect => "192.168.10.14:2181"

         topic_id => "kafka07"

        

         type => "kafkatest"

         }      

}

filter {      

}

output {

         elasticsearch {

         hosts => "192.168.10.14:9200"

         index => "kafka-%{+YYYY.MM.dd}"

         document_type => "%{type}"

         }      

         stdout{

                  codec => rubydebug

         }      

        

}

读取这个topic主题为test的内容,发送到es上就成功了。

 

 

5 总结

本次安装只是为了能调通整个流程而做出的简单部署,关于kafka还有很多不懂得地方需要学习。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐