背景:
前段时间写了个 Flume实时采集日志到 Kafka(极简版),其中我们是使用 exec source执行 tail命令来监控采集日志的,但这样做会存在一些问题:如果agent进程突然挂了,下次重启采集任务,会导致日志文件内容重复采集,虽然进程挂了这种事情不常发生,当我们还是要尽量避免因此带来的负面影响!


一、方案选择

和一些朋友交流过Flume断点续传问题,他们往往是自己修改source源码,写一个自定义的source,继承 AbstractSource 、实现 EventDrivenSource,Configurable接口;这种方案还不错,可以达到目的,但存在两个不便之处:一是自己造轮子需要开发、维护成本,二是如果团队水平不足可能导致后续各种bug。

那么Flume有没有提供现成的轮子可以解决此问题呢?答案是有的,那就是 Taildir Source

我们进入Flume官网,可以在Version 1.7.0Changes 里看到:
在这里插入图片描述
那么 Taildir Source有什么特点呢?

在这里插入图片描述
翻译如下:

注意:此source作为预览功能提供。它不适用于Windows。

观察指定的文件,并在检测到新行被添加到每个文件后能几乎实时地tail它们。如果正在写入新行,则此source将重试读取它们以等待写入完成。

此source是可靠的,即使tail的文件轮替也不会丢失数据。它定期以JSON格式写入给定位置文件上每个文件的最后读取位置。如果Flume由于某种原因stop或down,它可以从文件position处重新开始tail。

在其他用法中,此source也可以通过给定的position文件从每个文件的任意位置开始tail。当指定路径上没有position文件时,默认情况下它将从每个文件的第一行开始tail。

文件将按修改时间顺序使用。将首先使用具有最早修改时间的文件。

此source不会重命名或删除或对正在tail的文件执行任何修改。目前此source不支持tail二进制文件。它只能逐行读取文本文件。

我们可以发现,其功能的重点就在于有一个记录采集文件position记录,每次重新采集都可以从该记录中获取上一次的position,接着上次往后采集,也就是能解决断点续传的问题!


二、具体配置

在Flume的conf目录下创建配置文件:kafka-producer-moercredit.conf,内容如下:

pro.sources = s1
pro.channels = c1
pro.sinks = k1

pro.sources.s1.type = TAILDIR
pro.sources.s1.positionFile = /home/dev/flume/flume-1.8.0/log/taildir_position.json
pro.sources.s1.filegroups = f1
pro.sources.s1.filegroups.f1 = /home/dev/log/moercredit/logstash.log
pro.sources.s1.headers.f1.headerKey1 = aaa
pro.sources.s1.fileHeader = true

pro.channels.c1.type = memory
pro.channels.c1.capacity = 1000
pro.channels.c1.transactionCapacity = 100

pro.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
pro.sinks.k1.kafka.topic = moercredit_log_test
pro.sinks.k1.kafka.bootstrap.servers = cdh1:9092,cdh2:9092,cdh3:9092
pro.sinks.k1.kafka.flumeBatchSize = 20
pro.sinks.k1.kafka.producer.acks = 1
pro.sinks.k1.kafka.producer.linger.ms = 1
pro.sinks.k1.kafka.producer.compression.type = snappy

pro.sources.s1.channels = c1
pro.sinks.k1.channel = c1

对比上篇博客只修改了source部分,这个应该能一眼看懂意思。

要注意的是 filegroups是一组文件,可以以空格分隔,也支持正则表达式。

该source具体参数含义可以看官网:
在这里插入图片描述


三、使用测试及深入理解

建议看完,会理解更深刻一些。

bin目录下执行命令:

nohup ./flume-ng agent -n pro -c ../conf/ -f ../conf/kafka-producer-moercredit.conf >/dev/null 2>&1 &

执行后发现在当前目录下产生了一个logs目录,里面有一个flume.log文件,部分内容如下:

19 Apr 2019 14:41:03,800 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration.validateConfiguration:140)  - Post-validation flume configuration contains configuration for agents: [pro]
19 Apr 2019 14:41:03,800 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:147)  - Creating channels
19 Apr 2019 14:41:03,807 INFO  [conf-file-poller-0] (org.apache.flume.channel.DefaultChannelFactory.create:42)  - Creating instance of channel c1 type memory
19 Apr 2019 14:41:03,816 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:201)  - Created channel c1
19 Apr 2019 14:41:03,817 INFO  [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:41)  - Creating instance of source s1, type TAILDIR
19 Apr 2019 14:41:03,908 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42)  - Creating instance of sink: k1, type: org.apache.flume.sink.kafka.KafkaSink
19 Apr 2019 14:41:03,916 INFO  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:314)  - Using the static topic moercredit_log_test. This may be overridden by event headers
19 Apr 2019 14:41:03,929 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.getConfiguration:116)  - Channel c1 connected to [s1, k1]
19 Apr 2019 14:41:03,937 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:137)  - Starting new configuration:{ sourceRunners:{s1=PollableSourceRunner: { source:Taildir source: { positionFile: /home/dev/flume/flume-1.8.0/log/taildir_position.json, skipToEnd: false, byteOffsetHeader: false, idleTimeout: 120000, writePosInterval: 3000 } counterGroup:{ name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@30010525 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
19 Apr 2019 14:41:03,938 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:144)  - Starting Channel c1
19 Apr 2019 14:41:04,011 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:119)  - Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
19 Apr 2019 14:41:04,011 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:95)  - Component type: CHANNEL, name: c1 started
19 Apr 2019 14:41:04,011 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:171)  - Starting Sink k1
19 Apr 2019 14:41:04,012 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:182)  - Starting Source s1
19 Apr 2019 14:41:04,014 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.TaildirSource.start:92)  - s1 TaildirSource source starting with directory: {f1=/home/dev/log/moercredit/logstash.log}
19 Apr 2019 14:41:04,018 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>:83)  - taildirCache: [{filegroup='f1', filePattern='/home/dev/log/moercredit/logstash.log', cached=true}]
19 Apr 2019 14:41:04,024 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>:84)  - headerTable: {f1={headerKey1=aaa}}
19 Apr 2019 14:41:04,029 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile:283)  - Opening file: /home/dev/log/moercredit/logstash.log, inode: 807943550, pos: 0
19 Apr 2019 14:41:04,031 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>:94)  - Updating position from position file: /home/dev/flume/flume-1.8.0/log/taildir_position.json
19 Apr 2019 14:41:04,031 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.loadPositionFile:144)  - File not found: /home/dev/flume/flume-1.8.0/log/taildir_position.json, not updating position
19 Apr 2019 14:41:04,033 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:119)  - Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
19 Apr 2019 14:41:04,033 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:95)  - Component type: SOURCE, name: s1 started

通过该日志我们可以详细看到flume运行过程,我们重点关注这几行:

Opening file: /home/dev/log/moercredit/logstash.log, inode: 807943550, pos: 0
Updating position from position file: /home/dev/flume/flume-1.8.0/log/taildir_position.json
File not found: /home/dev/flume/flume-1.8.0/log/taildir_position.json, not updating position

第一次执行该flume agent进程,先找到待采集的日志文件inode为807943550,然后会创建taildir_position.json文件将pos更新其中,进程运行后会马上采集完该日志文件,并更新position,我们此时查看下taildir_position.json文件内容:

[{"inode":807943550,"pos":579077,"file":"/home/dev/log/moercredit/logstash.log"}]

其实就是个json array,每个采集文件对应一个数组元素,每个元素包含三个属性:inode(文件唯一标识号码)、pos(被采集文件的最后采集位置,也就是文件的byte字节数)、file(被采集文件的绝对路径)

扩展知识:

除了文件名以外的所有文件元信息,都存在inode之中,每个inode都有一个号码,操作系统用inode号码来识别不同的文件。

这里值得重复一遍,Unix/linux系统内部不使用文件名,而使用inode号码来识别文件。对于系统来说,文件名只是inode号码便于识别的别称或者绰号。
表面上,用户通过文件名,打开文件。实际上,系统内部这个过程分成三步:首先,系统找到这个文件名对应的inode号码;其次,通过inode号码,获取inode信息;最后,根据inode信息,找到文件数据所在的block,读出数据。

使用ls -i命令,可以看到文件名对应的inode号码:
[dev@localhost log]$ ls -i
taildir_position.json 542922380 taildir_position.json

或者通过stat命令查看inode元信息:
[dev@localhost log]$ stat taildir_position.json
File: ‘taildir_position.json’ Size: 81 Blocks: 8 IO
Block: 4096 regular file Device: fd02h/64770d Inode: 542922380
Links: 1 Access: (0664/-rw-rw-r–) Uid: ( 1000/ dev) Gid: (
1000/ dev) Context: unconfined_u:object_r:user_home_t:s0 Access:
2019-04-19 15:18:19.034511139 +0800 Modify: 2019-04-19
15:19:24.806511139 +0800 Change: 2019-04-19 15:19:24.806511139 +0800
Birth: -

由于inode号码与文件名分离,这种机制导致了一些Unix/Linux系统特有的现象。
  1. 有时,文件名包含特殊字符,无法正常删除。这时,直接删除inode节点,就能起到删除文件的作用。
  2. 移动文件或重命名文件,只是改变文件名,不影响inode号码。
  3. 打开一个文件以后,系统就以inode号码来识别这个文件,不再考虑文件名。

因此,通常来说,系统无法从inode号码得知文件名。 第3点使得软件更新变得简单,可以在不关闭软件的情况下进行更新,不需要重启。因为系统通过inode号码,识别运行中的文件,不通过文件名。更新的时候,新版文件以同样的文件名,生成一个新的inode,不会影响到运行中的文件。等到下一次运行这个软件的时候,文件名就自动指向新版文件,旧版文件的inode则被回收。

我们测试下,如果flume进程down了,重启是否会重复消费:
目前topic数据为1661条。
在这里插入图片描述

[{"inode":807943550,"pos":585006,"file":"/home/dev/log/moercredit/logstash.log"}

重启后:
topic数据为1663条,并未重复消费(这两条是操作时新增的数据),达到了断点续传的目的!
在这里插入图片描述
查看flume.log:

Updating position from position file: /home/dev/flume/flume-1.8.0/log/taildir_position.json
Updated position, file: /home/dev/log/moercredit/logstash.log, inode: 807943550, pos: 585006

这时问题来了:

1.该source是根据文件名还是inode采集对应文件呢?
2.读取taildir_position.json文件中既有inode也有filepath,到底以谁为主

先看第一个问题,因为conf中的source配置的是文件路径:
pro.sources.s1.filegroups.f1 = /home/dev/log/moercredit/logstash.log
猜测是根据文件名来采集文件的,即文件名改了,会导致采集中断,再新建一个文件名和原来一样的文件,会采集新的文件

测试:

[dev@localhost moercredit]$ mv logstash.log logstash.log.bak

此时观察到flume.log发现改名后的文件被关闭,而且taildir_position.json中记录消失:

Closed file: /home/dev/log/moercredit/logstash.log, inode: 807943550, pos: 593777

然后我创建一个新文件logstash.log:

touch logstash.log

发现flume.log新增两条日志:

Opening file: /home/dev/log/moercredit/logstash.log, inode: 809248997, pos: 0
Closed file: /home/dev/log/moercredit/logstash.log, inode: 809248997, pos: 0

open了新的文件(新的inode),但是马上又close了。

我再在新文件里写入几条数据,发现又open了,kafka也新增了几条消息,现在采集新的文件了,说明确实是通过文件名而不是inode来采集文件

Opening file: /home/dev/log/moercredit/logstash.log, inode: 809248999, pos: 0

观察到,2分钟没写入数据,文件又被close了:

Closed file: /home/dev/log/moercredit/logstash.log, inode: 809248999, pos: 21

这个时间是由source的idleTimeout属性控制的,默认120s,在此期间文件没新增行,则自动关闭文件,这也解决了防止文件资源一直占用的问题
在这里插入图片描述
这是我们查看taildir_position.json,发现原来的inode被新文件的inode覆盖了!这也解决了第二个问题,断点续传也是根据文件名来记录position的

[{"inode":809248999,"pos":21,"file":"/home/dev/log/moercredit/logstash.log"}

注意的是,虽然老文件(inode=807943550)此时不在taildir_position.json文件中记录,但不意味这此时把新logstash.log删除、logstash.log.bak改回logstash.log,flume又会从头消费!
事实是依旧能断点续传,从上次position接着消费,为啥呢?taildir_position.json不是记录被覆盖了么?

看源码可以知道,内存中存在所有tairdir的详细信息:

private Map<Long, TailFile> tailFiles = Maps.newHashMap();
public class TailFile {
  ......
  private RandomAccessFile raf;
  private final String path;
  private final long inode;
  private long pos;
  private long lastUpdated;
  private boolean needTail;
  private final Map<String, String> headers;
  private byte[] buffer;
  private byte[] oldBuffer;
  private int bufferPos;
  private long lineReadPos;
}

ReliableTaildirEventReaderloadPositionFile方法中会首先读取taildir_position.json文件内容,将其inode、pos、file更新到tailFiles这个Map之中,如果taildir_position.json文件没有内容,自然使用的是内存中的tairdir pos了:

TailFile tf = tailFiles.get(inode);
// 此处 path、inode、pos是taildir_position.json文件内容
if (tf != null && tf.updatePos(path, inode, pos)) {  // 更新到tailFiles中
  tailFiles.put(inode, tf);    
} else {
  logger.info("Missing file: " + path + ", inode: " + inode + ", pos: " + pos);
}

故其实是内存中一直存在老文件的消费pos,故只要操作中间进程不挂,依旧没问题。那进程挂了再重启还能续传么?答案是不能,因为重启后只能从磁盘文件taildir_position.json中读取pos!


我们的采集需求恰好是文件名不变(按时间滚动),所以无需改动源码扩展需求!直接就能用了。


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐