读取kafka日志,切割nginx access log

一、准备grok pattern

在logstash的home路径中,新建patterns文件夹,并新建nginx文件,放入以下内容

NGINX_ACCESS %{IPORHOST:remote_addr} (?:-|(%{WORD}.%{WORD})) %{USER:remote_user} \[%{HTTPDATE:time_local}\] "(?:%{WORD:method} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|%{DATA:rawrequest})" %{NUMBER:status} (?:%{NUMBER:body_bytes_sent}|-) %{QS:http_referer} %{QS:http_user_agent} %{QS:http_x_forwarded_for} (?:%{NUMBER:request_time}|-) (?:%{NUMBER:upstream_response_time}|-)

二、新建logstash配置文件

input {
 kafka{
    bootstrap_servers => ["10.20.10.1:9092,10.20.10.2:9092,10.20.10.3:9092"]
    client_id => "logstash-jd-sre-bigdata-85-47"
    group_id => "logstash-test"
    auto_offset_reset => "latest"
    consumer_threads => 5
    decorate_events => true
    topics => ["test"]
    codec => json {
        charset => "UTF-8"
    }
  }
 
 
}
 
filter {
    grok {
        patterns_dir => ["./patterns"]
        match => { "message" => "%{NGINX_ACCESS}" }
        remove_field => [ "message" ]
    }
    mutate {
        convert => {
            "upstream_response_time" => "float"
            "request_time" => "float"
            "status" => "integer"
            "body_bytes_sent" => "float"
        }
    }
 
}
 
output {
  elasticsearch {
    hosts => ["10.20.10.1:9200","10.20.10.2:9200","10.20.10.3:9200"]
    index => "logstash-%{[fields][log_topic]}-%{+YYYY.MM.dd}"
  }
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐