Flume和Kafka整合
采集单元选用模块化程度极高的 Flume-ng,传输单元选择高吞吐率的 Kafka,将两者结合共同构成分布式计算集群的基础数据输入组件。材料准备:Zookeeper - 3.4.6Flume - 1.5.0kafka_2.10-0.8.1.1.tgzFlume-Kafka 插件首先要准备 flume-kafka 插件,进行编译下载地址:htt
采集单元选用模块化程度极高的 Flume-ng,传输单元选择高吞吐率的 Kafka,将两者结合共同构成分布式计算集群的基础数据输入组件。
材料准备:
Zookeeper - 3.4.6
Flume - 1.5.0
kafka_2.10-0.8.1.1.tgz
Flume-Kafka 插件
首先要准备 flume-kafka 插件,进行编译
下载地址:https://github.com/Polaris-zlf/flume-ng-kafka-sink.git
Prerequisites:
Java 1.6 or higher
Apache Maven 3
Apache Flume - 1.5.0 above
Apache Kafka - 0.8.1.1 above
1、下载和安装maven
[root@master ~]# wget https://archive.apache.org/dist/maven/maven-3/3.0.5/binaries/apache-maven-3.0.5-bin.tar.gz
[root@master ~]# tar -zxvf apache-maven-3.0.5-bin.tar.gz
[root@master ~]# ln -s apache-maven-3.0.5 maven
添加环境变量
[root@master maven]# vim /etc/profile
export MAVEN_HOME=/root/maven
export PATH=$PATH:$MAVEN_HOME/bin
刷新配置
[root@master maven]# source /etc/profile
验证,出现下面信息表示maven已经安装完成。
[root@master maven]# mvn
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 0.275s
[INFO] Finished at: Thu Oct 06 22:04:28 CST 2016
[INFO] Final Memory: 2M/15M
[INFO] ------------------------------------------------------------------------
[ERROR] No goals have been specified for this build. You must specify a valid lifecycle phase or a goal in the format <plugin-prefix>:<goal> or <plugin-group-id>:<plugin-artifact-id>[:<plugin-version>]:<goal>. Available lifecycle phases are: validate, initialize, generate-sources, process-sources, generate-resources, process-resources, compile, process-classes, generate-test-sources, process-test-sources, generate-test-resources, process-test-resources, test-compile, process-test-classes, test, prepare-package, package, pre-integration-test, integration-test, post-integration-test, verify, install, deploy, pre-clean, clean, post-clean, pre-site, site, post-site, site-deploy. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/NoGoalSpecifiedException
2、下载插件
[root@master ~]# yum install -y git
[root@master ~]# git clone https://github.com/Polaris-zlf/flume-ng-kafka-sink.git
3、编译
flume-ng-kafka-sink 下面的pom.xml 里面的配置信息:
<flume-version>1.5.0</flume-version>
<kafka-version>0.8.1.1</kafka-version>
<artifactId>kafka_2.10</artifactId>
<version>${kafka-version}</version>
[root@master ~]# cd flume-ng-kafka-sink/
[root@master flume-ng-kafka-sink]# mvn clean install
编译时可能出现下面的错误,不用管这个错误:
ERROR Closing socket for /xxx.xxx.xxx.xxx because of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:122)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
at kafka.network.Processor.write(SocketServer.scala:375)
at kafka.network.Processor.run(SocketServer.scala:247)
at java.lang.Thread.run(Thread.java:745)
编译完成之后在 dist/target 目录下会生成 flume-kafka-sink-dist-0.5.0-bin.zip,解压缩后,在 lib 目录下有四个依赖jar包:
flume-kafka-sink-impl-0.5.0.jar
kafka_2.10-0.8.1.1.jar
metrics-core-2.2.0.jar
scala-library-2.10.1.jar
添加依赖包:
在Flume的安装目录下建立 plugins.d 文件夹,再在该文件夹下建立 kafka-sink 文件夹,然后在kafka-sink文件夹下建立 lib 与 libext 两个文件夹,
将 flume-kafka-sink-impl-0.5.0.jar 拷贝到 lib 下,其他三个jar包拷贝到 libext 下,整个目录结构如下所示:
${FLUME_HOME}
|-- plugins.d
|-- kafka-sink
|-- lib
|-- flume-kafka-sink-impl-x.x.x.jar
|-- libext
|-- kafka_x.x.-x.x.x.x.jar
|-- metrics-core-x.x.x.jar
|-- scala-library-x.x.x.jar
4、安装zookeeper
参考:http://blog.csdn.net/u012689336/article/details/52745571
5、安装 Flume-ng 1.5.0
参考:http://blog.csdn.net/u012689336/article/details/52687956
6、安装 Kafka 0.8.1.1
参考:http://blog.csdn.net/u012689336/article/details/52739042
文章里面的kafka_2.9.2-0.8.1.1.tgz 换成 kafka_2.10-0.8.1.1.tgz
7、配置 Flume
vi conf/a1.conf
# 定义 agent
a1.sources = src1
a1.channels = ch1
a1.sinks = k1
# 定义 sources
a1.sources.src1.type = spooldir
a1.sources.src1.spoolDir=/root/data
a1.sources.src1.channels=ch1
# 定义 sinks
a1.sinks.k1.type = com.polaris.flume.sink.KafkaSink
# 需要连接的 topic 名称
# 注意:如果此 topic 不存在(即在 Kafka 集群中未创建)则默认连接到一个名为 “default-flume-topic” 的 topic
a1.sinks.k1.custom-topic = flumeTopic
a1.sinks.k1.preprocessor = com.polaris.flume.sink.SimpleMessagePreprocessor
# 需要连接到的 Kafka 服务器地址与端口
a1.sinks.k1.kafka.metadata.broker.list=master:9092
a1.sinks.k1.kafka.serializer.class = kafka.serializer.StringEncoder
a1.sinks.k1.kafka.request.required.acks = 1
a1.sinks.k1.channel = ch1
# 定义 channels
a1.channels.ch1.type = memory
a1.channels.ch1.capacity = 1000
上面 配置文件中提到的默认连接到一个名为 ‘default-flume-topic’ 的 topic" ,实际上是在flume-ng-kafka-sink项目中定义的,
如果需要修改默认名称等属性,可以修改 Constants类。
flume-ng-kafka-sink/impl/src/main/java/com/polaris/flume/sink/下面的
Constants.java
public static final String DEFAULT_TOPIC = "default-flume-topic";
8、运行
启动 zookeeper server
[root@master bin]# ./zkServer.sh start
启动 Kafka server(每个 flume agent 对应的 Kafka broker)
[root@master kafka]# bin/kafka-server-start.sh config/server.properties &
创建一个叫做“flume”的topic,它只有一个分区,一个副本。
[root@master kafka]# bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topicflumeTopic
Created topic "flumeTopic".
通过list命令查看创建的topic:
[root@master kafka]# bin/kafka-topics.sh --list --zookeeper master:2181
flumeTopic
启动 Kafka consumer
bin/kafka-console-consumer.sh --zookeeper master:2181 --topic flumeTopic --from-beginning
启动 Flume
如果需要监控 agent 配置信息,可以添加 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 参数,通过 http://master:34545 访问 agent 配置信息。
往 Flume 数据源发送数据
[root@master ~]# cp data.txt data/
将会在 Kafka consumer 控制台上面打印出 data.txt 里面的数据
当数据打印完 data 目录下面的 data.txt 将会变成 data.txt.COMPLETED
到此 Flume和Kafka整合 完毕,有人如果遇到问题的可以留言或者加QQ:850188633进行咨询。
更多推荐
所有评论(0)