基于cdh的Kafka+Flume配置(详细,成功运行)
目 录一 Kafka 11 版本选择 12 集群搭建规划 13 修改Kafka相关配置文件 14 配置全局变量 15 启动Kafka集群 26 测试数据 3二 Flume 51 版本选择 52修改Flume相关配置文件 53 配置全局变量 64 启动Flume 65 测试数据 7三 Flume + Kafka测试启动命令 101 Flu...
目 录
1 版本选择
CDH-3.7.5的组件版本:KAFKA-3.1.0-1.3.1.0.p0.35
2 集群搭建规划
| Manager | Namenode | Datanode |
ZooKeeper | 是 | 是 | 是 |
Kafka(broker) | 是 | 是 | 是 |
Flume(agent) | 是 | --- | --- |
【注意:必须配置好主机(免密登录、修改host主机名)】
3 修改Kafka相关配置文件
第一步:
cd /opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35/etc/kafka/conf.dist
第二步:
vi server.properties
第三步:(主要修改其中的6个参数)
broker.id=0 //标示符(三台主机的id分别为0,1,2)
host.name=manager //绑定的主机
log.dirs=/usr/local/soft/kafka/kafka-logs //数据保存的位置
log.retention.hours=168 //数据的保留时间(168 hours=7天)
zookeeper.connect=manager:2181,namenode:2181,datanode:2181
delete.topic.enable=true //可以删除已创建主题
4 配置全局变量
(1)vi .bashrc
(2)添加配置
export KAFKA_HOME=/opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35
export PATH=$PATH:$KAFKA_HOME/bin
- :退出保存,执行以下命令
- :将配置好的.bashrc文件分发给Namenode、DataNode主机
scp -r .bashrc namenode:$PWD
scp -r .bashrc datanode:$PWD
5 启动Kafka集群
(1)启动命令:
kafka-server-start.sh /opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35/etc/kafka/conf.dist/server.properties
(2)启动成功:
- 使用jps命令查看
6 测试数据
- 查看当前有哪些主题命令(此处选择manager主机的zk):
kafka-topics.sh --list --zookeeper manager:2181
2.创建新的主题命令(设置为3个副本,1个分区):
kafka-topics.sh --create --zookeeper manager:2181,namenode:2181,datanode:2181 --replication-factor 2 --partitions 1 --topic test
在manager上模拟,往kafka的test主题里面发送数据,然后datanode上去消费这个数据:
kafka-console-producer.sh --broker-list
manager:9092,namenode:9092,datanode:9092 --topic test
kafka-console-consumer.sh --zookeeper datanode:2181 --from-beginning --topic test
(4)删除主题命令:
kafka-topics --delete --topic realtime --zookeeper manager:2181
二 Flume
1 版本选择
CDH-3.7.5的组件版本
2修改Flume相关配置文件
/opt/cloudera/parcels/CDH-5.7.5-1.cdh5.7.5.p0.3/etc/flume-ng/conf.empty
flume-conf.properties
[注意:如果文件中有flume.conf,可以直接删除,新建的文件名字可以随意取,但是后缀必须是.properties]
a1.sources = r1 a1.sinks = s1 a1.channels = c1
#sources 消息生产 a1.sources.r1.type = spooldir a1.sources.r1.channels = c1 a1.sources.r1.spoolDir = /usr/local/soft/flume/flume_dir //用于存放收集的日志 a1.sources.r1.fileHeader = false a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp
#channels 消息传递 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
#sinks 消息消费 a1.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.s1.brokerList = manager:9092,namenode:9092,datanode:9092 //链接kafka a1.sinks.s1.topics = test//flume收集的日志分发给kafka的对应主题名称 a1.sinks.s1.requiredAcks = 1 a1.sinks.s1.batchSize = 20 a1.sinks.s1.channel = c1 //注意这里是channel不是channels |
3 配置全局变量
- 回到根目录输入命令:
vi .bashrc
- 添加配置:
export FLUME_HOME=/opt/cloudera/parcels/CDH-5.7.5-1.cdh5.7.5.p0.3/lib/flume-ng
export PATH=$PATH:$FLUME_HOME/bin
- 保存 --> 退出 --> 更新
source .bashrc
4 启动Flume
- 分别启动zookeeper集群、kafka集群:
因为是在Cloudera Manager上直接添加的服务组件,所以可以直接在CM上启动。
- 因为是在主机manager上配置的flume,因此启动manager主机上的flume,启动命令如下:
1)cd /opt/cloudera/parcels/CDH-5.7.5-1.cdh5.7.5.p0.3
2)bin/flume-ng agent --conf conf --conf-file
etc/flume-ng/conf.empty/flume-conf.properties --name a1
-Dflume.root.logger=INFO,console
5 测试数据
(1)在namenode主机上添加一个kafka.txt文件,然后发送到manager主机上的/usr/local/soft/flume/flume_dir目录中,这时可以发现manager主机上发生如下变化:
(2)随机选择dataNode主机,查看从manager主机上的flume传过来的数据:
1)kafka-console-consumer.sh --zookeeper
manager:2181,namenode:2181,datanode:2181 --from-beginning --topic test
(3)Cloudera Manager上的监控状态
三 Flume + Kafka测试启动命令
1)cd /opt/cloudera/parcels/CDH-5.7.5-1.cdh5.7.5.p0.3
2) bin/flume-ng agent --conf conf --conf-file etc/flume-ng/conf.empty/flume-conf.properties --name
a1 -Dflume.root.logger=INFO,console
2 Kafaka启动测试命令
kafka-console-consumer.sh --zookeeper manager:2181,namenode:2181,datanode:2181 --from-beginning --topic test
更多推荐
所有评论(0)