filebeat + kafka + logstash + ES + Kibana 整合使用

环境准备
  • JDK 1.8
  • filebeat 6.4.1
  • kafka 0.10.2.0
  • logstash 6.4.1
  • elasticsearch 6.4.1
  • Kibana 6.4.1
kafka
  • 创建topic
kafka-topics --zookeeper 192.168.23.121,192.168.23.122,192.168.23.123 --create --partitions 3 --replication-factor 3 --topic nginx-data001
filebeat
  • 修改配置文件
cd /etc/filebeat/
vi filebeat.yml
  • filebeat.yml 添加如下配置,输入为Nginx日志目录,输出为kafka
filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
   - /data01/datadir_test/nginx_logs/dataLKOne/access.log
output.kafka:
   enable: true
   hosts: ["dsgcd4121:9092","dsgcd4122:9092","dsgcd4123:9092"]
   topic: 'nginx-data001'
   version: '0.10.2.0'
  • 启动 filebeat
service filebeat start
logstash
  • 自定义 logstash patterns
mkdir -p /usr/local/logstash/patterns
vi /usr/local/logstash/patterns/nginx
  • nginx文件内容如下:
QS1 (.*?)
NGINXACCESS %{IPORHOST:clientip} - %{USERNAME:remote_user} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:http_verb} %{NOTSPACE:http_request}(?: HTTP/%{NUMBER:http_version})?|%{DATA:raw_http_request})\" (%{IPORHOST:http_host}.%{WORD:http_port}) %{NUMBER:response_status} %{NUMBER:response_length} (?:%{NUMBER:bytes_read}|-) %{QS1:referrer} %{QS1:agent}  %{NUMBER:request_time:float} %{NUMBER:upstream_response_time:float}
  • 创建配置文件
cd /etc/logstash/conf.d/
vi nginx_datalkone.conf
  • nginx_datalkone.conf 内容如下:
input {
  kafka {
        enable_auto_commit => true
        auto_commit_interval_ms => "1000"
        codec => "json"
        bootstrap_servers => "192.168.23.121:9092,192.168.23.122:9092,192.168.23.123:9092"
        topics => ["nginx-data001"]
  }
}
filter {
     grok {
        patterns_dir => "/usr/local/logstash/patterns"
        match => { "message" => "%{NGINXACCESS}" }
        remove_field => ["message"]
     }
     urldecode {
          all_fields => true
     }
     geoip {
        source => "clientip"
     }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "nginx-data-%{+YYYY.MM.dd}"
    }
}
  • 启动logstashinitctl start logstash
ElasticSearch
  • 通过curl 'localhost:9200/_cat/indices?v'可以查看是否写入了数据
176># curl 'localhost:9200/_cat/indices?v'
health status index                 uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   .kibana               iLXOPq3wTc2PECVeXtXzOw   1   1          2            0      7.7kb          7.7kb
yellow open   nginx-data-2018.09.28 ORbbgojRTEGJVSl_jN2MXg   5   1          0            0       401b           401b
在kibana中配置index

····

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐