前面已经完成了Springcloud+sleuth+zipkin的入门,以及kafka的安装。至于ES这里就不在说明了,网上安装使用资料挺多的,这里仅仅是将其作为持久化工具使用。

环境说明

  1. jdk1.8 server 64位
  2. intellij IDEA 2017
  3. springboot 1.5.2.RELEASE
  4. Springcloud Dalston.SR5
  5. kafka 2.11-1.0.0
  6. zookeeper 3.4.10
  7. elasticsearch 5.6.4

1、启动环境

  1. 启动zookeeper
  2. 启动kafka
  3. 启动elasticsearch

2、创建zipkinserver

在pom.xml中加入如下内容:

 <dependencies>
        <!--springboot依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--加入zipkin依赖-->
        <dependency>
            <groupId>io.zipkin.java</groupId>
            <artifactId>zipkin</artifactId>
            <version>2.4.2</version>
        </dependency>
        <!--引入zipkin的ES存储-->
        <dependency>
            <groupId>io.zipkin.java</groupId>
            <artifactId>zipkin-autoconfigure-storage-elasticsearch-http</artifactId>
            <version>2.4.2</version>
            <optional>true</optional>
        </dependency>
        <!--引入zipkin的流绑定-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-sleuth-zipkin-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
    </dependencies>

同时在resources中创建application.yml

#配置kafka
spring:
  sleuth:
    enabled: false
    sampler:
      percentage: 1.0
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zkNodes: localhost:2181
  #ES配置
zipkin:
  storage:
    type: elasticsearch
    elasticsearch:
      host: localhost:9200
      cluster: elasticsearch
      index: zipkin
      index-shards: 1
      index-replicas: 1

启动类增加如下注解:

@SpringBootApplication
@EnableZipkinStreamServer//配置可以作为zipkinserver
public class ZipkinServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ZipkinServerApplication.class,args);
    }
}

启动后可以 在控制台看到kafka的连接信息

3、创建目标服务一

在pom.xml中引入:

<dependencies>
        <!--引入springboot-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--引入zipkin绑定-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-sleuth-zipkin-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <!--引入logback日志输出配置,这时由于kafka绑定的是日志事件-->
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>4.6</version>
        </dependency>
        <!--引入feign依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-feign</artifactId>
        </dependency>
    </dependencies>

注意对于日志依赖的引入。
在application.yml中加入如下配置:

server:
  port: 8082
spring:
  application:
    name: serverone
  sleuth:
    sampler:
      percentage: 1.0
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zkNodes: localhost:2181

创建logback-spring.xml并如下配置:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <!-- Example for logging into the build folder of your project -->
    <property name="LOG_FILE" value="${BUILD_FOLDER:-build}/${springAppName}"/>

    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <!-- Appender to log to console -->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <!-- Minimum logging level to be presented in the console logs-->
            <level>INFO</level>
        </filter>
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
    </appender>

    <!-- Appender to log to file -->
    <appender name="flatfile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_FILE}</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.gz</fileNamePattern>
            <maxHistory>7</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
    </appender>

    <!-- Appender to log to file in a JSON format -->
    <appender name="logstash" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_FILE}.json</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_FILE}.json.%d{yyyy-MM-dd}.gz</fileNamePattern>
            <maxHistory>7</maxHistory>
        </rollingPolicy>
        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>UTC</timeZone>
                </timestamp>
                <pattern>
                    <pattern>
                        {
                        "severity": "%level",
                        "service": "${springAppName:-}",
                        "trace": "%X{X-B3-TraceId:-}",
                        "span": "%X{X-B3-SpanId:-}",
                        "exportable": "%X{X-Span-Export:-}",
                        "pid": "${PID:-}",
                        "thread": "%thread",
                        "class": "%logger{40}",
                        "rest": "%message"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
    </appender>

    <root level="INFO">
        <!--<appender-ref ref="console"/>-->
        <appender-ref ref="logstash"/>
        <!--<appender-ref ref="flatfile"/>-->
    </root>
</configuration>

在启动类加入如下配置:

@SpringBootApplication
@EnableFeignClients //引入feign支持
@EnableAutoConfiguration //引入自动配置,替代配置文件
public class ServerOneApplication {

    public static void main(String[] args) {
        SpringApplication.run(ServerOneApplication.class,args);
    }
}

创建的restTemplate 配置类为:

@Configuration
public class RestConfiguration {

    @Bean
    public RestTemplate restTemplate(){
        return new RestTemplate();
    }
}

feign的客户端为:

@FeignClient(name = "sleuthone",url = "http://localhost:8888")
public interface SleuthService {
    @RequestMapping("/sayHello/{name}")
    public String sayHello(@PathVariable(name = "name")String name);
}

调用的后台controller为:

@RestController
public class SleuthController {
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private SleuthService sleuthService;

    @ResponseBody
    @RequestMapping("/restHello/{name}")
    public String restHello(@PathVariable String name) {
        return "rest " + restTemplate.getForObject("http://localhost:8888/sayHello/" + name,String.class);
    }

    @ResponseBody
    @RequestMapping("/feignHello/{name}")
    public String feignHello(@PathVariable String name) {
        return "feign " + sleuthService.sayHello(name);
    }
}

4、创建目标服务二

服务二与服务一基本相同,不同的地方在于服务二不需要引入feign了,其pom如下:

<dependencies>
        <!--依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--引入链路调用信息-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-sleuth-zipkin-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <!--引入logback日志输出配置,这时由于kafka绑定的是日志事件-->
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>4.6</version>
        </dependency>
    </dependencies>

其他一致,服务二仅提供一个sayHello服务。

@RestController
public class SleuthController {

    @ResponseBody
    @RequestMapping("/sayHello/{name}")
    public String sayHello(@PathVariable String name) {
        return "hello " + name;
    }
}

验证

分别访问http://localhost:8082/restHello/lisi
http://localhost:8082/feignHello/lisi
正确返回信息后,我们访问es查询数据
http://localhost:9200/zipkin*/_search?pretty
可以看到如下数据:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 37,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "zipkin:span-2018-01-08",
        "_type" : "span",
        "_id" : "AWDUsg4wSqtAFWoqJU1h",
        "_score" : 1.0,
        "_source" : {
          "traceId" : "45fc228abc4a4edf",
          "duration" : 4000,
          "shared" : true,
          "localEndpoint" : {
            "serviceName" : "servertwo",
            "ipv4" : "10.130.236.27",
            "port" : 8888 },
          "timestamp_millis" : 1515396926392,
          "kind" : "SERVER",
          "name" : "http:/sayhello/lisi",
          "id" : "d852bf5a65f75acb",
          "parentId" : "45fc228abc4a4edf",
          "timestamp" : 1515396926392000,
          "tags" : {
            "mvc.controller.class" : "SleuthController",
            "mvc.controller.method" : "sayHello",
            "spring.instance_id" : "01C702601479820.corp.haier.com:servertwo:8888" }
        }
      },
      {
        "_index" : "zipkin:span-2018-01-08",
        "_type" : "span",
        "_id" : "AWDUsg8wSqtAFWoqJU1i",
        "_score" : 1.0,
        "_source" : {
          "traceId" : "45fc228abc4a4edf",
          "duration" : 36000,
          "localEndpoint" : {
            "serviceName" : "serverone",
            "ipv4" : "10.130.236.27",
            "port" : 8082 },
          "timestamp_millis" : 1515396926361,
          "kind" : "CLIENT",
          "name" : "http:/sayhello/lisi",
          "id" : "d852bf5a65f75acb",
          "parentId" : "45fc228abc4a4edf",
          "timestamp" : 1515396926361000,
          "tags" : {
            "http.host" : "localhost",
            "http.method" : "GET",
            "http.path" : "/sayHello/lisi",
            "http.url" : "http://localhost:8888/sayHello/lisi",
            "spring.instance_id" : "01C702601479820.corp.haier.com:serverone:8082" }
        }
      },
      {
        "_index" : "zipkin:span-2018-01-08",
        "_type" : "span",
        "_id" : "AWDUsg8wSqtAFWoqJU1j",
        "_score" : 1.0,
        "_source" : {
          "traceId" : "45fc228abc4a4edf",
          "duration" : 56696,
          "localEndpoint" : {
            "serviceName" : "serverone",
            "ipv4" : "10.130.236.27",
            "port" : 8082 },
          "timestamp_millis" : 1515396926355,
          "kind" : "SERVER",
          "name" : "http:/feignhello/lisi",
          "id" : "45fc228abc4a4edf",
          "timestamp" : 1515396926355000,
          "tags" : {
            "mvc.controller.class" : "SleuthController",
            "mvc.controller.method" : "feignHello",
            "spring.instance_id" : "01C702601479820.corp.haier.com:serverone:8082" }
        }
      }
    ]
  }
}

我们需要注意:duration这里的单位是微妙,所以耗时56ms

问题汇总

1、数据没有进入到kafka通道,没有配置日志输出到日志文件,通过对源码分析,我们发现其kafka绑定的是日志事件
2、启动服务时,kafka停机或者不能消费参看上一篇关于kafka不能消费的问题。其实主要原因是jdk版本问题。

源码地址

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐