Springboot整合Zipkin+kafka+ELK,实现链路追踪和日志收集
背景随着项目的深入,久不久可能出现些问题,但是查询起来又极其麻烦,线上有三台服务器,不知道报错日志在那台上,很麻烦。所以领导要求把项目的链路追踪和日志给统一输出到elk中,可以实现在elk查询。Zipkin+Sleuth链路追踪先来实现zipkin+sleuth链路追踪功能,至于zipkin+sleuth是什么不多累赘,自己网上查询。本人使用的自己搭建的zipkin项目,话不多说,直接上干货:导入
背景
随着项目的深入,久不久可能出现些问题,但是查询起来又极其麻烦,线上有三台服务器,不知道报错日志在那台上,很麻烦。所以领导要求把项目的链路追踪和日志给统一输出到elk中,可以实现在elk查询。
Zipkin+Sleuth链路追踪
先来实现zipkin+sleuth链路追踪功能,至于zipkin+sleuth是什么不多累赘,自己网上查询。本人使用的自己搭建的zipkin项目,话不多说,直接上干货:
导入包
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-ui</artifactId>
<version>2.10.4</version>
<exclusions>
<exclusion>
<artifactId>log4j-slf4j-impl</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-server</artifactId>
<version>2.10.4</version>
</dependency>
<!-- 使用消息的方式收集数据(kafka) -->
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-collector-kafka</artifactId>
<version>2.12.8</version>
</dependency>
<!-- 支持Elasticsearch 2.x - 6.x -->
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-storage-elasticsearch-http</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
application.yml文件配置
spring:
application:
name: zipkin
# 收集粒度,1.0全部收集
sleuth:
sampler:
percentage: 1.0
cloud:
nacos:
discovery:
server-addr: localhost:8848
namespace: fc500383-342b-4d02-a9ce-5e87f2f58787
heart-beat-timeout: 60
stream:
kafka:
binder:
brokers: localhost:9092
zkNodes: localhost:2181
server:
port: 9411
# servlet:
# context-path: /user
zipkin:
storage:
type: elasticsearch
elasticsearch:
hosts: http://localhost:9200
# username: elastic
# password: changeme
cluster: elasticsearch
index: zipkin
index-shards: 1
index-replicas: 1
collector:
kafka:
bootstrap-servers: localhost:9092
zookeeper: localhost:2181
topic:
# 加上这个防止报错
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: ALWAYS
metrics:
web:
server:
auto-time-requests: false
启动类上加上@EnableZipkinServer,千万要记住,不然zipkin客户端找不到服务哦
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import zipkin2.server.internal.EnableZipkinServer;
@EnableDiscoveryClient
@SpringBootApplication
@EnableZipkinServer
public class CempSleuthApplication {
public static void main(String[] args) {
SpringApplication.run(CempSleuthApplication.class, args);
}
}
输入:http://localhost:9411/zipkin/ 查看一把吧。
kafka配置安装此处省略,自行网上搜索下载
ELK配置
相关文件安装包下载路径:
Logstash: https://www.elastic.co/downloads/logstash
Elasticsearch: https://www.elastic.co/downloads/elasticsearch
Kibana: https://www.elastic.co/downloads/kibana
ELK的安装下载此处省略,自行网上搜索下载。
这里主要是说出一些坑:
1、ElasticSearch-Head连接ES失败:单独查询es是没有问题的
问题解决方法:
a、es-head中修改: ES安装路径\elasticsearch-head-master\Gruntfile.js
b、进入es安装目录下的config目录,修改elasticsearch.yml文件.在文件的末尾加入以下代码
#设置跨域
http.cors.enabled: true
http.cors.allow-origin: “*”
node.master: true
node.data: true
重启ES和启动ES-HEAD,搞定
输入:http://localhost:9100/ 可以看到es-head启动成功,
2、问题二: logstack的conf配置拉取kafka不同的主题topic,创建不同的索引index
我是这样配置的:采用不填的type来区分
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["zipkin"]
codec => "json"
auto_offset_reset => "earliest" #从最早的偏移量开始消费
decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中
type => "zipkin" #所有插件通用属性,尤其在input里面配置多个数据源时很有用
}
kafka {
bootstrap_servers => "localhost:9092"
topics => ["my_log"]
codec => "json"
auto_offset_reset => "earliest" #从最早的偏移量开始消费
decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中
type => "my_log" #所有插件通用属性,尤其在input里面配置多个数据源时很有用
}
}
output {
if [type] == "zipkin" {
elasticsearch {
hosts => ["localhost:9200"]
index => "zipkin-topic-%{+YYY.MM.dd}"
}
}
if [type] == "my_log" {
elasticsearch {
hosts => ["localhost:9200"]
index => "kafka-log-%{+YYY.MM.dd}"
}
}
stdout {
codec => rubydebug {metadata => true} #logstash控制台输出日志和@metadata信息
}
}
3、问题三:kibana创建搜索项,index创建不只在哪,直接上截图:
这个选项可选可不选,就是师傅选择根据时间戳查询。
最后点击discover,可以看到刚刚创建的index 模板,就可以看到数据啦。到此ELK的配置都完成了。
启动项目就可以看到链路追踪的日志了。
springboot整和kafka和elk日志收集
1、项目导入包:
<!-- elasticsearch -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>x-pack-transport</artifactId>
<version>7.10.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.9.1</version>
</dependency>
<!-- zipkin-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--以下依赖是logback接入kafka所需依赖-->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC1</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.2</version>
</dependency>
2、application.yml文件配置
spring:
application:
name: cardmanage
sleuth: #链路追踪配置
sampler:
percentage: 1.0
zipkin:
sender:
type: kafka
kafka:
topic: zipkin
kafka:
bootstrap-servers: localhost:9092
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: ALWAYS
3、日志文件logback-spring.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->
<!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
<!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
<configuration scan="true" scanPeriod="5 seconds">
<!-- 该节点会读取Environment中配置的值,在这里我们读取application.yml中的值 -->
<springProperty scope="context" name="springAppName" source="spring.application.name" />
<!-- name的值是变量的名称,value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义变量后,可以使“${}”来使用变量。 -->
<property name="log.path" value="/data/logs/cemp/cardmanage" />
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN"
value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${%level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}" />
<!--输出到控制台-->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
<!-- 输出格式 -->
<encoder charset="UTF-8">
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- 时间滚动输出 level为 DEBUG 日志 -->
<appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_debug.log</file>
<!--日志文件输出格式-->
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset> <!-- 设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志归档 -->
<fileNamePattern>${log.path}/debug/log-debug-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>90</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录debug级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>debug</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 INFO 日志 -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_info.log</file>
<!--日志文件输出格式-->
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天日志归档路径以及格式 -->
<fileNamePattern>${log.path}/info/log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>90</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录info级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>info</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 WARN 日志 -->
<appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_warn.log</file>
<!--日志文件输出格式-->
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/warn/log-warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>90</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录warn级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>warn</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 ERROR 日志 -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_error.log</file>
<!--日志文件输出格式-->
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/error/log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>90</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录ERROR级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!--
<logger>用来设置某一个包或者具体的某一个类的日志打印级别、
以及指定<appender>。<logger>仅有一个name属性,
一个可选的level和一个可选的addtivity属性。
name:用来指定受此logger约束的某一个包或者具体的某一个类。
level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
还有一个特俗值INHERITED或者同义词NULL,代表强制执行上级的级别。
如果未设置此属性,那么当前logger将会继承上级的级别。
addtivity:是否向上级logger传递打印信息。默认是true。
-->
<!--<logger name="org.springframework.web" level="info"/>-->
<!--<logger name="org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor" level="INFO"/>-->
<!--
使用mybatis的时候,sql语句是debug下才会打印,而这里我们只配置了info,所以想要查看sql语句的话,有以下两种操作:
第一种把<root level="info">改成<root level="DEBUG">这样就会打印sql,不过这样日志那边会出现很多其他消息
第二种就是单独给dao下目录配置debug模式,代码如下,这样配置sql语句会打印,其他还是正常info级别:
-->
<!--
root节点是必选节点,用来指定最基础的日志输出级别,只有一个level属性
level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
不能设置为INHERITED或者同义词NULL。默认是DEBUG
可以包含零个或多个元素,标识这个appender将会添加到这个logger。
-->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>
{
"severity": "%level",
"service": "${springAppName:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message"
}
</pattern>
</encoder>
<!-- 此处为kafka的Topic名称,千万不要写错 -->
<topic>my_log</topic>
<!-- we don't care how the log messages will be partitioned -->
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
<!-- use async delivery. the application threads are not blocked by logging -->
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
<!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
<!-- bootstrap.servers is the only mandatory producerConfig 设置kafka的服务器ip和端口-->
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
<!-- don't wait for a broker to ack the reception of a batch. -->
<producerConfig>acks=0</producerConfig>
<!-- wait up to 1000ms and collect log messages before sending them as a batch -->
<producerConfig>linger.ms=1000</producerConfig>
<!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
<producerConfig>max.block.ms=0</producerConfig>
<!-- define a client-id that you use to identify yourself against the kafka broker -->
<producerConfig>client.id=0</producerConfig>
</appender>
<!-- 不同环境使用不同的输出 </root>-->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="DEBUG_FILE"/>
<appender-ref ref="INFO_FILE"/>
<appender-ref ref="WARN_FILE"/>
<appender-ref ref="ERROR_FILE"/>
<appender-ref ref="kafkaAppender" />
</root>
</configuration>
到此日志收集也好了,启动项目,到kibana可以看到zipkin和kafka-log分别收集了链路追踪和系统日志的数据。
各人觉得使用日志配置文件这种方式比使用kafka切面收集的方式来的方便简单。以上内容都是各人开发过程中实际遇到的问题,同事也搜索了网上很多的大神的博客最终把项目搞定。如有不对和不妥之处请多多指教。本人微信:“441338280”,欢迎一起交流学习。
更多推荐
所有评论(0)