Flume自定义Sink根据日志信息拆分日志成不同的Topic发送到Kafka中
前言:最近因为疫情的原因嘛,就被一直关在了家里,也就有了大把的时间可以搞自己的事情,反正闲着也是闲着,就在家里在做一个项目,这个项目就是一个推荐系统,(后面我应该会专门来写一些博客来记录这一次的这个项目经历),在这个项目中呢有一个环节就是需要对HttpServer端产生的日志进行实时采集,然后再通过Spark 进行实时计算分析,关于日志的采集在本专题的前一篇文章中已经提到了,那样虽然也能把日志..
前言:
- 最近因为疫情的原因嘛,就被一直关在了家里,也就有了大把的时间可以搞自己的事情,反正闲着也是闲着,就在家里在做一个项目,这个项目就是一个推荐系统,(后面我应该会专门来写一些博客来记录这一次的这个项目经历),在这个项目中呢有一个环节就是需要对HttpServer端产生的日志进行实时采集,然后再通过Spark 进行实时计算分析,关于日志的采集在本专题的前一篇文章中已经提到了,那样虽然也能把日志采集到Kafka中,但是后面发现了一个问题就是,我把日志都丢在了一个主题里,其中很多日志都是我不要的,但还是加载进来了.
- 是这样的在我这个项目中Spark这边是做的一个窗口分析,也就是说每个窗口我设置了一个固定的 Duration .然后对日志分析,也只是需要一部分的Http请求日志,例如:搜索商品的请求,获取商品列表的请求,购物车的一系列动作,获取商品详情等,基本上在线分析这块也就重点分析前说的这些请求.其他的那些登录日志这些就可要可不要.
- 如果将全部的日志都放在一个主题中,那么我Spark在消费的时候,一个窗口内就会有很多无关紧要的日志信息占用着空间.也不利于分析.
- 然后我就想了个办法,当然博主还是一位大三的学生,也没太多的项目经验,也不知道这样的思路对不对,如果有大佬看见,嘿嘿嘿,还请多指教.
- 下面就来说所我的想法把,也正如标题这样,我的想法就是在Flume中Sink的这个环节这里,我通过自定义一个Sink读取日志数据,然后将日志中的Http请求地址信息读取出来,然后根据这个地址分类,分别发送到相应的KafkaTopic中去,一些无关紧要的日志信息就丢弃掉.整个的思路就是下图这样:
- 图很简陋,能明白意思就行,其中这个BreakLogToKafkaSink就是我们要自定义的这个Sink,就是通过它将我们的日志根据内容进行拆分,下面就来看一下整个的实现流程.
一. 准备
- 看了一下官网的开发文档,要想自定义一个Sink也很简单,只需要继承一个抽象类 AbstractSink 和一个用于接收配置参数的接口 Configurable 即可.
- 然后呢就需要实现两个方法
- 一个就是public Status process() throws EventDeliveryException {}这个方法会被多次调用,反复执行,也就是通过它来实时的获取Channel流出来的数据;
- 第二个就是public void configure(Context context) {} 这个方法主要是通过传入的这个Contex上下文对象.来个获取配置文件中的参数,一些初始化的工作可以写在这个方法里面.
上面简单阐述了一下自定义Sink实现的一些原理,然后下面就要来准备一下开发环境:
这里博主使用的idea作为编辑器,maven进行环境配置
这是我的pom.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>BreakLogToKafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--Flume 依赖-->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.9.0</version>
</dependency>
<!--Kafka 依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
</project>
二.编写代码
这里就主要编写了两个java文件,一个就是上面说的继承自AbstractSink的自定义Sink文件,还有一个呢就是用于日志分类的一个java文件.
BreakLogToKafka.java 代码:
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import java.util.Properties;
public class BreakLogToKafka extends AbstractSink implements Configurable {
private MessageClassifier messageClassifier;
public Status process() throws EventDeliveryException {
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
try{
Event event = channel.take();
if (event == null){
// 回滚事务
transaction.rollback();
status = Status.BACKOFF;
return status;
}
// 获取消息体
byte[] body = event.getBody();
// 将body 转换成字符串
final String msg = new String(body);
// 使用自定义的消息分发器对日志进行选择分发到Kafka中
status = messageClassifier.startClassifier(msg) ;
// 提交事务
transaction.commit();
}catch (Exception e){
transaction.rollback();
e.printStackTrace();
status = Status.BACKOFF;
}finally {
transaction.close();
}
return status;
}
public void configure(Context context) {
Properties properties = new Properties();
// 读取配置文件中的文件并配置Kafka相应的参数
properties.put("bootstrap.servers",context.getString("bootstrap.servers","localhost:9092"));
properties.put("acks",context.getString("acks","all"));
properties.put("retries",Integer.parseInt(context.getString("retries","0")));
properties.put("batch.size",Integer.parseInt(context.getString("batch.size","16384")));
properties.put("linger.ms",Integer.parseInt(context.getString("linger.ms","1")));
properties.put("buffer.memory", Integer.parseInt(context.getString("buffer.memory","33554432")));
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
messageClassifier = new MessageClassifier(properties);
}
}
MessageClassifier.java代码:
import org.apache.flume.Sink;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.regex.Pattern;
public class MessageClassifier {
/** "/product/search/get_items.action" 日志的模式串*/
private static final String GET_PRODUCT_ITEMS_ACTION_P = ".*/product/search/get_items\\.action.*";
/** "/product/search/get_items.action" 需要分发到的kafka对应的主题名*/
private static final String GET_PRODUCT_ITEMS_ACTION_T = "product_items_info";
/** "/product/detail_info.action" 日志的模式串*/
private static final String GET_PRODUCT_DETAIL_INFO_P = ".*/product/detail_info\\.action.*";
/** "/product/detail_info.action" 需要分发到的kafka对应的主题名*/
private static final String GET_PRODUCT_DETAIL_INFO_T = "get_product_detail_info";
/** "/product/car/*.action 日志的模式串 */
private static final String SHOP_CAR_OPERATION_INFO_P = ".*/product/car/.*";
/** "/product/car/*.action 需要分发到的kafka对应的主题名 */
private static final String SHOP_CAR_OPERATION_INFO_T = "shop_car_opt_info";
private final KafkaProducer<String,String> producer;
public MessageClassifier(Properties kafkaConf){
producer = new KafkaProducer<>(kafkaConf);
}
public Sink.Status startClassifier(String msg){
try{
if(Pattern.matches(GET_PRODUCT_ITEMS_ACTION_P,msg)){
producer.send(new ProducerRecord<>(GET_PRODUCT_ITEMS_ACTION_T,msg));
}else if (Pattern.matches(GET_PRODUCT_DETAIL_INFO_P,msg)){
producer.send(new ProducerRecord<>(GET_PRODUCT_DETAIL_INFO_T,msg));
}else if (Pattern.matches(SHOP_CAR_OPERATION_INFO_P,msg)){
producer.send(new ProducerRecord<>(SHOP_CAR_OPERATION_INFO_T,msg));
}
}catch (Exception e){
e.printStackTrace();
return Sink.Status.BACKOFF;
}
return Sink.Status.READY;
}
}
三.将上面的工程项目打包成 jar格式
会弹出一个对话框啥也不用管 直接选择 OK就行,最后Build项目
然后你就会发现在你项目中文件夹中会出现一个out文件夹,在里面找到jar包即可.
最后我们要把这个jar包放在flume的环境中来
这里提供两方法
- 方法一 (简单但不推荐,这样不好管理jar包):
找到我们刚在构建的这个jar包然后将它拷贝在$FLUME_HOME/lib 目录中去就大功告成 - 方法二 (也是官方认可的方法)
进入$FLUME_HOME目录中,然后新建一个 plugins.d 目录,然后在在这下面建立一个文件夹,怎么命名都可以,然后再进入你新建的这个目录,在这个目录中再新建两个目录,一个是lib目录(这里就是你存放自定义jar包的位置),还有一个目录就是libext (这个目录主要是用来存放自定义jar包的一些依赖,我们这里就不用管它了建好就行),建好之后你的目录结构应该是这样的:
plugins.d/
└── custom
├── lib
│ └── BreakLogToKafka.jar
└── libext
四.配置flume配置文件
进入$FLUME_HOME/conf 然后新建一个 custom_test.conf 文件用来存放配置信息,在这个文件中填入:
# agent的名称为a1
a1.sources=source1
a1.channels=channel1
a1.sinks=sink1
# set source
a1.sources.source1.type=TAILDIR
a1.sources.source1.filegroups=f1
a1.sources.source1.filegroups.f1=/home/zh123/PycharmProjects/RS_HttpServer/Test/logTest/.*log
a1sources.source1.fileHeader=flase
# set sink
a1.sinks.sink1.type=BreakLogToKafka
# set channel
a1.channels.channel1.type=memory
a1.channels.channel1.capacity=1000
a1.channels.channel1.transactionCapacity=1000
# bind
a1.sources.source1.channels=channel1
a1.sinks.sink1.channel=channel1
五.启动相应组件
- 首先启动zookeeper
zkServer.sh start
- 然后启动Kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
在这里启动完Kafka后还需要先把前面用到的Kafka主题进行创建,这里就不细说了.
- 启动消费者程序
- 启动日志模拟脚本
(关于消费者端的程序和日志模拟的脚本在上一篇文章可以查看) - 最后启动Flume
flume-ng agent --conf-file $FLUME_HOME/conf/custom_test.conf --name a1 -Dfume.root.logger=INFO,console
六.查看运行情况
Flume 运行截图:
消费者端运行截图 (这里的消费者只消费了一个主题):
在这里就可以看到红框就是Request请求地址(全是当前主题对应的请求地址),这就实现了根据请求地址划分Topic的需求.
写在最后的话:
不知不觉进入大学都已经过去三年了,我也从一个刚入学什么都不懂的小白成长成了入门级的小白(这其中还是蛮辛的,刚进学校那会儿我可以说是一个真真切切的小白,说出来你可能都不信,那时的我连盲打都不会,哈哈哈 QWER 倒是很熟,可以说对计算机是一窍不通吧,在后面的学习过程当中发现编程还是挺有意思(在这里还是要感谢一位老师,就不说是谁了,算是他把我领进门的吧),后面发现编程有点儿类似于打怪升级的这种,越玩越上瘾,技术就相当于游戏里面的装备吧,出于对技术的渴求,经常就钻研一些这计算机相关的书籍,看一看学习视频,也是通过这样慢慢的有了一定的基础,有了一些基础后就参加各种各样的比赛,ACM参加过也得过市级的奖状,程序设计大赛也参加过是得的一个国奖,还有蓝桥杯…,然后呢也做过很多项目,通过这样就慢慢成长起来有了现在的自己,在这里我也想告诫一才刚刚踏入校门小伙伴门,既然选择了这一行,你就真的只有负重前行,没有等出来的辉煌,你不努力凭什么和别人比,凭运气吗?对不起这一行只有硬实力才行,你再怎么包装也躲不过面试的环节,还请珍重),这学期一结束就该去实习了,唉~今年又受这个疫情的影响,经常就听见哪里哪里在裁员,听着还听慌的,暂时还没有计划取哪儿实习,嘿嘿嘿如果各位小伙伴有资源的话,还请推荐哦.
更多推荐
所有评论(0)