前言

在工作中使用DataX组件来封装Sink端写入Kafka中,在使用Schema注册器时遇到的问题。本人的另一篇博客,中所使用的方式在这种方式下存在一个非常严重的bug,avro只支持基本数据对应date类的数据类型时无法序列化。最终导致本人已写好的代码进行了重构。对应date数据类型在国内使用时会出现反序列化的数据不一致。解决方案参考本人的另一篇博客。

直接上Demo演示

  1. 生成者
package org.apache.kafka.connect.runtime;

import io.confluent.connect.avro.AvroConverter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.data.*;

import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Properties;

/**
 * @ description:
 * @ author: wangshuai
 * @ create: 2020-05-14 16:35
 **/
public class StructSchema {

    public static void main(String[] args) throws InterruptedException, ParseException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "master:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 使用Confluent实现的KafkaAvroSerializer
//        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://slaver:8081");
        Producer<String, byte[]> producer = new KafkaProducer<>(props);
        HashMap<String, Object> converterConfig = new HashMap<String, Object>() {{
            put("schemas.enable", true);
            put("value.converter", "io.confluent.connect.avro.AvroConverter");
            put("schema.registry.url", "http://slaver:8081");
        }};
        AvroConverter avroConverter = new AvroConverter();
        avroConverter.configure(converterConfig, false);
        int id = 0;

        Date.builder().optional().schema();
        Schema schema = SchemaBuilder.struct().name("testId")
                .field("the_byte", Schema.OPTIONAL_INT8_SCHEMA)
                .field("the_short", Schema.OPTIONAL_INT16_SCHEMA)
                .field("the_int", Schema.OPTIONAL_INT32_SCHEMA)
                .field("the_long", Schema.OPTIONAL_INT64_SCHEMA)
                .field("the_float", Schema.OPTIONAL_FLOAT32_SCHEMA)
                .field("the_double", Schema.OPTIONAL_FLOAT64_SCHEMA)
                .field("the_bool", Schema.OPTIONAL_BOOLEAN_SCHEMA)
                .field("the_string", Schema.OPTIONAL_STRING_SCHEMA)
                .field("the_bytes", Schema.OPTIONAL_BYTES_SCHEMA)
                .field("the_decimal", Decimal.builder(3).parameter("connect.decimal.precision", "38").optional().schema()) //配置成38是因为再给hive集成时最大只支持38
                .field("the_date", Date.builder().optional().schema())
                .field("the_time", Time.builder().optional().schema())
                .field("the_timestamp", Timestamp.builder().optional().schema());
        final java.util.Date instant = new java.util.Date(System.currentTimeMillis());
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String date = sdf.format(instant);
        final Struct struct = new Struct(schema)
                .put("the_byte", (byte) -32)
                .put("the_short", (short) 1234)
                .put("the_int", 31)
                .put("the_long", 12425436L)
                .put("the_float", 2356.3f)
                .put("the_double", -2436546.56457d)
                .put("the_bool", true)
                .put("the_string", "foo")
                .put("the_bytes", new byte[]{-32, 124})
                .put("the_decimal", new BigDecimal("1234.567"))
                .put("the_date", sdf.parse(date))
                .put("the_time", instant)
                .put("the_timestamp", instant);
        while (id < 100) {
            id++;
            byte[] bytes = avroConverter.fromConnectData("schema-test-topic4", schema, struct);

            ProducerRecord<String, byte[]> record = new ProducerRecord<>("schema-test-topic4", bytes);

            producer.send(record);
            Thread.sleep(100);
        }

        producer.close();
    }

}

文中的具体思想就是再了解Kafak Connect 的底层原理后,所做的封装处理,类似与伪装成Kafka Connct的Source端来进行数据的schema的注册与发送。

  1. 消费者
package com.registry;

import io.confluent.connect.avro.AvroConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;

/**
 * @description:
 * @author: wangshuai
 * @create: 2020-01-03 14:34
 **/
public class ConfluentConsumer {


    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "master:9092");
        props.put("group.id", "test1");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 使用Confluent实现的KafkaAvroDeserializer
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
//        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
//        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 添加schema服务的地址,用于获取schema
//        props.put("schema.registry.url", "http://master:8081");
        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);

        HashMap<String, Object> converterConfig = new HashMap<String, Object>() {{
            put("schemas.enable", true);
            put("value.converter", "io.confluent.connect.avro.AvroConverter");
            put("schema.registry.url", "http://master:8081");
        }};
        AvroConverter avroConverter = new AvroConverter();
        avroConverter.configure(converterConfig, false);


        consumer.subscribe(Collections.singletonList("jdbc-sourcetest"));

        try {
            while (true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, byte[]> record : records) {
                    SchemaAndValue schemaAndValue = avroConverter.toConnectData("jdbc-sourcetest", record.value());
//                    System.out.println(schemaAndValue.value());
                    Struct struct = (Struct) schemaAndValue.value();
//                    if (struct.toString().contains("the_date")){
//                        System.out.println(struct);
//                        Object the_date = struct.get("the_date");
//                        Object the_timestamp = struct.get("the_timestamp");
//                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
//                        System.out.println(sdf.format(the_date)+"=============="+sdf.format(the_timestamp));
//                    }
                    System.out.println(struct);

//                    System.out.println(record.value().getSchema());
//                    break;
//                    System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
//                            + user.get("name") + ", " + "user.age = " + user.get("age") + "], "
//                            + "partition = " + record.partition() + ", " + "offset = " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

在这里插入图片描述

  1. POM文件
 		<dependency>
 		// 此依赖包需要自行编译
            <groupId>io.confluent</groupId>
            <artifactId>kafka-connect-avro-converter</artifactId>
            <version>5.3.2</version>
        </dependency>
          <dependency>
          // 此Api 需要改成 Kafka Connect的API
                <groupId>com.yss</groupId>
                <artifactId>api</artifactId>
                <version>1.0</version>
            </dependency>
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐