flume上报日志到kafka
1.flume安装参考:http://blog.csdn.net/lnho2015/article/details/520351451. 系统需求Flume需要Java 1.6及以上(推荐1.7),对Agent监控目录的读写权限。2. 下载软件包到Flume官网上http://flume.apache.org/download.html下载软件包,例如:wget "
1.flume安装
参考:http://blog.csdn.net/lnho2015/article/details/52035145
1. 系统需求
Flume需要Java 1.6及以上(推荐1.7),对Agent监控目录的读写权限。
2. 下载软件包
到Flume官网上http://flume.apache.org/download.html下载软件包,例如:
wget "http://mirrors.cnnic.cn/apache/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz"
tar -xzvf apache-flume-1.6.0-bin.tar.gz
mv flume-1.6.0 /opt
3. 简单示例
3.1 修改配置文件
vi /opt/flume-1.6.0/conf/flume.conf
输入以下内容
# 指定Agent的组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 指定Flume source(要监听的路径)
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/path
# 指定Flume sink
a1.sinks.k1.type = logger
# 指定Flume channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定source和sink到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.2 启动flume agent
cd /opt/flume-1.6.0
bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
参数 作用 举例
–conf 或 -c 指定配置文件夹,包含flume-env.sh和log4j的配置文件 –conf conf
–conf-file 或 -f 配置文件地址 –conf-file conf/flume.conf
–name 或 -n agent名称 –name a1
-z zookeeper连接字符串 -z zkhost:2181,zkhost1:2181
-p zookeeper中的存储路径前缀 -p /flume
3.3 写入日志内容
vi 1.log
写入Hello Flume.作为测试内容。然后拷贝到flume监听路径。
cp 1.log /root/path/
接着就可以在前一个终端看到刚刚采集的内容了,如下:
2016-06-27 10:02:58,322 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 0D Hello Flume. }
至此flume已经能够正常运行。
2.kafka安装
1.1 Zookeeper的安装与启动
(1)登录tc-host云主机,执行以下命令,下载并解压Zookeeper
$ cd /opt
$ wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
$ gzip -d zookeeper-3.4.6.tar.gz
$ tar -xvf zookeeper-3.4.6.tar
(2)进入conf目录,将示例的配置文件zoo_sample.cfg改名为zoo.cfg,作为我们的配置文件使用,命令如下
$ cd zookeeper-3.4.6/conf
$ mv zoo_sample.cfg zoo.cfg
(3)进入bin目录,执行zkServer.sh的start命令启动Zookeeper服务
$ cd ..
$ cd bin
$ ./zkServer.sh start
(4)提示以下内容,启动成功
JMX enabled by default
Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
这时我们以默认的配置启动了Zookeeper服务,使用的是2181端口。
有关Zookeeper的相关知识,这里不做赘述。
1.2 Kafka的安装与启动
(1)登录tc-newhost云主机,执行以下命令,下载并解压Kafka
$ cd /opt
$ wget http://mirrors.cnnic.cn/apache/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
$ gtar xvzf kafka_2.11-0.9.0.1.tgz
(2)修改kafka_2.11-0.9.0.1/config目录下的server.properties文件,找到zookeeper.connect一项,修改为tc-host的地址,如下:
zookeeper.connect=172.26.6.246:2181
注意,默认是访问本机上部署的Zookeeper,由于我们是将zookeeper部署在另一台主机上,所以要修改地址。
(3)执行bin目录下的kafka-server-start.sh命令启动Kafka,以server.properties文件作为参数,启动Kafka
$ cd kafka_2.11-0.9.0.1
$ ./bin/kafka-server-start.sh ./config/server.properties &
命令后面的&符号是将启动的Kafka服务设置为后台进程,方便我们进一步的操作。
启动命令是以配置文件为参数,按照相关的配置来启动的。server.properties是默认的配置文件,几个比较常用的配置项包括:
(1)broker.id broker的id号
(2)port 端口
(3)zookeeper.connect zookeeper的连接地址
(4)log.dirs 日志的目录
......
[2016-05-25 19:53:06,410] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(tc-newhost.office.mos,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-05-25 19:53:06,423] INFO Kafka version : 0.9.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2016-05-25 19:53:06,423] INFO Kafka commitId : 23c69d62a0cabf06 (org.apache.kafka.common.utils.AppInfoParser)
[2016-05-25 19:53:06,424] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-05-25 19:53:06,713] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0],[test2,0],[tes,0] (kafka.server.ReplicaFetcherManager)
[2016-05-25 19:53:06,763] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0],[test2,0],[tes,0] (kafka.server.ReplicaFetcherManager)
1.3 简单功能验证
Kafka成功启动后,可以通过一些简单的命令来验证一下功能。
(1)创建一个名为test的topic
$./bin/kafka-topics.sh --create --zookeeper 172.26.6.246:2181 --replication-factor 1 --partitions 1 --topic test
create命令的replication-factor是设置该topic在多少个broker上存储。
(2)查询topic的属性
$./bin/kafka-topics.sh --describe --zookeeper 172.26.6.246:2181 --topic test
describe命令的返回信息中,罗列了所有partition的信息,其中:
(1)Partition是编号
(2)Leader是一个broker的编号,该broker存储了当前partition,并且被选举为broker列表中的Leader。在Kafka中,只有Leader节点会负责消息的读和写,其他broker只是做备份
(3)Replicas是存储了该partition的broker列表
(4)Isr是当前可用的broker列表
(3)生产者连接broker发送消息
$./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
在终端执行该命令后,会进入到一个连续输入的状态,输入一条消息后,按回车换行,可以直接输入下一条消息,直到按下Control+C结束这个状态。
(4)消费者获取消息
$./bin/kafka-console-consumer.sh --zookeeper 172.26.6.246:2181 --from-beginning --topic test
hello
消费者获取消息成功,至此,说明一个基本的Kafka环境安装并启动成功了。
3.flume写入kafka
修改conf/flume.conf
# 指定Flume sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
更多推荐
所有评论(0)