Logstash 连接 Kafka 指南
总体思路还是之前的场景,我需要把各个业务的线上服务器日志接入到统一的日志处理平台中。具体会用 Kafka 做中间件,所以需要解决的就是如何把日志传到 Kafka。原先的考虑是利用系统自带的 rsyslog,这样我只需要自动配置一下 rsyslog 的处理发送规则就可以了,免去了安装和维护的麻烦。但是系统自带的 rsyslog 版本太低,所以到头来还是要更新维护,那就不如直接用更强大且更好用的 ..
总体思路
还是之前的场景,我需要把各个业务的线上服务器日志接入到统一的日志处理平台中。具体会用 Kafka 做中间件,所以需要解决的就是如何把日志传到 Kafka。原先的考虑是利用系统自带的 rsyslog,这样我只需要自动配置一下 rsyslog 的处理发送规则就可以了,免去了安装和维护的麻烦。但是系统自带的 rsyslog 版本太低,所以到头来还是要更新维护,那就不如直接用更强大且更好用的 Logstash 了。
需要注意的有两点:
- 不要即时推送日志,以免增加服务器负担
- 能够妥善处理 logrotate 的情况
幸运的是,这 Logstash 都考虑到了,我们只需要简单配置一下即可。
安装 Java
因为大部分线上服务器跑的是 Ruby,所以需要先安装一下 Java
sudo add-apt-repository -y ppa:webupd8team/java sudo apt-get update sudo apt-get -y install oracle-java8-installer |
安装 Logstash
在 ELK 指南中介绍了用 apt-get
进行安装的方法,这里介绍如何手动下载安装
- 下载到服务器
wget https://download.elastic.co/logstash/logstash/logstash-all-plugins-2.3.4.tar.gz
- 解压
tar -xvzf logstash-all-plugins-2.3.4.tar.gz
- 进入 Logstash 文件夹并创建配置文件夹(个人习惯)
cd logstash-2.3.4; mkdir confs
之后所有的配置文件均可放在 confs
文件夹中。
配置 Logstash 到 Kafka
这里 Logstash 相当于 producer
Input 读取文件
Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫 .sincedb
的数据库文件来跟踪被监听的日志文件的当前读取位置。通过记录下来的 inode
, major number
, minor number
和 pos
就可以保证不漏过每一条日志。一个可能的配置文件是这样的
input { file { path => ["/data/home/service/project/current/log/logstash_production.log", "/data/home/service/project/current/log/logstash_production.log.1"] codec => "json" add_field => { "topic" => "djiservice"} stat_interval => 1800 } } |
这里说一下 File rotation 的情况,为了处理被 rotate 的情况,最好把 rotate 之后的文件名也加到 path 中(如上面所示),这里注意,如果 start_position
被设为 beginning
,被 rotate 的文件因为会被认为是新文件,而重新导入。如果用默认值 end
,那么在最后一次读之后到被 rotate 结束前生成的日志不会被采集。
其他一些配置的设定原因
add_field
添加一个 topic 字段,用作之后导入 elasticsearch 的索引标识stat_interval
单位是秒,这里 30 分钟进行一次检测,不过测试的时候需要去掉这个配置codec
因为已经处理成 logstash 兼容格式,就直接以 json 解析
Filter 内容定制
Filter 主要是对数据进行一些处理,比如说我用的是:
filter { mutate { remove_field => ["format"] } geoip { source => "ip" fields => ["location", "city_name", "country_name", "country_code2","country_code3", "region_name", "continent_code"] } } |
这里做的操作一是移除无效的域,二是把 ip 转换为地理位置,方便后期的处理。
Output 输出到 Kafka
因为 Logstash 自带 Kafka 插件,直接配置上即可,比如:
output { kafka { topic_id => "test" bootstrap_servers => "kafka_url:port" } } |
下面是基本的配置及其设定原因
topic_id
指定 topic 来进行发送bootstrap_servers
这里是 Kafka 的接入地址
其他一些需要注意的配置
acks
可以选的值为0
,1
,all
,这里解释一下,0 表示不需要 server 返回就认为请求已完成;1 表示需要 leader 返回才认为请求完成;all 表示需要所有的服务器返回才认为请求完成batch_size
单位是字节,如果是发送到同一分区,会攒够这个大小才发送一次请求block_on_buffer_full
这个设置在缓冲区慢了之后阻塞还是直接报错buffer_memory
发送给服务器之前的缓冲区大小,单位是字节client_id
可以在这里设定有意义的名字,就不一定要用 ip 和 端口来区分compression_type
压缩方式,默认是none
,其他可选的是gzip
和snappy
利用 Logstash 从 Kafka 导出数据到 Elasticsearch
这一步就比较简单了,先从 Kafka 中读取,然后写入到 elasticsearch,这里 Logstash 作为 consumer
output { input { kafka { zk_connect => "localhost:2181" topic_id => "log" } } stdout { codec => rubydebug } elasticsearch { hosts => ["url:port"] user => "name" password => "password" index => "%{service}-%{+YYYY-MM-dd}" } } |
这样可以按照配置的服务名称和日期来切割。
至此,我们完成了从 Logstash 到 Kafka 再到 Elasticsearch 的连接,下一步就可以用 kibana 来展示日志的监控分析结果了。
更多推荐
所有评论(0)