Spring Cloud ELK+kafka日志分析平台(二)优化
一、概述
在笔者的上一篇博客介绍了Spring Cloud ELK+kafka日志分析平台的搭建,http://xuyangyang.club/articles/2018/05/24/1527176074152.html,但是笔者在测试环境中发现,在logstash采用了grok插件去处理日志埋点和解析的时候发现了高资源占用,在阿里云8核16G的服务器部署后,测试环境大概每秒不超过几百条的日志的解析下竟然CPU占用高达95%左右,笔者分析了其中的原因,首先由于几个服务的日志格式相关配置还没有落地导致了日志格式无法解析和解析慢,其次grok天生解析正则表达式非常耗CPU。在笔者分析原因后立马进行了改造,将无法解析的格式不做任何处理直接输出到elasticsearch,另外多起了同一个group的基于kafka的logstash去消费日志,发现CPU占用虽然降低了,但任然在高达70%的CPU占用,这使得笔者不得不放弃原先的logstash插件grok,采用了ruby来解析日志和处理日志埋点,并且这次同时对docker启动shell脚本做相应修改,原先由于书写filter导致脚本杂乱无法查看改为启动传入配置文件的方式。
二、改造logstash
本次主要的问题主要发生在logstash的grok插件,在笔者查阅相关资料中发现,logstash提供了grok、mutate、ruby、json等等多种filter插件,由于我们的日志非表中的json格式,所以放弃了测试json插件,而笔者在ruby插件的介绍中发现官方对其颇为称赞,它支持ruby语法,对一些字符串、循环、判断能更方便,并且可以操作event对象(其实就是logstash对象),所以笔者采用了ruby插件。以下为笔者所写的logstash配置文件示例:

filter {
    ruby{
        code => "
            array=event.get('message').split('|')
            if array.length==9 
                event.set('localDate',array.at(0))
                event.set('level',array.at(1))
                event.set('thread',array.at(2))
                event.set('class',array.at(3))
                event.set('method',array.at(4))
                event.set('line',array.at(5))
                event.set('service',array.at(6))
                event.set('ip',array.at(7))
                event.set('content',array.at(8))
            elsif array.length==10
                event.set('localDate',array.at(0))
                event.set('level',array.at(1))
                event.set('thread',array.at(2))
                event.set('class',array.at(3))
                event.set('method',array.at(4))
                event.set('line',array.at(5))
                event.set('service',array.at(6))
                event.set('ip',array.at(7))
                event.set('uid',array.at(8))
                event.set('content',array.at(9))
            else
                content= event.get('message')
                event.set('content',content)
            end
            "
        remove_field=>"message"
    }
}

笔者对日志输出做了修改,将日志用“|”分割了一下,并日志埋点了uid,在ruby中解析到字符串数组后,通过判断它的length来判断它是哪种类型的数据,从而进行解析处理,最后将冗余字段message删除,需要注意的时在解析日志之前最好手动去创建elasticsearch的index,如果不创建,logstash在解析到日志会自动创建,在采用如上配置的解析后,最后用kibana分析的时候发现字段确实解析和处理了,但是类型都是String,例如日志埋点的uid或者其他属性需要做一些过滤就不是很好用了,因此需要提前手动创建并设置类型。最后再说一下kibana,在第一次配置完索引后字段不会马上加载出来,也可能点开具体查看的时候我们解析的fileld报感叹号,这是因为kibana还没加载到索引的全部字段,可以去kibana的index管理中去refresh下就好了。以下附上创建elasticsearch索引的相关代码示例(在kibana中可以直接调试使用):

PUT /logstash-index
{  
 "mappings" : {  
  "_default_" : {  
   "properties" : {  
    "uid" : {"type": "long"},
    "localDate" : { "type" : "text" },  
    "level" : { "type" : "text" }  ,
    "thread" : { "type" : "text" } ,
    "class" : { "type" : "text" } ,
    "method" : { "type" : "text" } ,
    "service" : { "type" : "text" } ,
    "ip" : { "type" : "text" } ,
    "line" : { "type" : "text" } ,
    "content" : { "type" : "text" }
   }  
  }  
 }  
}

三、改造启动脚本
原先因为笔者想尽快测试使用所以采用了docker -e的方式,结果导致脚本看起来杂乱不堪,现在附上新的docker启动方式,通过传配置文件路径进行docker部署启动

docker run -idt -v /usr/local/elk/logstash.conf:/usr/local/logstash.conf --name logstash logstash -f /usr/local/logstash.conf

其中通过-v的方式映射配置文件路径,方式为 -v 宿主机路径:容器内路径,-f为logstash的启动命令,可以传入配置文件路径。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐