将项目中的日志使用log4j打印,然后使用avro方式,收集到flume,最后输出到kafka。flume官方提供了两种方式接受log4j输入源的方式:Log4J Appender 和 Load Balancing Log4J Appender,flume详情可查看官网:Welcome to Apache Flume — Apache Flume

一、appender

1)Log4J Appender参数解释

Property Name Default Description
Hostname 使用avro源的flume agent主机名(必填)
Port flume agent的avro源的监听端口(必填)
UnsafeMode false 如果为true,则添加程序不会在发送事件失败时引发异常
AvroReflectionEnabled false 使用Avro反射来序列化Log4j事件。(当用户记录字符串时不要使用)
AvroSchemaUrl avro schema的url地址

2) Load Balancing Log4J Appender参数解释

Property Name Default Description
Hosts 使用avro源的flume agent主机名加端口,多个用空格分隔,如:hadoop01:6666 hadoop02:6666
Selector ROUND_ROBIN 选择机制。ROUND_ROBIN(轮询)、RANDOM(随机)或自定义FQDN,但必须是从LoadBalancingSelector继承的类。
MaxBackoff 一个long型数值,表示负载平衡客户端将从未能使用事件的节点回退的最长时间(毫秒)。默认为无回退
UnsafeMode false 如果为true,则添加程序不会在发送事件失败时引发异常
AvroReflectionEnabled false 使用Avro反射来序列化Log4j事件。(当用户记录字符串时不要使用)
AvroSchemaUrl avro schema的url地址

生产环境建议使用此种appender,类似这种架构:

二、日志打印类

1)引入pom

<!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-clients/flume-ng-log4jappender -->
    <dependency>
      <groupId>org.apache.flume.flume-ng-clients</groupId>
      <artifactId>flume-ng-log4jappender</artifactId>
      <version>1.9.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-sdk</artifactId>
      <version>1.9.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/log4j/log4j -->
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.25</version>
      <scope>test</scope>
    </dependency>

2)工具类

package com.zstax;

import org.apache.log4j.Logger;

/**
 * @author: ndf
 * @date: 2022/3/23 14:20
 * @description:
 */
public class Log4jPrinter {
    private static final Logger logger= Logger.getLogger(Log4jPrinter.class);

    /**
     * 打印埋点日志
     * @param buriedLog 埋点日志
     */
    public static void printBuriedLog(String buriedLog) {
        logger.info(buriedLog);
    }

}

三、log4j.properties配置如下:

1)单个agent

# Log4j Appender
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=hadoop05
log4j.appender.flume.Port=6666
log4j.appender.flume.UnsafeMode=true
log4j.appender.flume.layout = org.apache.log4j.PatternLayout 
# 这里的com.zstax.Log4jPrinter为项目中打印日志类的路径
log4j.logger.com.zstax.Log4jPrinter = INFO,flume

 2)多个agent轮询

log4j.appender.flume2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.flume2.Hosts = hadoop01:6666 hadoop02:6666
log4j.appender.flume2.Selector = ROUND_ROBIN
log4j.appender.flume2.MaxBackoff = 30000
log4j.appender.flume2.UnsafeMode = true
log4j.appender.flume2.layout=org.apache.log4j.PatternLayout
# 这里的com.zstax.Log4jPrinter为项目中打印日志类的路径
log4j.logger.com.zstax.Log4jPrinter = INFO,flume2

问题:在使用的过程中发现flume的log4jappender会出现一些问题,具体可参考:征服flume之三——使用log4j输出日志到flume-pudn.com

四、flume配置

logger.sources = r1
logger.sinks = k1
logger.channels = c1

# Describe/configure the source
logger.sources.r1.type = Avro
logger.sources.r1.bind = 0.0.0.0
logger.sources.r1.port = 6666

# Describe the sink
logger.sinks.k1.channel=c1
logger.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
logger.sinks.k1.brokerList=hadoop03:6667,hadoop04:6667,hadoop05:6667
logger.sinks.k1.topic=buriedLogger
logger.sinks.k1.serializer.class=kafka.serializer.StringEncoder
logger.sinks.k1.serializer.appendNewline=false

#Spillable Memory Channel
logger.channels.c1.type=SPILLABLEMEMORY
logger.channels.c1.checkpointDir = /data/flume/checkpoint
logger.channels.c1.dataDirs = /data/flume

# Bind the source and sink to the channel
logger.sources.r1.channels = c1

五、kafka配置

1)创建主题

bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 2 --partitions 3 --topic buriedLogger

 2)查看所有主题列表

bin/kafka-topics.sh --list --zookeeper hadoop01:2181

[kafka@hadoop05 kafka-broker]$ bin/kafka-topics.sh --list --zookeeper hadoop01:2181
ATLAS_ENTITIES
ATLAS_HOOK
__consumer_offsets
ambari_kafka_service_check
buriedLogger

3)模拟消费者

bin/kafka-console-consumer.sh --from-beginning --topic buriedLogger --bootstrap-server hadoop03:6667,hadoop04:6667,hadoop05:6667

4)运行打印日志

package com.zstax;

/**
 * @author: ndf
 * @date: 2022/3/23 14:55
 * @description:
 */
public class TestApp {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            Log4jPrinter.printBuriedLog("日志消息产生了:"+i);
        }
        Thread.sleep(10000);
        for (int i = 20; i < 40; i++) {
            Log4jPrinter.printBuriedLog("日志消息产生了:"+i);
        }
        Thread.sleep(10000);
    }
}

 可以看到消费者接收到了消息:

日志消息产生了:0
日志消息产生了:3
日志消息产生了:6
日志消息产生了:9
日志消息产生了:12
日志消息产生了:15
日志消息产生了:18
日志消息产生了:2
日志消息产生了:5
日志消息产生了:8
日志消息产生了:11
日志消息产生了:14
日志消息产生了:17
日志消息产生了:1
日志消息产生了:4
日志消息产生了:7
日志消息产生了:10
日志消息产生了:13
日志消息产生了:16
日志消息产生了:19
日志消息产生了:20
日志消息产生了:23
日志消息产生了:26
日志消息产生了:29
日志消息产生了:32
日志消息产生了:35
日志消息产生了:38
日志消息产生了:22
日志消息产生了:25
日志消息产生了:28
日志消息产生了:31
日志消息产生了:34
日志消息产生了:37
日志消息产生了:21
日志消息产生了:24
日志消息产生了:27
日志消息产生了:30
日志消息产生了:33
日志消息产生了:36
日志消息产生了:39

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐