一、Flume 对 接 Kafka

1)配置 flume(flume-kafka.conf)

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/flume.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 
hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2) 启动 kafkaIDEA 消费者

3) 进入 flume 根目录下,启动 flume

$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4) 向 /opt/module/data/flume.log 里追加数据,查看 kafka 消费者消费情况

$ echo hello >> /opt/module/data/flume.log

二、为什么要kafka对接Flume

1、问题

采集日志给多个人使用
如果使用flume、那就的再多加一个channel、不能动态加业务线
增加业务线动态增加(类似消费者可以动态增加、副本数不变)

三、kafka对接Flume (数据分类)

1、编码

监听头部信息 headers.put(“topic”, “first”);

package org.example.interceptor;


import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * 使用flume 判断 卡法卡 走那个主题 自定义拦截器
 * flum 自定义拦截器
 * 定义类型拦截器
 */
public class TypeInterceptor implements Interceptor {


    //声明一个集合、用于存放拦截器处理后的事件
    private List<Event> addHeaderEvents;

    @Override
    public void initialize() {
        //初始化集合用于存放拦截器处理后的事件
        addHeaderEvents = new ArrayList<>();
    }


    /**
     * 单个事件处理方法
     * event 包含body 和header
     * https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
     *
     * @param event
     * @return
     */
    @Override
    public Event intercept(Event event) {
        //1、获取事件中的头部信息 header & body
        Map<String, String> headers = event.getHeaders();

        //获取事件中的body信息
        String body = new String(event.getBody());

        //根据body中是否有hello 来决定添加怎样的头部信息
        if (body.contains("hello")) {
            headers.put("topic", "first");

        } else {
            headers.put("topic", "second");
        }

        //返回数据
        return event;
    }


    /**
     * 批量事件处理方法
     *
     * @param list
     * @return
     */
    @Override
    public List<Event> intercept(List<Event> list) {
        //清空集合
        addHeaderEvents.clear();
        for (Event event : list) {
            //交给单个Event 处理
            addHeaderEvents.add(intercept(event));
        }

        //返回数据
        return addHeaderEvents;
    }

    @Override
    public void close() {


    }


    /**
     * a1.sources = r1
     * a1.sinks = k1
     * a1.channels = c1
     * a1.sources.r1.interceptors = i1 i2
     * a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
     * a1.sources.r1.interceptors.i1.preserveExisting = false
     * a1.sources.r1.interceptors.i1.hostHeader = hostname
     * a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
     * a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
     * a1.sinks.k1.channel = c1
     * 帮助构建拦截器对象  $Builder
     */
    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

}

2、丢到服务器

位置是flume的lib下

3、在flume 的job里面新增分类文件如下

在这里插入图片描述
新增配置属性

#Name
a1.sources = r1
a1.channels = c1
a1.sinks = k1


# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.example.interceptor.TypeInterceptor$Builder


#Chabbel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#事务容量
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


4、启动两个消费者

bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic second

5、启动flume

bin/flume-ng agent -c conf/ -f job/type_kafka.conf -n a1

6、开启发送数据端口

nc localhost 44444

查看效果
在这里插入图片描述

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐