最近在做 logstash 消费 kafka 写到 es 的项目中,在 kibana 中查看发现 logstash 写入的 @timastamp 比真实服务器的时间少了8个小时。并且生成的文件中,日志切分的格式也是差了8小时。例如:2019-01-01的日志里面会写入2019-01-02 08:00:00 前的日志。因为logstash默认使用的是UTC时间。
使用版本:

nameversion
logstash6.3.2
es6.2.2
kibana6.2.2

解决方法:对@timestamp重新赋值,加上8小时。直接给@timestamp赋值不行,需要一个中间变量来操作。
ruby代码:
event.set(‘timestamp’, event.get(’@timestamp’).time.localtime + 86060)
event.set(’@timestamp’,event.get(‘timestamp’))
remove_field => [“timestamp”]

完整代码如下:

input {
    kafka{
        bootstrap_servers => "kafka-1:10193,kafka-2:10193"
        group_id => "mygroup"
        topics => ["docker-log"]
        consumer_threads => 5
        decorate_events => true
        codec => json {
             charset => "UTF-8"
        }
        auto_offset_reset => "latest"
    }
}
filter {
   mutate{
        split=>["source", "/"]
          add_field => {
            "file_name" => "%{source[5]}"
        }
        remove_field => ['@version','source','fields','input_type','beat']
    }
   if "webapp" not in [docker]{
     drop {}
   }
   grok{
       match => {"message" => "\[%{TIMESTAMP_ISO8601:logTimestamp}\] {%{LOGLEVEL:level}} \[(?<thread>.*)\] %{JAVACLASS:className} %{NUMBER:lineNum} \- (?<content>.*)"}
   }
   if "controller" != [className] and "用户" not in [content]{
      drop {}
   }
   ruby {
     code => "
       if !event.get('content').start_with?'用户'
          event.cancel
       end
       event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)
       event.set('@timestamp',event.get('timestamp'))
       "
     remove_field => ["timestamp"]
   }
}
output {
    #elasticsearch {
    #    hosts => ["es-6.com:80"]
    #    index =>  "%{docker}_%{+YYYY-MM-dd}"
    #}
    stdout {
        codec => rubydebug
    }
}

注意:
高版本ruby代码中已经不能使用 event[‘name’] 取值了,需要改为:event.get(‘name’)
jruby 语法 api 参考链接
Logstash进阶指南 | 1. 万能的ruby filter

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐