首先安装kafka和它的依赖zookeeper

我安装的zookeeper版本是zookeeper-3.4.8.tar.gz
直接解压到安装目录下,然后修改/conf/zoo.cfg
dataDir修改到自己想要的目录即可。默认端口是2181,一般不需要修改
注意其依赖java,所以如果如果没有装jdk的话需要装一下。
启动命令:

bin/zkServer.sh start

kafka同样下载解压,我用的版本是kafka_2.11-0.10.0.0.tgz
在主目录下新建data目录,然后在server.properties中同样添加你的日志路径。端口是9092.
启动命令:

bin/kafka-server-start.sh config/server.properties &

安装logstash

logstash安装也比较简单,解压就可以,我用的版本是logstash-2.3.2
然后写个配置文件conf/exam.conf

input {
    file {
        path => "/home/work/cuteflow/work_node/logs/worknode.log"
        start_position => end
    }

    #stdin {} #不需要控制台输出
}
filter {
    grok {
        match => {
            "message" => "%{GREEDYDATA}"  #正则表达式过滤
        }
    }
}
output {
        #stdout {codec => rubydebug}
        if [system] == "cuteflow" {
              #stdout {codec => rubydebug}
              kafka {
                 codec => plain {
                        format => "%{system}"
                 }
                 bootstrap_servers => "ip:port"
                topic_id =>  "xxx"
                }
        }
}

然后启动命令:

bin/logstash -f conf/exam.conf 

然后再kafka创建一个topic

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

开启生产者往这个topic发消息

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

在另一个窗口用命令创建消费者接收test中的消息

bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning

基本上看生产者生产的消息消费者窗口能看到就是没有问题。

搭建系统

首先logstash是一个比较强大的日志收集系统,它支持一系列的插件,我们最常用的就是input,filter,output。我们的目标是用logstash收集过滤日志,然后转化成想要的格式,放到kafka里面,然后有需要的在从kafka取出来用。
或者当然也可以从kafka中取出来处理一下放入kafka,或者从kafka中取出来,处理之后,然后发送给别的系统,可以说很强大,自己可以在官网上选用自己的插件就ok。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐