Flume自定义拦截器
需求定义两个拦截器,一个用于过滤不合法数据,一个用于区分日志类型。ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志。日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。导入依赖<dependencies><dependency><groupId>org.apache.flume</groupId&g
·
需求
- 定义两个拦截器,一个用于过滤不合法数据,一个用于区分日志类型。
- ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志。
- 日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。
导入依赖
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
ETL拦截器
package com.aura.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 1 获取数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 判断数据类型并向Header中赋值
if (log.contains("start")) {
if (LogUtils.validateStart(log)){
return event;
}
}else {
if (LogUtils.validateEvent(log)){
return event;
}
}
// 3 返回校验结果
return null;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
if (intercept1 != null){
interceptors.add(intercept1);
}
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
LogUtils工具类
package com.aura.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
public static boolean validateEvent(String log) {
// 服务器时间 | json
// 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}
// 1 切割
String[] logContents = log.split("\\|");
// 2 校验
if(logContents.length != 2){
return false;
}
//3 校验服务器时间
if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
return false;
}
// 4 校验json
if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
return false;
}
return true;
}
public static boolean validateStart(String log) {
// {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2","extend1":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4","ln":"-98.3","loading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1","os":"8.2.1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"}
if (log == null){
return false;
}
// 校验json
if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
return false;
}
return true;
}
}
日志类型拦截器
package com.aura.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 区分日志类型: body header
// 1 获取body数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 获取header
Map<String, String> headers = event.getHeaders();
// 3 判断数据类型并向Header中赋值
if (log.contains("start")) {
headers.put("topic","topic_start");
}else {
headers.put("topic","topic_event");
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
interceptors.add(intercept1);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
Flume配置文件
#从指定目录加载日志文件,到kafka channel,kafka channel有两个,一个保存启动日志,一个保存行为日志
# 指定source、channel
a1.sources = r1
a1.channels = c1 c2
# 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1 c2
# 断点续传索引文件
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 200
# 配置拦截器
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.aura.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.aura.flume.interceptor.LogTypeInterceptor$Builder
# 选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
# 指定topic名字
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
# 配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
# 不包含flume headers信息
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
更多推荐
已为社区贡献1条内容
所有评论(0)