主要使用 kafka logstash elasticsearch kibana进行配置日志分析收集,由于只是学习,所以在window机器上搭建

  1. kafka_2.10-0.10.1.1
    http://kafka.apache.org/downloads
  2. zookeeper-3.4.6
    http://zookeeper.apache.org/
  3. logstash-5.3.2
    https://www.elastic.co/cn/downloads/logstash
  4. elasticsearch-5.3.2
    https://www.elastic.co/cn/downloads/elasticsearch
  5. kibana-5.3.2-windows-x86

    https://www.elastic.co/cn/downloads/kibana

    由于 logstash 、kibana 、elasticsearch都是一个公司的,可各个产品首页显示的不是5.3.2 版本 ,可以点击产品页面上的View past releases. 下载对应的版本,为了避免版本冲突 建议这三个采用同一个版本

    简单的原理:
    kafka(在应用中采集日志)—-logstash(从kafka中收集日志到ES中)—-elasticsearch(负责存储日志信息)——–kibana(负责用来展示数据)

所以启动顺序:
先把zookeeper kafka elasticsearch
1. zookeeper启动

cd zookeeper-3.4.6/bin
zkServer.cmd

2.kafka服务启动

cd kafka_2.10-0.10.1.1\bin\windows
kafka-server-start.bat ../../config/server.properties
为了查看效果 顺带启动一下 kafka 消费者
kafka-console-consumer.bat –zookeeper localhost:2181 –topic mytopic –from-beginning

3.启动elasticsearch

cd elasticsearch-5.3.2\bin
elasticsearch.bat
elasticsearch启动后 可以看http://localhost:9200/
正常会返回一个json 对应你的elasticsearch的版本相关信息

4.配置kibana
首先修改一下kibana的配置文件 config\kibana.yml

cd kibana-5.3.2-windows-x86\config
在config增加一行配置
elasticsearch.url: “http://localhost:9200
cd kibana-5.3.2-windows-x86\bin
kibana.bat

启动后可以进入kibana首页查看http://localhost:5601
一开始进去会提示没有index pattern。原因是因为我们还没有把数据弄到elasticsearch中
5.配置logstash

cd logstash-5.3.2\bin
我们在这个目录下 新建一个配置文件 名字:logstash.conf
内容如下:

input{
    kafka {
        bootstrap_servers => "localhost:9092"   
        topics => ["mytopic"]
        enable_auto_commit => "true"
        auto_offset_reset => "latest"
   }

}

output{
    stdout{}    
     elasticsearch {
      hosts => [ "localhost:9200" ]  
      index => "logstash-cylog"  
    }
}

简单说明一下这个配置文件:
input:代表数据源怎么来的。我们这里使用kafka消费者
filter :代表过滤数据,这里配置代表是 将message进行json解析。(非必须)
output:输出数据,stdout代表输出到控制台中(方便直接查看效果)
elasticsearch 配置 指向本地的地址
input filter output 这个只是简单配置,详细的可以参考logstash的官网
https://www.elastic.co/cn/products/logstash 里面有非常详细,针对每个配置说明

由于我们使用需要使用到kafka-input插件 所以我们要安装kafka的插件

cd logstash-5.3.2\bin
logstash-plugin.bat install logstash-input-kafka

安装插件后 启动logstash

提示成功后,就是我们最重要的一步了。

把日志发送到kafka 这里有很多种方式,我们采用最简单的log4j kafka appender
新建一个maven项目
KafkaAppenderApplication

package com.test;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaAppenderApplication {

    public static Logger logger = LogManager.getLogger(KafkaAppenderApplication.class);

    public static void main(String[] args) {
        long t = System.currentTimeMillis();
        logger.info("inf1o123" + t + "info");
        SpringApplication.run(KafkaAppenderApplication.class, args);
        System.exit(0);
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>kafkaAppender</artifactId>
    <version>1.0</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.0.RELEASE</version>
        <relativePath/>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.5</version>
        </dependency>

        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>


        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.10.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
</project>

log4j2.xml

<Configuration status="warn" monitorInterval="30" strict="true" schema="Log4J-V2.2.xsd">
    <Appenders>
        <!-- 输出到控制台 -->
        <Console name="Console" target="SYSTEM_OUT">
            <!-- 需要记录的级别 -->
            <ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/>
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}:%4p %t (%F:%L) - %m%n"/>
        </Console>

        <!-- 输出到Kafka   topci:mytopic     bootstrap.servers: localhost:9092-->
        <Kafka name="KAFKA" topic="mytopic">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] [%-5p] {%F:%L} - %m%n" />
            <Property name="bootstrap.servers">localhost:9092</Property>
        </Kafka>
    </Appenders>

    <Loggers>
        <Root level="info"> <!-- 全局配置 -->
            <AppenderRef ref="Console"/>
            <AppenderRef ref="KAFKA"/>
        </Root>
    </Loggers>
</Configuration>

启动项目后
kafka消费者显示日志:

2017-05-04 14:16:35.221 [main] [INFO ] {KafkaAppenderApplication.java:15} - inf1
o1231493878595218info

2017-05-04 14:16:35.865 [background-preinit] [INFO ] {Version.java:30} - HV00000
1: Hibernate Validator 5.2.4.Final

然后我们可以登录到kafka 首先建立index 可以选择logstash-*
进入 Discover后 可以看到我们刚才的日志

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐