在进行本机实践时遇到的错误具体见这篇博客:flume复制和多路复用简单示例(监控日志文件对接hdfs、kafka、本地文件系统)
错误如下:

19/09/17 03:59:34 ERROR hdfs.HDFSEventSink: process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
	at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:256)
	at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:465)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:748)
19/09/17 03:59:34 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
	at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:256)
	at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:465)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
	... 3 more

提取关键信息:

org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

时间戳的问题,参考这篇博客:https://www.cnblogs.com/mingfengshan/p/6853615.html

原因是因为写入到hfds时使用到了时间戳来区分目录结构,flume的消息组件event在接受到之后在header中没有发现时间戳参数,导致该错误发生,有三种方法可以解决这个错误;
1、agent1.sources.source1.interceptors = t1
agent1.sources.source1.interceptors.t1.type = timestamp 为source添加拦截,每条event头中加入时间戳;(效率会慢一些)
2、agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 为sink指定该参数为true (如果客户端和flume集群时间不一致数据时间会不准确)
3、在向source发送event时,将时间戳参数添加到event的header中即可,header是一个map,添加时mapkey为timestamp(推荐使用)

我直接就是在对应配置文件上添加了(采用上述第二种方法):

#using local timestamp
a2.sinks.k1.hdfs.useLocalTimeStamp = true

问题解决
在这里插入图片描述
第三中方法就是自定义拦截器了手动在代码里添加时间戳了。

网上有说是flume的一个bug,详见https://issues.apache.org/jira/browse/FLUME-1419

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐