日志收集系统Flume笔记(基础版)
1.Flume是啥?Flume是一种分布式,大数据日志收集系统。可以定制数据源、数据终点、数据传输通道,过程中可以简单的对数据进行一些处理。而且可以搭配消息队列或者数据仓库使用,比如kafka、Hive。2.架构Flume很简单,主要有三个方面:Source、Channel、Sink。其中Source是源、Channel是通道、Sink是数据下沉,也就是数据目的地。基本上我们要围绕这仨进行...
1.Flume是啥?
Flume是一种分布式,大数据日志收集系统。可以定制数据源、数据终点、数据传输通道,过程中可以简单的对数据进行一些处理。而且可以搭配消息队列或者数据仓库使用,比如kafka、Hive。
2.架构
Flume很简单,主要有三个方面:Source、Channel、Sink。其中Source是源、Channel是通道、Sink是数据下沉,也就是数据目的地。基本上我们要围绕这仨进行操作。
3.安装
我是用虚拟机,实验的场合是CentOs7,需要有java环境。
1.在官方下载安装包: http://flume.apache.org/download.html
2.解压至目录:/opt/soft/ (自行选择后续不提)
3.在 /etc/profile 里配置好flume的环境变量(和配Java或者Hadoop啥的差不多)。
4.执行source /etc/profile
5.进入到conf目录下,有一个 flume-env.sh.template 文件,修改一下内容
export JAVA_HOME=你java的路径
保存,并改名把后面的template去掉。
这个时候尝试一下 flume-ng version
如果有版本提示,则安装完成。如果没有,检查环境变量。实在不行,去flume的bin目录下用flume-ng也可以!
4.用法
其实Flume支持的用法有很多!这里从最基础的开始。
Flume是靠配置文件启动的,我们可以打开 flume-conf.properties.template 文件看一下:
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink
# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = seq
# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.loggerSink.type = logger
#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
里面密密麻麻的,去掉不重要的注释,我们只拆开慢慢的边改边说:
首先这里面有一个 agent,如他的名字,他是代理,我们每创建一个新的配置,就明确一个新的代理,比如这里改成a1,代理主要配置三个属性:sources、channel、sinks。
前三行:
a1.sources=r1
a1.channels=c1
a1.sinks=k1
这三个后面可以自己定义名字,选择比较简单容易记的就可以,这里分别命名为r1、c1、k1
紧接着就是对source进行定义:
souce的配置主要是由type属性确定的,模板里没有给出,这里可以先简单展示一个avro。
a1.sources.r1.type=avro
a1.sources.r1.bind=你的ip地址
a1.sources.r1.port=你定义个端口号
这里就是用来avro类型的数据源。后续我们可以通过avro客户端来发消息,从而测试。
忽略一行我们继续看sinks:
这里我们简单改写。
a1.sinks.k1.type=logger
同理,这里type也有很多,这选择最简单的logger来展示,即直接打印到控制台。
忽略一行继续看channel:
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100
channel是通道,默认最简单的方式是直接使用内存存储并传输。简单解释一下,type是选择类型为内存。capacity是默认通道里最大可以存储的event数量,trasactionCapacity是每次最多可以从source中送往sink的event数量。
最后不要忘记!!!把source和sink绑定到channel上
a1.sources.r1.channels= c1
a1.sinks.k1.channels= c1
最后是长这样的:
#定义名称
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#描述source
a1.sources.r1.type=avro
a1.sources.r1.bind=你的ip地址
a1.sources.r1.port=你定义个端口号
#定义sink
a1.sinks.k1.type=logger
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100
#source和sink绑定到channel
a1.sources.r1.channels= c1
a1.sinks.k1.channels= c1
这样,我们的第一个配置文件就配置好了,可以改个名,比如叫他 example.conf ,现在来使用它!!
我们可以来到bin目录下,执行指令开启flume,比如:
flume-ng agent -c ../conf/ -f ../conf/example.conf -n a1 -Dflume.root.logger=INFO,console
指令后面的选项很多,解释一下
- 首先指令分四个:agent是启动一个flume代理、help是展示help目录、avro-client是运行一个avro客户端、version是看版本。
- -c 是指定配置文件的目录,可以找log4j.properties记录flume的日志。
- -f 是指定flume使用哪个配置文件。
- -n是agent代理的名称,这个是必填项!
这样就输出到控制台了,如果想后台启动可以执行:
nohup flume-ng agent -c ../conf/ -f ../conf/example.conf -n a1 -Dflume.root.logger=INFO,console&
然后根据nohup查看。
执行 之后可以查看一下,如果没有报错可以调试一下,主要就是通过avro客户端发消息过去。比如我们创建一个文件:
/opt/soft/apache-flume-1.8.0-bin/test_file/test.log
里面写上hello flume然后保存。
用avro客户端发消息:
flume-ng avro-client -c . -H 你设置的ip地址 -p 你设置的端口号 -F /opt/soft/apache-flume-1.8.0-bin/test_file/test.log
发送之后可以看控制台,后台打印的去看生成的nohup.out。可以发现hello flume已经被打印出来了。
我们可以拷贝一个配置文件,修改一下sink,让它不打印在控制台,而是生成文件:
#定义名称
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#描述source
a1.sources.r1.type=avro
a1.sources.r1.bind=你的ip地址
a1.sources.r1.port=你定义个端口号
#定义sink
a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/opt/soft/apache-flume-1.8.0-bin/collect
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100
#source和sink绑定到channel
a1.sources.r1.channels= c1
a1.sinks.k1.channels= c1
这里把sink的type改成了file_roll
然后加上了一个文件夹,地址可以自定义。
这个我们在使用的时候会发现有很多文件,每30秒就多一个文件,就算不创建日志文件也会出现!所以这里我们需要设置一个参数:
a1.sinks.k1.rollInterval=0
由于是自己调试,所以这样会更清晰一些,官方文档的解释是,每30秒生成一个文件去记录,如果我们设置为0,则不会分隔文件,接受的会全部记录在同一个文件中。
更多推荐
所有评论(0)