kafka+logstash搭建分布式消息订阅系统
首先安装kafka和它的依赖zookeeper我安装的zookeeper版本是zookeeper-3.4.8.tar.gz直接解压到安装目录下,然后修改/conf/zoo.cfgdataDir修改到自己想要的目录即可。默认端口是2181,一般不需要修改注意其依赖java,所以如果如果没有装jdk的话需要装一下。启动命令:bin/zkServer.sh startkafka同样下载解
首先安装kafka和它的依赖zookeeper
我安装的zookeeper版本是zookeeper-3.4.8.tar.gz
直接解压到安装目录下,然后修改/conf/zoo.cfg
dataDir修改到自己想要的目录即可。默认端口是2181,一般不需要修改
注意其依赖java,所以如果如果没有装jdk的话需要装一下。
启动命令:
bin/zkServer.sh start
kafka同样下载解压,我用的版本是kafka_2.11-0.10.0.0.tgz
在主目录下新建data目录,然后在server.properties中同样添加你的日志路径。端口是9092.
启动命令:
bin/kafka-server-start.sh config/server.properties &
安装logstash
logstash安装也比较简单,解压就可以,我用的版本是logstash-2.3.2
然后写个配置文件conf/exam.conf
input {
file {
path => "/home/work/cuteflow/work_node/logs/worknode.log"
start_position => end
}
#stdin {} #不需要控制台输出
}
filter {
grok {
match => {
"message" => "%{GREEDYDATA}" #正则表达式过滤
}
}
}
output {
#stdout {codec => rubydebug}
if [system] == "cuteflow" {
#stdout {codec => rubydebug}
kafka {
codec => plain {
format => "%{system}"
}
bootstrap_servers => "ip:port"
topic_id => "xxx"
}
}
}
然后启动命令:
bin/logstash -f conf/exam.conf
然后再kafka创建一个topic
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
开启生产者往这个topic发消息
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
在另一个窗口用命令创建消费者接收test中的消息
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
基本上看生产者生产的消息消费者窗口能看到就是没有问题。
搭建系统
首先logstash是一个比较强大的日志收集系统,它支持一系列的插件,我们最常用的就是input,filter,output。我们的目标是用logstash收集过滤日志,然后转化成想要的格式,放到kafka里面,然后有需要的在从kafka取出来用。
或者当然也可以从kafka中取出来处理一下放入kafka,或者从kafka中取出来,处理之后,然后发送给别的系统,可以说很强大,自己可以在官网上选用自己的插件就ok。
更多推荐
所有评论(0)