Kafka+Zookeeper单机初步搭建
Kafka+Zookeeper单机初步搭建1:前期软件准备Zookeeper: zookeeper-3.4.8.tar.gzKafka: kafka_2.12-0.10.2.1.tgz默认 配置好了java环境。 2:单机器Zookeeper搭建 Kafka集群是把相关的服务状态保存在Zookeeper里面的。所以要事先搭建Zookeeper本次搭建过程中,...
Kafka+Zookeeper单机初步搭建
1:前期软件准备
Zookeeper: zookeeper-3.4.8.tar.gz
Kafka: kafka_2.12-0.10.2.1.tgz
默认 配置好了java环境。
2:单机器Zookeeper搭建
Kafka集群是把相关的服务状态保存在Zookeeper里面的。所以要事先搭建Zookeeper
本次搭建过程中,只是为了测试,就搭建单机版本,下面的kafka也是。
2.1:解压软件
tar –zxvf zookeeper-3.4.8.tar.gz
进入解压好的目录里面的conf目录中。
#zoo_sample.cfg 这个文件是官方给我们的zookeeper的样板文件,给他复制一份命名为zoo.cfg,zoo.cfg是官方指定的文件命名规则。
cp ./ zoo_sample.cfg zoo.cfg
最后生成的目录结构如下:
2.2:修改配置文件
对新生成的zoo.cfg文件进行修改。
tickTime=2000
initLimit=10
syncLimit=5
##可以在外部新建一个文件夹后单独指定
dataDir=/opt/zookeeper/zkdata
##可以在外部新建一个文件夹后单独指定
dataLogDir=/opt/zookeeper/zkdatalog
clientPort=12181
#默认是2181
# 如果是多个服务器,可以再写server.2 server.3
server.1=192.168.7.100:2888:3888
#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
#192.168.7.100为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888.可以随便修改
配置文件解释:
#tickTime:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
#initLimit:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
#syncLimit:
这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
#dataDir:
快照日志的存储路径
#dataLogDir:
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
#clientPort:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点
创建myid文件
#server1
echo "1" > /opt/zookeeper/zkdata/myid
2.3:启动服务并查看
1:启动服务
#进入到Zookeeper的bin目录下
cd /opt/zookeeper/zookeeper-3.4.6/bin (这个目录由你自己的安装目录决定。)
#启动服务(3台都需要操作)
./zkServer.sh start
如果报错:
1:kill掉这个进程
2:检查文件权限,看是否有问题。
2:查看状态
#检查服务器状态
./zkServer.sh status
从上面可以看出是 standlone 单机版本。
也可以使用 jps 来查看主程序
红色部分就是zk的主程序
2.4:注意事项
zookeeper不会主动的清除旧的快照和日志文件,这个是使用者的责任。
3:kafka单机搭建
3.1:解压软件,并修改配置文件
tar –zxvf kafka_2.12-0.10.2.1.tgz
进入对应的config目录,找到server.properties.在此基础上增加配置文件。
本次主要是测试为主,就修改部分参数就可以了,后续数据优化调整参数可以继续研究
broker.id=0
#当前机器在集群中的唯一标识,和zookeeper的myid性质一样
#开启删除topic的开关
delete.topic.enable=true
这个参数很重要,如果后续你使用命令去删除topic,那么只会把zk上标记的topic删除,但是对实际的kafka上主题保存的数据一点都没有影响,都不会删除。
port=9092
#当前kafka对外提供服务的端口默认是9092 可以自己修改
listeners=PLAINTEXT://:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://192.168.10.14:9092
这个参数非常重要,一定要添加,这个主要是broker(kafka服务器)向生产者和消费者连接使用的,如果你的生产者和消费者是跨主机的存在,一定要把这条配置打开,否则,就算是防火墙关闭了,producer 和 consumer也无法正常通信。
host.name=192.168.7.100
#这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
#log.dirs=/tmp/kafka-logs
log.dirs=/usr/local/src/zookeeper_kafka/kafka/kafkalogs
#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
这个文件夹建议你新建一个单独文件夹来指定
num.partitions=1
#默认的分区数,一个topic默认1个分区数
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218
#设置zookeeper的连接端口
3.2 启动kafka来测试
在kafka的bin目录下:
3.2.1:启动kafka
./kafka-server-start.sh -daemon ../config/server.properties
检测是否成功:
3.2.2:创建Topic来验证是否创建成功
./kafka-topics.sh --create --zookeeper 192.168.10.14:2181 --replication-factor 1 --partitions 1 --topic spdb
--replication-factor 1 #复制1份 2181是客户端链接ZK服务端的默认端口
--partitions 1 #创建1个分区
--topic #主题为test
注意事项:一定要等kafka启动成功后才可以创建主题。
3.2.3:查询topic列表
./kafka-topics.sh --list --zookeeper 192.168.10.14:2181
在上一次配置的日子目录(server.properties里配置的log.dirs)上可以看到,已经存在了该主题的日志信息文件了,由于server.properties里面配置的num.partitions=1。所以只有一个文件夹
内部文件格式:
3.2.4:创建一个发布者和订阅者
本次操作是在同一台机器上:
创建发布者:
./kafka-console-producer.sh --broker-list 192.168.10.14:9092 --topic spdb
创建订阅者:
./kafka-console-consumer.sh --zookeeper 192.168.10.14:2181 --from-beginning --topic spdb
3.2.5:测试发布消息和订阅消费消息
发布者:发布消息
订阅者:马上接受了消息
3.2.6:检查topic中的消息
随着刚才的发布者向主题中发布了一条信息,导致日志文件发生了变化
由于Kafka文件的特殊性, 要查看这个日志信息,直接打开会出现乱码问题
我们可以使用下面的命令来查看:
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/src/zookeeper_kafka/kafka/kafkalogs/spdb-0/00000000000000000000.log --print-data-log
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 文件全名1,全名2,--print-data-log
这里 --print-data-log 是表示查看消息内容的,不加此项是查看不到详细的消息内容。如果要查看多个log文件可以用逗号分隔。
3.2.7 删除topic
在上面的server.properties上有一行重要配置delete.topic.enable=true
删除spdb主题
./kafka-topics.sh --zookeeper 192.168.10.14:2181 --delete --topic spdb
查看kafka上的数据
4 ELK结合kafka
上面完成了kafka的自己创建发布者和订阅者的简单尝试,现在可以使用ELK来结合kafka来进行测试,本次也主要是为了测试为主,简单配置如下:
按照logstash-->kafka-->zookeeper-->logstash-->elasticsearch-->kibana 数据顺序
本次是在跨主机情况下测试,两台logstash在不同机器上。
4.1:logstash作为kafka生产者
input{
stdin{}
}
output {
kafka{
bootstrap_servers => "192.168.10.14:9092"
topic_id => "kafka07"
}
stdout{
codec => rubydebug
}
}
这个会自动在kafka中创建一个topic为test。等待logstash启动完毕后,在控制台随便输入一句话,就ok。
在kafka里面可以看到topic名字为 kafka07上存在了新的消息
4.2:logstash作为kafka的消费者
input{
kafka{
zk_connect => "192.168.10.14:2181"
topic_id => "kafka07"
type => "kafkatest"
}
}
filter {
}
output {
elasticsearch {
hosts => "192.168.10.14:9200"
index => "kafka-%{+YYYY.MM.dd}"
document_type => "%{type}"
}
stdout{
codec => rubydebug
}
}
读取这个topic主题为test的内容,发送到es上就成功了。
5 总结
本次安装只是为了能调通整个流程而做出的简单部署,关于kafka还有很多不懂得地方需要学习。
更多推荐
所有评论(0)