cdh5.8.0 flume+kafka用户行为日志数据采集方案详解
一、日志模拟1.1 模拟日志生成java -classpath /data/opt/module/log-produce.jar com.lsl.appclient.AppMain >/data/opt/module/test.log二、flume数据采集2.1 Flume安装2.1.1 日志采集Flume安装1)添加服务2)选择Flume,点击继续3)选择节点4)完成...
组件 | 版本 |
---|---|
flume | 1.6.0+cdh5.8.0 |
kafka | 2.1.0+kafka4.0.0 |
一、日志模拟
1.1 模拟日志生成
java -classpath /data/opt/module/log-produce.jar com.lsl.appclient.AppMain >/data/opt/module/test.log
二、flume数据采集
2.1 Flume安装
2.1.1 日志采集Flume安装
1)添加服务
2)选择Flume,点击继续
3)选择节点
4)完成
2.2.2 日志采集Flume配置
1)Flume配置分析
图片: https://uploader.shimo.im/f/0X5ZXNUeD4clkwtO.png
Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。
2)Flume的流程配置如下:
(1)在CM管理页面上点击Flume
(2)在实例页面选择hadoop01上的Agent
(3)在CM管理页面hadoop01上Flume的配置中找到代理名称改为a1
(4)在配置文件如下内容(flume-kafka)
a1.sources=r1 # 定义组件
a1.channels=c1 c2
a1.sinks=k1 k2
# configure source
a1.sources.r1.type = com.wljs.flume.source.taildir.TaildirSource # TAILDIR方式读取数据
a1.sources.r1.positionFile = /data/opt/module/flume/log_position.json # 记录日志读取位置
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ # 读取日志位置
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.wljs.flume.interceptor.LogETLInterceptor$Builder #ETL拦截器
a1.sources.r1.interceptors.i2.type = com.wljs.flume.interceptor.LogTypeInterceptor$Builder #日志类型拦截器
# selector
a1.sources.r1.selector.type = multiplexing #多路选择器
a1.sources.r1.selector.header = topic # 根据日志类型分数据
a1.sources.r1.selector.mapping.topic_start = c1 # 根据日志类型分数据
a1.sources.r1.selector.mapping.topic_event = c2
# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000 # channel中存储的最大event数,默认值100。
a1.channels.c1.byteCapacityBufferPercentage=20 #缓冲空间占Channel容量(byteCapacity)的百分比,为event中的头信息保留了空间,默认值20(单位:百分比)。
a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20
# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_start
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000 #Producer端单次批量发送的消息条数,该值应该根据实际环境适当调整,增大批量发送消息的条数能够在一定程度上提高性能,但是同时也增加了延迟和Producer端数据丢失的风险。默认值为100。
a1.sinks.k1.kafka.producer.acks = 1 #kafka的ack
a1.sinks.k1.channel = c1
# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_event
a1.sinks.k2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2
注意:com.lsl.flume.interceptor.LogETLInterceptor和com.lsl.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。
2.2.3 线上配置
a1.sources=r1 # 定义组件
a1.channels=c1 c2
a1.sinks=k1 k2
# configure source
a1.sources.r1.type = com.wljs.flume.source.taildir.TaildirSource # TAILDIR方式读取数据
a1.sources.r1.positionFile = /data/opt/module/flume/log_position.json # 记录日志读取位置
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ # 读取日志位置
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.wljs.flume.interceptor.LogETLInterceptor$Builder #ETL拦截器
a1.sources.r1.interceptors.i2.type = com.wljs.flume.interceptor.LogTypeInterceptor$Builder #日志类型拦截器
# selector
a1.sources.r1.selector.type = multiplexing #多路选择器
a1.sources.r1.selector.header = topic # 根据日志类型分数据
a1.sources.r1.selector.mapping.topic_start = c1 # 根据日志类型分数据
a1.sources.r1.selector.mapping.topic_event = c2
# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000 # channel中存储的最大event数,默认值100。
a1.channels.c1.byteCapacityBufferPercentage=20 #缓冲空间占Channel容量(byteCapacity)的百分比,为event中的头信息保留了空间,默认值20(单位:百分比)。
a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20
# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_start
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000 #Producer端单次批量发送的消息条数,该值应该根据实际环境适当调整,增大批量发送消息的条数能够在一定程度上提高性能,但是同时也增加了延迟和Producer端数据丢失的风险。默认值为100。
a1.sinks.k1.kafka.producer.acks = 1 #kafka的ack
a1.sinks.k1.channel = c1
# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_event
a1.sinks.k2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2
(3)修改/opt/module/flume/log_position.json的读写权限
[root@hadoop01 module]# mkdir -p /opt/module/flume
[root@hadoop01 flume]# touch log_position.json
[root@hadoop01 flume]# chmod 777 log_position.json
[root@hadoop01 module]# xsync /opt/module/flume/
注意:Json文件的父目录一定要创建好,并改好权限
2.2.4 Flume拦截器
自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志
日志类型区分拦截器主要用于,将ic日志和iu日志区分开来,方便发往Kafka的不同Topic。
1)创建Maven工程flume-interceptor
2)创建包名:com.wljs.flume.interceptor
3)在pom.xml文件中添加如下配置
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0-cdh5.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
4)在com.wljs.flume.interceptor包下创建LogETLInterceptor类名
Flume ETL拦截器LogETLInterceptor
package com.wljs.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 1 获取数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 判断数据类型并向Header中赋值
if (log.contains("start")) {
if (LogUtils.validateStart(log)){
return event;
}
}else {
if (LogUtils.validateEvent(log)){
return event;
}
}
// 3 返回校验结果
return null;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
if (intercept1 != null){
interceptors.add(intercept1);
}
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4)Flume日志过滤工具类
package com.lsl.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
public static boolean validateEvent(String log) {
// 服务器时间 | json
// 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}
// 1 切割
String[] logContents = log.split("\\|");
// 2 校验
if(logContents.length != 2){
return false;
}
//3 校验服务器时间
if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
return false;
}
// 4 校验json
if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
return false;
}
return true;
}
public static boolean validateStart(String log) {
if (log == null){
return false;
}
// 校验json
if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
return false;
}
return true;
}
}
5)Flume日志类型区分拦截器LogTypeInterceptor
package com.lsl.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 区分日志类型: body header
// 1 获取body数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 获取header
Map<String, String> headers = event.getHeaders();
// 3 判断数据类型并向Header中赋值
if (log.contains("start")) {
headers.put("topic","topic_start");
}else {
headers.put("topic","topic_event");
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
interceptors.add(intercept1);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
6)打包
拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面。
注意:为什么不需要依赖包?因为依赖包在flume的lib目录下面已经存在了。
7)采用root用户将flume-interceptor-1.0-SNAPSHOT.jar包放入到hadoop01的/data/opt/cloudera/parcels/CDH/lib/flume-ng/lib/文件夹下面。
[root@hadoop01 lib]# ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT.jar
8)分发Flume到hadoop02
[root@hadoop01 lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar
三、kafka数据
3.1 离线安装
3.1.1 下载
安装jar包:http://archive.cloudera.com/csds/
安装包下载:http://archive.cloudera.com/kafka/parcels/4.0.0/
图片: https://uploader.shimo.im/f/dtHh8T705Xg7xqn2.png
3.1.2 安装
- 上传下载下来的安装包至集群路径/data/opt/cloudera/parcel-repo并修改KAFKA-4.0.0-1.4.0.0.p0.1-precise.parcel.sha1 文件名为KAFKA-4.0.0-1.4.0.0.p0.1-precise.parcel.sha
- 创建/data/opt/cloudera/csd目录导入Kafka-1.2.0.jar
图片: https://uploader.shimo.im/f/fJWmqshX0GkV5tWm.png - 点击安装主页上的主机–>Parcel–>检查新Parcel–>选择操作【分发->激活】图片: https://uploader.shimo.im/f/rVRe9S6KIowEpSlz.png
图片: https://uploader.shimo.im/f/BfQ5jdjVayMJZl2O.png
3.2 kafka消息
3.2.1 查看Kafka Topic列表
kafka-topics --zookeeper hadoop01:2181 --list
3.2.2 创建Kafka Topic
创建:启动日志主题、事件日志主题。
1)创建启动日志主题
kafka-topics --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --create --replication-factor 2 --partitions 5 --topic topic_start
2)创建事件日志主题
kafka-topics --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --create --replication-factor 2 --partitions 5 --topic topic_event
3.2.3 删除Kafka Topic
1)删除启动日志主题
bin/kafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic topic_start
2)删除事件日志主题
bin/kafka-topics.sh --delete --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 --topic topic_event
3.2.4 Kafka生产消息
kafka-console-producer --broker-list hadoop01:9092 --topic topic_start
3.2.5 Kafka消费消息
kafka-console-consumer --bootstrap-server hadoop01:9092 --from-beginning --topic topic_start
–from-beginning:会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
3.2.6 查看Kafka Topic详情
kafka-topics --zookeeper hadoop01:2181 --describe --topic topic_start
四、消费kafka中的数据
4.1 Flume消费Kafka数据写到HDFS
1)集群规划
2)Flume配置分析
图片: https://uploader.shimo.im/f/D6DxTn1pZ7ox05to.png
3)Flume的具体配置如下:
(1)在CM管理页面hadoop03上Flume的配置中找到代理名称
a1
在配置文件如下内容(kafka-hdfs)
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000 # 一批写入 channel 的最大消息数
a1.sources.r2.batchDurationMillis = 2000# 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。
a1.sources.r2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000 # 内存中存储 Event 的最大数
a1.channels.c1.transactionCapacity=10000 # source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)
## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/wljs/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true #是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
a1.sinks.k1.hdfs.roundValue = 10 # 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30
a1.sinks.k1.hdfs.roundUnit = second # 向下舍入的单位,可选值: second 、 minute 、 hour
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/wljs/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 600 #当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
a1.sinks.k1.hdfs.rollSize = 134217728 #当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
a1.sinks.k1.hdfs.rollCount = 0 #当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
a1.sinks.k1.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制
a1.sinks.k2.hdfs.rollInterval = 600
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制
## 控制输出文件是原生文件。
## a1.sinks.k1.hdfs.fileType = CompressedStream
## a1.sinks.k2.hdfs.fileType = CompressedStream
## a1.sinks.k1.hdfs.codeC = lzop
## a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
4.2 线上配置
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 8000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 8000 # 一批写入 channel 的最大消息数
a1.sources.r2.batchDurationMillis = 2000# 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。
a1.sources.r2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000 # 内存中存储 Event 的最大数
a1.channels.c1.transactionCapacity=10000 # source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)
## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/wljs/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true #是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
a1.sinks.k1.hdfs.roundValue = 10 # 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30
a1.sinks.k1.hdfs.roundUnit = minute # 向下舍入的单位,可选值: second 、 minute 、 hour
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/wljs/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = minute
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 120 #当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
a1.sinks.k1.hdfs.rollSize = 0 #当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
a1.sinks.k1.hdfs.rollCount = 0 #当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
a1.sinks.k1.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制
a1.sinks.k2.hdfs.rollInterval = 120
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制
## 控制输出文件
## a1.sinks.k1.hdfs.fileType = CompressedStream
## a1.sinks.k2.hdfs.fileType = CompressedStream
## a1.sinks.k1.hdfs.codeC = lzop
## a1.sinks.k2.hdfs.codeC = lzop
## 控制输出文件是原生文件。
a1.sinks.k2.hdfs.fileType=DataStream #文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数
a1.sinks.k2.hdfs.idleTimeout=65 # 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
a1.sinks.k2.hdfs.callTimeout=65000 # 允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒)
a1.sinks.k2.hdfs.threadsPoolSize=200 #每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.idleTimeout=65
a1.sinks.k2.hdfs.callTimeout=65000
a1.sinks.k2.hdfs.threadsPoolSize=200
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
4.3 日志生成数据传输到HDFS
1)将log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar上传都hadoop01的/opt/module目录
2)分发log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar到hadoop02
[root@hadoop02 module]# xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar
3)在/root/bin目录下创建脚本lg.sh
[root@hadoop02 bin]$ vim lg.sh
4)在脚本中编写如下内容
#! /bin/bash
for i in hadoop01 hadoop02
do
ssh $i "java -classpath /data/opt/module/log-produce.jar com.lsl.appclient.AppMain $1 $2 >/data/opt/module/test.log &"
done
5)修改脚本执行权限
[root@hadoop02 bin]$ chmod 777 lg.sh
6)启动脚本
[root@hadoop02 module]$ lg.sh
五、可能出现的问题及解决
5.1 flume
5.1.1 OOM 问题
Flume 启动时的最大堆内存大小默认是 20M,线上环境很容易 OOM,因此需要你在 flume-env.sh 中添加 JVM 启动参数:
JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
5.1.2 cdh5.8.0-1.6.0 flume-tairdir问题
taildirSource组件不支持文件改名的。如果文件改名会认为是新文件,就会重新读取,这就导致了日志文件重读!!!
解决:下载1.7flume源码修改编译,上传至flume的lib目录下
更多推荐
所有评论(0)