组件版本
flume1.6.0+cdh5.8.0
kafka2.1.0+kafka4.0.0

一、日志模拟

1.1 模拟日志生成

java -classpath /data/opt/module/log-produce.jar com.lsl.appclient.AppMain >/data/opt/module/test.log

二、flume数据采集

2.1 Flume安装

2.1.1 日志采集Flume安装

1)添加服务
2)选择Flume,点击继续
3)选择节点
4)完成

2.2.2 日志采集Flume配置

1)Flume配置分析
图片: https://uploader.shimo.im/f/0X5ZXNUeD4clkwtO.png
Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。
2)Flume的流程配置如下:
(1)在CM管理页面上点击Flume
(2)在实例页面选择hadoop01上的Agent
(3)在CM管理页面hadoop01上Flume的配置中找到代理名称改为a1
(4)在配置文件如下内容(flume-kafka)

a1.sources=r1          # 定义组件
a1.channels=c1 c2
a1.sinks=k1 k2

# configure source   
a1.sources.r1.type = com.wljs.flume.source.taildir.TaildirSource  # TAILDIR方式读取数据
a1.sources.r1.positionFile = /data/opt/module/flume/log_position.json # 记录日志读取位置
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+  # 读取日志位置
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
 
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.wljs.flume.interceptor.LogETLInterceptor$Builder  #ETL拦截器
a1.sources.r1.interceptors.i2.type = com.wljs.flume.interceptor.LogTypeInterceptor$Builder #日志类型拦截器

# selector
a1.sources.r1.selector.type = multiplexing #多路选择器
a1.sources.r1.selector.header = topic # 根据日志类型分数据
a1.sources.r1.selector.mapping.topic_start = c1 # 根据日志类型分数据
a1.sources.r1.selector.mapping.topic_event = c2

# configure channel  
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000 # channel中存储的最大event数,默认值100。
a1.channels.c1.byteCapacityBufferPercentage=20 #缓冲空间占Channel容量(byteCapacity)的百分比,为event中的头信息保留了空间,默认值20(单位:百分比)。
a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20

# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_start
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000 #Producer端单次批量发送的消息条数,该值应该根据实际环境适当调整,增大批量发送消息的条数能够在一定程度上提高性能,但是同时也增加了延迟和Producer端数据丢失的风险。默认值为100。
a1.sinks.k1.kafka.producer.acks = 1 #kafka的ack
a1.sinks.k1.channel = c1
# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_event
a1.sinks.k2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2

         注意:com.lsl.flume.interceptor.LogETLInterceptor和com.lsl.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。
2.2.3 线上配置
a1.sources=r1          # 定义组件
a1.channels=c1 c2
a1.sinks=k1 k2

# configure source   
a1.sources.r1.type = com.wljs.flume.source.taildir.TaildirSource  # TAILDIR方式读取数据
a1.sources.r1.positionFile = /data/opt/module/flume/log_position.json # 记录日志读取位置
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+  # 读取日志位置
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
 
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.wljs.flume.interceptor.LogETLInterceptor$Builder  #ETL拦截器
a1.sources.r1.interceptors.i2.type = com.wljs.flume.interceptor.LogTypeInterceptor$Builder #日志类型拦截器

# selector
a1.sources.r1.selector.type = multiplexing #多路选择器
a1.sources.r1.selector.header = topic # 根据日志类型分数据
a1.sources.r1.selector.mapping.topic_start = c1 # 根据日志类型分数据
a1.sources.r1.selector.mapping.topic_event = c2

# configure channel  
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000 # channel中存储的最大event数,默认值100。
a1.channels.c1.byteCapacityBufferPercentage=20 #缓冲空间占Channel容量(byteCapacity)的百分比,为event中的头信息保留了空间,默认值20(单位:百分比)。
a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20

# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_start
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000 #Producer端单次批量发送的消息条数,该值应该根据实际环境适当调整,增大批量发送消息的条数能够在一定程度上提高性能,但是同时也增加了延迟和Producer端数据丢失的风险。默认值为100。
a1.sinks.k1.kafka.producer.acks = 1 #kafka的ack
a1.sinks.k1.channel = c1
# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_event
a1.sinks.k2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2

(3)修改/opt/module/flume/log_position.json的读写权限

[root@hadoop01 module]# mkdir -p /opt/module/flume
[root@hadoop01 flume]# touch log_position.json
[root@hadoop01 flume]# chmod 777 log_position.json
[root@hadoop01 module]# xsync /opt/module/flume/
注意:Json文件的父目录一定要创建好,并改好权限

2.2.4 Flume拦截器

自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志
日志类型区分拦截器主要用于,将ic日志和iu日志区分开来,方便发往Kafka的不同Topic。

1)创建Maven工程flume-interceptor
2)创建包名:com.wljs.flume.interceptor
3)在pom.xml文件中添加如下配置

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.6.0-cdh5.8.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

4)在com.wljs.flume.interceptor包下创建LogETLInterceptor类名
Flume ETL拦截器LogETLInterceptor

package com.wljs.flume.interceptor;
 
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.interceptor.Interceptor;
 
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
 public class LogETLInterceptor implements Interceptor {
 
     @Override
     public void initialize() {
 
     }
 
     @Override
     public Event intercept(Event event) {
 
         // 1 获取数据
         byte[] body = event.getBody();
         String log = new String(body, Charset.forName("UTF-8"));
 
         // 2 判断数据类型并向Header中赋值
         if (log.contains("start")) {
             if (LogUtils.validateStart(log)){
                 return event;
             }
         }else {
             if (LogUtils.validateEvent(log)){
                 return event;
             }
         }
         // 3 返回校验结果
         return null;
     }
 
     @Override
     public List<Event> intercept(List<Event> events) {
 
         ArrayList<Event> interceptors = new ArrayList<>();
 
         for (Event event : events) {
             Event intercept1 = intercept(event);
             if (intercept1 != null){
                 interceptors.add(intercept1);
             }
         }
         return interceptors;
     }
 
     @Override
     public void close() {
 
     }
 
     public static class Builder implements Interceptor.Builder{
 
         @Override
         public Interceptor build() {
             return new LogETLInterceptor();
         }
 
         @Override
         public void configure(Context context) {
 
         }
     }
 }

4)Flume日志过滤工具类

package com.lsl.flume.interceptor;
 import org.apache.commons.lang.math.NumberUtils;
 
 public class LogUtils {
 
     public static boolean validateEvent(String log) {
         // 服务器时间 | json
         // 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}
 
         // 1 切割
         String[] logContents = log.split("\\|");
 
         // 2 校验
         if(logContents.length != 2){
             return false;
         }
 
         //3 校验服务器时间
         if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
             return false;
         }
 
         // 4 校验json
         if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
             return false;
         }
 
         return true;
     }
 
     public static boolean validateStart(String log) {
 
         if (log == null){
             return false;
         }
 
         // 校验json
         if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
             return false;
         }
         return true;
     }
 }

5)Flume日志类型区分拦截器LogTypeInterceptor

package com.lsl.flume.interceptor;
 
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.interceptor.Interceptor;
 
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 public class LogTypeInterceptor implements Interceptor {
     @Override
     public void initialize() {
 
     }
 
     @Override
     public Event intercept(Event event) {
 
         // 区分日志类型:   body  header
         // 1 获取body数据
         byte[] body = event.getBody();
         String log = new String(body, Charset.forName("UTF-8"));
 
         // 2 获取header
         Map<String, String> headers = event.getHeaders();
 
         // 3 判断数据类型并向Header中赋值
         if (log.contains("start")) {
             headers.put("topic","topic_start");
         }else {
             headers.put("topic","topic_event");
         }
 
         return event;
     }
 
     @Override
     public List<Event> intercept(List<Event> events) {
 
         ArrayList<Event> interceptors = new ArrayList<>();
 
         for (Event event : events) {
             Event intercept1 = intercept(event);
 
             interceptors.add(intercept1);
         }

         return interceptors;
     }
 
     @Override
     public void close() {
 
     }
 
     public static class Builder implements  Interceptor.Builder{
 
         @Override
         public Interceptor build() {
             return new LogTypeInterceptor();
         }
 
         @Override
         public void configure(Context context) {
 
         }
     }
 }

6)打包
拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面。

注意:为什么不需要依赖包?因为依赖包在flume的lib目录下面已经存在了。

7)采用root用户将flume-interceptor-1.0-SNAPSHOT.jar包放入到hadoop01的/data/opt/cloudera/parcels/CDH/lib/flume-ng/lib/文件夹下面。

[root@hadoop01 lib]# ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT.jar

8)分发Flume到hadoop02

[root@hadoop01 lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar

三、kafka数据

3.1 离线安装

3.1.1 下载

安装jar包:http://archive.cloudera.com/csds/
安装包下载:http://archive.cloudera.com/kafka/parcels/4.0.0/
图片: https://uploader.shimo.im/f/dtHh8T705Xg7xqn2.png

3.1.2 安装

  1. 上传下载下来的安装包至集群路径/data/opt/cloudera/parcel-repo并修改KAFKA-4.0.0-1.4.0.0.p0.1-precise.parcel.sha1 文件名为KAFKA-4.0.0-1.4.0.0.p0.1-precise.parcel.sha
  2. 创建/data/opt/cloudera/csd目录导入Kafka-1.2.0.jar
    图片: https://uploader.shimo.im/f/fJWmqshX0GkV5tWm.png
  3. 点击安装主页上的主机–>Parcel–>检查新Parcel–>选择操作【分发->激活】图片: https://uploader.shimo.im/f/rVRe9S6KIowEpSlz.png
    图片: https://uploader.shimo.im/f/BfQ5jdjVayMJZl2O.png

3.2 kafka消息

3.2.1 查看Kafka Topic列表

kafka-topics --zookeeper hadoop01:2181 --list

3.2.2 创建Kafka Topic

创建:启动日志主题、事件日志主题。
1)创建启动日志主题
kafka-topics --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --create --replication-factor 2 --partitions 5 --topic topic_start
2)创建事件日志主题
kafka-topics --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --create --replication-factor 2 --partitions 5 --topic topic_event

3.2.3 删除Kafka Topic

1)删除启动日志主题
bin/kafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic topic_start
2)删除事件日志主题
bin/kafka-topics.sh --delete --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 --topic topic_event

3.2.4 Kafka生产消息

kafka-console-producer --broker-list hadoop01:9092 --topic topic_start

3.2.5 Kafka消费消息

kafka-console-consumer --bootstrap-server hadoop01:9092 --from-beginning --topic topic_start
–from-beginning:会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

3.2.6 查看Kafka Topic详情

kafka-topics --zookeeper hadoop01:2181 --describe --topic topic_start

四、消费kafka中的数据

4.1 Flume消费Kafka数据写到HDFS

1)集群规划
2)Flume配置分析
图片: https://uploader.shimo.im/f/D6DxTn1pZ7ox05to.png
3)Flume的具体配置如下:
(1)在CM管理页面hadoop03上Flume的配置中找到代理名称

a1
在配置文件如下内容(kafka-hdfs)
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000  # 一批写入 channel 的最大消息数
a1.sources.r2.batchDurationMillis = 2000# 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。
a1.sources.r2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sources.r2.kafka.topics=topic_event

## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000 # 内存中存储 Event 的最大数
a1.channels.c1.transactionCapacity=10000 # source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)

## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/wljs/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true #是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
a1.sinks.k1.hdfs.roundValue = 10 # 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30
a1.sinks.k1.hdfs.roundUnit = second # 向下舍入的单位,可选值: second 、 minute 、 hour

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/wljs/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second

## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 600 #当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
a1.sinks.k1.hdfs.rollSize = 134217728 #当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
a1.sinks.k1.hdfs.rollCount = 0 #当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
a1.sinks.k1.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制

a1.sinks.k2.hdfs.rollInterval = 600
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制

## 控制输出文件是原生文件。
## a1.sinks.k1.hdfs.fileType = CompressedStream 
## a1.sinks.k2.hdfs.fileType = CompressedStream 

## a1.sinks.k1.hdfs.codeC = lzop
## a1.sinks.k2.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
4.2 线上配置
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 8000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 8000  # 一批写入 channel 的最大消息数
a1.sources.r2.batchDurationMillis = 2000# 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。
a1.sources.r2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sources.r2.kafka.topics=topic_event

## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000 # 内存中存储 Event 的最大数
a1.channels.c1.transactionCapacity=10000 # source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)

## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/wljs/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true #是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
a1.sinks.k1.hdfs.roundValue = 10 # 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30
a1.sinks.k1.hdfs.roundUnit = minute # 向下舍入的单位,可选值: second 、 minute 、 hour

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/wljs/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = minute

## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 120 #当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
a1.sinks.k1.hdfs.rollSize = 0 #当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
a1.sinks.k1.hdfs.rollCount = 0 #当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
a1.sinks.k1.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制

a1.sinks.k2.hdfs.rollInterval = 120
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制

## 控制输出文件
## a1.sinks.k1.hdfs.fileType = CompressedStream 
## a1.sinks.k2.hdfs.fileType = CompressedStream 

## a1.sinks.k1.hdfs.codeC = lzop
## a1.sinks.k2.hdfs.codeC = lzop

## 控制输出文件是原生文件。
a1.sinks.k2.hdfs.fileType=DataStream #文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数
a1.sinks.k2.hdfs.idleTimeout=65 # 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
a1.sinks.k2.hdfs.callTimeout=65000 # 允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒)
a1.sinks.k2.hdfs.threadsPoolSize=200 #每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)

a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.idleTimeout=65
a1.sinks.k2.hdfs.callTimeout=65000
a1.sinks.k2.hdfs.threadsPoolSize=200

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

4.3 日志生成数据传输到HDFS

1)将log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar上传都hadoop01的/opt/module目录
2)分发log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar到hadoop02

[root@hadoop02 module]# xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

3)在/root/bin目录下创建脚本lg.sh

[root@hadoop02 bin]$ vim lg.sh

4)在脚本中编写如下内容

#! /bin/bash
for i in hadoop01 hadoop02 
do
   ssh $i "java -classpath /data/opt/module/log-produce.jar com.lsl.appclient.AppMain $1 $2 >/data/opt/module/test.log &"
done

5)修改脚本执行权限

[root@hadoop02 bin]$ chmod 777 lg.sh

6)启动脚本

[root@hadoop02 module]$ lg.sh 

五、可能出现的问题及解决

5.1 flume

5.1.1 OOM 问题

Flume 启动时的最大堆内存大小默认是 20M,线上环境很容易 OOM,因此需要你在 flume-env.sh 中添加 JVM 启动参数:

JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

5.1.2 cdh5.8.0-1.6.0 flume-tairdir问题

taildirSource组件不支持文件改名的。如果文件改名会认为是新文件,就会重新读取,这就导致了日志文件重读!!!

解决:下载1.7flume源码修改编译,上传至flume的lib目录下

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐