Flume配置案例——Flume传输数据给Kafka
相关知识Flume是一个海量日志采集、聚合和传输的日志收集系统。Kafka是一个可持久化的分布式的消息队列。由于采集和处理数据的速度不一定同步,所以使用Kafka这个消息中间件来缓冲,如果你收集了日志后,想输出到多个业务方也可结合Kafka,Kafka支持多个业务来读取数据。系统环境Linux Ubuntu 14.04jdk-7u75-linux-x64kafka_2.10-0.8....
相关知识
Flume是一个海量日志采集、聚合和传输的日志收集系统。
Kafka是一个可持久化的分布式的消息队列。
由于采集和处理数据的速度不一定同步,所以使用Kafka这个消息中间件来缓冲,如果你收集了日志后,想输出到多个业务方也可结合Kafka,Kafka支持多个业务来读取数据。
系统环境
Linux Ubuntu 14.04
jdk-7u75-linux-x64
kafka_2.10-0.8.2.2
hadoop-2.6.0-cdh5.4.5
flume-ng-1.5.0-cdh5.4.5
zookeeper-3.4.5-cdh5.4.5
实验内容
说明:
1、使用Flume采集syslog端口的数据。
2、Channel1和Channel2都使用mem。
3、sink1输出数据到HDFS,作持久化存储;sink2输出数据到Kafka,由Kafka端启用console-consumer来消费数据,并输出到屏幕上。
实验步骤
1、启动Hadoop的相关进程,启动zookeeper服务。
jps
cd /apps/hadoop/sbin
./start-all.sh
jps
cd /apps/zookeeper/bin
./zkServer.sh start
./zkServer.sh status
2、创建Flume配置文件,名为:syslogtcp_file_hdfsandkafka.conf,使用vim编辑该conf文件,
cd /apps/flume/conf
cp ./flume-conf.properties.template ./syslogtcp_mem_hdfsandkafka.conf
vim syslogtcp_mem_hdfsandkafka.conf
插入如下内容:
#定义各个组件
agent1.sources = src
agent1.channels = ch_hdfs ch_kafka
agent1.sinks = des_hdfs des_kafka
#配置source
agent1.sources.src.type = syslogtcp
agent1.sources.src.bind = localhost
agent1.sources.src.port = 6666
#配置channel
agent1.channels.ch_hdfs.type = memory
agent1.channels.ch_kafka.type = memory
##配置sink
#配置hdfs sink
agent1.sinks.des_hdfs.type = hdfs
agent1.sinks.des_hdfs.hdfs.path = hdfs://localhost:9000/myflume/syslog_mem_hdfsandkafka/
agent1.sinks.des_hdfs.hdfs.useLocalTimeStamp = true
#设置flume临时文件的前缀为 . 或 _ 在hive加载时,会忽略此文件。
agent1.sinks.des_hdfs.hdfs.inUsePrefix=_
#设置flume写入文件的前缀是什么
agent1.sinks.des_hdfs.hdfs.filePrefix = q7
agent1.sinks.des_hdfs.hdfs.fileType = DataStream
agent1.sinks.des_hdfs.hdfs.writeFormat = Text
#hdfs创建多久会新建一个文件,0为不基于时间判断,单位为秒
agent1.sinks.des_hdfs.hdfs.rollInterval = 20
#hdfs写入的文件达到多大时,创建新文件 0为不基于空间大小,单位B
agent1.sinks.des_hdfs.hdfs.rollSize = 10
#hdfs有多少条消息记录时,创建文件,0为不基于条数判断
agent1.sinks.des_hdfs.hdfs.rollCount = 5
#hdfs空闲多久就新建一个文件,单位秒
#配置kafka sink
agent1.sinks.des_kafka.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.des_kafka.brokerList = localhost:9092
agent1.sinks.des_kafka.topic = flumekafka
agent1.sinks.des_kafka.batchSize=100
agent1.sinks.des_kafka.requiredAcks=1
##下面是把上面设置的组件关联起来(把点用线连起来)
agent1.sources.src.channels = ch_hdfs ch_kafka
agent1.sinks.des_hdfs.channel = ch_hdfs
agent1.sinks.des_kafka.channel = ch_kafka
3、启动kakfa-server。
cd /apps/kafka/bin
./kafka-server-start.sh config/server.properties
4、另起一窗口,在Kafka中创建topic并命名为flumekafka。
cd /apps/kafka
bin/kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--topic flumekafka \
--partitions 1
5、启动flume。
cd /apps/flume/bin
./flume-ng agent -c /conf -f /apps/flume/conf/syslogtcp_mem_hdfsandkafka.conf -n agent1 -D flume.root.logger=DEBUG,console
6、另起一窗口,启动Kafka的console consumer来消费数据。
cd /apps/kafka/bin
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic flumekafka --from-beginning
7、另起一窗口,执行nc
echo "good morning" | nc localhost 6666
Flume-ng的输出:
kafka-server的变化:
kafka-console-consumer的输出:
查看hdfs中的数据:
更多推荐
所有评论(0)