flume主要是做日志数据(离线或实时)地采集

在这里插入图片描述

flume采集完毕数据之后,进行的离线处理和实时处理两条业务线,这里我们做的下面的这一条线

整合前提条件:

  • 保证zookeeper开启
  • 保证kafka服务开启
1.创建topic
kafka-topics.sh --create \
--topic flume-kafka \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka \
--partitions 3 \
--replication-factor 3
2.创建flume-agent配置文件

这里一般在flume的conf下新建

vi /home/hadoop/apps/apache-flume-1.8.0-bin/conf/flume-kafka-sink.conf

配置信息

  • 别名
  • source的配置信息
  • kafka的sink的配置
  • channel的配置
  • 绑定
## Flume agent's name
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop01
a1.sources.r1.port = 44444

# kafka's sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.topic = flume-kafka
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.开启消费者

这里我在hadoop02上启动

kafka-console-consumer.sh --topic flume-kafka \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--from-beginning
4.后台启动flume-agent

在hadoop01上

cd /home/hadoop/apps/apache-flume-1.8.0-bin

启动

nohup bin/flume-ng agent -n a1 -c conf -f conf/flume-kafka-sink.conf >/dev/null 2>&1 &

在这里插入图片描述
在这里插入图片描述

5.测试

向44444端口发送消息

telnet hadoop01 44444

结果如图

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐