Kafka streams报错:Caused by: java.lang.ClassNotFoundException
1.场景在实时流处理工程里,既有spark代码,又有kafka代码,二者之间的依赖,依赖的依赖,把idea搞蒙了。依赖也正确写了,就是找不到类,清除缓存反复加载,也解决不了。Caused by: java.lang.ClassNotFoundException。。。。2.代码如下import com.tk.dwd.etltools.DateJudge;import org.apache.kafka
·
1.场景
在实时流处理工程里,既有spark代码,又有kafka代码,二者之间的依赖,依赖的依赖,把idea搞蒙了。
依赖也正确写了,就是找不到类,清除缓存反复加载,也解决不了。
Caused by: java.lang.ClassNotFoundException。。。。
2.代码如下
import com.tk.dwd.etltools.DateJudge;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.List;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.text.ParseException;
import java.util.Properties;
public class ODS2DWD_ETL_TOPIC_5T_TCDS_M_RT_EVENT_DATA {
public static void main(String[] args) throws IOException, ParseException {
String str = "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
"!!!!!============!!!!!!!!!!!!=============!!!!!!!!!!!=======!!!!!!!!!!!!\n" +
"!!!!!!!==========!!!!!!!!!!!!===!!!==!!===!!!!!!!!!!!!!===!!!!!!!!!!!!!!\n" +
"!!!!!!!==!!!!!!===!!!!!!!!!!!==!!!!==!!!==!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
"!!!!!!!==!!!!!!!==!!!!!!!!!!!==!!!!==!!!!=!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
"!!!!!!!==!!!!!=!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
"!!!!!!!==!!!!==!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
"!!!!!!!========!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
"!!!!!!!==!!!!==!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
"!!!!!!!==!!!!!=!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!\n" +
"!!!!!!!==!!!!!=!!!!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!=!!!!!!\n" +
"!!!!!!!==!!!!!!!==!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!!=!!!!!!\n" +
"!!!!!!!==!!!!!!!==!!!!!!!!!!!!!!!!!==!!!!!!!!!!!!!!!!!!!==!!!!!!==!!!!!!\n" +
"!!!!!!!===!!!!====!!!!!!!!!!!!!!!!===!!!!!!!!!!!!!!!!!!====!!!====!!!!!!\n" +
"!!!!!============!!!!!!!!!!!!!!!=======!!!!!!!!!!!!!!=============!!!!!!\n" +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!";
System.out.println(str);
System.out.println("================================================etl beginning ================================================");
// 定义输入的topic
String from = "EVENT_DATA";
// 定义输出的topic
String to = "DWD_EVENT_DATA";
// 设置参数
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop104:9092");
System.out.println("condfig:" + props);
// StreamsConfig config = new StreamsConfig(props);
System.out.println("0001");
// 构建拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
// 具体分析处理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
//
// 创建kafka stream
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}
class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
System.out.println(input);
JSONArray jarr = JSONArray.parseArray(input);
JSONObject json = jarr.getJSONObject(0);
System.out.println(json);
// 如果包含“>>>”则只保留该标记后面的内容
if (input.contains(">>>")) {
input = input.split(">>>")[1].trim();
// 输出到下一个topic
context.forward("logProcessor".getBytes(), input.getBytes());
} else {
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
3.解决方案
将kafka streams的代码,移动到单独的模块里,与spark模块独立开来,这样就可以正确的加载各种类了。
更多推荐
已为社区贡献3条内容
所有评论(0)