1.Flume是啥?

Flume是一种分布式,大数据日志收集系统。可以定制数据源、数据终点、数据传输通道,过程中可以简单的对数据进行一些处理。而且可以搭配消息队列或者数据仓库使用,比如kafka、Hive。

2.架构

flume.png
Flume很简单,主要有三个方面:Source、Channel、Sink。其中Source是源、Channel是通道、Sink是数据下沉,也就是数据目的地。基本上我们要围绕这仨进行操作。

3.安装

我是用虚拟机,实验的场合是CentOs7,需要有java环境。
1.在官方下载安装包: http://flume.apache.org/download.html
2.解压至目录:/opt/soft/ (自行选择后续不提)
3.在 /etc/profile 里配置好flume的环境变量(和配Java或者Hadoop啥的差不多)。
4.执行source /etc/profile
5.进入到conf目录下,有一个 flume-env.sh.template 文件,修改一下内容

export JAVA_HOME=你java的路径

保存,并改名把后面的template去掉。
这个时候尝试一下 flume-ng version
如果有版本提示,则安装完成。如果没有,检查环境变量。实在不行,去flume的bin目录下用flume-ng也可以!

4.用法

其实Flume支持的用法有很多!这里从最基础的开始。
Flume是靠配置文件启动的,我们可以打开 flume-conf.properties.template 文件看一下:

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

里面密密麻麻的,去掉不重要的注释,我们只拆开慢慢的边改边说:
首先这里面有一个 agent,如他的名字,他是代理,我们每创建一个新的配置,就明确一个新的代理,比如这里改成a1,代理主要配置三个属性:sources、channel、sinks。
前三行:

a1.sources=r1
a1.channels=c1
a1.sinks=k1

这三个后面可以自己定义名字,选择比较简单容易记的就可以,这里分别命名为r1、c1、k1
紧接着就是对source进行定义:
souce的配置主要是由type属性确定的,模板里没有给出,这里可以先简单展示一个avro。

a1.sources.r1.type=avro
a1.sources.r1.bind=你的ip地址
a1.sources.r1.port=你定义个端口号

这里就是用来avro类型的数据源。后续我们可以通过avro客户端来发消息,从而测试。

忽略一行我们继续看sinks:
这里我们简单改写。

a1.sinks.k1.type=logger

同理,这里type也有很多,这选择最简单的logger来展示,即直接打印到控制台。

忽略一行继续看channel:

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100

channel是通道,默认最简单的方式是直接使用内存存储并传输。简单解释一下,type是选择类型为内存。capacity是默认通道里最大可以存储的event数量,trasactionCapacity是每次最多可以从source中送往sink的event数量。

最后不要忘记!!!把source和sink绑定到channel上

a1.sources.r1.channels= c1
a1.sinks.k1.channels= c1

最后是长这样的:

#定义名称
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#描述source
a1.sources.r1.type=avro
a1.sources.r1.bind=你的ip地址
a1.sources.r1.port=你定义个端口号

#定义sink
a1.sinks.k1.type=logger

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100

#source和sink绑定到channel
a1.sources.r1.channels= c1
a1.sinks.k1.channels= c1

这样,我们的第一个配置文件就配置好了,可以改个名,比如叫他 example.conf ,现在来使用它!!

我们可以来到bin目录下,执行指令开启flume,比如:

flume-ng agent -c ../conf/ -f ../conf/example.conf -n a1 -Dflume.root.logger=INFO,console

指令后面的选项很多,解释一下

  • 首先指令分四个:agent是启动一个flume代理、help是展示help目录、avro-client是运行一个avro客户端、version是看版本。
  • -c 是指定配置文件的目录,可以找log4j.properties记录flume的日志。
  • -f 是指定flume使用哪个配置文件。
  • -n是agent代理的名称,这个是必填项!

这样就输出到控制台了,如果想后台启动可以执行:

nohup flume-ng agent -c ../conf/ -f ../conf/example.conf -n a1 -Dflume.root.logger=INFO,console&

然后根据nohup查看。

执行 之后可以查看一下,如果没有报错可以调试一下,主要就是通过avro客户端发消息过去。比如我们创建一个文件:
/opt/soft/apache-flume-1.8.0-bin/test_file/test.log
里面写上hello flume然后保存。

用avro客户端发消息:

flume-ng avro-client -c . -H 你设置的ip地址 -p 你设置的端口号 -F /opt/soft/apache-flume-1.8.0-bin/test_file/test.log

发送之后可以看控制台,后台打印的去看生成的nohup.out。可以发现hello flume已经被打印出来了。

我们可以拷贝一个配置文件,修改一下sink,让它不打印在控制台,而是生成文件:

#定义名称
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#描述source
a1.sources.r1.type=avro
a1.sources.r1.bind=你的ip地址
a1.sources.r1.port=你定义个端口号

#定义sink
a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/opt/soft/apache-flume-1.8.0-bin/collect


#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100

#source和sink绑定到channel
a1.sources.r1.channels= c1
a1.sinks.k1.channels= c1

这里把sink的type改成了file_roll
然后加上了一个文件夹,地址可以自定义。
这个我们在使用的时候会发现有很多文件,每30秒就多一个文件,就算不创建日志文件也会出现!所以这里我们需要设置一个参数:

a1.sinks.k1.rollInterval=0

由于是自己调试,所以这样会更清晰一些,官方文档的解释是,每30秒生成一个文件去记录,如果我们设置为0,则不会分隔文件,接受的会全部记录在同一个文件中。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐