1.flume安装

参考:http://blog.csdn.net/lnho2015/article/details/52035145

1. 系统需求
Flume需要Java 1.6及以上(推荐1.7),对Agent监控目录的读写权限。

2. 下载软件包

到Flume官网上http://flume.apache.org/download.html下载软件包,例如:

wget "http://mirrors.cnnic.cn/apache/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz"
tar -xzvf apache-flume-1.6.0-bin.tar.gz
mv flume-1.6.0 /opt

3. 简单示例
3.1 修改配置文件

vi /opt/flume-1.6.0/conf/flume.conf
输入以下内容
# 指定Agent的组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1


# 指定Flume source(要监听的路径)
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/path


# 指定Flume sink
a1.sinks.k1.type = logger


# 指定Flume channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# 绑定source和sink到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.2 启动flume agent
cd /opt/flume-1.6.0
bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console

参数 作用 举例
–conf 或 -c 指定配置文件夹,包含flume-env.sh和log4j的配置文件 –conf conf
–conf-file 或 -f 配置文件地址 –conf-file conf/flume.conf
–name 或 -n agent名称 –name a1
-z zookeeper连接字符串 -z zkhost:2181,zkhost1:2181
-p zookeeper中的存储路径前缀 -p /flume

3.3 写入日志内容

vi 1.log
写入Hello Flume.作为测试内容。然后拷贝到flume监听路径。

cp 1.log  /root/path/

接着就可以在前一个终端看到刚刚采集的内容了,如下:

2016-06-27 10:02:58,322 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 0D             Hello Flume. }

至此flume已经能够正常运行。

2.kafka安装

1.1  Zookeeper的安装与启动
(1)登录tc-host云主机,执行以下命令,下载并解压Zookeeper

$ cd /opt
$ wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz 
$ gzip -d zookeeper-3.4.6.tar.gz 
$ tar -xvf zookeeper-3.4.6.tar
(2)进入conf目录,将示例的配置文件zoo_sample.cfg改名为zoo.cfg,作为我们的配置文件使用,命令如下
$ cd zookeeper-3.4.6/conf
$ mv zoo_sample.cfg zoo.cfg
(3)进入bin目录,执行zkServer.sh的start命令启动Zookeeper服务
$ cd ..
$ cd bin
$ ./zkServer.sh start
(4)提示以下内容,启动成功
JMX enabled by default
Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
这时我们以默认的配置启动了Zookeeper服务,使用的是2181端口。
有关Zookeeper的相关知识,这里不做赘述。
1.2  Kafka的安装与启动
(1)登录tc-newhost云主机,执行以下命令,下载并解压Kafka
$ cd /opt
$ wget http://mirrors.cnnic.cn/apache/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
$ gtar xvzf kafka_2.11-0.9.0.1.tgz
(2)修改kafka_2.11-0.9.0.1/config目录下的server.properties文件,找到zookeeper.connect一项,修改为tc-host的地址,如下:
zookeeper.connect=172.26.6.246:2181
注意,默认是访问本机上部署的Zookeeper,由于我们是将zookeeper部署在另一台主机上,所以要修改地址。
(3)执行bin目录下的kafka-server-start.sh命令启动Kafka,以server.properties文件作为参数,启动Kafka
$ cd kafka_2.11-0.9.0.1
$ ./bin/kafka-server-start.sh ./config/server.properties &

命令后面的&符号是将启动的Kafka服务设置为后台进程,方便我们进一步的操作。

启动命令是以配置文件为参数,按照相关的配置来启动的。server.properties是默认的配置文件,几个比较常用的配置项包括:
(1)broker.id  broker的id号
(2)port  端口
(3)zookeeper.connect  zookeeper的连接地址
(4)log.dirs  日志的目录

终端输出一系列的日志以后,启动成功。
......
[2016-05-25 19:53:06,410] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(tc-newhost.office.mos,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-05-25 19:53:06,423] INFO Kafka version : 0.9.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2016-05-25 19:53:06,423] INFO Kafka commitId : 23c69d62a0cabf06 (org.apache.kafka.common.utils.AppInfoParser)
[2016-05-25 19:53:06,424] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-05-25 19:53:06,713] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0],[test2,0],[tes,0] (kafka.server.ReplicaFetcherManager)
[2016-05-25 19:53:06,763] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0],[test2,0],[tes,0] (kafka.server.ReplicaFetcherManager)
1.3  简单功能验证
Kafka成功启动后,可以通过一些简单的命令来验证一下功能。
(1)创建一个名为test的topic
$./bin/kafka-topics.sh --create --zookeeper 172.26.6.246:2181 --replication-factor 1 --partitions 1 --topic test

create命令的replication-factor是设置该topic在多少个broker上存储。

(2)查询topic的属性

$./bin/kafka-topics.sh --describe --zookeeper 172.26.6.246:2181 --topic test

describe命令的返回信息中,罗列了所有partition的信息,其中:
(1)Partition是编号
(2)Leader是一个broker的编号,该broker存储了当前partition,并且被选举为broker列表中的Leader。在Kafka中,只有Leader节点会负责消息的读和写,其他broker只是做备份
(3)Replicas是存储了该partition的broker列表
(4)Isr是当前可用的broker列表

(3)生产者连接broker发送消息

$./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
在终端执行该命令后,会进入到一个连续输入的状态,输入一条消息后,按回车换行,可以直接输入下一条消息,直到按下Control+C结束这个状态。
(4)消费者获取消息
$./bin/kafka-console-consumer.sh --zookeeper 172.26.6.246:2181 --from-beginning --topic test
hello
消费者获取消息成功,至此,说明一个基本的Kafka环境安装并启动成功了。

3.flume写入kafka

修改conf/flume.conf

# 指定Flume sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100





Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐