在这里插入图片描述


1.场景

在实时流处理工程里,既有spark代码,又有kafka代码,二者之间的依赖,依赖的依赖,把idea搞蒙了。
依赖也正确写了,就是找不到类,清除缓存反复加载,也解决不了。
Caused by: java.lang.ClassNotFoundException。。。。
在这里插入图片描述


2.代码如下

import com.tk.dwd.etltools.DateJudge;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;


import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.util.ArrayList;
import java.util.List;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.text.ParseException;
import java.util.Properties;


public class ODS2DWD_ETL_TOPIC_5T_TCDS_M_RT_EVENT_DATA {
    public static void main(String[] args) throws IOException, ParseException {
        String str = "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
                "!!!!!============!!!!!!!!!!!!=============!!!!!!!!!!!=======!!!!!!!!!!!!\n" +
                "!!!!!!!==========!!!!!!!!!!!!===!!!==!!===!!!!!!!!!!!!!===!!!!!!!!!!!!!!\n" +
                "!!!!!!!==!!!!!!===!!!!!!!!!!!==!!!!==!!!==!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
                "!!!!!!!==!!!!!!!==!!!!!!!!!!!==!!!!==!!!!=!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
                "!!!!!!!==!!!!!=!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
                "!!!!!!!==!!!!==!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
                "!!!!!!!========!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
                "!!!!!!!==!!!!==!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
                "!!!!!!!==!!!!!=!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
                "!!!!!!!==!!!!!=!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!=!!!!!!\n" +
                "!!!!!!!==!!!!!!!==!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!=!!!!!!\n" +
                "!!!!!!!==!!!!!!!==!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!==!!!!!!\n" +
                "!!!!!!!===!!!!====!!!!!!!!!!!!!!!!===!!!!!!!!!!!!!!!!!!====!!!====!!!!!!\n" +
                "!!!!!============!!!!!!!!!!!!!!!=======!!!!!!!!!!!!!!=============!!!!!!\n" +
                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!";
        System.out.println(str);
        System.out.println("================================================etl beginning ================================================");

        // 定义输入的topic
        String from = "EVENT_DATA";
        // 定义输出的topic
        String to = "DWD_EVENT_DATA";

        // 设置参数
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop104:9092");
        System.out.println("condfig:" + props);
        // StreamsConfig config = new StreamsConfig(props);
        System.out.println("0001");
        // 构建拓扑

        TopologyBuilder builder = new TopologyBuilder();

            builder.addSource("SOURCE", from)
                    .addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {

                        @Override
                        public Processor<byte[], byte[]> get() {
                            // 具体分析处理
                            return new LogProcessor();
                        }
                    }, "SOURCE")
                    .addSink("SINK", to, "PROCESS");
        //
            // 创建kafka stream
            KafkaStreams streams = new KafkaStreams(builder, props);
            streams.start();
        }
    }


class LogProcessor implements Processor<byte[], byte[]> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(byte[] key, byte[] value) {
        String input = new String(value);
        System.out.println(input);
        JSONArray jarr = JSONArray.parseArray(input);
        JSONObject json = jarr.getJSONObject(0);
        System.out.println(json);



        // 如果包含“>>>”则只保留该标记后面的内容
        if (input.contains(">>>")) {
            input = input.split(">>>")[1].trim();
            // 输出到下一个topic
            context.forward("logProcessor".getBytes(), input.getBytes());
        } else {
            context.forward("logProcessor".getBytes(), input.getBytes());
        }
    }

    @Override
    public void punctuate(long timestamp) {

    }

    @Override
    public void close() {

    }
}

3.解决方案

将kafka streams的代码,移动到单独的模块里,与spark模块独立开来,这样就可以正确的加载各种类了。

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐