Kafka Connect 中avro序列化Date数据类型时在使用官方推荐的方式编码时引发的血案
前言在工作中使用DataX组件来封装Sink端写入Kafka中,在使用Schema注册器时遇到的问题。本人的另一篇博客,中所使用的方式在这种方式下存在一个非常严重的bug,avro只支持基本数据对应date类的数据类型时无法序列化。最终导致本人已写好的代码进行了重构。对应date数据类型在国内使用时会出现反序列化的数据不一致。解决方案参考本人的另一篇博客。直接上Demo演示生成者package o
·
前言
在工作中使用DataX组件来封装Sink端写入Kafka中,在使用Schema注册器时遇到的问题。本人的另一篇博客,中所使用的方式在这种方式下存在一个非常严重的bug,avro只支持基本数据对应date类的数据类型时无法序列化。最终导致本人已写好的代码进行了重构。对应date数据类型在国内使用时会出现反序列化的数据不一致。解决方案参考本人的另一篇博客。
直接上Demo演示
- 生成者
package org.apache.kafka.connect.runtime;
import io.confluent.connect.avro.AvroConverter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.data.*;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Properties;
/**
* @ description:
* @ author: wangshuai
* @ create: 2020-05-14 16:35
**/
public class StructSchema {
public static void main(String[] args) throws InterruptedException, ParseException {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 使用Confluent实现的KafkaAvroSerializer
// props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// 添加schema服务的地址,用于获取schema
props.put("schema.registry.url", "http://slaver:8081");
Producer<String, byte[]> producer = new KafkaProducer<>(props);
HashMap<String, Object> converterConfig = new HashMap<String, Object>() {{
put("schemas.enable", true);
put("value.converter", "io.confluent.connect.avro.AvroConverter");
put("schema.registry.url", "http://slaver:8081");
}};
AvroConverter avroConverter = new AvroConverter();
avroConverter.configure(converterConfig, false);
int id = 0;
Date.builder().optional().schema();
Schema schema = SchemaBuilder.struct().name("testId")
.field("the_byte", Schema.OPTIONAL_INT8_SCHEMA)
.field("the_short", Schema.OPTIONAL_INT16_SCHEMA)
.field("the_int", Schema.OPTIONAL_INT32_SCHEMA)
.field("the_long", Schema.OPTIONAL_INT64_SCHEMA)
.field("the_float", Schema.OPTIONAL_FLOAT32_SCHEMA)
.field("the_double", Schema.OPTIONAL_FLOAT64_SCHEMA)
.field("the_bool", Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field("the_string", Schema.OPTIONAL_STRING_SCHEMA)
.field("the_bytes", Schema.OPTIONAL_BYTES_SCHEMA)
.field("the_decimal", Decimal.builder(3).parameter("connect.decimal.precision", "38").optional().schema()) //配置成38是因为再给hive集成时最大只支持38
.field("the_date", Date.builder().optional().schema())
.field("the_time", Time.builder().optional().schema())
.field("the_timestamp", Timestamp.builder().optional().schema());
final java.util.Date instant = new java.util.Date(System.currentTimeMillis());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String date = sdf.format(instant);
final Struct struct = new Struct(schema)
.put("the_byte", (byte) -32)
.put("the_short", (short) 1234)
.put("the_int", 31)
.put("the_long", 12425436L)
.put("the_float", 2356.3f)
.put("the_double", -2436546.56457d)
.put("the_bool", true)
.put("the_string", "foo")
.put("the_bytes", new byte[]{-32, 124})
.put("the_decimal", new BigDecimal("1234.567"))
.put("the_date", sdf.parse(date))
.put("the_time", instant)
.put("the_timestamp", instant);
while (id < 100) {
id++;
byte[] bytes = avroConverter.fromConnectData("schema-test-topic4", schema, struct);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("schema-test-topic4", bytes);
producer.send(record);
Thread.sleep(100);
}
producer.close();
}
}
文中的具体思想就是再了解Kafak Connect 的底层原理后,所做的封装处理,类似与伪装成Kafka Connct的Source端来进行数据的schema的注册与发送。
- 消费者
package com.registry;
import io.confluent.connect.avro.AvroConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
/**
* @description:
* @author: wangshuai
* @create: 2020-01-03 14:34
**/
public class ConfluentConsumer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("group.id", "test1");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 使用Confluent实现的KafkaAvroDeserializer
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
// props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
// props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 添加schema服务的地址,用于获取schema
// props.put("schema.registry.url", "http://master:8081");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
HashMap<String, Object> converterConfig = new HashMap<String, Object>() {{
put("schemas.enable", true);
put("value.converter", "io.confluent.connect.avro.AvroConverter");
put("schema.registry.url", "http://master:8081");
}};
AvroConverter avroConverter = new AvroConverter();
avroConverter.configure(converterConfig, false);
consumer.subscribe(Collections.singletonList("jdbc-sourcetest"));
try {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, byte[]> record : records) {
SchemaAndValue schemaAndValue = avroConverter.toConnectData("jdbc-sourcetest", record.value());
// System.out.println(schemaAndValue.value());
Struct struct = (Struct) schemaAndValue.value();
// if (struct.toString().contains("the_date")){
// System.out.println(struct);
// Object the_date = struct.get("the_date");
// Object the_timestamp = struct.get("the_timestamp");
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
// System.out.println(sdf.format(the_date)+"=============="+sdf.format(the_timestamp));
// }
System.out.println(struct);
// System.out.println(record.value().getSchema());
// break;
// System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
// + user.get("name") + ", " + "user.age = " + user.get("age") + "], "
// + "partition = " + record.partition() + ", " + "offset = " + record.offset());
}
}
} finally {
consumer.close();
}
}
}
- POM文件
<dependency>
// 此依赖包需要自行编译
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>5.3.2</version>
</dependency>
<dependency>
// 此Api 需要改成 Kafka Connect的API
<groupId>com.yss</groupId>
<artifactId>api</artifactId>
<version>1.0</version>
</dependency>
更多推荐
已为社区贡献3条内容
所有评论(0)