winlogbeat监听windows日志到kafka、hdfs,不同层级取数据
winlogbeat监听windows日志到kafka,查询kafka里面的topic数据,以及从kafka中接入数据到hdfs、logstash不同层级取数据,过滤message.
只截取关键部分:
# https://go.es.io/WinlogbeatConfig
winlogbeat.event_logs:
- name: Application
ignore_older: 72h
- name: Security
- name: System
接入kafka
#--------------------------kafka-----------------------------------
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["192.1.1.216:9092"]
topic: 'chao-beat0710'
kafka查询数据命令
bin/kafka-console-consumer.sh --zookeeper 192.1.1.216:2181 --topic chao-beat0710 --from-beginning
接入到hdfs
input{
kafka{
zk_connect=>"192.1.1.216:2181"
topic_id=>"chao-beat0710"
}
}
filter{
mutate{
#替换windows中获取message数据中的\n以及\t为空
gsub => ["message","\n",""]
gsub => ["message","\t",""]
#获取key值的测试,可以通过map的值,比如..beat是map,则beat下面的字段是key,可以通过beat[name]去到字段值
update => {"message" => "%{activity_id}--%{beat}--%{beat[hostname]}"}
}
}
output{
webhdfs{
host => "192.1.1.151"
port => 50070
path => "/chao/hdfs/test/kafka3/data.txt"
user => "lee"
}
stdout{
codec => rubydebug
}
}
数据格式例子
取出数据在hdfs展示
附带winlogbeat接入es,以及logstash方式
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
#----------------------------- Logstash output --------------------------------
#output.logstash:
# The Logstash hosts
#hosts: ["localhost:5044"]
最后附带官方文档API(
- packetbeat(用于监控网络流量)、
- filebeat(用于监听日志数据,可以替代logstash-input-file)、
- topbeat(用于搜集进程的信息、负载、内存、磁盘等数据)、
- winlogbeat(用于搜集windows事件日志)
https://www.elastic.co/guide/en/beats/winlogbeat/current/winlogbeat-configuration-details.html
更多推荐
所有评论(0)