flume是cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

在最近的项目上,需要统一收集日志,然后将日志发送到kafka,架构流程如下:
image

在每个srv中,通过log4j中flume appender,将日志输出到kafka srv上,在flume srv上配置redource为netcat模式,如下:

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.99.70.51
a1.sources.r1.port = 44444

上面配置了flume srv运行在10.99.70.51的服务器上,并且监听的端口为44444,在log4j中只需要将日志发送到10.99.70.51的44444端口就能成功的发动到flume上。

在flume中,只需要将sink指定到kafka,如下:

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytest
a1.sinks.k1.kafka.bootstrap.servers = 10.99.70.51:9092,10.99.70.52:9092,10.99.70.53:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

其中kafka.bootstrap.servers需要指定kafka的集群地址,kafka.topic需要指定kafka中的topic。

如上配置,当app srv中通过log4j输出一条日志的时候,会通过flume appender发送到10.9.70.51机器44444端口上,flume会监听并收集该端口上的数据信息,然后将他转化成kafka event,并发送到kafka集群mytest topic下。

完整配置,如下

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.99.70.51
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.k1.kafka.topic = mytest
#a1.sinks.k1.kafka.bootstrap.servers = 10.99.70.51:9092,10.99.70.52:9092,10.99.70.53:9092
#a1.sinks.k1.kafka.flumeBatchSize = 20
#a1.sinks.k1.kafka.producer.acks = 1
#a1.sinks.k1.kafka.producer.linger.ms = 1
#a1.sinks.ki.kafka.producer.compression.type = snappy

==手机QQ扫描下方二维码,快速加入Java架构师交流群==

image

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐