log4j结合apache flume输出日志到apache kafka
flume是cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。在最近的项目上,需要统一收集日志,然后将日志发送到kafka,架构流程如下:在每个srv中,通过log4j中flume appender,将日志输出到kafka
flume是cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
在最近的项目上,需要统一收集日志,然后将日志发送到kafka,架构流程如下:
在每个srv中,通过log4j中flume appender,将日志输出到kafka srv上,在flume srv上配置redource为netcat
模式,如下:
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.99.70.51
a1.sources.r1.port = 44444
上面配置了flume srv运行在10.99.70.51的服务器上,并且监听的端口为44444,在log4j中只需要将日志发送到10.99.70.51的44444端口就能成功的发动到flume上。
在flume中,只需要将sink指定到kafka,如下:
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytest
a1.sinks.k1.kafka.bootstrap.servers = 10.99.70.51:9092,10.99.70.52:9092,10.99.70.53:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
其中kafka.bootstrap.servers
需要指定kafka的集群地址,kafka.topic
需要指定kafka中的topic。
如上配置,当app srv中通过log4j输出一条日志的时候,会通过flume appender发送到10.9.70.51机器44444端口上,flume会监听并收集该端口上的数据信息,然后将他转化成kafka event,并发送到kafka集群mytest topic下。
完整配置,如下
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.99.70.51
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.k1.kafka.topic = mytest
#a1.sinks.k1.kafka.bootstrap.servers = 10.99.70.51:9092,10.99.70.52:9092,10.99.70.53:9092
#a1.sinks.k1.kafka.flumeBatchSize = 20
#a1.sinks.k1.kafka.producer.acks = 1
#a1.sinks.k1.kafka.producer.linger.ms = 1
#a1.sinks.ki.kafka.producer.compression.type = snappy
==手机QQ扫描下方二维码,快速加入Java架构师交流群==
更多推荐
所有评论(0)