日志格式:

{"Q1":62442457475833333,"Q2":2016811232228686,"date":1556186700487}

logstash配置文件:

input {
  kafka {
    zk_connect => "localhost:2181"
    group_id => "test"
    topic_id => "test"
    #auto_offset_reset => "smallest"
  }
}

filter {
date {
    match => ["date", "yyyy-MM-dd HH:mm:ss.SS", "UNIX_MS"]
      target => "@timestamp"
      locale => "cn"
    }
  ruby {
    code => '
      require "json"
      begin
        event["msg_time"] = event["@timestamp"].time.getlocal("+08:00").strftime "%Y-%m-%d %H:%M:%S"
        event["date_id"] = event["@timestamp"].time.getlocal("+08:00").strftime "%Y-%m-%d"
        event["hours"] = event["@timestamp"].time.getlocal("+08:00").strftime("%H").to_i
       # 为了保证写到es中Q1+Q2唯一,所以先把他们组合起来
       event["doc_id"] = event["Q1"].to_s + event["Q2"].to_s
      rescue Exception => e
      end
    '
  }
}

output {
  if "_jsonparsefailure" not in [tags]{
    stdout{codec=>"rubydebug"}
    elasticsearch {
       # http默认端口9200
       protocol => "http"
       #刷新数据的大小
       flush_size => 5000
       #没满足上面配置的话,5s后也会写出数据
       idle_flush_time => 5
       # es集群
       cluster => "es"
       host => ["localhost:9200"]
       index => "stat-logstash-q"
       # 保证写入到es的数据唯一
       document_id => "%{doc_id}"
   }
  }
}

遇到的问题:

1.刚开始写入一直报错:failed action with response of 404, dropping action: ["index", {

经排查,环境是没有问题的,后来发现是

es配置了 自动创建index的名称正则规则,而我之前的index => logstash-q,不符合规范,所以一直失败,改了符合配置的规范

index => "stat-logstash-q" 就ok了。

2.需求方要要求写入es的数据不能重复,即Q1+Q2唯一,通过查看logstash官网得知,document_id属性就是控制唯一的,同样的数据会执行update操作

所以先把Q1和Q2组装起来,然后使 document_id等于组装的值

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐