kafka自定定义对象实现
1.异常 16:54:52.512 [main] INFOkafka.utils.VerifiableProperties - Property serializer.class is overridden to com.bpbp.kafka.encoder.OrderEncoderjava.lang.ExceptionInInitializerErrorat com.bpbp.kaf
·
1.异常
16:54:52.512 [main] INFO kafka.utils.VerifiableProperties - Property serializer.class is overridden to com.bpbp.kafka.encoder.OrderEncoder
java.lang.ExceptionInInitializerError
at com.bpbp.kafka.entity.OrderDataTest.test(OrderDataTest.java:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.NoSuchMethodException: com.bpbp.kafka.encoder.OrderEncoder.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:2892)
at java.lang.Class.getConstructor(Class.java:1723)
at kafka.utils.Utils$.createObject(Utils.scala:436)
at kafka.producer.Producer.<init>(Producer.scala:62)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at com.bpbp.kafka.Producer.init(Producer.java:44)
at com.bpbp.kafka.Producer.<clinit>(Producer.java:30)
... 28 more
原因是缺少构造方法
com.bpbp.kafka.encoder.OrderEncoder
public class OrderEncoder implements kafka.serializer.Encoder<OrderData> {
/**
* 增加构造方法
* @param verifiableProperties
*/
public OrderEncoder(VerifiableProperties verifiableProperties){
}
/**
* 对象转字节
* @param orderData
* @return
*/
public byte[] toBytes(OrderData orderData) {
return BeanUtil.object2Bytes(orderData);
}
附上其他源码:
@Slf4j
public class Producer {
//发送订单数据
private static kafka.javaapi.producer.Producer<String, OrderData> orderProducer = null;
static {
init();
}
/**
* 初始化Producer配置
*/
private static void init() {
Properties orderProp = new Properties();
orderProp.put("metadata.broker.list", PropUtil.getKafkaBroker());
orderProp.put("serializer.class", "com.bpbp.kafka.encoder.OrderEncoder");
//key 的序列化类
orderProp.put("key.serializer.class", "kafka.serializer.StringEncoder");
orderProp.put("request.required.acks", "-1");
//实例化订单发送者
orderProducer = new kafka.javaapi.producer.Producer<String, OrderData>(new ProducerConfig(
orderProp));
}
/**
* 发送订单数据
* @param key
* @param data
*/
public static void sendOrderData(String key, OrderData data) {
log.info("Send order data, orderid:", data.getBpbpOrderNo());
try {
orderProducer.send(new KeyedMessage<String, OrderData>(PropUtil.getOrderTopic(), key,
data));
} catch (Exception e) {
log.error("Send order data [{}] exception:",data.getBpbpOrderNo(), e.getMessage());
}
}
数据对象类定义
@Data
@ToString //必须
public class OrderData implements java.io.Serializable {
private String bpbpOrderNo;
/**
* 渠道号
*/
private String channelCode;
/**
* 时间戳-每次调用生成
*/
private long timeStamp;
}
消费者
public class Consumer {
private static ConsumerConnector orderConsumer;
static {
init();
}
private static void init(){
Properties prop = new Properties();
prop.put("zookeeper.connect", PropUtil.getZkUrl());
prop.put("group.id","order");
prop.put("zookeeper.session.timeout.ms","400");
prop.put("zookeeper.sync.time.ms","200");
prop.put("auto.commit.interval.ms","1000");
orderConsumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
}
public static void receive(){
Map<String,Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(PropUtil.getOrderTopic(),new Integer(1));
Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = orderConsumer.createMessageStreams(topicCountMap);
KafkaStream<byte[],byte[]> stream = consumerMap.get(PropUtil.getOrderTopic()).get(0);
ConsumerIterator<byte[],byte[]> it = stream.iterator();
while(it.hasNext()){
byte[] bytes = it.next().message();
OrderData orderData = (OrderData) BeanUtil.bytes2Object(bytes);
System.out.println(orderData.getBpbpOrderNo());
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)