一  Kafka 1

1 版本选择 1

2 集群搭建规划 1

3 修改Kafka相关配置文件 1

4 配置全局变量 1

5 启动Kafka集群 2

6 测试数据 3

二 Flume 5

1 版本选择 5

2修改Flume相关配置文件 5

3 配置全局变量 6

4 启动Flume 6

5 测试数据 7

三 Flume + Kafka测试启动命令 10

1 Flume启动测试命令 10

2 Kafaka启动测试命令 10

 

 

一  Kafka

1 版本选择

CDH-3.7.5的组件版本:KAFKA-3.1.0-1.3.1.0.p0.35

2 集群搭建规划

 

Manager

Namenode

Datanode

ZooKeeper

Kafka(broker)

Flume(agent)

---

---

 

【注意:必须配置好主机(免密登录、修改host主机名)】

3 修改Kafka相关配置文件

第一步

cd /opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35/etc/kafka/conf.dist

第二步

vi  server.properties

第三步:(主要修改其中的6个参数)

broker.id=0  //标示符(三台主机的id分别为0,1,2)

  host.name=manager //绑定的主机

  log.dirs=/usr/local/soft/kafka/kafka-logs  //数据保存的位置

  log.retention.hours=168  //数据的保留时间(168 hours=7天)

  zookeeper.connect=manager:2181,namenode:2181,datanode:2181

  delete.topic.enable=true //可以删除已创建主题

4 配置全局变量

(1)vi .bashrc

(2)添加配置

export KAFKA_HOME=/opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35

export PATH=$PATH:$KAFKA_HOME/bin

  1. :退出保存,执行以下命令
  2. :将配置好的.bashrc文件分发给Namenode、DataNode主机

scp -r .bashrc namenode:$PWD

scp -r .bashrc datanode:$PWD

5 启动Kafka集群

(1)启动命令:

kafka-server-start.sh /opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35/etc/kafka/conf.dist/server.properties

(2)启动成功:

  1. 使用jps命令查看

6 测试数据

  1. 查看当前有哪些主题命令(此处选择manager主机的zk):

  kafka-topics.sh --list --zookeeper manager:2181

      2.创建新的主题命令(设置为3个副本,1个分区):

kafka-topics.sh --create --zookeeper manager:2181,namenode:2181,datanode:2181 --replication-factor 2 --partitions 1 --topic test

    在manager上模拟,往kafka的test主题里面发送数据,然后datanode上去消费这个数据:

 

Manager发送数据命令:

kafka-console-producer.sh --broker-list

manager:9092,namenode:9092,datanode:9092  --topic test

 

Datanode消费数据命令:

kafka-console-consumer.sh --zookeeper datanode:2181 --from-beginning --topic test

 

 

(4)删除主题命令:

kafka-topics --delete --topic realtime --zookeeper manager:2181

 

 

 

二 Flume

1 版本选择

CDH-3.7.5的组件版本

2修改Flume相关配置文件

  1. 首先,进入flume的配置目录:

/opt/cloudera/parcels/CDH-5.7.5-1.cdh5.7.5.p0.3/etc/flume-ng/conf.empty

  1. 其次,新建一个文件:

flume-conf.properties

[注意:如果文件中有flume.conf,可以直接删除,新建的文件名字可以随意取,但是后缀必须是.properties]

  1. 在flume-conf.properties文件中添加链接kafka的配置:

a1.sources = r1

a1.sinks = s1

a1.channels = c1

 

#sources 消息生产

a1.sources.r1.type = spooldir   

a1.sources.r1.channels = c1

a1.sources.r1.spoolDir = /usr/local/soft/flume/flume_dir //用于存放收集的日志

a1.sources.r1.fileHeader = false

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

 

#channels 消息传递

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

#sinks 消息消费

a1.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.s1.brokerList = manager:9092,namenode:9092,datanode:9092 //链接kafka

a1.sinks.s1.topics = test//flume收集的日志分发给kafka的对应主题名称

a1.sinks.s1.requiredAcks = 1

a1.sinks.s1.batchSize = 20

a1.sinks.s1.channel = c1  //注意这里是channel不是channels

3 配置全局变量

  1. 回到根目录输入命令: 

vi .bashrc

  1. 添加配置:

export FLUME_HOME=/opt/cloudera/parcels/CDH-5.7.5-1.cdh5.7.5.p0.3/lib/flume-ng

export PATH=$PATH:$FLUME_HOME/bin

  1. 保存 --> 退出 --> 更新

source .bashrc

4 启动Flume

  1. 分别启动zookeeper集群、kafka集群:

因为是在Cloudera Manager上直接添加的服务组件,所以可以直接在CM上启动。

 

  1. 因为是在主机manager上配置的flume,因此启动manager主机上的flume,启动命令如下:

1)cd /opt/cloudera/parcels/CDH-5.7.5-1.cdh5.7.5.p0.3

2)bin/flume-ng agent --conf conf --conf-file

etc/flume-ng/conf.empty/flume-conf.properties --name a1

-Dflume.root.logger=INFO,console

 

 

 

5 测试数据

(1)在namenode主机上添加一个kafka.txt文件,然后发送到manager主机上的/usr/local/soft/flume/flume_dir目录中,这时可以发现manager主机上发生如下变化:

(2)随机选择dataNode主机,查看从manager主机上的flume传过来的数据:

1)kafka-console-consumer.sh --zookeeper

manager:2181,namenode:2181,datanode:2181 --from-beginning --topic test

(3)Cloudera Manager上的监控状态

 

三 Flume + Kafka测试启动命令

1 Flume启动测试命令

1)cd /opt/cloudera/parcels/CDH-5.7.5-1.cdh5.7.5.p0.3

2) bin/flume-ng agent --conf conf --conf-file etc/flume-ng/conf.empty/flume-conf.properties --name

a1 -Dflume.root.logger=INFO,console

2 Kafaka启动测试命令

kafka-console-consumer.sh --zookeeper manager:2181,namenode:2181,datanode:2181 --from-beginning --topic test

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐