背景

随着项目的深入,久不久可能出现些问题,但是查询起来又极其麻烦,线上有三台服务器,不知道报错日志在那台上,很麻烦。所以领导要求把项目的链路追踪和日志给统一输出到elk中,可以实现在elk查询。

Zipkin+Sleuth链路追踪

先来实现zipkin+sleuth链路追踪功能,至于zipkin+sleuth是什么不多累赘,自己网上查询。本人使用的自己搭建的zipkin项目,话不多说,直接上干货:

导入包

<dependency>
    <groupId>io.zipkin.java</groupId>
    <artifactId>zipkin-autoconfigure-ui</artifactId>
    <version>2.10.4</version>
    <exclusions>
        <exclusion>
            <artifactId>log4j-slf4j-impl</artifactId>
            <groupId>org.apache.logging.log4j</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.zipkin.java</groupId>
    <artifactId>zipkin-server</artifactId>
    <version>2.10.4</version>
</dependency>
<!-- 使用消息的方式收集数据(kafka) -->
<dependency>
    <groupId>io.zipkin.java</groupId>
    <artifactId>zipkin-autoconfigure-collector-kafka</artifactId>
    <version>2.12.8</version>
</dependency>

<!-- 支持Elasticsearch 2.x - 6.x -->
<dependency>
    <groupId>io.zipkin.java</groupId>
    <artifactId>zipkin-autoconfigure-storage-elasticsearch-http</artifactId>
    <version>2.8.4</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

application.yml文件配置

spring:
  application:
    name: zipkin
  # 收集粒度,1.0全部收集
  sleuth:
    sampler:
      percentage: 1.0
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
        namespace: fc500383-342b-4d02-a9ce-5e87f2f58787
        heart-beat-timeout: 60
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zkNodes: localhost:2181
server:
  port: 9411
#  servlet:
#    context-path: /user
zipkin:
  storage:
    type: elasticsearch
    elasticsearch:
      hosts: http://localhost:9200
      #      username: elastic
      #      password: changeme
      cluster: elasticsearch
      index: zipkin
      index-shards: 1
      index-replicas: 1
  collector:
    kafka:
      bootstrap-servers: localhost:9092
      zookeeper: localhost:2181
      topic:

# 加上这个防止报错
management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    health:
      show-details: ALWAYS
  metrics:
    web:
      server:
        auto-time-requests: false

启动类上加上@EnableZipkinServer,千万要记住,不然zipkin客户端找不到服务哦

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import zipkin2.server.internal.EnableZipkinServer;

@EnableDiscoveryClient
@SpringBootApplication
@EnableZipkinServer
public class CempSleuthApplication {

    public static void main(String[] args) {
        SpringApplication.run(CempSleuthApplication.class, args);
    }

}

输入:http://localhost:9411/zipkin/ 查看一把吧。

kafka配置安装此处省略,自行网上搜索下载

ELK配置

相关文件安装包下载路径:

Logstash: https://www.elastic.co/downloads/logstash

Elasticsearch: https://www.elastic.co/downloads/elasticsearch

Kibana: https://www.elastic.co/downloads/kibana

ELK的安装下载此处省略,自行网上搜索下载。

这里主要是说出一些坑:

1、ElasticSearch-Head连接ES失败:单独查询es是没有问题的

在这里插入图片描述
在这里插入图片描述
问题解决方法:
a、es-head中修改: ES安装路径\elasticsearch-head-master\Gruntfile.js
在这里插入图片描述
b、进入es安装目录下的config目录,修改elasticsearch.yml文件.在文件的末尾加入以下代码
#设置跨域
http.cors.enabled: true
http.cors.allow-origin: “*”
node.master: true
node.data: true
重启ES和启动ES-HEAD,搞定
输入:http://localhost:9100/ 可以看到es-head启动成功,在这里插入图片描述
2、问题二: logstack的conf配置拉取kafka不同的主题topic,创建不同的索引index
我是这样配置的:采用不填的type来区分

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["zipkin"]
    codec => "json"
    auto_offset_reset => "earliest" #从最早的偏移量开始消费
    decorate_events => true    #此属性会将当前topic、offset、group、partition等信息也带到message中
    type => "zipkin" #所有插件通用属性,尤其在input里面配置多个数据源时很有用
  }
   kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["my_log"]
    codec => "json"
    auto_offset_reset => "earliest" #从最早的偏移量开始消费
    decorate_events => true    #此属性会将当前topic、offset、group、partition等信息也带到message中
    type => "my_log" #所有插件通用属性,尤其在input里面配置多个数据源时很有用
  }

}
 
output {

	if [type] == "zipkin" {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "zipkin-topic-%{+YYY.MM.dd}"
        }
    }
	if [type] == "my_log" {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "kafka-log-%{+YYY.MM.dd}"
        }
    }
    stdout {
        codec => rubydebug {metadata => true}  #logstash控制台输出日志和@metadata信息
    }
}

3、问题三:kibana创建搜索项,index创建不只在哪,直接上截图:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这个选项可选可不选,就是师傅选择根据时间戳查询。
在这里插入图片描述
在这里插入图片描述
最后点击discover,可以看到刚刚创建的index 模板,就可以看到数据啦。到此ELK的配置都完成了。
启动项目就可以看到链路追踪的日志了。
在这里插入图片描述

springboot整和kafka和elk日志收集

1、项目导入包:

 <!-- elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>x-pack-transport</artifactId>
            <version>7.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.9.1</version>
        </dependency>

        <!--        zipkin-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
            <version>2.0.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-zipkin</artifactId>
            <version>2.0.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!--以下依赖是logback接入kafka所需依赖-->
        <dependency>
            <groupId>com.github.danielwegener</groupId>
            <artifactId>logback-kafka-appender</artifactId>
            <version>0.2.0-RC1</version>
        </dependency>
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>5.2</version>
        </dependency>

2、application.yml文件配置

spring:
  	application:
    	name: cardmanage
  	sleuth: #链路追踪配置
    	sampler:
      		percentage: 1.0
  	zipkin:
     	sender:
       		type: kafka
     	kafka:
       		topic: zipkin
 	kafka:
   		bootstrap-servers: localhost:9092
 management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    health:
      show-details: ALWAYS

3、日志文件logback-spring.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->
<!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
<!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false-->
<configuration  scan="true" scanPeriod="5 seconds">

    <!-- 该节点会读取Environment中配置的值,在这里我们读取application.yml中的值 -->
    <springProperty scope="context" name="springAppName" source="spring.application.name"  />

    <!-- name的值是变量的名称,value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义变量后,可以使“${}”来使用变量。 -->
    <property name="log.path" value="/data/logs/cemp/cardmanage" />

    <!-- 彩色日志依赖的渲染类 -->
    <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
    <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
    <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
    <!-- 彩色日志格式 -->
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${%level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}" />

    <!--输出到控制台-->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>DEBUG</level>
        </filter>
        <!-- 输出格式 -->
        <encoder charset="UTF-8">
            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <!-- 时间滚动输出 level为 DEBUG 日志 -->
    <appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${log.path}/log_debug.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset> <!-- 设置字符集 -->
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 日志归档 -->
            <fileNamePattern>${log.path}/debug/log-debug-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>90</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录debug级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>debug</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 时间滚动输出 level为 INFO 日志 -->
    <appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${log.path}/log_info.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset>
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 每天日志归档路径以及格式 -->
            <fileNamePattern>${log.path}/info/log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>90</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录info级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>info</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 时间滚动输出 level为 WARN 日志 -->
    <appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${log.path}/log_warn.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset> <!-- 此处设置字符集 -->
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.path}/warn/log-warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>90</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录warn级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>warn</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>


    <!-- 时间滚动输出 level为 ERROR 日志 -->
    <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${log.path}/log_error.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset> <!-- 此处设置字符集 -->
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.path}/error/log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>90</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录ERROR级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!--
        <logger>用来设置某一个包或者具体的某一个类的日志打印级别、
        以及指定<appender><logger>仅有一个name属性,
        一个可选的level和一个可选的addtivity属性。
        name:用来指定受此logger约束的某一个包或者具体的某一个类。
        level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
              还有一个特俗值INHERITED或者同义词NULL,代表强制执行上级的级别。
              如果未设置此属性,那么当前logger将会继承上级的级别。
        addtivity:是否向上级logger传递打印信息。默认是true-->
    <!--<logger name="org.springframework.web" level="info"/>-->
    <!--<logger name="org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor" level="INFO"/>-->
    <!--
        使用mybatis的时候,sql语句是debug下才会打印,而这里我们只配置了info,所以想要查看sql语句的话,有以下两种操作:
        第一种把<root level="info">改成<root level="DEBUG">这样就会打印sql,不过这样日志那边会出现很多其他消息
        第二种就是单独给dao下目录配置debug模式,代码如下,这样配置sql语句会打印,其他还是正常info级别:
     -->

    <!--
        root节点是必选节点,用来指定最基础的日志输出级别,只有一个level属性
        level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
        不能设置为INHERITED或者同义词NULL。默认是DEBUG
        可以包含零个或多个元素,标识这个appender将会添加到这个logger。
    -->
        <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
            <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
                <pattern>
                    {
                    "severity": "%level",
                    "service": "${springAppName:-}",
                    "trace": "%X{X-B3-TraceId:-}",
                    "span": "%X{X-B3-SpanId:-}",
                    "exportable": "%X{X-Span-Export:-}",
                    "pid": "${PID:-}",
                    "thread": "%thread",
                    "class": "%logger{40}",
                    "rest": "%message"
                    }
                </pattern>
            </encoder>
            <!-- 此处为kafka的Topic名称,千万不要写错 -->
            <topic>my_log</topic>
            <!-- we don't care how the log messages will be partitioned  -->
            <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
            <!-- use async delivery. the application threads are not blocked by logging -->
            <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
            <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
            <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
            <!-- bootstrap.servers is the only mandatory producerConfig  设置kafka的服务器ip和端口-->
            <producerConfig>bootstrap.servers=localhost:9092</producerConfig>
            <!-- don't wait for a broker to ack the reception of a batch.  -->
            <producerConfig>acks=0</producerConfig>
            <!-- wait up to 1000ms and collect log messages before sending them as a batch -->
            <producerConfig>linger.ms=1000</producerConfig>
            <!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
            <producerConfig>max.block.ms=0</producerConfig>
            <!-- define a client-id that you use to identify yourself against the kafka broker -->
            <producerConfig>client.id=0</producerConfig>
        </appender>

    <!--   不同环境使用不同的输出 </root>-->
    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
        <appender-ref ref="DEBUG_FILE"/>
        <appender-ref ref="INFO_FILE"/>
        <appender-ref ref="WARN_FILE"/>
        <appender-ref ref="ERROR_FILE"/>
        <appender-ref ref="kafkaAppender" />
    </root>
</configuration>

到此日志收集也好了,启动项目,到kibana可以看到zipkin和kafka-log分别收集了链路追踪和系统日志的数据。
在这里插入图片描述
各人觉得使用日志配置文件这种方式比使用kafka切面收集的方式来的方便简单。以上内容都是各人开发过程中实际遇到的问题,同事也搜索了网上很多的大神的博客最终把项目搞定。如有不对和不妥之处请多多指教。本人微信:“441338280”,欢迎一起交流学习。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐