ELK+Kafka从0开始

简介

(1)Kafka:接收用户日志的消息队列

(2)Logstash:做日志解析,统一成json输出给Elasticsearch

(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。

(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。

(5)Zookeeper: 状态管理,监控进程等服务

切换到JDK1.8

最新的软件都是要求JDK1.8,建议首先切换到JDK版本

yum search jdk
yum install java-1.8.0-openjdk.x86_64 -y

//修改JDK环境配置(例如:)
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_31  
export JRE_HOME=${JAVA_HOME}/jre  
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export  PATH=${JAVA_HOME}/bin:$PATH  

1.elasticsearch

官网docs:https://www.elastic.co/guide/en/elasticsearch/reference/current/rpm.html

1.下载安装包

**wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/rpm/elasticsearch/2.3.5/elasticsearch-2.3.5.rpm**

2.解压启动

**sudo rpm --install elasticsearch-2.3.5.rpm**

配置成系统启动时自动启动:

**sudo systemctl daemon-reload**

**sudo systemctl enable elasticsearch.service**

启动:

**sudo systemctl start elasticsearch.service**

3.查看是否启动:

**systemctl status elasticsearch**

4.查找elasticsearch路径,查看修改elasticsearch.yum

**whereis elasticsearch**

**vi /etc/elasticsearch/elasticsearch.yml**

注意这里要修改host和port

这里我将ip修改成里本机的ip地址,才能让外网访问,修改完成后注意重启.

5.服务的启动和停止

**sudo systemctl start elasticsearch.service**

**sudo systemctl stop elasticsearch.service**

6.无法启动,查看错误日志

**journalctl -xe**

**sudo journalctl --unit elasticsearch**

根据时间搜索:eg

**sudo journalctl --unit elasticsearch --since  "2016-10-30 18:17:16"**

7.启动成功,测试

**curl -XGET "127.0.0.1:9200"**

在浏览器访问:

2.安装kibana

1.下载

wget https://download.elastic.co/kibana/kibana/kibana-4.5.1-1.x86_64.rpm

2.安装

yum localinstall kibana-4.5.1-1.x86_64.rpm -y


3.启动
sudo systemctl start kibana

查看是否启动成功

systemctl status kibana

4.浏览器访问kibana

3.安装logstash,以及添加配置文件

1.下载logstash

wget -c https://download.elastic.co/logstash/logstash/packages/centos/logstash-2.4.1.noarch.rpm

这里注意;logstash 5.x和6.x版本都需要JDK1.8,而服务器jdk1.7版本所以并没有下载那么高的版本,只能选在2.x

2.安装

yum localinstall logstash-2.4.1.noarch.rpm –y

3.生成证书

[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b pki]# cd tls
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b tls]# ls
cert.pem  certs  misc  openssl.cnf  private

//使用下面命令
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b tls]# openssl req -subj '/CN=elk.test.com/' -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout private/logstash-forwarder.key -out certs/logstash-forwarder.crt

Generating a 2048 bit RSA private key
..+++
.....................................................................+++
writing new private key to 'private/logstash-forwarder.key'
-----

4.之后创建logstash 的配置文件。如下:

cd /etc/logstash/conf.d/
vi 01-logstash-initial.conf

将下面配置写入

input {
  beats {
    port => 5000
    type => "logs"
    ssl => true
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
        }
      }

监听TCP 5044端口上beats 输入,使用上面创建的SSL证书加密,当然你要是不想使用这种方式还可以配置路径收集

input{
file{
    path =>"/Develop/Tools/apache-tomcat-8.0.30/webapps/nggirllog/access.log"
    start_position=>"beginning"
   }
 }

创建一个名为10-syslog-filter.conf的配置文件,我们将为syslog消息添加一个过滤器

 filter {
   if [type] == "syslog-beat" {
     grok {
       match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: % {GREEDYDATA:syslog_message}" }
       add_field => [ "received_at", "%  {@timestamp}" ]
       add_field => [ "received_from", "% {host}" ]
      }
     geoip {
        source => "clientip"
     }
     syslog_pri {}
     date {
       match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
    }
  }

这个输出基本上配置Logstash来存储input数据到Elasticsearch中,运行在localhost:9200

 output {
  elasticsearch { }
  stdout { codec => rubydebug }

}

5.启动logstash,刚才配置文件端口写的5000

启动
sudo systemctl start logstash

查看是否启动成功
systemctl status logstash

6.查询一下当前启动的三个软件的运行状态

netstat -nltp

ok,测试一下,效果

这个时候其实已经能初步的使用了,你可以将一个java项目的跑在服务器上将日志输出路径陪成和input路径相同,大概能实现如下效果:

4.kafka

Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制:

Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息

Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

kafka官方文档:
http://kafka.apache.org/documentation

kafka工作原理:https://www.cnblogs.com/hei12138/p/7805475.html

1.从kafka官网下载,并解压kafka

首先确保你的机器上安装了jdk**(最新版kafka需要使用jdk1.8)**,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境.

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz

//解压
tar -zxvf kafka_2.11-1.0.0.tgz

2.修改配置

//进入到kafka的安装目录下
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b config]# cd /home/centos/kafka_2.12-1.0.0
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b kafka_2.12-1.0.0]# ls
bin  config  libs  LICENSE  NOTICE  site-docs
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b kafka_2.12-1.0.0]# cd config/
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b config]# ls
connect-console-sink.propertiesconnect-distributed.properties  connect-file-source.properties  connect-standalone.properties  log4j.properties server.properties   zookeeper.properties
connect-console-source.properties  connect-file-sink.propertiesconnect-log4j.propertiesconsumer.propertiesproducer.properties  tools-log4j.properties

修改service.properties文件

#此Broker的ID,集群中每个Broker的ID不可相同

broker.id=0

#监听器,端口号与port一致即可

listeners=PLAINTEXT://localhostls:9092

#Broker监听的端口

port=19092

#Broker的Hostname,填主机IP即可

host.name=172.16.38.176

#向Producer和Consumer建议连接的Hostname和port(此处有坑,具体见后)

advertised.host.name=172.16.38.176

advertised.port=9092

#进行IO的线程数,应大于主机磁盘数

num.io.threads=8

#消息文件存储的路径

log.dirs=/data/kafka-logs

#消息文件清理周期,即清理x小时前的消息记录

log.retention.hours=168

#每个Topic默认的分区数,一般在创建Topic时都会指定分区数,所以这个配成1就行了

num.partitions=1

#Zookeeper连接串,此处填写上一节中安装的三个zk节点的ip和端口即可

zookeeper.connect=172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182

3.启动zookeeper,kafka

进入到kafka目录下,启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

启动zookeeper成功后会看到如下的输出:

cd进入kafka解压目录,启动kafka

bin/kafka-server-start.sh config/server.properties

启动kafka成功后会看到如下的输出:

4.第一个消息

4.1 创建一个topic(主题)

Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷.

在kafka解压目录打开,输入,创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

创建一个名为test的topic

来查看已经创建的topic

bin/kafka-topics.sh --list --zookeeper localhost:2181  
4.2 创建一个消息消费者

在kafka解压目录打开终端,输入,创建一个消息消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
4.3 创建一个消息生产者

在kafka解压目录打开一个新的终端,输入,创建一个消息生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

输入信息:

查看输出:

4.4 如何通过ip修改,外网访问

进入到config目录下,修改

//listeners = listener_name://host_name:port
//EXAMPLE:
//listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

启动时注意修改ip

kafka log存放目录:

log.dirs=/tmp/kafka-logs
4.5 修改logstash的配置,让logstash消费kafka的消息
input{
  kafka {
   zk_connect => "172.28.50.143:2181"
   group_id => "logstash"
   topic_id => "test1"
   reset_beginning => false
   consumer_threads => 5
   decorate_events => true
  }
}

这里配置完成之后需要重启logstash,这里的配置使logstash变成了kafka对应topic:test1的消费方.

5.向fakfa中放Message

这里向kafka中放消息是很灵活的,spring官网有对kafka的支持:

官方文档地址:https://docs.spring.io/spring-kafka/docs/2.0.3.RELEASE/reference/html/

pom:

        <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.0.3.RELEASE</version>
        </dependency>

使用KafkaTemplate向kafka中放入消息;

kafka的configBean

配置参数:

kafka.host.name = 172.28.50.143:9092
kafka.topic.name = test1

当然KafkaTemplate也可以用来消费kafka中的消息,我们这里不需要.

这里实际的代码是很简单的,可以使用过滤器来统一打印Controller的日志,同时将这个项目打成一个jar依赖到其他项目中,尽量减少代码入侵.

代码git地址:
https://github.com/Houlintao1/hou_start1.git

这只是这几天初步研究的简单的demo,理解不深,后续还会补充.

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐