1 、Confluent Schema Registry 安装教程
参考连接:https://www.jianshu.com/p/cd6f413d35b0
原文链接:https://blog.csdn.net/weixin_41609807/article/details/103820327
1.S[chema Registry的各个发现行版本的下载链接](https://www.confluent.io/previous-versions/)
2.上传到linux系统进行解压安装。
3.本教程使用外部以安装好的Kafka集群不使用内部默认的。
4.修改confluent-5.3.1/etc/schema-registry/schema-registry.properties配置

#注册服务器的监听地址及其端口号
listeners=http://0.0.0.0:8081

#有关连接外部集群的地址有两种方式:1 通过zk连接 2 通过kafka的控制器 。 本教程采用zk连接
kafkastore.connection.url=henghe-042:2181

#The name of the topic to store schemas in
kafkastore.topic=_schemas

#If true, API requests that fail will include extra debugging information, including stack traces
debug=false

注册服务器的启动…/…/bin/schema-registry-start -daemon …/…/etc/schema-registry/schema-registry.properties
注册服务器的API接口

#Register a new version of a schema under the subject “Kafka-key”
$ curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“schema”: “{“type”: “string”}”}’
http://localhost:8081/subjects/Kafka-key/versions
{“id”:1}

#Register a new version of a schema under the subject “Kafka-value”
$ curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“schema”: “{“type”: “string”}”}’
http://localhost:8081/subjects/Kafka-value/versions
{“id”:1}

#List all subjects
$ curl -X GET http://localhost:8081/subjects
[“Kafka-value”,“Kafka-key”]

#List all schema versions registered under the subject “Kafka-value”
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
[1]

#Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
{“schema”:"“string”"}

#Fetch version 1 of the schema registered under subject “Kafka-value”
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
{“subject”:“Kafka-value”,“version”:1,“id”:1,“schema”:"“string”"}

#Fetch the most recently registered schema under subject “Kafka-value”
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
{“subject”:“Kafka-value”,“version”:1,“id”:1,“schema”:"“string”"}

#Delete version 3 of the schema registered under subject “Kafka-value”
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
3

#Delete all versions of the schema registered under subject “Kafka-value”
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
[1, 2, 3, 4, 5]

#Check whether a schema has been registered under subject “Kafka-key”
$ curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“schema”: “{“type”: “string”}”}’
http://localhost:8081/subjects/Kafka-key
{“subject”:“Kafka-key”,“version”:1,“id”:1,“schema”:"“string”"}

#Test compatibility of a schema with the latest schema under subject “Kafka-value”
$ curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“schema”: “{“type”: “string”}”}’
http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
{“is_compatible”:true}

#Get top level config
$ curl -X GET http://localhost:8081/config
{“compatibilityLevel”:“BACKWARD”}

#Update compatibility requirements globally
$ curl -X PUT -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“compatibility”: “NONE”}’
http://localhost:8081/config
{“compatibility”:“NONE”}

#Update compatibility requirements under the subject “Kafka-value”
$ curl -X PUT -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“compatibility”: “BACKWARD”}’
http://localhost:8081/config/Kafka-value
{“compatibility”:“BACKWARD”}

2、Confluent Schema Registry 使用教程

创建java工程的pom依赖
<?xml version="1.0" encoding="UTF-8"?>


4.0.0

<groupId>com.yss</groupId>
<artifactId>Kafka</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.0</version>
    </dependency>
    <!--此依赖是通过本地依赖库导入的,有关如何把jar放入本地依赖库自行搜索-->
    <!--本人的jar文件是在编译源码时自动到依赖库中的所以直接引用-->
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>5.3.2</version>
    </dependency>
</dependencies>
生产者示例:

package com.registry;

import java.util.Properties;
import java.util.Random;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**

  • @description:

  • @author: wangshuai

  • @create: 2020-01-03 14:17
    **/
    public class ConfluentProducer {
    public static final String USER_SCHEMA = "{“type”: “record”, “name”: “User”, " +
    "“fields”: [{“name”: “id”, “type”: “int”}, " +
    “{“name”: “name”, “type”: “string”}, {“name”: “age”, “type”: “int”}]}”;

    public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put(“bootstrap.servers”, “192.168.101.42:9092”);
    props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
    // 使用Confluent实现的KafkaAvroSerializer
    props.put(“value.serializer”, “io.confluent.kafka.serializers.KafkaAvroSerializer”);
    // 添加schema服务的地址,用于获取schema
    props.put(“schema.registry.url”, “http://192.168.101.42:8081”);
    Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Random rand = new Random();
    int id = 0;
    while (id < 100) {
    id++;
    String name = “name” + id;
    int age = rand.nextInt(40) + 1;
    GenericRecord user = new GenericData.Record(schema);
    user.put(“id”, id);
    user.put(“name”, name);
    user.put(“age”, age);
    ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(“test-topic”, user);

         producer.send(record);
         Thread.sleep(1000);
     }
    
     producer.close();
    

    }
    }

    消费者示例:

package com.registry;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**

  • @description:

  • @author: wangshuai

  • @create: 2020-01-03 14:34
    **/
    public class ConfluentConsumer {

    public static void main(String[] args) throws Exception {

     Properties props = new Properties();
     props.put("bootstrap.servers", "192.168.101.42:9092");
     props.put("group.id", "test1");
     props.put("enable.auto.commit", "false");
     // 配置禁止自动提交,每次从头消费供测试使用
     props.put("auto.offset.reset", "earliest");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     // 使用Confluent实现的KafkaAvroDeserializer
     props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
     // 添加schema服务的地址,用于获取schema
     props.put("schema.registry.url", "http://192.168.101.42:8081");
     KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
    
     consumer.subscribe(Collections.singletonList("test-topic"));
    
     try {
         while (true) {
             ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));
             for (ConsumerRecord<String, GenericRecord> record : records) {
                 GenericRecord user = record.value();
                 System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
                         + user.get("name") + ", " + "user.age = " + user.get("age") + "], "
                         + "partition = " + record.partition() + ", " + "offset = " + record.offset());
             }
         }
     } finally {
         consumer.close();
     }
    

    }
    }

    消费者消费结果示例。
    结果示例
    在这里插入图片描述

这里是引用

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐