logstash采集数据到es
日志格式:{"Q1":62442457475833333,"Q2":2016811232228686,"date":1556186700487}logstash配置文件:input {kafka {zk_connect => "localhost:2181"group_id => "test"topic_id => "test"...
·
日志格式:
{"Q1":62442457475833333,"Q2":2016811232228686,"date":1556186700487}
logstash配置文件:
input {
kafka {
zk_connect => "localhost:2181"
group_id => "test"
topic_id => "test"
#auto_offset_reset => "smallest"
}
}
filter {
date {
match => ["date", "yyyy-MM-dd HH:mm:ss.SS", "UNIX_MS"]
target => "@timestamp"
locale => "cn"
}
ruby {
code => '
require "json"
begin
event["msg_time"] = event["@timestamp"].time.getlocal("+08:00").strftime "%Y-%m-%d %H:%M:%S"
event["date_id"] = event["@timestamp"].time.getlocal("+08:00").strftime "%Y-%m-%d"
event["hours"] = event["@timestamp"].time.getlocal("+08:00").strftime("%H").to_i
# 为了保证写到es中Q1+Q2唯一,所以先把他们组合起来
event["doc_id"] = event["Q1"].to_s + event["Q2"].to_s
rescue Exception => e
end
'
}
}
output {
if "_jsonparsefailure" not in [tags]{
stdout{codec=>"rubydebug"}
elasticsearch {
# http默认端口9200
protocol => "http"
#刷新数据的大小
flush_size => 5000
#没满足上面配置的话,5s后也会写出数据
idle_flush_time => 5
# es集群
cluster => "es"
host => ["localhost:9200"]
index => "stat-logstash-q"
# 保证写入到es的数据唯一
document_id => "%{doc_id}"
}
}
}
遇到的问题:
1.刚开始写入一直报错:failed action with response of 404, dropping action: ["index", {
经排查,环境是没有问题的,后来发现是
es配置了 自动创建index的名称正则规则,而我之前的index => logstash-q,不符合规范,所以一直失败,改了符合配置的规范
index => "stat-logstash-q" 就ok了。
2.需求方要要求写入es的数据不能重复,即Q1+Q2唯一,通过查看logstash官网得知,document_id属性就是控制唯一的,同样的数据会执行update操作
所以先把Q1和Q2组装起来,然后使 document_id等于组装的值
更多推荐
已为社区贡献4条内容
所有评论(0)