数据流模型

Event是Flume定义的一个数据流传输的最小单元。Agent就是一个Flume的实例,本质是一个JVM进程,该JVM进程控制Event数据流从外部日志生产者那里传输到目的地(或者是下一个Agent)。

学习Flume必须明白这几个概念,Event英文直译是事件,但是在Flume中表示数据传输的一个最小单位。参照下图可以看得出Agent就是Flume的一个部署实例, 一个完整的Agent中包含了三个组件Source、Channel和Sink,Source是指数据的来源和方式,Channel是一个数据的缓冲池,Sink定义了数据输出的方式和目的地。

Agent component diagram

Source消耗由外部(如Web服务器)传递给它的Event。外部以Flume Source识别的格式向Flume发送Event。例如,Avro Flume Source可接收从Avro客户端(或其他FlumeSink)接收Avro Event。用Thrift Flume Source也可以实现类似的流程,接收的Event数据可以是任何语言编写的只要符合Thrift协议即可。

当Source接收Event时,它将其存储到一个或多个channel。该channel是一个被动存储器,可以保持Event直到它被Sink消耗。『文件channel』就是一个例子 - 它由本地文件系统支持。sink从channel中移除Event并将其放入外部存储库(如HDFS,通过 Flume的 HDFS Sink 实现)或将其转发到流中下一个Flume Agent(下一跳)的Flume Source。

Agent中的source和sink与channel存取Event是异步的。

Flume的Source负责消费外部传递给它的数据(比如web服务器的日志)。外部的数据生产方以Flume Source识别的格式向Flume发送Event。

复杂流

Flume可以设置多级Agent连接的方式传输Event数据。也支持扇入和扇出的部署方式,类似于负载均衡方式或多点同时备份的方式。

第一句的意思是可以部署多个Agent组成一个数据流的传输链。第二句要知道扇入(多对一)和扇出(一对多)的概念,就是说Agent可以将数据流发到多个下级Agent,也可以从多个Agent发到一个Agent中。

可靠性

Event会在每个Agent的Channel上进行缓存,随后Event将会传递到流中的下一个Agent或目的地(比如HDFS)。只有成功地发送到下一个Agent或目的地后Event才会从Channel中删除。这一步保证了Event数据流在Flume Agent中传输时端到端的可靠性。

Flume的这个channel最重要的功能是用来保证数据的可靠传输的。其实另外一个重要的功能也不可忽视,就是实现了数据流入和流出的异步执行。

Flume使用事务来保证Event的 可靠传输。Source和Sink对channel提供的每个Event数据分别封装一个事务用于存储和恢复,这样就保证了数据流的集合在点对点之间的可靠传输。在多层架构的情况下,来自前一层的sink和来自下一层的Source 都会有事务在运行以确保数据安全地传输到下一层的Channel中。

可恢复性

Event数据会缓存在Channel中用来在失败的时候恢复出来。Flume支持保存在本地文件系统中的『文件channel』,也支持保存在内存中的『内存Channel』,『内存Channel』显然速度会更快,缺点是万一Agent挂掉『内存Channel』中缓存的数据也就丢失了。

详细介绍Flume的source、sink、channel三大组件和其他几个组件channel selector、sink processor、serializer、interceptor的配置、使用方法和各自的适用范围。 如果硬要翻译这些组件的话,三大组件分别是数据源(source)、数据目的地(sink)和缓冲池(channel)。其他几个分别是Event多路复用的channel选择器(channel selector), Sink组逻辑处理器(sink processor)、序列化器(serializer)、拦截器(interceptor)。

source

1、RPC

2、 系统命令:Exec Source

3、网络流:Avro、Thrift、Syslog、Netcat

4、从文件获取数据:Spooling Directory Source 和 Taildir Source。可以用它们来监控应用服务产生的日志并进行收集。

Spooling Directory Source

这个Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(也可以配置成读完后立即删除)。

与Exec Source不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的文件必须是不可变的、唯一命名的。Flume会自动检测避免这种情况发生,如果发现问题,则会抛出异常:

  1. 如果文件在写入完成后又被再次写入新内容,Flume将向其日志文件(这是指Flume自己logs目录下的日志文件)打印错误并停止处理。
  2. 如果在以后重新使用以前的文件名,Flume将向其日志文件打印错误并停止处理。

为了避免上述问题,生成新文件的时候文件名加上时间戳是个不错的办法。

尽管有这个Source的可靠性保证,但是仍然存在这样的情况,某些下游故障发生时会出现重复Event的情况。这与其他Flume组件提供的保证是一致的。

属性名默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: spooldir.
spoolDirFlume Source监控的文件夹目录,该目录下的文件会被Flume收集
fileSuffix.COMPLETED被Flume收集完成的文件被重命名的后缀。1.txt被Flume收集完成后会重命名为1.txt.COMPLETED
deletePolicynever是否删除已完成收集的文件,可选值: neverimmediate
fileHeaderfalse是否添加文件的绝对路径名(绝对路径+文件名)到header中。
fileHeaderKeyfile添加绝对路径名到header里面所使用的key(配合上面的fileHeader一起使用)
basenameHeaderfalse是否添加文件名(只是文件名,不包括路径)到header 中
basenameHeaderKeybasename添加文件名到header里面所使用的key(配合上面的basenameHeader一起使用)
includePattern^.*$指定会被收集的文件名正则表达式,它跟下面的ignorePattern不冲突,可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说ignorePattern的优先级更高
ignorePattern^$指定要忽略的文件名称正则表达式。它可以跟 includePattern 一起使用,如果一个文件被 ignorePatternincludePattern 两个正则都匹配到,这个文件会被忽略。
trackerDir.flumespool用于存储与文件处理相关的元数据的目录。如果配置的是相对目录地址,它会在spoolDir中开始创建
consumeOrderoldest设定收集目录内文件的顺序。默认是“先来先走”(也就是最早生成的文件最先被收集),可选值有: oldestyoungestrandom 。当使用oldest和youngest这两种选项的时候,Flume会扫描整个文件夹进行对比排序,当文件夹里面有大量的文件的时候可能会运行缓慢。 当使用random时候,如果一直在产生新的文件,有一部分老文件可能会很久才会被收集
pollDelay500Flume监视目录内新文件产生的时间间隔,单位:毫秒
recursiveDirectorySearchfalse是否收集子目录下的日志文件
maxBackoff4000等待写入channel的最长退避时间,如果channel已满实例启动时会自动设定一个很低的值,当遇到ChannelException异常时会自动以指数级增加这个超时时间,直到达到设定的这个最大值为止。
batchSize100每次批量传输到channel时的size大小
inputCharsetUTF-8解析器读取文件时使用的编码(解析器会把所有文件当做文本读取)
decodeErrorPolicyFAIL当从文件读取时遇到不可解析的字符时如何处理。 FAIL :抛出异常,解析文件失败; REPLACE :替换掉这些无法解析的字符,通常是用U+FFFD; IGNORE :忽略无法解析的字符。
deserializerLINE指定一个把文件中的数据行解析成Event的解析器。默认是把每一行当做一个Event进行解析,所有解析器必须实现EventDeserializer.Builder接口
deserializer.*解析器的相关属性,根据解析器不同而不同
bufferMaxLines(已废弃)
bufferMaxLineLength5000(已废弃)每行的最大长度。改用 deserializer.maxLineLength 代替
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配置

20210312155526400

配置范例:

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
Event反序列化器

下面是Flume内置的一些反序列化工具

LINE

这个反序列化器会把文本数据的每行解析成一个Event

属性默认值解释
deserializer.maxLineLength2048每个Event数据所包含的最大字符数,如果一行文本字符数超过这个配置就会被截断,剩下的字符会出现再后面的Event数据里
deserializer.outputCharsetUTF-8解析Event所使用的编码

提示deserializer.maxLineLength 的默认值是2048,这个数值对于日志行来说有点小,如果实际使用中日志每行字符数可能超过2048,超出的部分会被截断,千万记得根据自己的日志长度调大这个值。

AVRO

这个反序列化器能够读取avro容器文件,并在文件中为每个Avro记录生成一个Event。每个Event都会在header中记录它的模式。Event的body是二进制的avro记录内容,不包括模式和容器文件元素的其余部分。

注意如果Spooling Directory Source发生了重新把一个Event放入channel的情况(比如,通道已满导致重试),则它将重置并从最新的Avro容器文件同步点重试。 为了减少此类情况下的潜在Event重复,请在Avro输入文件中更频繁地写入同步标记。

属性名默认值解释
deserializer.schemaTypeHASH如何表示模式。 默认或者指定为 HASH 时,会对Avro模式进行哈希处理,并将哈希值存储在Event header中以“flume.avro.schema.hash”这个key。 如果指定为 LITERAL ,则会以JSON格式的模式存储在Event header中以“flume.avro.schema.literal”这个key。 与HASH模式相比,使用LITERAL模式效率相对较低。
BlobDeserializer

这个反序列化器可以反序列化一些大的二进制文件,一个文件解析成一个Event,例如pdf或者jpg文件等。注意这个解析器不太适合解析太大的文件,因为被反序列化的操作是在内存里面进行的

属性默认值解释
deserializer这个解析器没有别名缩写,需要填类的全限定名: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
deserializer.maxBlobLength100000000每次请求的最大读取和缓冲的字节数,默认这个值大概是95.36MB
Kafka Source

Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。

属性名默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.serversSource使用的Kafka集群实例列表
kafka.consumer.group.idflume消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组
kafka.topics将要读取消息的目标 Kafka topic 列表,多个用逗号分隔
kafka.topics.regex会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置。
batchSize1000一批写入 channel 的最大消息数
batchDurationMillis1000一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。
backoffSleepIncrement1000当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适, 但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。
maxBackoffSleep5000Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。
useFlumeEventFormatfalse默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。
setTopicHeadertrue当设置为true时,会把存储Event的topic名字存储到header中,使用的key就是下面的 topicHeader 的值。
topicHeadertopic如果 setTopicHeader 设置为 true ,则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心,避免又循环将消息又发送回 topic。
migrateZookeeperOffsetstrue如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。
kafka.consumer.security.protocolPLAINTEXT设置使用哪种安全协议写入Kafka。可选值:SASL_PLAINTEXT、SASL_SSL 和 SSL有关安全设置的其他信息,请参见下文。
more consumer security props如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置
Other Kafka Consumer Properties其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如: kafka.consumer.auto.offset.reset

20210312154813527

注解:Kafka Source 覆盖了两个Kafka 消费者的参数:auto.commit.enable 这个参数被设置成了false,Kafka Source 会提交每一个批处理。Kafka Source 保证至少一次消息恢复策略。 Source 启动时可以存在重复项。Kafka Source 还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和 value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。

通过逗号分隔的 topic 列表进行 topic 订阅的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

正则表达式 topic 订阅的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

安全与加密: Flume 和 Kafka 之间通信渠道是支持安全认证和数据加密的。对于身份安全验证,可以使用 Kafka 0.9.0版本中的 SASL、GSSAPI (Kerberos V5) 或 SSL (虽然名字是SSL,实际是TLS实现)。

截至目前,数据加密仅由SSL / TLS提供。

当你把 kafka.consumer.security.protocol 设置下面任何一个值的时候意味着:

  • SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证
  • SASL_SSL - 有数据加密的 Kerberos 或明文认证
  • SSL - 基于TLS的加密,可选的身份验证。

警告:启用SSL时性能会下降,影响大小取决于 CPU 和 JVM 实现。参考 Kafka security overviewKAFKA-2561

使用TLS:

请阅读 Configuring Kafka Clients SSL SSL 中描述的步骤来了解用于微调的其他配置设置,例如下面的几个例子:启用安全策略、密码套件、启用协议、信任库或秘钥库类型。

服务端认证和数据加密的一个配置实例:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SSL
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

注意,默认情况下 ssl.endpoint.identification.algorithm 这个参数没有被定义,因此不会执行主机名验证。如果要启用主机名验证,请加入以下配置:

a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS

开启后,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

如果还需要客户端身份验证,则还应在 Flume 配置中添加以下内容。 每个Flume 实例都必须拥有其客户证书,来被Kafka 实例单独或通过其签名链来信任。 常见示例是由 Kafka 信任的单个根CA签署每个客户端证书。

a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>

如果密钥库和密钥使用不同的密码保护,则 ssl.key.password 属性将为消费者密钥库提供所需的额外密码:

a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>

Kerberos安全配置:

要将Kafka Source 与使用Kerberos保护的Kafka群集一起使用,请为消费者设置上面提到的consumer.security.protocol 属性。 与Kafka实例一起使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。 “客户端”部分描述了Zookeeper连接信息(如果需要)。 有关JAAS文件内容的信息,请参阅 Kafka doc 。 可以通过flume-env.sh中的JAVA_OPTS指定此JAAS文件的位置以及系统范围的 kerberos 配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用 SASL_PLAINTEXT 的示例安全配置:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

使用 SASL_SSL 的安全配置范例:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

JAAS 文件配置示例。有关其内容的参考,请参阅Kafka文档 SASL configuration 中关于所需认证机制(GSSAPI/PLAIN)的客户端配置部分。由于Kafka Source 也可以连接 Zookeeper 以进行偏移迁移, 因此“Client”部分也添加到此示例中。除非您需要偏移迁移,否则不必要这样做,或者您需要此部分用于其他安全组件。 另外,请确保Flume进程的操作系统用户对 JAAS 和 keytab 文件具有读权限。

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
NetCat TCP Source

这个source十分像nc -k -l [host] [port]这个命令,监听一个指定的端口,把从该端口收到的TCP协议的文本数据按行转换为Event,它能识别的是带换行符的文本数据,同其他Source一样,解析成功的Event数据会发送到channel中。

提示:常见的系统日志都是逐行输出的,Flume的各种Source接收数据也基本上以行为单位进行解析和处理。不论是 NetCat TCP Source ,还是其他的读取文本类型的Source比如:Spooling Directory SourceTaildir SourceExec Source 等也都是一样的。

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: netcat
bind要监听的 hostname 或者IP地址
port监听的端口
max-line-length512每行解析成Event 消息体的最大字节数
ack-every-eventtrue对收到的每一行数据用“OK”做出响应
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配置

20210312160920006

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
Syslog Sources
Syslog TCP Source

提示:这个Syslog TCP Source在源码里面已经被@deprecated了,推荐使用 Multiport Syslog TCP Source 来代替。

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: syslogtcp
host要监听的hostname或者IP地址
port要监听的端口
eventSize2500每行数据的最大字节数
keepFieldsnone是否保留syslog消息头中的一些属性到Event中,可选值 allnone 或自定义指定保留的字段。如果设置为all,则会保留Priority, Timestamp 和Hostname三个属性到Event中。 也支持单独指定保留哪些属性(支持的属性有:priority, version, timestamp, hostname),用空格分开即可。现在已经不建议使用 truefalse,建议改用 allnone 了。
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配

20210312161312946

Multiport Syslog TCP Source

这是一个增强版的 Syslog TCP Source ,它更新、更快、支持监听多个端口。因为支持了多个端口,port参数已经改为了ports。这个Source使用了Apache mina(一个异步通信的框架,同netty类似)来实现。 提供了对RFC-3164和许多常见的RFC-5424格式消息的支持。 支持每个端口配置不同字符集。

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是:multiport_syslogtcp
host要监听的hostname或者IP地址
ports一个或多个要监听的端口,多个用空格分开
eventSize2500解析成Event的每行数据的最大字节数
keepFieldsnone是否保留syslog消息头中的一些属性到Event中,可选值 allnone 或自定义指定保留的字段,如果设置为all,则会保留Priority, Timestamp 和Hostname三个属性到Event中。 也支持单独指定保留哪些属性(支持的属性有:priority, version, timestamp, hostname),用空格分开即可。现在已经不建议使用 truefalse ,建议改用 allnone 了。
portHeader如果配置了这个属性值,端口号会被存到每个Event的header里面用这个属性配置的值当key。这样就可以在拦截器或者channel选择器里面根据端口号来自定义路由Event的逻辑。
charset.defaultUTF-8解析syslog使用的默认编码
charset.port.针对具体某一个端口配置编码
batchSize100每次请求尝试处理的最大Event数量,通常用这个默认值就很好。
readBufferSize1024内部Mina通信的读取缓冲区大小,用于性能调优,通常用默认值就很好。
numProcessors(自动分配)处理消息时系统使用的处理器数量。 默认是使用Java Runtime API自动检测CPU数量。 Mina将为每个检测到的CPU核心生成2个请求处理线程,这通常是合理的。
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配

20210312161730638

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port
Syslog UDP Source
属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: syslogudp
host要监听的hostname或者IP地址
port要监听的端口
keepFieldsfalse设置为true后,解析syslog时会保留Priority, Timestamp and Hostname这些属性到Event的消息体中(查看源码发现,实际上保留了priority、version、timestamp、hostname这四个字段在消息体的前面)
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配

20210312161914880

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
Avro Source

Avro Source监听Avro端口接收从外部Avro客户端发送来的数据流。

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: avro
bind监听的服务器名hostname或者ip
port监听的端口
threads生成的最大工作线程数量
selector.type可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器的相关属性
compression-typenone可选值: nonedeflate 。这个类型必须跟Avro Source相匹配
sslfalse设置为 true 可启用SSL加密,如果为true必须指定下面的 keystorekeystore-password
keystoreSSL加密使用的Java keystore文件路径
keystore-passwordJava keystore的密码
keystore-typeJKSJava keystore的类型. 可选值有 JKSPKCS12
exclude-protocolsSSLv3指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
ipFilterfalse设置为true可启用ip过滤(netty方式的avro)
ipFilterRulesnetty ipFilter的配置。ipFilterRules 可以配置一些允许或者禁止的ip规则,它的配置格式是:allow/deny:ip/name:pattern

在这里插入图片描述
配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*
Thrift Source

监听Thrift 端口,从外部的Thrift客户端接收数据流。可以通过配置让它以安全模式(kerberos authentication)运行。

同Avro Source十分类似,不同的是支持了 kerberos 认证。

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: thrift
bind监听的 hostname 或 IP 地址
port监听的端口
threads生成的最大工作线程数量
selector.type可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器的相关属性
sslfalse设置为true可启用SSL加密,如果为true必须指定下面的keystore和keystore-password。
keystoreSSL加密使用的Java keystore文件路径
keystore-passwordJava keystore的密码
keystore-typeJKSJava keystore的类型. 可选值有 JKSPKCS12
exclude-protocolsSSLv3排除支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
kerberosfalse设置为 true ,开启kerberos 身份验证。在kerberos模式下,成功进行身份验证需要 agent-principalagent-keytab 。 安全模式下的Thrift仅接受来自已启用kerberos且已成功通过kerberos KDC验证的Thrift客户端的连接。
agent-principal指定Thrift Source使用的kerberos主体用于从kerberos KDC进行身份验证。
agent-keytab—-Thrift Source与Agent主体结合使用的keytab文件位置,用于对kerberos KDC进行身份验证。

20210312160034228
配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
Exec Source

这个source在启动时运行给定的Unix命令,并期望该进程在标准输出上连续生成数据(stderr 信息会被丢弃,除非属性 logStdErr 设置为 true )。 如果进程因任何原因退出, 则source也会退出并且不会继续生成数据。 综上来看cat [named pipe]或tail -F [file]这两个命令符合要求可以产生所需的结果,而date这种命令可能不会,因为前两个命令(tail 和 cat)能产生持续的数据流,而后者(date这种命令)只会产生单个Event并退出。

cat [named pipe]和tail -F [file]都能持续地输出内容,那些不能持续输出内容的命令不可以。这里注意一下cat命令后面接的参数是命名管道(named pipe)不是文件。

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: exec
command所使用的系统命令,一般是cat 或者tail
shell设置用于运行命令的shell。 例如 / bin / sh -c。 仅适用于依赖shell功能的命令,如通配符、后退标记、管道等。
restartThrottle10000尝试重新启动之前等待的时间(毫秒)
restartfalse如果执行命令线程挂掉,是否重启
logStdErrfalse是否会记录命令的stderr内容
batchSize20读取并向channel发送数据时单次发送的最大数量
batchTimeout3000向下游推送数据时,单次批量发送Event的最大等待时间(毫秒),如果等待了batchTimeout毫秒后未达到一次批量发送数量,则仍然执行发送操作。
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配置

20210312155902110

警告:ExecSource相比于其他异步source的问题在于,如果无法将Event放入Channel中,ExecSource无法保证客户端知道它。在这种情况下数据会丢失。例如,最常见的用法是用tail -F [file]这种,应用程序负责向磁盘写入日志文件, Flume 会用tail命令从日志文件尾部读取,将每行作为一个Event发送。这里有一个明显的问题:如果channel满了然后无法继续发送Event,会发生什么?由于种种原因,Flume无法向输出日志文件的应用程序指示它需要保留日志或某些Event尚未发送。 总之你需要知道:当使用ExecSource等单向异步接口时,您的应用程序永远无法保证数据已经被成功接收!作为此警告的延伸,此source传递Event时没有交付保证。为了获得更强的可靠性保证,请考虑使用 Spooling Directory Source, Taildir Source 或通过SDK直接与Flume集成。

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

shell 属性是用来配置执行命令的shell(比如Bash或者Powershell)。command 会作为参数传递给 shell 执行,这使得command可以使用shell中的特性,例如通配符、后退标记、管道、循环、条件等。如果没有 shell 配置, 将直接调用 command 配置的命令。shell 通常配置的值有:“/bin/sh -c”、“/bin/ksh -c”、“cmd /c”、“powershell -Command”等。

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
NetCat UDP Source

看名字也看得出,跟 NetCat TCP Source 是一对亲兄弟,区别是监听的协议不同。这个source就像是 nc -u -k -l [host] [port]命令一样, 监听一个端口然后接收来自于这个端口上UDP协议发送过来的文本内容,逐行转换为Event发送到channel。

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是:netcatudp
bind要监听的 hostname 或者IP地址
port监听的端口
remoteAddressHeaderUDP消息源地址(或IP)被解析到Event的header里面时所使用的key名称
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配

20210312161041701

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

Interceptor

拦截器

Flume支持在运行时对Event进行修改或丢弃,可以通过拦截器来实现。Flume里面的拦截器是实现了 org.apache.flume.interceptor.Interceptor 接口的类。拦截器可以根据开发者的意图随意修改甚至丢弃Event, Flume也支持链式的拦截器执行方式,在配置文件里面配置多个拦截器就可以了。拦截器的顺序取决于它们被初始化的顺序(实际也就是配置的顺序),Event就这样按照顺序经过每一个拦截器,如果想在拦截器里面丢弃Event, 在传递给下一级拦截器的list里面把它移除就行了。如果想丢弃所有的Event,返回一个空集合就行了。拦截器也是通过命名配置的组件,下面就是通过配置文件来创建拦截器的例子。

提示:Event在拦截器之间流动的时候是以集合的形式,并不是逐个Event传输的,这样就能理解上面所说的“从list里面移除”、“返回一个空集合”了。做过Java web开发的同学应该很容易理解拦截器,Flume拦截器与spring MVC、struts2等框架里面的拦截器思路十分相似。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1

拦截器构建器配置在type参数上。 拦截器是可配置的,就像其他可配置的组件一样。 在上面的示例中,Event首先传递给HostInterceptor,然后HostInterceptor返回的Event传递给TimestampInterceptor。 配置拦截器时你可以指定完全限定的类名(FQCN)或别名(timestamp)。 如果你有多个收集器写入相同的HDFS路径下,那么HostInterceptor是很有用的。

Timestamp

时间戳添加拦截器

这个拦截器会向每个Event的header中添加一个时间戳属性进去,key默认是“timestamp ”(也可以通过下面表格中的header来自定义),value就是当前的毫秒值(其实就是用System.currentTimeMillis()方法得到的)。 如果Event已经存在同名的属性,可以选择是否保留原始的值。

属性默认值解释
type组件类型,这个是: timestamp
headertimestamp向Event header中添加时间戳键值对的key
preserveExistingfalse是否保留Event header中已经存在的同名(上面header设置的key,默认是timestamp)时间戳

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
Host

Host添加拦截器

这个拦截器会把当前Agent的hostname或者IP地址写入到Event的header中,key默认是“host”(也可以通过配置自定义key),value可以选择使用hostname或者IP地址。

属性默认值解释
type组件类型,这个是: host
preserveExistingfalse如果header中已经存在同名的属性是否保留
useIPtruetrue:使用IP地址;false:使用hostname
hostHeaderhost向Event header中添加host键值对的key

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
Static

静态属性写入拦截器

静态拦截器可以向Event header中写入一个固定的键值对属性。

这个拦截器目前不支持写入多个属性,但是你可以通过配置多个静态属性写入拦截器来实现。

属性默认值解释
type组件类型,这个是: static
preserveExistingtrue如果header中已经存在同名的属性是否保留
keykey写入header的key
valuevalue写入header的值

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
UUID

添加唯一ID拦截器

此拦截器在所有截获的Event上设置通用唯一标识符。 比如UUID可以是b5755073-77a9-43c1-8fad-b7a586f89757,它是一个128-bit的值。

Event如果没有可用的应用级唯一ID,就可以考虑使用添加唯一ID拦截器自动为Event分配UUID。 Event数据只要进入Flume网络中就给其分配一个UUID是非常重要的,Event进入Flume网络的第一个节点通常就是Flume的第一个source。 这样可以在Flume网络中进行复制和重新传输以及Event的后续重复数据删除可以实现高可用性和高性能。 如果在应用层有唯一ID的话要比这种自动生成UUID要好一些,因为应用层分配的ID能方便我们在后续的数据存储中心对Event进行集中的更新和删除等操作。

属性默认值解释
type组件类型,这个是:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerNameid将要添加或者修改的id名称
preserveExistingtrue如果header中已经存在同名的属性是否保留
prefix“”UUID值的固定前缀(每个生成的uuid会在前面拼上这个固定前缀)
Regex Filtering

正则过滤拦截器

这个拦截器会把Event的body当做字符串来处理,并用配置的正则表达式来匹配。可以配置指定被匹配到的Event丢弃还是没被匹配到的Event丢弃。

属性默认值解释
type组件类型,这个是: regex_filter
regex“.*”用于匹配Event内容的正则表达式
excludeEventsfalse如果为true,被正则匹配到的Event会被丢弃;如果为false,不被正则匹配到的Event会被丢弃
Morphline

Morphline 实时清洗拦截器

此拦截器通过 morphline配置文件 过滤Event,配置文件定义了一系列转换命令,用于将记录从一个命令传递到另一个命令。 例如,morphline可以忽略某些Event或通过基于正则表达式的模式匹配来更改或插入某些Event header, 或者它可以通过Apache Tika在截获的Event上自动检测和设置MIME类型。 例如,这种数据包嗅探可用于Flume拓扑中基于内容的动态路由。 Morphline 实时清洗拦截器还可以帮助实现到多个Apache Solr集合的动态路由(例如,用于multi-tenancy)。

目前存在一个限制,这个拦截器不能输入一个Event然后产生多个Event出来,它不适用于重型的ETL处理,如果有需要,请考虑将ETL操作从Flume source转移到Flume sink中,比如:MorphlineSolrSink

属性默认值解释
type组件类型,这个是: org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
morphlineFilemorphline配置文件在本地文件系统的绝对目录。比如:/etc/flume-ng/conf/morphline.conf
morphlineIdnull如果在morphline 配置文件里有多个morphline ,可以配置这个名字来加以区分

配置范例:

a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
字段解密
字段筛选

channel selector

如果没有手动配置,source的默认channel选择器类型是replicating(复制),当然这个选择器只针对source配置了多个channel的时候。

提示:既然叫做channel选择器,很容易猜得到这是source才有的配置。前面介绍过,一个souce可以向多个channel同时写数据,所以也就产生了以何种方式向多个channel写的问题(比如自带的 复制选择器 ,会 把数据完整地发送到每一个channel,而 多路复用选择器 就可以通过配置来按照一定的规则进行分发,听起来很像负载均衡),channel选择器也就应运而生。

replicating

复制选择器

复制模式下,source中的Event会被发送到与source连接的所有channel上。

属性默认值解释
selector.typereplicatingreplicating
selector.optional指定哪些channel是可选的,多个用空格分开

配置范例:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
multiplexing

多路复用选择器

多路复用模式下,Event仅被发送到 部分channel上。为了分散流量,需要指定好source的所有channel和Event分发的策略。

属性默认值解释
selector.typereplicating组件类型,这个是: multiplexing
selector.headerflume.selector.header想要进行匹配的header属性的名字
selector.default指定一个默认的channel。如果没有被规则匹配到,默认会发到这个channel上
selector.mapping.*一些匹配规则,具体参考下面的例子

20210315105114824

配置范例:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state        #以每个Event的header中的state这个属性的值作为选择channel的依据
a1.sources.r1.selector.mapping.CZ = c1       #如果state=CZ,则选择c1这个channel
a1.sources.r1.selector.mapping.US = c2 c3    #如果state=US,则选择c2 和 c3 这两个channel
a1.sources.r1.selector.default = c4          #默认使用c4这个channel

channel

channel 是在 Agent 上暂存 Event 的缓冲池。 Event由source添加,由sink消费后删除。

Memory

内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。

属性默认值解释
type组件类型,这个是: memory
capacity100内存中存储 Event 的最大数
transactionCapacity100source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)
keep-alive3添加或删除一个 Event 的超时时间(秒)
byteCapacityBufferPercentage20指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比
byteCapacityChannel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候, 而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event), 这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。

提示:举2个例子来帮助理解最后两个参数吧:

两个例子都有共同的前提,假设JVM最大的可用内存是100M(或者说JVM启动时指定了-Xmx=100m)。

例子1: byteCapacityBufferPercentage 设置为20, byteCapacity 设置为52428800(就是50M),此时内存中所有 Event body 的总大小就被限制为50M *(1-20%)=40M,内存channel可用内存是50M。

例子2: byteCapacityBufferPercentage 设置为10, byteCapacity 不设置,此时内存中所有 Event body 的总大小就被限制为100M * 80% *(1-10%)=72M,内存channel可用内存是80M。

配置范例:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
File
属性默认值解释
type组件类型,这个是: file.
checkpointDir~/.flume/file-channel/checkpoint记录检查点的文件的存储目录
useDualCheckpointsfalse是否备份检查点文件。如果设置为 truebackupCheckpointDir 参数必须设置。
backupCheckpointDir备份检查点的目录。 此目录不能与数据目录或检查点目录 checkpointDir 相同
dataDirs~/.flume/file-channel/data逗号分隔的目录列表,用于存储日志文件。 在不同物理磁盘上使用多个目录可以提高文件channel的性能
transactionCapacity10000channel支持的单个事务最大容量
checkpointInterval30000检查点的时间间隔(毫秒)
maxFileSize2146435071单个日志文件的最大字节数。这个默认值约等于2047MB
minimumRequiredSpace524288000最小空闲空间的字节数。为了避免数据损坏,当空闲空间低于这个值的时候,文件channel将拒绝一切存取请求
capacity1000000channel的最大容量
keep-alive3存入Event的最大等待时间(秒)
use-log-replay-v1false(专家)是否使用老的回放逻辑 (Flume默认是使用v2版本的回放方法,但是如果v2版本不能正常工作可以考虑通过这个参数改为使用v1版本,v1版本是从Flume1.2开始启用的,回放是指系统关闭或者崩溃前执行的校验检查点文件和文件channel记录是否一致程序)
use-fast-replayfalse(专家)是否开启快速回放(不适用队列)
checkpointOnClosetruechannel关闭时是否创建检查点文件。开启次功能可以避免回放提高下次文件channel启动的速度
encryption.activeKey加密数据所使用的key名称
encryption.cipherProvider加密类型,目前只支持:AESCTRNOPADDING
encryption.keyProviderkey类型,目前只支持:JCEKSFILE
encryption.keyProvider.keyStoreFilekeystore 文件路径
encrpytion.keyProvider.keyStorePasswordFilekeystore 密码文件路径
encryption.keyProvider.keys所有key的列表,包含所有使用过的加密key名称
encyption.keyProvider.keys.*.passwordFile可选的秘钥密码文件路径

20210312170305300

注解:默认情况下,文件channel使用默认的用户主目录内的检查点和数据目录的路径(说的就是上面的checkpointDir参数的默认值)。 如果一个Agent中有多个活动的文件channel实例,而且都是用了默认的检查点文件, 则只有一个实例可以锁定目录并导致其他channel初始化失败。 因此,这时候有必要为所有已配置的channel显式配置不同的检查点文件目录,最好是在不同的磁盘上。 此外,由于文件channel将在每次提交后会同步到磁盘,因此将其与将Event一起批处理的sink/source耦合可能是必要的,以便在多个磁盘不可用于检查点和数据目录时提供良好的性能。

配置范例:

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
Spillable Memory

这个channel会将Event存储在内存队列和磁盘上。 内存队列充当主存储,内存装满之后会存到磁盘。 磁盘存储使用嵌入的文件channel进行管理。 当内存队列已满时,其他传入Event将存储在文件channel中。 这个channel非常适用于需要高吞吐量存储器channel的流,但同时需要更大容量的文件channel,以便更好地容忍间歇性目的地侧(sink)中断或消费速率降低。 在这种异常情况下,吞吐量将大致降低到文件channel速度。 如果Agent程序崩溃或重新启动,只有存储在磁盘上的Event能恢复。 这个channel目前是实验性的,不建议用于生产环境

提示:这个channel的机制十分像Windows系统里面的「虚拟内存」。兼顾了内存channel的高吞吐量和文件channel的可靠、大容量优势。

属性默认值解释
type组件类型,这个是: SPILLABLEMEMORY
memoryCapacity10000内存队列存储的Event最大数量。如果设置为0,则会禁用内存队列。
overflowCapacity100000000磁盘(比如文件channel)上存储Event的最大数量,如果设置为0,则会禁用磁盘存储
overflowTimeout3当内存占满时启用磁盘存储之前等待的最大秒数
byteCapacityBufferPercentage20指定Event header所占空间大小与channel中所有Event的总大小之间的百分比
byteCapacity内存中最大允许存储Event的总字节数。 默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算Event的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。 注意,当你在一个Agent里面有多个内存channel的时候,而且碰巧这些channel存储相同的物理Event(例如:这些channel通过复制机制(复制选择器)接收同一个source中的Event), 这时候这些Event占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。
avgEventSize500估计进入channel的Event的平均大小(单位:字节)
see file channel可以使用除“keep-alive”和“capacity”之外的任何文件channel属性。 文件channel的“keep-alive”由Spillable Memory Channel管理, 而channel容量则是通过使用 overflowCapacity 来设置。

如果达到 memoryCapacitybyteCapacity 限制,则内存队列被视为已满。

配置范例:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

禁用内存channel,只使用磁盘存储(就像文件channel那样)的例子:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

禁用掉磁盘存储,只使用内存channel的例子:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity = 0

sinkgroup

Sink组逻辑处理器

你可以把多个sink分成一个组, 这时候 Sink组逻辑处理器 可以对这同一个组里的几个sink进行负载均衡或者其中一个sink发生故障后将输出Event的任务转移到其他的sink上。

提示:说的直白一些,这N个sink本来是要将Event输出到对应的N个目的地的,通过 Sink组逻辑处理器 就可以把这N个sink配置成负载均衡或者故障转移的工作方式(暂时还不支持自定义的)。 负载均衡就方式是把channel里面的Event按照配置的负载机制(比如轮询)分别发送到sink各自对应的目的地;故障转移就是这N个sink同一时间只有一个在工作,其余的作为备用,工作的sink挂掉之后备用的sink顶上。

属性默认值解释
sinks这一组的所有sink名,多个用空格分开
processor.typedefault这个sink组的逻辑处理器类型,可选值 default (默认一对一的) 、 failover (故障转移) 、 load_balance (负载均衡)
failover

故障转移

故障转移组逻辑处理器维护了一个发送Event失败的sink的列表,保证有一个sink是可用的来发送Event。

故障转移机制的工作原理是将故障sink降级到一个池中,在池中为它们分配冷却期(超时时间),在重试之前随顺序故障而增加。 Sink成功发送事件后,它将恢复到实时池。sink具有与之相关的优先级,数值越大,优先级越高。 如果在发送Event时Sink发生故障,会继续尝试下一个具有最高优先级的sink。 例如,在优先级为80的sink之前激活优先级为100的sink。如果未指定优先级,则根据配置中的顺序来选取。

要使用故障转移选择器,不仅要设置sink组的选择器为failover,还有为每一个sink设置一个唯一的优先级数值。 可以使用 maxpenalty 属性设置故障转移时间的上限(毫秒)。

属性默认值解释
sinks这一组的所有sink名,多个用空格分开
processor.typedefault组件类型,这个是: failover
processor.priority.组内sink的权重值,必须是当前组关联的sink之一。数值(绝对值)越高越早被激活
processor.maxpenalty30000发生异常的sink最大故障转移时间(毫秒)

配置范例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
load_balance

负载均衡

负载均衡Sink 选择器提供了在多个sink上进行负载均衡流量的功能。 它维护一个活动sink列表的索引来实现负载的分配。 默认支持了轮询(round_robin)和随机(random)两种选择机制分配负载。 默认是轮询,可以通过配置来更改。也可以从 AbstractSinkSelector 继承写一个自定义的选择器。

工作时,此选择器使用其配置的选择机制选择下一个sink并调用它。 如果所选sink无法正常工作,则处理器通过其配置的选择机制选择下一个可用sink。 此实现不会将失败的Sink列入黑名单,而是继续乐观地尝试每个可用的Sink。

如果所有sink调用都失败了,选择器会将故障抛给sink的运行器。

如果backoff设置为true则启用了退避机制,失败的sink会被放入黑名单,达到一定的超时时间后会自动从黑名单移除。 如从黑名单出来后sink仍然失败,则再次进入黑名单而且超时时间会翻倍,以避免在无响应的sink上浪费过长时间。 如果没有启用退避机制,在禁用此功能的情况下,发生sink传输失败后,会将本次负载传给下一个sink继续尝试,因此这种情况下是不均衡的。

属性默认值解释
processor.sinks这一组的所有sink名,多个用空格分开
processor.typedefault组件类型,这个是: load_balance
processor.backofffalse失败的sink是否成倍地增加退避它的时间。 如果设置为false,负载均衡在某一个sink发生异常后,下一次选择sink的时候仍然会将失败的这个sink加入候选队列; 如果设置为true,某个sink连续发生异常时会成倍地增加它的退避时间,在退避的时间内是无法参与负载均衡竞争的。退避机制只统计1个小时发生的异常,超过1个小时没有发生异常就会重新计算
processor.selectorround_robin负载均衡机制,可选值:round_robin (轮询)、 random (随机选择)、「自定义选择器的全限定类名」:自定义的负载器要继承 AbstractSinkSelector
processor.selector.maxTimeOut30000发生异常的sink最长退避时间(毫秒) 如果设置了processor.backoff=true,某一个sink发生异常的时候就会触发自动退避它一段时间,这个 maxTimeOut 就是退避一个sink的最长时间

20210312173206706

配置范例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = truechu
a1.sinkgroups.g1.processor.selector = random

sink

Kafka

这个 Sink 可以把数据发送到 Kafka topic上。目的就是将 Flume 与 Kafka 集成,以便基于拉的处理系统可以处理来自各种 Flume Source 的数据。目前支持 Kafka 0.9.x 发行版。

属性默认值解释
type组件类型,这个是: org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.serversKafka Sink 使用的 Kafka 集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用(HA)支持。格式为 hostname:port,多个用逗号分隔
kafka.topicdefault-flume-topic用于发布消息的 Kafka topic 名称 。如果这个参数配置了值,消息就会被发布到这个 topic 上。如果Event header中包含叫做“topic”的属性, Event 就会被发布到 header 中指定的 topic 上,而不会发布到 kafka.topic 指定的 topic 上。支持任意的 header 属性动态替换, 比如%{lyf}就会被 Event header 中叫做“lyf”的属性值替换(如果使用了这种动态替换,建议将 Kafka 的 auto.create.topics.enable 属性设置为 true )。
flumeBatchSize100一批中要处理的消息数。设置较大的值可以提高吞吐量,但是会增加延迟。
kafka.producer.acks1在考虑成功写入之前,要有多少个副本必须确认消息。可选值, 0 :(从不等待确认); 1 :只等待leader确认; -1 :等待所有副本确认。 设置为-1可以避免某些情况 leader 实例失败的情况下丢失数据。
useFlumeEventFormatfalse默认情况下,会直接将 Event body 的字节数组作为消息内容直接发送到 Kafka topic 。如果设置为true,会以 Flume Avro 二进制格式进行读取。 与 Kafka Source 上的同名参数或者 Kafka channel 的 parseAsFlumeEvent 参数相关联,这样以对象的形式处理能使生成端发送过来的 Event header 信息得以保留。
defaultPartitionId指定所有 Event 将要发送到的 Kafka 分区ID,除非被 partitionIdHeader 参数的配置覆盖。 默认情况下,如果没有设置此参数,Event 会被 Kafka 生产者的分发程序分发,包括 key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发
partitionIdHeader设置后,Sink将使用 Event header 中使用此属性的值命名的字段的值,并将消息发送到 topic 的指定分区。 如果该值表示无效分区,则将抛出 EventDeliveryException。 如果存在标头值,则此设置将覆盖 defaultPartitionId 。假如这个参数设置为“lyf”,这个 Sink 就会读取 Event header 中的 lyf 属性的值,用该值作为分区ID
allowTopicOverridetrue如果设置为 true,会读取 Event header 中的名为 topicHeader 的的属性值,用它作为目标 topic。
topicHeadertopic与上面的 allowTopicOverride 一起使用,allowTopicOverride 会用当前参数配置的名字从 Event header 获取该属性的值,来作为目标 topic 名称
kafka.producer.security.protocolPLAINTEXT设置使用哪种安全协议写入 Kafka。可选值:SASL_PLAINTEXTSASL_SSLSSL, 有关安全设置的其他信息,请参见下文。
more producer security props如果使用了 SASL_PLAINTEXTSASL_SSLSSL 等安全协议,参考 Kafka security 来为生产者增加安全相关的参数配置
Other Kafka Producer Properties其他一些 Kafka 生产者配置参数。任何 Kafka 支持的生产者参数都可以使用。唯一的要求是使用“kafka.producer.”这个前缀来配置参数,比如:kafka.producer.linger.ms

20210315100547639

注解:Kafka Sink使用 Event header 中的 topic 和其他关键属性将 Event 发送到 Kafka。 如果 header 中存在 topic,则会将Event发送到该特定 topic,从而覆盖为Sink配置的 topic。 如果 header 中存在指定分区相关的参数,则Kafka将使用相关参数发送到指定分区。 header中特定参数相同的 Event 将被发送到同一分区。 如果为空,则将 Event 会被发送到随机分区。 Kafka Sink 还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。

安全与加密

Flume 和 Kafka 之间通信渠道是支持安全认证和数据加密的。对于身份安全验证,可以使用 Kafka 0.9.0版本中的 SASL、GSSAPI (Kerberos V5) 或 SSL (虽然名字是SSL,实际是TLS实现)。

截至目前,数据加密仅由SSL / TLS提供。

当你把 kafka.producer.security.protocol 设置下面任何一个值的时候意味着:

  • SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证
  • SASL_SSL - 有数据加密的 Kerberos 或明文认证
  • SSL - 基于TLS的加密,可选的身份验证

警告:启用 SSL 时性能会下降,影响大小取决于 CPU 和 JVM 实现。参考 Kafka security overviewKAFKA-2561

使用TLS

请阅读 Configuring Kafka Clients SSL 中描述的步骤来了解用于微调的其他配置设置,例如下面的几个例子:启用安全策略、密码套件、启用协议、信任库或秘钥库类型。

服务端认证和数据加密的一个配置实例:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SSL
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

注意,默认情况下 ssl.endpoint.identification.algorithm 这个参数没有被定义,因此不会执行主机名验证。如果要启用主机名验证,请加入以下配置:

a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

开启后,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

如果还需要客户端身份验证,则还应在 Flume 配置中添加以下内容。 每个Flume 实例都必须拥有其客户证书,来被Kafka 实例单独或通过其签名链来信任。 常见示例是由 Kafka 信任的单个根CA签署每个客户端证书。

a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

如果密钥库和密钥使用不同的密码保护,则 ssl.key.password 属性将为生产者密钥库提供所需的额外密码:

a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos安全配置:

  • 要将Kafka Sink 与使用 Kerberos 保护的Kafka群集一起使用,请为生产者设置上面提到的 producer.security.protocol 属性。 与 Kafka 实例一起使用的 Kerberos keytab 和主体在 JAAS 文件的“KafkaClient”部分中指定。
  • “客户端”部分描述了 Zookeeper 连接信息(如果需要)。 有关 JAAS 文件内容的信息,请参阅 Kafka doc。 可以通过 flume-env.sh 中的 JAVA_OPTS 指定此 JAAS 文件的位置以及系统范围的 kerberos 配置:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用 SASL_PLAINTEXT 的示例安全配置:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

使用 SASL_SSL 的安全配置范例:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

JAAS 文件配置示例。有关其内容的参考,请参阅Kafka文档 SASL configuration 中关于所需认证机制(GSSAPI/PLAIN)的客户端配置部分。 与 Kafka Source 和 Kafka Channel 不同,“Client”部分并不是必须的,除非其他组件需要它,否则不必要这样做。 另外,请确保 Flume 进程的操作系统用户对 JAAS 和 keytab 文件具有读权限。

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
Null

丢弃所有从 channel 读取到的 Event。

属性默认值解释
channel与 Sink 绑定的 channel
type组件类型,这个是: null.
batchSize100每次批处理的 Event 数量

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1
Logger

使用INFO级别把Event内容输出到日志中,一般用来测试、调试使用。这个 Sink 是唯一一个不需要额外配置就能把 Event 的原始内容输出的Sink。

提示:通常在Flume的运行日志里面输出数据流中的原始的数据内容是非常不可取的,所以 Flume 的组件默认都不会这么做。但是总有特殊的情况想要把 Event 内容打印出来,就可以借助这个Logger Sink了。

属性默认值解释
channel与 Sink 绑定的 channel
type组件类型,这个是: logger
maxBytesToLog16Event body 输出到日志的最大字节数,超出的部分会被丢弃

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
File Roll

把 Event 存储到本地文件系统。

属性默认值解释
channel与 Sink 绑定的 channel
type组件类型,这个是: file_roll.
sink.directoryEvent 将要保存的目录
sink.pathManagerDEFAULT配置使用哪个路径管理器,这个管理器的作用是按照规则生成新的存储文件名称,可选值有: defaultrolltime。default规则:prefix+当前毫秒值+“-”+文件序号+“.”+extension;rolltime规则:prefix+yyyyMMddHHmmss+“-”+文件序号+“.”+extension;注:prefix 和 extension 如果没有配置则不会附带
sink.pathManager.extension如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的扩展名
sink.pathManager.prefix如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的文件名的固定前缀
sink.rollInterval30表示每隔30秒创建一个新文件进行存储。如果设置为0,表示所有 Event 都会写到一个文件中。
sink.serializerTEXT配置 Event 序列化器,可选值有:textheader_and_textavro_event 或者自定义实现了 EventSerializer.Builder 接口的序列化器的全限定类名.。 text 只会把 Event 的 body 的文本内容序列化; header_and_text 会把 header 和 body 内容都序列化。
batchSize100每次请求批处理的 Event 数

20210315101942258

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
Hive

此Sink将包含分隔文本或JSON数据的 Event 直接流式传输到 Hive表或分区上。 Event 使用 Hive事务进行写入, 一旦将一组 Event 提交给Hive,它们就会立即显示给Hive查询。 即将写入的目标分区既可以预先自己创建,也可以选择让 Flume 创建它们,如果没有的话。 写入的 Event 数据中的字段将映射到 Hive表中的相应列。

属性默认值解释
channel与 Sink 连接的 channel
type组件类型,这个是: hive
hive.metastoreHive metastore URI (eg thrift://a.b.com:9083 )
hive.databaseHive 数据库名
hive.tableHive表名
hive.partition逗号分隔的要写入的分区信息。 比如hive表的分区是(continent: string, country :string, time : string), 那么“Asia,India,2014-02-26-01-21”就表示数据会写入到continent=Asia,country=India,time=2014-02-26-01-21这个分区。
hive.txnsPerBatchAsk100Hive从Flume等客户端接收数据流会使用多次事务来操作,而不是只开启一个事务。这个参数指定处理每次请求所开启的事务数量。来自同一个批次中所有事务中的数据最终都在一个文件中。 Flume会向每个事务中写入 batchSize 个 Event,这个参数和 batchSize 一起控制着每个文件的大小,请注意,Hive最终会将这些文件压缩成一个更大的文件。
heartBeatInterval240发送到 Hive 的连续心跳检测间隔(秒),以防止未使用的事务过期。设置为0表示禁用心跳。
autoCreatePartitionstrueFlume 会自动创建必要的 Hive分区以进行流式传输
batchSize15000写入一个 Hive事务中最大的 Event 数量
maxOpenConnections500允许打开的最大连接数。如果超过此数量,则关闭最近最少使用的连接。
callTimeout10000Hive、HDFS I/O操作的超时时间(毫秒),比如:开启事务、写数据、提交事务、取消事务。
serializer序列化器负责解析 Event 中的字段并把它们映射到 Hive表中的列,选择哪种序列化器取决于 Event 中的数据格式,支持的序列化器有:DELIMITEDJSON
roundfalse是否启用时间戳舍入机制
roundUnitminute舍入值的单位,可选值:secondminutehour
roundValue1舍入到小于当前时间的最高倍数(使用 roundUnit 配置的单位) 例子1:roundUnit=second,roundValue=10,则14:31:18这个时间戳会被舍入到14:31:10; 例子2:roundUnit=second,roundValue=30,则14:31:18这个时间戳会被舍入到14:31:00,14:31:42这个时间戳会被舍入到14:31:30;
timeZoneLocal Time应用于解析分区中转义序列的时区名称,比如:America/Los_Angeles、Asia/Shanghai、Asia/Tokyo等
useLocalTimeStampfalse替换转义序列时是否使用本地时间戳(否则使用Event header中的timestamp )****

20210315102446907

下面介绍Hive Sink的两个序列化器:

JSON :处理UTF8编码的 Json 格式(严格语法)Event,不需要配置。 JSON中的对象名称直接映射到Hive表中具有相同名称的列。 内部使用 org.apache.hive.hcatalog.data.JsonSerDe ,但独立于 Hive表的 Serde 。 此序列化程序需要安装 HCatalog。

DELIMITED: 处理简单的分隔文本 Event。 内部使用 LazySimpleSerde,但独立于 Hive表的 Serde。

属性默认值解释
serializer.delimiter,(类型:字符串)传入数据中的字段分隔符。 要使用特殊字符,请用双引号括起来,例如“\t”
serializer.fieldnames从输入字段到Hive表中的列的映射。 指定为Hive表列名称的逗号分隔列表(无空格),按顺序标识输入字段。 要跳过字段,请保留未指定的列名称。 例如, ‘time,ip,message’表示输入映射到hive表中的 time,ip 和 message 列的第1,第3和第4个字段。
serializer.serdeSeparatorCtrl-A(类型:字符)自定义底层序列化器的分隔符。如果 serializer.fieldnames 中的字段与 Hive表列的顺序相同,则 serializer.delimiterserializer.serdeSeparator 相同, 并且 serializer.fieldnames 中的字段数小于或等于表的字段数量,可以提高效率,因为传入 Event 正文中的字段不需要重新排序以匹配 Hive表列的顺序。 对于’\t’这样的特殊字符使用单引号,要确保输入字段不包含此字符。 注意:如果 serializer.delimiter 是单个字符,最好将本参数也设置为相同的字符。

以下是支持的转义符:

转义符解释
%{host}Event header中 key 为 host 的值。这个 host 可以是任意的 key,只要 header 中有就能读取,比如%{aabc}将读取 header 中 key 为 aabc 的值
%t毫秒值的时间戳(同 System.currentTimeMillis() 方法)
%a星期的缩写(Mon、Tue等)
%A星期的全拼(Monday、 Tuesday等)
%b月份的缩写(Jan、 Feb等)
%B月份的全拼(January、February等)
%c日期和时间(Thu Feb 14 23:05:25 2019)
%d月份中的天(00到31)
%D日期,与%m/%d/%y相同 ,例如:02/09/19
%H小时(00到23)
%I小时(01到12)
%j年中的天数(001到366)
%k小时(0到23),注意跟 %H 的区别
%m月份(01到12)
%M分钟(00到59)
%pam 或者 pm
%sunix时间戳,是秒值。比如:2019/4/1 15:12:47 的unix时间戳是:1554102767
%S秒(00到59)
%y一年中的最后两位数(00到99),比如1998年的%y就是98
%Y年(2010这种格式)
%z数字时区(比如:-0400)

注解:对于所有与时间相关的转义字符,Event header 中必须存在带有“timestamp”键的属性(除非 useLocalTimeStamp 设置为 true )。快速添加此时间戳的一种方法是使用 时间戳添加拦截器 ( TimestampInterceptor)。

假设Hive表如下:

create table weblogs ( id int , msg string )
    partitioned by (continent string, country string, time string)
    clustered by (id) into 5 buckets
    stored as orc;

配置范例:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

以上配置会将时间戳向下舍入到最后10分钟。 例如,将时间戳标头设置为2019年4月1日下午15:21:34且“country”标头设置为“india”的Event将评估为分区(continent =’asia’,country =’india’,time =’2019-04-01-15-20’。序列化程序配置为接收包含三个字段的制表符分隔的输入并跳过第二个字段。

Elasticsearch

这个Sink把数据写入到 elasticsearch 集群,就像 logstash 一样把 Event 写入以便 Kibana 图形接口可以查询并展示。

必须将环境所需的 elasticsearch 和 lucene-core jar 放在 Flume 安装的 lib 目录中。 Elasticsearch 要求客户端 JAR 的主要版本与服务器的主要版本匹配,并且两者都运行相同的 JVM 次要版本。如果版本不正确,会报 SerializationExceptions 异常。 要选择所需的版本,请首先确定 elasticsearch 的版本以及目标群集正在运行的 JVM 版本。然后选择与主要版本匹配的 elasticsearch 客户端库。 0.19.x客户端可以与0.19.x群集通信; 0.20.x可以与0.20.x对话,0.90.x可以与0.90.x对话。确定 elasticsearch 版本后, 读取 pom.xml 文件以确定要使用的正确 lucene-core JAR 版本。运行 ElasticSearchSink 的 Flume 实例程序也应该与目标集群运行的次要版本的 JVM 相匹配。

所有的 Event 每天会被写入到新的索引,名称是-yyyy-MM-dd的格式,其中可以自定义配置。Sink将在午夜 UTC 开始写入新索引。

默认情况下,Event 会被 ElasticSearchLogStashEventSerializer 序列化器进行序列化。可以通过 serializer 参数配置来更改序和自定义列化器。这个参数可以配置 org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializerorg.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory 接口的实现类,ElasticSearchEventSerializer 现在已经不建议使用了,推荐使用更强大的后者。

属性默认值解释
channel与 Sink 绑定的 channel
type组件类型,这个是: org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostNames逗号分隔的hostname:port列表,如果端口不存在,则使用默认的9300端口
indexNameflume指定索引名称的前缀。比如:默认是“flume”,使用的索引名称就是 flume-yyyy-MM-dd 这种格式。也支持 header 属性替换的方式,比如%{lyf}就会用 Event header 中的属性名为 lyf 的值。
indexTypelogs文档的索引类型。默认为 log,也支持 header 属性替换的方式,比如%{lyf}就会用 Event header 中的属性名为 lyf 的值。
clusterNameelasticsearch要连接的 ElasticSearch 集群名称
batchSize100每个事务写入的 Event 数量
ttlTTL 以天为单位,设置了会导致过期文档自动删除,如果没有设置,文档将永远不会被自动删除。 TTL 仅以较早的整数形式被接受, 例如 a1.sinks.k1.ttl = 5并且还具有限定符 ms (毫秒), s (秒), m (分钟), h (小时), d (天)和 w (星期)。 示例a1.sinks.k1.ttl = 5d 表示将TTL设置为5天。 点击 http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ 了解更多信息。
serializerorg.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer序列化器必须实现 ElasticSearchEventSerializerElasticSearchIndexRequestBuilderFactory 接口,推荐使用后者。
serializer.*序列化器的一些属性配置

20210315103333352

注解:使用 header 替换可以方便地通过 header 中的值来动态地决定存储 Event 时要时候用的 indexName 和 indexType。使用此功能时应谨慎,因为 Event 提交者可以控制 indexName 和 indexType。 此外,如果使用 elasticsearch REST 客户端,则 Event 提交者可以控制所使用的URL路径。

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
Hbase

此Sink将数据写入 HBase。 Hbase 配置是从classpath中遇到的第一个 hbase-site.xml 中获取的。 配置指定的 HbaseEventSerializer 接口的实现类用于将 Event 转换为 HBase put 或 increments。 然后将这些 put 和 increments 写入 HBase。 该Sink提供与 HBase 相同的一致性保证,HBase 是当前行的原子性。 如果 Hbase 无法写入某些 Event,则Sink将重试该事务中的所有 Event。

这个Sink支持以安全的方式把数据写入到 HBase。为了使用安全写入模式,运行 Flume 实例的用户必须有写入 HBase 目标表的写入权限。可以在配置中指定用于对 KDC 进行身份验证的主体和密钥表。 Flume 的 classpath 中的 hbase-site.xml 必须将身份验证设置为 kerberos。

Flume提供了两个序列化器。第一个序列化器是 SimpleHbaseEventSerializer ( org.apache.flume.sink.hbase.SimpleHbaseEventSerializer ) ,它把 Event body 原样写入到HBase,并可选增加HBase列,这个实现主要就是提供个例子。 第二个序列化器是 RegexHbaseEventSerializer ( org.apache.flume.sink.hbase.RegexHbaseEventSerializer ) ,它把 Event body 按照给定的正则进行分割然后写入到不同的列中。

属性默认值解释
channel与 Sink 绑定的 channel
type组件类型,这个是: hbase
table要写入的 Hbase 表名
columnFamily要写入的 Hbase 列族
zookeeperQuorumZookeeper 节点(host:port格式,多个用逗号分隔),hbase-site.xml 中属性 hbase.zookeeper.quorum 的值
znodeParent/hbaseZooKeeper 中 HBase 的 Root ZNode 路径,hbase-site.xml中 zookeeper.znode.parent 的值。
batchSize100每个事务写入的 Event 数量
coalesceIncrementsfalse每次提交时,Sink是否合并多个 increment 到一个 cell。如果有限数量的 cell 有多个 increment ,这样可能会提供更好的性能。
serializerorg.apache.flume.sink.hbase.SimpleHbaseEventSerializer指定序列化器。默认的increment column = “iCol”, payload column = “pCol”。
serializer.*序列化器的属性
kerberosPrincipal以安全方式访问 HBase 的 Kerberos 用户主体
kerberosKeytab以安全方式访问 HBase 的 Kerberos keytab 文件目录

20210315103653008

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
HDFS

这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意, 需要使用支持sync() 调用的Hadoop版本

以下是支持的转义符:

转义符解释
%{host}Event header中key为host的值。这个host可以是任意的key,只要header中有就能读取,比如%{aabc}将读取header中key为aabc的值
%t毫秒值的时间戳(同 System.currentTimeMillis() 方法)
%a星期的缩写(Mon、Tue等)
%A星期的全拼(Monday、 Tuesday等)
%b月份的缩写(Jan、 Feb等)
%B月份的全拼(January、February等)
%c日期和时间(Thu Feb 14 23:05:25 2019)
%d月份中的天(00到31)
%e月份中的天(1到31)
%D日期,与%m/%d/%y相同 ,例如:02/09/19
%H小时(00到23)
%I小时(01到12)
%j年中的天数(001到366)
%k小时(0到23),注意跟 %H的区别
%m月份(01到12)
%n月份(1到12)
%M分钟(00到59)
%pam或者pm
%sunix时间戳,是秒值。比如2019/2/14 18:15:49的unix时间戳是:1550139349
%S秒(00到59)
%y一年中的最后两位数(00到99),比如1998年的%y就是98
%Y年(2010这种格式)
%z数字时区(比如:-0400)
%[localhost]Agent实例所在主机的hostname
%[IP]Agent实例所在主机的IP
%[FQDN]Agent实例所在主机的规范hostname

注意,%[localhost], %[IP] 和 %[FQDN]这三个转义符实际上都是用java的API来获取的,在一些网络环境下可能会获取失败。

正在打开的文件会在名称末尾加上“.tmp”的后缀。文件关闭后,会自动删除此扩展名。这样容易排除目录中的那些已完成的文件。

对于所有与时间相关的转义字符,Event header中必须存在带有“timestamp”键的属性(除非 hdfs.useLocalTimeStamp 设置为 true )。快速自动添加此时间戳的一种方法是使用 时间戳添加拦截器

属性名默认值解释
channel与 Sink 连接的 channel
type组件类型,这个是: hdfs
hdfs.pathHDFS目录路径(例如:hdfs://namenode/flume/webdata/)
hdfs.filePrefixFlumeDataFlume在HDFS文件夹下创建新文件的固定前缀
hdfs.fileSuffixFlume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)
hdfs.inUsePrefixFlume正在写入的临时文件前缀,默认没有
hdfs.inUseSuffix.tmpFlume正在写入的临时文件后缀
hdfs.rollInterval30当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize1024当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount10当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
hdfs.idleTimeout0关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
hdfs.batchSize100向 HDFS 写入内容时每次批量操作的 Event 数量
hdfs.codeC压缩算法。可选值:gzipbzip2lzolzop 、 ``snappy`
hdfs.fileTypeSequenceFile文件格式,目前支持: SequenceFileDataStreamCompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数
hdfs.maxOpenFiles5000允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭
hdfs.minBlockReplicas指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。
hdfs.writeFormatWritable文件写入格式。可选值: TextWritable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。
hdfs.callTimeout10000允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒)
hdfs.threadsPoolSize10每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
hdfs.rollTimerPoolSize1每个HDFS Sink实例调度定时文件滚动的线程数
hdfs.kerberosPrincipal用于安全访问 HDFS 的 Kerberos 用户主体
hdfs.kerberosKeytab用于安全访问 HDFS 的 Kerberos keytab 文件
hdfs.proxyUser代理名
hdfs.roundfalse是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
hdfs.roundValue1向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30
hdfs.roundUnitsecond向下舍入的单位,可选值: secondminutehour
hdfs.timeZoneLocal Time解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai
hdfs.useLocalTimeStampfalse使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳)
hdfs.closeTries0开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;如果设置为0,Sink会一直尝试重命名文件直到成功为止;关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。
hdfs.retryInterval180连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。
serializerTEXTEvent 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。
serializer.*根据上面 serializer 配置的类型来根据需要添加序列化器的参数

20210315104025251

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

上面的例子中时间戳会向前一个整10分钟取整。比如,一个 Event 的 header 中带的时间戳是11:54:34 AM, June 12, 2012,它会保存的 HDFS 路径就是/flume/events/2012-06-12/1150/00。

Avro

这个Sink可以作为 Flume 分层收集特性的下半部分。发送到此Sink的 Event 将转换为Avro Event发送到指定的主机/端口上。Event 从 channel 中批量获取,数量根据配置的 batch-size 而定。

属性默认值解释
channel与 Sink 绑定的 channel
type组件类型,这个是: avro.
hostname监听的服务器名(hostname)或者 IP
port监听的端口
batch-size100每次批量发送的 Event 数
connect-timeout20000第一次连接请求(握手)的超时时间,单位:毫秒
request-timeout20000请求超时时间,单位:毫秒
reset-connection-intervalnone重置连接到下一跳之前的时间量(秒)。 这将强制 Avro Sink 重新连接到下一跳。 这将允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。
compression-typenone压缩类型。可选值: nonedeflate 。压缩类型必须与上一级Avro Source 配置的一致
compression-level6Event的压缩级别 0:不压缩,1-9:进行压缩,数字越大,压缩率越高
sslfalse设置为 true 表示Sink开启 SSL 下面的 truststoretruststore-passwordtruststore-type 就是开启SSL后使用的参数,并且可以指定是否信任所有证书( trust-all-certs
trust-all-certsfalse如果设置为true, 不会检查远程服务器(Avro Source)的SSL服务器证书。不要在生产环境开启这个配置,因为它使攻击者更容易执行中间人攻击并在加密的连接上进行“监听”。
truststore自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为Oracle JRE中的“jssecacerts”或“cacerts”)。
truststore-password上面配置的信任库的密码
truststore-typeJKSJava 信任库的类型。可以配成 JKS 或者其他支持的 Java 信任库类型
exclude-protocolsSSLv3要排除的以空格分隔的 SSL/TLS 协议列表。 SSLv3 协议不管是否配置都会被排除掉。
maxIoWorkers2 * 机器上可用的处理器核心数量I/O工作线程的最大数量。这个是在 NettyAvroRpcClient 的 NioClientSocketChannelFactory 上配置的。

20210315104142614

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
Thrift

这个Sink可以作为 Flume 分层收集特性的下半部分。发送到此Sink的 Event 将转换为 Thrift Event 发送到指定的主机/端口上。Event 从 channel 中获取批量获取,数量根据配置的 batch-size 而定。 可以通过启用 kerberos 身份验证将 Thrift Sink 以安全模式启动。如果想以安全模式与 Thrift Source 通信,那么 Thrift Sink 也必须以安全模式运行。 client-principalclient-keytab 是 Thrift Sink 用于向 kerberos KDC 进行身份验证的配置参数。 server-principal 表示此Sink将要以安全模式连接的 Thrift Source 的主体。

属性默认值解释
channel与 Sink 绑定的 channel
type组件类型,这个是: thrift.
hostname远程 Thrift 服务的主机名或 IP
port远程 Thrift 的端口
batch-size100一起批量发送 Event 数量
connect-timeout20000第一次连接请求(握手)的超时时间,单位:毫秒
request-timeout20000请求超时时间,单位:毫秒
reset-connection-intervalnone重置连接到下一跳之前的时间量(秒)。 这将强制 Thrift Sink 重新连接到下一跳。 允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。
sslfalse设置为 true 表示Sink开启 SSL。下面的 truststoretruststore-passwordtruststore-type 就是开启 SSL 后使用的参数
truststore自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为 Oracle JRE 中的“jssecacerts”或“cacerts”)。
truststore-password上面配置的信任库的密码
truststore-typeJKSJava 信任库的类型。可以配成 JKS 或者其他支持的 Java 信任库类型
exclude-protocolsSSLv3要排除的以空格分隔的 SSL/TLS 协议列表
kerberosfalse设置为 true 开启 kerberos 身份验证。在 kerberos 模式下,需要 client-principalclient-keytabserver-principal 才能成功进行身份验证并与启用了 kerberos 的 Thrift Source 进行通信。
client-principal—-Thrift Sink 用来向 kerberos KDC 进行身份验证的 kerberos 主体。
client-keytab—-Thrift Sink 与 client-principal 结合使用的 keytab 文件路径,用于对 kerberos KDC 进行身份验证。
server-principalThrift Sink 将要连接到的 Thrift Source 的 kerberos 主体。

20210315104320110

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐