写在前面:
elkfk 是Elasticsearch,Logstash,Kafka,Filebeat,Kibana的简称

  • ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。
  • Logstash 是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。Logstash 收集数据对象就是日志文件。由于日志文件来源多(如:系统日志、服务器 日志等),且内容杂乱,不便于人类进行观察。因此,我们可以使用 Logstash 对日志文件进行收集和统一过滤,变成可读性高的内容,方便开发者或运维人员观察,从而有效的分析系统/项目运行的性能,做好监控和预警的准备工作等。
  • Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据
  • FileBeat是转发和集中日志数据的轻量级工具。作为服务器上的代理安装,filebeat监视您指定的日志文件或位置,收集日志事件,并将其转发到ElasticSearch或Logstash进行索引。由于logstash相对而言较于耗性能,故采用Filebeat进行数据采集。
  • Kibana是一个开源的分析和可视化平台,设计用于和Elasticsearch一起工作。用Kibana来搜索,查看,并和存储在Elasticsearch索引中的数据进行交互

本文测试环境:centos7, Elasticsearch 6.6.1, Logstash 6.6.1,Kafka 2.12-2.1.1, FileBeat 6.6.1, Kibana 6.6.1,测试机器ip:47.99.200.71,由于elk需要占用较大的内存请确保有 2g 的内存以便提供测试
架构图:

一、elkfk环境搭建(已有环境可跳过)

1.elastcsearch环境

前提设置:调高JVM线程数限制数量
在centos窗口中,修改配置sysctl.conf

vim /etc/sysctl.conf

在最后一行加入如下内容:

vm.max_map_count=262144 

退出保存文件后,启用配置:

sysctl -p

注:这一步是为了防止启动容器时,报出如下错误:
bootstrap checks failed max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]

以下的操作请自行打开对应防火墙端口,或者配置安全组规则,本文中不再赘述。

1.1 在centos中 创建对应映射目录 /home/software/es/data,以及编写/home/software/elasticsearch/config/下es-single.yml,内容如下
network.bind_host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"

注:是为了解决其他地址可以访问,以及跨域问题

1.2 在centos中 执行如下命令搭建elasticsearch单例实例
 chmod 777 /home/software/es/data
 docker run -e ES_JAVA_OPTS="-Xms256m -Xmx256m" -d -p 9210:9200 -p 9310:9300  -e "discovery.type=single-node" -v /home/software/es/data:/usr/share/elasticsearch/data -v /home/software/elasticsearch/config/es-single.yml:/usr/share/elasticsearch/config/elasticsearch.yml --name es-single elasticsearch:6.6.1

注:该文只提供单例模式,读者可以查看《docker简易搭建ElasticSearch集群》https://blog.csdn.net/belonghuang157405/article/details/83301937
注:如果ELK选的6.X版本的,那么读者需将data目录 开启777权限=> chmod 777 data .防止出现java.nio.file.AccessDeniedException: /usr/share/elasticsearch/data/nodes 错误

2.kafka环境

2.1 在centos中,拉取zookeeper镜像,以及创建zookeeper容器:
docker run -d -p 2181:2181 -v /home/software/zookeeperhost/zookeeperDataDir:/data -v /home/software/zookeeperhost/zookeeperDataLogDir:/datalog  -e ZOO_MY_ID=1 -e ZOO_SERVERS='server.1=47.99.200.71:2888:3888'    --name zookeeper-single --privileged zookeeper

*注:该文只提供单例模式,读者可以查看《docker简易搭建ElasticSearch集群》https://blog.csdn.net/belonghuang157405/article/details/83301937 *

2.2 在centos中,创建kafka容器:
docker run  -d --name kafka -p 9092:9092  --env KAFKA_ADVERTISED_HOST_NAME=47.99.200.71 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"  -e KAFKA_ZOOKEEPER_CONNECT=47.99.200.71:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://47.99.200.71:9092  -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092  wurstmeister/kafka 

注:集群模式请移步至《docker简易搭建Zookeeper集群》 https://blog.csdn.net/belonghuang157405/article/details/81670853

3.logstash环境

3.1 在centos的/home/software/logstash/下 创建logstash.yml,文件内容如下:
xpack:
  monitoring:
    enabled:true

注:此处开启elasticsearch监控,该这里不做过多说明,读者可以根据需求自定义配置,具体配置请参考官网https://www.elastic.co/guide/en/logstash/current/logstash-settings-file.html

3.2 在centos的/home/software/pipeline/下 创建pipeline.conf,文件内容如下:
input {
   kafka {
    add_field => {"type"=>"kafka"}
    bootstrap_servers => "47.99.200.71:9092"
    topics => ["logstash-kafka"]
    group_id => "logstash-kafka-group"
    codec => "plain"
    consumer_threads => 1
    decorate_events => true
  }

}

output {
    if [type] == "kafka" {
          elasticsearch {
        #ES地址
                hosts => "47.99.200.71:9210"
                action => "index"
        #指定索引名字,不适用默认的,用来区分各个项目
               index => "kafka-tag-%{+YYYY.MM.dd}"
        }      
    }  
        stdout { codec => rubydebug}
}

注:这里定义一个输入一个输出,通过add_field自定义属性 用于输出判断

3.3 在centos中 执行如下命令搭建logstash单例环境
docker run -p 9600:9600 -d --privileged -e xpack.monitoring.elasticsearch.url=http://47.99.200.71:9210    -v /home/software/logstash/logstash.yml:/etc/logstash/logstash.yml  -v /home/software/pipeline:/usr/share/logstash/pipeline/ --name logstash-kafka-single  logstash:6.6.1 -f /usr/share/logstash/pipeline/pipeline.conf

注: xpack.monitoring.elasticsearch.url 默认值 http://elasticsearch:9200

4.FileBeat环境

4.1 在centos的/home/software/filebeat/下 创建filebeat.yml,文件内容如下:
filebeat.inputs:
- type: tcp
  max_message_size: 10MiB
  host: "0.0.0.0:9000"
output.kafka:
  enabled: true
  hosts: ["47.99.200.71:9092"]
  max_retries: 5
  timeout: 300
  topic: "logstash-kafka"

注:此处设置了filebeat tcp输入方式 kafka输出方式,该这里不做过多说明,读者可以根据需求自定义配置,具体配置请参考官网https://www.elastic.co/guide/en/beats/filebeat/current/configuration-filebeat-options.html

4.2 在centos中 执行如下命令搭建logstash单例环境
docker run -d -p 9000:9000 --name filebeat-kafka-single -v /home/software/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml  elastic/filebeat:6.6.1 -e -strict.perms=false  

5.kibana环境

5.1 在centos中 执行如下命令搭建kibana环境
docker run -d --name kibana-single -p 5610:5601  -e ELASTICSEARCH_URL=http://47.99.200.71:9210 -d kibana:6.6.1

注:ELASTICSEARCH_URL elasticsearch环境的地址

二、springboot集成logstash

demo项目地址:https://github.com/Blankwhiter/elk-springboot

1.springboot加入 logstash-logback-encoder依赖,pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>elk-test</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>elk-test</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--logstash 依赖-->
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>5.3</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.编写application.properties
logging.config=classpath:logback-spring.xml
LOGSTASH_HOST=47.99.200.71
LOGSTASH_PORT=5000
3.在resource目录下新建 logback-spring.xml,内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/base.xml"/>
    <!--引入在application.properties文件中的变量-->
    <property resource="application.properties"></property>

    <!--定义logstash 传输方式 以及地址-->
    <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>${LOGSTASH_HOST}:${LOGSTASH_PORT}</destination>
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>
    </appender>
    
    <root level="INFO">
        <appender-ref ref="LOGSTASH"/>
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

4.测试代码
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ElkTestApplicationTests {


    @Value("${LOGSTASH_HOST}")
    private String ip;

    private final static Logger log = LoggerFactory.getLogger(ElkTestApplication.class);
    @Test
    public void test() {
        log.info("ip:{}的filebeat-kafka-logstash  测试 info 成功了!!!",ip);
        log.warn("ip:{}的filebeat-kafka-logstash   测试 warn 成功了!!!",ip);
        log.error("ip:{}的filebeat-kafka-logstash   测试 error 成功了!!",ip);
    }
    
}

三、测试是否集成成功

1.访问 http:/47.99.200.71:8088/app/kibana
2.点击 Management 菜单

3.选择 Kibana 下的 Index Patterns,并填写 logstash.conf 配置的index,类似kafka-tag*进行模糊匹配即可

4.选择时间过滤 进行每日的过滤选择,读者可以根据自身需求

5.点击Create Index pattern 按钮创建索引
6.点击Discover菜单 查看采集列表

点击该菜单后查看数据结果

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐