logstash读取kafka的topics,根据内容提取指定字段然后自动创建es索引。

input {
  kafka{
     bootstrap_servers => "192.168.1.15:9092"
     auto_offset_reset => "latest"
     topics_pattern => "svc.*" #topics_pattern支持正则匹配,topics不支持
     consumer_threads => 5
     codec => "json"
  }

}
filter {
    mutate {
#        gsub => [
#          "fieldname", "#", "-"
#        ] 用于替换指定字符
        split => ["message","#"]          #分割字符串获取服务名
        add_field => {  "service" => "%{[message][3]}" }
    }        
   #下面移除不必要的字段
    mutate {
    remove_field => ["@version"]
    remove_field => ["@timestamp"]
    remove_field => ["tags"]
    remove_field => ["_id"]
    remove_field => ["_type"]
    remove_field => ["_index"]
    remove_field => ["_score"]

}

}

output {
       elasticsearch{  
            hosts => "192.168.1.15:9200" 
            index => "log-%{service}"  
        }
        stdout {
            codec => rubydebug
        }
}
 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐