将项目中的日志使用log4j打印,然后使用avro方式,收集到flume,最后输出到kafka。flume官方提供了两种方式接受log4j输入源的方式:Log4J Appender 和 Load Balancing Log4J Appender,flume详情可查看官网:Welcome to Apache Flume — Apache Flume

一、appender

1)Log4J Appender参数解释

Property NameDefaultDescription
Hostname使用avro源的flume agent主机名(必填)
Portflume agent的avro源的监听端口(必填)
UnsafeModefalse如果为true,则添加程序不会在发送事件失败时引发异常
AvroReflectionEnabledfalse使用Avro反射来序列化Log4j事件。(当用户记录字符串时不要使用)
AvroSchemaUrlavro schema的url地址

2) Load Balancing Log4J Appender参数解释

Property NameDefaultDescription
Hosts使用avro源的flume agent主机名加端口,多个用空格分隔,如:hadoop01:6666 hadoop02:6666
SelectorROUND_ROBIN选择机制。ROUND_ROBIN(轮询)、RANDOM(随机)或自定义FQDN,但必须是从LoadBalancingSelector继承的类。
MaxBackoff一个long型数值,表示负载平衡客户端将从未能使用事件的节点回退的最长时间(毫秒)。默认为无回退
UnsafeModefalse如果为true,则添加程序不会在发送事件失败时引发异常
AvroReflectionEnabledfalse使用Avro反射来序列化Log4j事件。(当用户记录字符串时不要使用)
AvroSchemaUrlavro schema的url地址

生产环境建议使用此种appender,类似这种架构:

二、日志打印类

1)引入pom

<!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-clients/flume-ng-log4jappender -->
    <dependency>
      <groupId>org.apache.flume.flume-ng-clients</groupId>
      <artifactId>flume-ng-log4jappender</artifactId>
      <version>1.9.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-sdk</artifactId>
      <version>1.9.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/log4j/log4j -->
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.25</version>
      <scope>test</scope>
    </dependency>

2)工具类

package com.zstax;

import org.apache.log4j.Logger;

/**
 * @author: ndf
 * @date: 2022/3/23 14:20
 * @description:
 */
public class Log4jPrinter {
    private static final Logger logger= Logger.getLogger(Log4jPrinter.class);

    /**
     * 打印埋点日志
     * @param buriedLog 埋点日志
     */
    public static void printBuriedLog(String buriedLog) {
        logger.info(buriedLog);
    }

}

三、log4j.properties配置如下:

1)单个agent

# Log4j Appender
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=hadoop05
log4j.appender.flume.Port=6666
log4j.appender.flume.UnsafeMode=true
log4j.appender.flume.layout = org.apache.log4j.PatternLayout 
# 这里的com.zstax.Log4jPrinter为项目中打印日志类的路径
log4j.logger.com.zstax.Log4jPrinter = INFO,flume

 2)多个agent轮询

log4j.appender.flume2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.flume2.Hosts = hadoop01:6666 hadoop02:6666
log4j.appender.flume2.Selector = ROUND_ROBIN
log4j.appender.flume2.MaxBackoff = 30000
log4j.appender.flume2.UnsafeMode = true
log4j.appender.flume2.layout=org.apache.log4j.PatternLayout
# 这里的com.zstax.Log4jPrinter为项目中打印日志类的路径
log4j.logger.com.zstax.Log4jPrinter = INFO,flume2

问题:在使用的过程中发现flume的log4jappender会出现一些问题,具体可参考:征服flume之三——使用log4j输出日志到flume-pudn.com

四、flume配置

logger.sources = r1
logger.sinks = k1
logger.channels = c1

# Describe/configure the source
logger.sources.r1.type = Avro
logger.sources.r1.bind = 0.0.0.0
logger.sources.r1.port = 6666

# Describe the sink
logger.sinks.k1.channel=c1
logger.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
logger.sinks.k1.brokerList=hadoop03:6667,hadoop04:6667,hadoop05:6667
logger.sinks.k1.topic=buriedLogger
logger.sinks.k1.serializer.class=kafka.serializer.StringEncoder
logger.sinks.k1.serializer.appendNewline=false

#Spillable Memory Channel
logger.channels.c1.type=SPILLABLEMEMORY
logger.channels.c1.checkpointDir = /data/flume/checkpoint
logger.channels.c1.dataDirs = /data/flume

# Bind the source and sink to the channel
logger.sources.r1.channels = c1

五、kafka配置

1)创建主题

bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 2 --partitions 3 --topic buriedLogger

 2)查看所有主题列表

bin/kafka-topics.sh --list --zookeeper hadoop01:2181

[kafka@hadoop05 kafka-broker]$ bin/kafka-topics.sh --list --zookeeper hadoop01:2181
ATLAS_ENTITIES
ATLAS_HOOK
__consumer_offsets
ambari_kafka_service_check
buriedLogger

3)模拟消费者

bin/kafka-console-consumer.sh --from-beginning --topic buriedLogger --bootstrap-server hadoop03:6667,hadoop04:6667,hadoop05:6667

4)运行打印日志

package com.zstax;

/**
 * @author: ndf
 * @date: 2022/3/23 14:55
 * @description:
 */
public class TestApp {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            Log4jPrinter.printBuriedLog("日志消息产生了:"+i);
        }
        Thread.sleep(10000);
        for (int i = 20; i < 40; i++) {
            Log4jPrinter.printBuriedLog("日志消息产生了:"+i);
        }
        Thread.sleep(10000);
    }
}

 可以看到消费者接收到了消息:

日志消息产生了:0
日志消息产生了:3
日志消息产生了:6
日志消息产生了:9
日志消息产生了:12
日志消息产生了:15
日志消息产生了:18
日志消息产生了:2
日志消息产生了:5
日志消息产生了:8
日志消息产生了:11
日志消息产生了:14
日志消息产生了:17
日志消息产生了:1
日志消息产生了:4
日志消息产生了:7
日志消息产生了:10
日志消息产生了:13
日志消息产生了:16
日志消息产生了:19
日志消息产生了:20
日志消息产生了:23
日志消息产生了:26
日志消息产生了:29
日志消息产生了:32
日志消息产生了:35
日志消息产生了:38
日志消息产生了:22
日志消息产生了:25
日志消息产生了:28
日志消息产生了:31
日志消息产生了:34
日志消息产生了:37
日志消息产生了:21
日志消息产生了:24
日志消息产生了:27
日志消息产生了:30
日志消息产生了:33
日志消息产生了:36
日志消息产生了:39

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐