教你无脑在springBoot项目中集成ELK+Kafka
ELK+Kafka从0开始简介(1)Kafka:接收用户日志的消息队列(2)Logstash:做日志解析,统一成json输出给Elasticsearch(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。(4)Kibana:基于Elasticsearch的数据可视化组件
ELK+Kafka从0开始
简介
(1)Kafka:接收用户日志的消息队列
(2)Logstash:做日志解析,统一成json输出给Elasticsearch
(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。
(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。
(5)Zookeeper: 状态管理,监控进程等服务
切换到JDK1.8
最新的软件都是要求JDK1.8,建议首先切换到JDK版本
yum search jdk
yum install java-1.8.0-openjdk.x86_64 -y
//修改JDK环境配置(例如:)
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_31
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
1.elasticsearch
官网docs:https://www.elastic.co/guide/en/elasticsearch/reference/current/rpm.html
1.下载安装包
**wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/rpm/elasticsearch/2.3.5/elasticsearch-2.3.5.rpm**
2.解压启动
**sudo rpm --install elasticsearch-2.3.5.rpm**
配置成系统启动时自动启动:
**sudo systemctl daemon-reload**
**sudo systemctl enable elasticsearch.service**
启动:
**sudo systemctl start elasticsearch.service**
3.查看是否启动:
**systemctl status elasticsearch**
4.查找elasticsearch路径,查看修改elasticsearch.yum
**whereis elasticsearch**
**vi /etc/elasticsearch/elasticsearch.yml**
注意这里要修改host和port
这里我将ip修改成里本机的ip地址,才能让外网访问,修改完成后注意重启.
5.服务的启动和停止
**sudo systemctl start elasticsearch.service**
**sudo systemctl stop elasticsearch.service**
6.无法启动,查看错误日志
**journalctl -xe**
**sudo journalctl --unit elasticsearch**
根据时间搜索:eg
**sudo journalctl --unit elasticsearch --since "2016-10-30 18:17:16"**
7.启动成功,测试
**curl -XGET "127.0.0.1:9200"**
在浏览器访问:
2.安装kibana
1.下载
wget https://download.elastic.co/kibana/kibana/kibana-4.5.1-1.x86_64.rpm
2.安装
yum localinstall kibana-4.5.1-1.x86_64.rpm -y
3.启动
sudo systemctl start kibana
查看是否启动成功
systemctl status kibana
4.浏览器访问kibana
3.安装logstash,以及添加配置文件
1.下载logstash
wget -c https://download.elastic.co/logstash/logstash/packages/centos/logstash-2.4.1.noarch.rpm
这里注意;logstash 5.x和6.x版本都需要JDK1.8,而服务器jdk1.7版本所以并没有下载那么高的版本,只能选在2.x
2.安装
yum localinstall logstash-2.4.1.noarch.rpm –y
3.生成证书
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b pki]# cd tls
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b tls]# ls
cert.pem certs misc openssl.cnf private
//使用下面命令
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b tls]# openssl req -subj '/CN=elk.test.com/' -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout private/logstash-forwarder.key -out certs/logstash-forwarder.crt
Generating a 2048 bit RSA private key
..+++
.....................................................................+++
writing new private key to 'private/logstash-forwarder.key'
-----
4.之后创建logstash 的配置文件。如下:
cd /etc/logstash/conf.d/
vi 01-logstash-initial.conf
将下面配置写入
input {
beats {
port => 5000
type => "logs"
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
}
}
监听TCP 5044端口上beats 输入,使用上面创建的SSL证书加密,当然你要是不想使用这种方式还可以配置路径收集
input{
file{
path =>"/Develop/Tools/apache-tomcat-8.0.30/webapps/nggirllog/access.log"
start_position=>"beginning"
}
}
创建一个名为10-syslog-filter.conf的配置文件,我们将为syslog消息添加一个过滤器
filter {
if [type] == "syslog-beat" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: % {GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "% {@timestamp}" ]
add_field => [ "received_from", "% {host}" ]
}
geoip {
source => "clientip"
}
syslog_pri {}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
这个输出基本上配置Logstash来存储input数据到Elasticsearch中,运行在localhost:9200
output {
elasticsearch { }
stdout { codec => rubydebug }
}
5.启动logstash,刚才配置文件端口写的5000
启动
sudo systemctl start logstash
查看是否启动成功
systemctl status logstash
6.查询一下当前启动的三个软件的运行状态
netstat -nltp
ok,测试一下,效果
这个时候其实已经能初步的使用了,你可以将一个java项目的跑在服务器上将日志输出路径陪成和input路径相同,大概能实现如下效果:
4.kafka
Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制:
Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。
Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息
Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。
kafka官方文档:
http://kafka.apache.org/documentation
kafka工作原理:https://www.cnblogs.com/hei12138/p/7805475.html
1.从kafka官网下载,并解压kafka
首先确保你的机器上安装了jdk**(最新版kafka需要使用jdk1.8)**,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境.
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
//解压
tar -zxvf kafka_2.11-1.0.0.tgz
2.修改配置
//进入到kafka的安装目录下
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b config]# cd /home/centos/kafka_2.12-1.0.0
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b kafka_2.12-1.0.0]# ls
bin config libs LICENSE NOTICE site-docs
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b kafka_2.12-1.0.0]# cd config/
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b config]# ls
connect-console-sink.propertiesconnect-distributed.properties connect-file-source.properties connect-standalone.properties log4j.properties server.properties zookeeper.properties
connect-console-source.properties connect-file-sink.propertiesconnect-log4j.propertiesconsumer.propertiesproducer.properties tools-log4j.properties
修改service.properties文件
#此Broker的ID,集群中每个Broker的ID不可相同
broker.id=0
#监听器,端口号与port一致即可
listeners=PLAINTEXT://localhostls:9092
#Broker监听的端口
port=19092
#Broker的Hostname,填主机IP即可
host.name=172.16.38.176
#向Producer和Consumer建议连接的Hostname和port(此处有坑,具体见后)
advertised.host.name=172.16.38.176
advertised.port=9092
#进行IO的线程数,应大于主机磁盘数
num.io.threads=8
#消息文件存储的路径
log.dirs=/data/kafka-logs
#消息文件清理周期,即清理x小时前的消息记录
log.retention.hours=168
#每个Topic默认的分区数,一般在创建Topic时都会指定分区数,所以这个配成1就行了
num.partitions=1
#Zookeeper连接串,此处填写上一节中安装的三个zk节点的ip和端口即可
zookeeper.connect=172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182
3.启动zookeeper,kafka
进入到kafka目录下,启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
启动zookeeper成功后会看到如下的输出:
cd进入kafka解压目录,启动kafka
bin/kafka-server-start.sh config/server.properties
启动kafka成功后会看到如下的输出:
4.第一个消息
4.1 创建一个topic(主题)
Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷.
在kafka解压目录打开,输入,创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
创建一个名为test的topic
来查看已经创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
4.2 创建一个消息消费者
在kafka解压目录打开终端,输入,创建一个消息消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
4.3 创建一个消息生产者
在kafka解压目录打开一个新的终端,输入,创建一个消息生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
输入信息:
查看输出:
4.4 如何通过ip修改,外网访问
进入到config目录下,修改
//listeners = listener_name://host_name:port
//EXAMPLE:
//listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092
启动时注意修改ip
kafka log存放目录:
log.dirs=/tmp/kafka-logs
4.5 修改logstash的配置,让logstash消费kafka的消息
input{
kafka {
zk_connect => "172.28.50.143:2181"
group_id => "logstash"
topic_id => "test1"
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
这里配置完成之后需要重启logstash,这里的配置使logstash变成了kafka对应topic:test1的消费方.
5.向fakfa中放Message
这里向kafka中放消息是很灵活的,spring官网有对kafka的支持:
官方文档地址:https://docs.spring.io/spring-kafka/docs/2.0.3.RELEASE/reference/html/
pom:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
使用KafkaTemplate向kafka中放入消息;
kafka的configBean
配置参数:
kafka.host.name = 172.28.50.143:9092
kafka.topic.name = test1
当然KafkaTemplate也可以用来消费kafka中的消息,我们这里不需要.
这里实际的代码是很简单的,可以使用过滤器来统一打印Controller的日志,同时将这个项目打成一个jar依赖到其他项目中,尽量减少代码入侵.
代码git地址:
https://github.com/Houlintao1/hou_start1.git
这只是这几天初步研究的简单的demo,理解不深,后续还会补充.
更多推荐
所有评论(0)