1. 编写flume执行文件

vi eventsattend-flume-kafka.conf

eventattend.sources=eventAttendSource
eventattend.channels=eventAttendChannel
eventattend.sinks=eventAttendSink

eventattend.sources.eventAttendSource.type=spooldir
eventattend.sources.eventAttendSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/eventAttend
eventattend.sources.eventAttendSource.deserializer=LINE
eventattend.sources.eventAttendSource.deserializer.maxLineLength=320000
eventattend.sources.eventAttendSource.includePattern=eventAttend_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
eventattend.sources.eventAttendSource.interceptors=head_filter
eventattend.sources.eventAttendSource.interceptors.head_filter.type=regex_filter
eventattend.sources.eventAttendSource.interceptors.head_filter.regex=^event,yes*
eventattend.sources.eventAttendSource.interceptors.head_filter.excludeEvents=true

eventattend.channels.eventAttendChannel.type=file
eventattend.channels.eventAttendChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/eventAttend
eventattend.channels.eventAttendChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/eventAttend

eventattend.sinks.eventAttendSink.type=org.apache.flume.sink.kafka.KafkaSink
eventattend.sinks.eventAttendSink.batchSize=640
eventattend.sinks.eventAttendSink.brokerList=192.168.116.60:9092
eventattend.sinks.eventAttendSink.topic=eventAttend_raw

eventattend.sources.eventAttendSource.channels=eventAttendChannel
eventattend.sinks.eventAttendSink.channel=eventAttendChannel

执行见博客:https://blog.csdn.net/weixin_43434273/article/details/110880886

2. kafka stream 将一个topic里的消息进行业务处理,再写入到另一个topic进行消费(拆分开每个id的分类关系)

2.1 以 12112,1211,yes 的格式消费出来

2.2 代码:

package cn.bright.kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * @Author Bright
 * @Date 2020/12/18
 * @Description
 */
public class EventAttendStream {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"eventattendapp1");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.116.60:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<Object, Object> ear = builder.stream("eventAttend_raw");

        KStream<String, String> eventKStream = ear.flatMap((k, v) -> {               //event,yes,maybe,invited,no
            System.out.println(k + " " + v);   //去除 121, 的情况
            String[] split = v.toString().split(",");
            List<KeyValue<String, String>> list = new ArrayList<>();
            if (split.length >= 2 && split[1].trim().length() > 0) {
                String[] yes = split[1].split("\\s+");
                for (String y :
                        yes) {
                    list.add(new KeyValue<String, String>(null, split[0] + "," + y + ",yes"));  //12112,1211,yes
                }
            }
            if (split.length >= 3 && split[2].trim().length() > 0) {
                String[] maybe = split[2].split("\\s+");
                for (String mb :
                        maybe) {
                    list.add(new KeyValue<String, String>(null, split[0] + "," + mb + ",maybe"));  //12112,1211,maybe
                }
            }
            if (split.length >= 4 && split[3].trim().length() > 0) {
                String[] invited = split[3].split("\\s+");
                for (String inv :
                        invited) {
                    list.add(new KeyValue<String, String>(null, split[0] + "," + inv + ",invited"));  //12112,1211,invited
                }
            }
            if (split.length >= 5 && split[4].trim().length() > 0) {
                String[] no = split[4].split("\\s+");
                for (String n :
                        no) {
                    list.add(new KeyValue<String, String>(null, split[0] + "," + n + ",no"));  //12112,1211,no
                }
            }

            return list;
        });

        eventKStream.to("event_attendees");

        final Topology topo =builder.build();
        final KafkaStreams streams = new KafkaStreams(topo, prop);

        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("stream"){

            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        streams.start();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.exit(0);
    }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐