只截取关键部分:

# https://go.es.io/WinlogbeatConfig
winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h
  - name: Security
  - name: System


接入kafka

#--------------------------kafka-----------------------------------
output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["192.1.1.216:9092"]
  topic: 'chao-beat0710'


kafka查询数据命令

bin/kafka-console-consumer.sh --zookeeper 192.1.1.216:2181 --topic chao-beat0710 --from-beginning


接入到hdfs


input{
        kafka{
                zk_connect=>"192.1.1.216:2181"
                topic_id=>"chao-beat0710"
        }
}
filter{


        mutate{

#替换windows中获取message数据中的\n以及\t为空
                gsub => ["message","\n",""]
                gsub => ["message","\t",""]
                #获取key值的测试,可以通过map的值,比如..beat是map,则beat下面的字段是key,可以通过beat[name]去到字段值
                update => {"message" => "%{activity_id}--%{beat}--%{beat[hostname]}"}
        }
}
output{
        webhdfs{
                host => "192.1.1.151"
                port => 50070
                path => "/chao/hdfs/test/kafka3/data.txt"
                user => "lee"
        }
        stdout{
                codec => rubydebug
        }
}


数据格式例子

取出数据在hdfs展示


附带winlogbeat接入es,以及logstash方式

#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
  #hosts: ["localhost:9200"]


  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"


#----------------------------- Logstash output --------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]



最后附带官方文档API(

  • packetbeat(用于监控网络流量)、
  • filebeat(用于监听日志数据,可以替代logstash-input-file)、
  • topbeat(用于搜集进程的信息、负载、内存、磁盘等数据)、
  • winlogbeat(用于搜集windows事件日志)
)

https://www.elastic.co/guide/en/beats/winlogbeat/current/winlogbeat-configuration-details.html

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐