开发人员如何使用elk

开发人员需要做的就是将日志信息通过各种渠道发到kafka,本实例以logback为例进行说明,其他接入方式请自行搜索。

logback接入elk

在pom.xml加入相关依赖

在logback配置文件中增加相应的appender

其中,可以在layout标签里加入:

 <layout class="net.logstash.logback.layout.LogstashLayout" >
            <includeContext>true</includeContext>
            <includeCallerData>true</includeCallerData>
            <customFields>{"system":"test"}</customFields>
            <fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
</layout>

配置解释

前面我们按照配置成功的将项目的日志信息写入到了elk中,并在kibana上查到了相应的日志信息。接下来我们相信解析一下前面的配置。

节点一:<includeContext>是否包含上下文

<includeContext>true</includeContext>

开启的话会包含hostname等logback的context信息

节点二:<includeCallerData>是否包含日志来源

<includeCallerData>false</includeCallerData>

差异如下

"caller": {
    "class": "com.example.elkdemo.com.example.elktest.utils.LogHelper",
    "method": "helpMethod",
    "file": "LogHelper.java",
    "line": 11
}

节点三:<customFields>自定义附加字段

<customFields>{"system":"test"}</customFields>

节点四:<fieldNames >自定义附加字段

<fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>

内容如下

public class ShortenedFieldNames extends LogstashFieldNames {
    public static final String FIELD_LOGGER = "logger";
    public static final String FIELD_THREAD = "thread";
    public static final String FIELD_LEVEL_VAL = "levelVal";
    public static final String FIELD_CALLER = "caller";
    public static final String FIELD_CLASS = "class";
    public static final String FIELD_METHOD = "method";
    public static final String FIELD_FILE = "file";
    public static final String FIELD_LINE = "line";
    public static final String FIELD_STACKTRACE = "stacktrace";

    public ShortenedFieldNames() {
        this.setLogger("logger");
        this.setThread("thread");
        this.setLevelValue("levelVal");
        this.setCaller("caller");
        this.setCallerClass("class");
        this.setCallerMethod("method");
        this.setCallerFile("file");
        this.setCallerLine("line");
        this.setStackTrace("stacktrace");
    }
}

概念准备

前面我们讲解了,生成的JSON数据日志的一些layout属性配置,下面我们讲解一下appender将日志信息发送给kafka的时候的一些配置

本文使用的logback-kafka-appender相对于kafka集群来说就是kafka集群消息的生产者。 
- producer: 
  消息生产者,发布消息到 kafka 集群的终端或服务。 
- broker: 
  kafka 集群中包含的服务器。 
- topic: 
  每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。 
- partition: 
  partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。 
- consumer: 
  从 kafka 集群中消费消息的终端或服务。 
- Consumer group: 
  high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。 
- replica: 
  partition 的副本,保障 partition 的高可用。 
- leader: 
  replica 中的一个角色, producer 和 consumer 只跟 leader 交互。 
- follower: 
  replica 中的一个角色,从 leader 中复制数据。 
- controller: 
  kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。 
- zookeeper: 
  kafka 通过 zookeeper 来存储集群的 meta 信息。

kafka消息提交策略(deliveryStrategy标签用)

由于logback的日志需要通过网络提交到kafka集群,难免会存在因为网络不稳定、kafka集群不稳定等原因导致日志信息不能抵达的情况,此时配置好相应的策略告诉logback-kafka-appender怎么解决这个问题就显得至关重要。

logback-kafka-appender为我们提供了两种策略,异步提交策略(AsynchronousDeliveryStrategy)和阻塞提交策略(BlockingDeliveryStrategy)

异步提交策略(AsynchronousDeliveryStrategy)

任何消息提交给kafka生产者,如果因为某些原因导致交付,该消息会被分发给备胎appenders,可是在网络(比如与kafka服务器的连接断了)有问题时这种交付策略会在生产者的发送缓冲区塞满了以后堵塞。为了避免这个阻塞的产生,我们可以开启producerConfig block.on.buffer.full = false。开启后所有不能快速通过网络抵达kafka集群的消息都会被分发到备胎appender。

阻塞提交策略(BlockingDeliveryStrategy)

这个策略会一直阻塞调用线程直到每一个日志消息实际抵达kafka。这种策略会导致比较消极和不开心以及让人难过的影响,因为它对吞吐量有巨大的负面影响。

==警告:这个策略不应使用一起producerConfig linger.ms==

==温馨提示==

异步提交策略(AsynchronousDeliveryStrategy)并不能阻止在向kafka提交数据时候的阻塞。这意味着:假如在logging上下文启动的时候所有的kafka服务器都不可达,或全部kafka服务器在配置的时间段(>metadata.max.age.ms)依然不可达,你的appender最终也将阻塞。这种行为是我们不希望看到的,kafka-clients migitated 0.9版本已改善这种情况(见#16)。在此之前的版本,可使用如下方案。

<configuration>

    <!-- This is the kafkaAppender -->
    <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
    <!-- Kafka Appender configuration -->
    </appender>

    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="kafkaAppender" />
    </appender>

    <root level="info">
        <appender-ref ref="ASYNC" />
    </root>
</configuration>

自定义策略

当然 如果你认为以上策略不是你的菜或者你实在看不惯这些策略,你也可以自定义你自己的策略,你只需要继承om.github.danielwegener.logback.kafka.delivery.DeliveryStrategy就可以了。

备胎(fallback)appender

假如某些原因导致我们的生产者(logback-kafka-appender)不能发布日志某条消息,这条消息还可以通过配置的备胎appender(比如基于STDOUT和STDERR的ConsoleAppender)记录下来。

作为开发者,我们只需要在相应的可能出现问题的appenders里面加入一个标签appender-ref,所有不能抵达的消息日志都会分发到appender-ref申明的appenders去。

比如 STDOUT就是后备方案

值得注意的是,采用异步提交策略会重用原来的kafka生产者的io线程写消息到后备appenders,因此所有的后备方案的appenders的io速度必须满足要求,否则可能拖慢速度或者导致宕机。

生产者(logback-kafka-appender)配置调优(基于kafka0.8.2,logback-kafka-appender默认配置)

对logback-kafka-appender的所有配置 我们都可以通过Name=Value进行同名覆盖,这给予我们更好的微调能力(比如 batch.size, compression.type 和 linger.ms)。

序列化

这个模块提供了一个与logback的LayoutWrappingEncoder类似的LayoutKafkaMessageEncoder(他们的区别就是以创建字节数组取代同步输出流)

LayoutKafkaMessageEncoder使用了常用的ch.qos.logback.core.Layout作为layout-parameter(布局参数)。

这就允许我们使用任何实现了 ILoggingEvent 或者 IAccessEvent 的组件,比如众所周知的PatternLayout和logstash-logback-encoder的LogstashLayout。

自定义序列化(略)

https://github.com/danielwegener/logback-kafka-appender#keying-strategies–partitioning

主键分区策略

kafka的可伸缩性和顺序保证严重依赖分区的概念。对于应用日志来说这就意味着我们需要思考我们想要如何分配日志消息到多个kafka的topic分区。这个决定的一个含义是这些消息在被多个不同消费者消费时有序,因为kafka只提供了在单一分区读取顺序的保证。另一个含义是我们的日志消息如何均匀分布在所有可用分区并且保持不同服务器间的负载均衡。

RoundRobinKeying策略(默认)

这个策略均匀地分配所有写日志消息到所有可用的kafka分区。这一策略可能会导致消费者客户端读取顺序异常。

HostNameKeying策略

这一策略使用HOSTNAME来划分发送到kafka的日志消息。这是有用的因为它确保所有通过这个host发布的日志消息将以正确的顺序交付给任何消费者。但是这种策略会导致日志不均匀的分布少量的主机(相对于分区的数量)。

ContextNameKeying(策略)

这一策略使用logbacks CONTEXT_NAME分区发送到kafka的日志消息。这是确保所有相同日志context的日志消息将可以被任何消费者以正确的顺序消费。但是这种策略会导致日志不均匀分布少量的主机(相对于分区的数量)。这个策略只适用于ILoggingEvents。

ThreadNameKeying策略

这一策略使用调用线程的名字作为分区键。这将确保所有相同线程的消息将可以被任何消费者以正确的顺序消费。但是这种策略会导致日志不均匀分布少量线程(名称)(相对于分区的数量)。这个策略只适用于ILoggingEvents。

LoggerNameKeying策略

*这个策略使用日志记录器的名字(logger name)作为分区键。这将确保所有使用相同logger name的消息将可以被任何消费者以正确的顺序消费。但是这种策略会导致日志不均匀分布与一些少量的不同的loggers上(相对于分区的数量)。这个策略只适用于ILoggingEvents。

自定义主键分区策略

如果你对以上分区策略都不满足你的需求,你也可以很容易的通过实现keyingStrategy类来实现自己的分区策略,比如:
 

package foo;
import com.github.danielwegener.logback.kafka.keying.KeyingStrategy;

public class LevelKeyingStrategy implements KeyingStrategy<ILoggingEvent> {
    @Override
    public byte[] createKey(ILoggingEvent e) {
        return ByteBuffer.allocate(4).putInt(e.getLevel()).array();
    }
}

参考大多数的logback组件,自定义分区策略可能需要实现如下接口

ch.qos.logback.core.spi.ContextAware and ch.qos.logback.core.spi.LifeCycle interfaces

Q&A

如果想使用不同的kafka topic怎么办?

你只需要为这个topic增加一个appenders即可

如何让log的格式为logstash里面的json格式

使用 logstash-logback-encoder 里面的LogstashLayout 比如

<encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
  <layout class="net.logstash.logback.layout.LogstashLayout" />
</encoder>

root日志级别为非INFO级别时会导致启动不了如何处理

root的level基本除了INFO之外不能改为别的,如改为DEBUG的话会导致启动初始化持续堵塞,容器一直无法启动。

解决方法:

<!-- This is the kafkaAppender -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<!-- Kafka Appender configuration -->
</appender>

<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="kafkaAppender" />
</appender>

<root level="info">
    <appender-ref ref="ASYNC" />
</root>

将kafkaAppender用logback的AsyncAppender处理下即可修改root的日志级别为其他的,目前我只尝试了DEBUG级别,其他没有尝试。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐