大数据之 Flume 对 接 Kafka 完整使用 (第六章)
大数据之 Flume 对 接 Kafka 完整使用一、Flume 对 接 Kafka1)配置 flume(flume-kafka.conf)2) 启动 kafkaIDEA 消费者3) 进入 flume 根目录下,启动 flume4) 向 /opt/module/data/flume.log 里追加数据,查看 kafka 消费者消费情况一、Flume 对 接 Kafka1)配置 flume(flum
·
大数据之 Flume 对 接 Kafka 完整使用
一、Flume 对 接 Kafka
1)配置 flume(flume-kafka.conf)
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/flume.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers =
hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) 启动 kafkaIDEA 消费者
3) 进入 flume 根目录下,启动 flume
$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
4) 向 /opt/module/data/flume.log 里追加数据,查看 kafka 消费者消费情况
$ echo hello >> /opt/module/data/flume.log
二、为什么要kafka对接Flume
1、问题
采集日志给多个人使用
如果使用flume、那就的再多加一个channel、不能动态加业务线
增加业务线动态增加(类似消费者可以动态增加、副本数不变)
三、kafka对接Flume (数据分类)
1、编码
监听头部信息 headers.put(“topic”, “first”);
package org.example.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 使用flume 判断 卡法卡 走那个主题 自定义拦截器
* flum 自定义拦截器
* 定义类型拦截器
*/
public class TypeInterceptor implements Interceptor {
//声明一个集合、用于存放拦截器处理后的事件
private List<Event> addHeaderEvents;
@Override
public void initialize() {
//初始化集合用于存放拦截器处理后的事件
addHeaderEvents = new ArrayList<>();
}
/**
* 单个事件处理方法
* event 包含body 和header
* https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
*
* @param event
* @return
*/
@Override
public Event intercept(Event event) {
//1、获取事件中的头部信息 header & body
Map<String, String> headers = event.getHeaders();
//获取事件中的body信息
String body = new String(event.getBody());
//根据body中是否有hello 来决定添加怎样的头部信息
if (body.contains("hello")) {
headers.put("topic", "first");
} else {
headers.put("topic", "second");
}
//返回数据
return event;
}
/**
* 批量事件处理方法
*
* @param list
* @return
*/
@Override
public List<Event> intercept(List<Event> list) {
//清空集合
addHeaderEvents.clear();
for (Event event : list) {
//交给单个Event 处理
addHeaderEvents.add(intercept(event));
}
//返回数据
return addHeaderEvents;
}
@Override
public void close() {
}
/**
* a1.sources = r1
* a1.sinks = k1
* a1.channels = c1
* a1.sources.r1.interceptors = i1 i2
* a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
* a1.sources.r1.interceptors.i1.preserveExisting = false
* a1.sources.r1.interceptors.i1.hostHeader = hostname
* a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
* a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
* a1.sinks.k1.channel = c1
* 帮助构建拦截器对象 $Builder
*/
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
2、丢到服务器
位置是flume的lib下
3、在flume 的job里面新增分类文件如下
新增配置属性
#Name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.example.interceptor.TypeInterceptor$Builder
#Chabbel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#事务容量
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、启动两个消费者
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic second
5、启动flume
bin/flume-ng agent -c conf/ -f job/type_kafka.conf -n a1
6、开启发送数据端口
nc localhost 44444
查看效果
更多推荐
已为社区贡献2条内容
所有评论(0)