logstash 6+ 切割nginx日志
读取kafka日志,切割nginx access log一、准备grok pattern在logstash的home路径中,新建patterns文件夹,并新建nginx文件,放入以下内容NGINX_ACCESS %{IPORHOST:remote_addr} (?:-|(%{WORD}.%{WORD})) %{USER:remote_user} \[%{HTTPDATE:time_local...
·
读取kafka日志,切割nginx access log
一、准备grok pattern
在logstash的home路径中,新建patterns文件夹,并新建nginx文件,放入以下内容
NGINX_ACCESS %{IPORHOST:remote_addr} (?:-|(%{WORD}.%{WORD})) %{USER:remote_user} \[%{HTTPDATE:time_local}\] "(?:%{WORD:method} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|%{DATA:rawrequest})" %{NUMBER:status} (?:%{NUMBER:body_bytes_sent}|-) %{QS:http_referer} %{QS:http_user_agent} %{QS:http_x_forwarded_for} (?:%{NUMBER:request_time}|-) (?:%{NUMBER:upstream_response_time}|-)
二、新建logstash配置文件
input {
kafka{
bootstrap_servers => ["10.20.10.1:9092,10.20.10.2:9092,10.20.10.3:9092"]
client_id => "logstash-jd-sre-bigdata-85-47"
group_id => "logstash-test"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics => ["test"]
codec => json {
charset => "UTF-8"
}
}
}
filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{NGINX_ACCESS}" }
remove_field => [ "message" ]
}
mutate {
convert => {
"upstream_response_time" => "float"
"request_time" => "float"
"status" => "integer"
"body_bytes_sent" => "float"
}
}
}
output {
elasticsearch {
hosts => ["10.20.10.1:9200","10.20.10.2:9200","10.20.10.3:9200"]
index => "logstash-%{[fields][log_topic]}-%{+YYYY.MM.dd}"
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)