Kafka 的 Confluent Schema Registry安装学习
1 、Confluent Schema Registry 安装教程原文链接:https://blog.csdn.net/weixin_41609807/article/details/1038203271.S[chema Registry的各个发现行版本的下载链接](https://www.confluent.io/previous-versions/)2.上传到linux系统进行解压安装。...
1 、Confluent Schema Registry 安装教程
参考连接:https://www.jianshu.com/p/cd6f413d35b0
原文链接:https://blog.csdn.net/weixin_41609807/article/details/103820327
1.S[chema Registry的各个发现行版本的下载链接](https://www.confluent.io/previous-versions/)
2.上传到linux系统进行解压安装。
3.本教程使用外部以安装好的Kafka集群不使用内部默认的。
4.修改confluent-5.3.1/etc/schema-registry/schema-registry.properties配置
#注册服务器的监听地址及其端口号
listeners=http://0.0.0.0:8081
#有关连接外部集群的地址有两种方式:1 通过zk连接 2 通过kafka的控制器 。 本教程采用zk连接
kafkastore.connection.url=henghe-042:2181
#The name of the topic to store schemas in
kafkastore.topic=_schemas
#If true, API requests that fail will include extra debugging information, including stack traces
debug=false
注册服务器的启动…/…/bin/schema-registry-start -daemon …/…/etc/schema-registry/schema-registry.properties
注册服务器的API接口
#Register a new version of a schema under the subject “Kafka-key”
$ curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“schema”: “{“type”: “string”}”}’
http://localhost:8081/subjects/Kafka-key/versions
{“id”:1}
#Register a new version of a schema under the subject “Kafka-value”
$ curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“schema”: “{“type”: “string”}”}’
http://localhost:8081/subjects/Kafka-value/versions
{“id”:1}
#List all subjects
$ curl -X GET http://localhost:8081/subjects
[“Kafka-value”,“Kafka-key”]
#List all schema versions registered under the subject “Kafka-value”
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
[1]
#Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
{“schema”:"“string”"}
#Fetch version 1 of the schema registered under subject “Kafka-value”
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
{“subject”:“Kafka-value”,“version”:1,“id”:1,“schema”:"“string”"}
#Fetch the most recently registered schema under subject “Kafka-value”
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
{“subject”:“Kafka-value”,“version”:1,“id”:1,“schema”:"“string”"}
#Delete version 3 of the schema registered under subject “Kafka-value”
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
3
#Delete all versions of the schema registered under subject “Kafka-value”
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
[1, 2, 3, 4, 5]
#Check whether a schema has been registered under subject “Kafka-key”
$ curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“schema”: “{“type”: “string”}”}’
http://localhost:8081/subjects/Kafka-key
{“subject”:“Kafka-key”,“version”:1,“id”:1,“schema”:"“string”"}
#Test compatibility of a schema with the latest schema under subject “Kafka-value”
$ curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“schema”: “{“type”: “string”}”}’
http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
{“is_compatible”:true}
#Get top level config
$ curl -X GET http://localhost:8081/config
{“compatibilityLevel”:“BACKWARD”}
#Update compatibility requirements globally
$ curl -X PUT -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“compatibility”: “NONE”}’
http://localhost:8081/config
{“compatibility”:“NONE”}
#Update compatibility requirements under the subject “Kafka-value”
$ curl -X PUT -H “Content-Type: application/vnd.schemaregistry.v1+json”
–data ‘{“compatibility”: “BACKWARD”}’
http://localhost:8081/config/Kafka-value
{“compatibility”:“BACKWARD”}
2、Confluent Schema Registry 使用教程
创建java工程的pom依赖
<?xml version="1.0" encoding="UTF-8"?>
4.0.0
<groupId>com.yss</groupId>
<artifactId>Kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
<!--此依赖是通过本地依赖库导入的,有关如何把jar放入本地依赖库自行搜索-->
<!--本人的jar文件是在编译源码时自动到依赖库中的所以直接引用-->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.2</version>
</dependency>
</dependencies>
生产者示例:
package com.registry;
import java.util.Properties;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
-
@description:
-
@author: wangshuai
-
@create: 2020-01-03 14:17
**/
public class ConfluentProducer {
public static final String USER_SCHEMA = "{“type”: “record”, “name”: “User”, " +
"“fields”: [{“name”: “id”, “type”: “int”}, " +
“{“name”: “name”, “type”: “string”}, {“name”: “age”, “type”: “int”}]}”;public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(“bootstrap.servers”, “192.168.101.42:9092”);
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
// 使用Confluent实现的KafkaAvroSerializer
props.put(“value.serializer”, “io.confluent.kafka.serializers.KafkaAvroSerializer”);
// 添加schema服务的地址,用于获取schema
props.put(“schema.registry.url”, “http://192.168.101.42:8081”);
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Random rand = new Random();
int id = 0;
while (id < 100) {
id++;
String name = “name” + id;
int age = rand.nextInt(40) + 1;
GenericRecord user = new GenericData.Record(schema);
user.put(“id”, id);
user.put(“name”, name);
user.put(“age”, age);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(“test-topic”, user);producer.send(record); Thread.sleep(1000); } producer.close();
}
}消费者示例:
package com.registry;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
-
@description:
-
@author: wangshuai
-
@create: 2020-01-03 14:34
**/
public class ConfluentConsumer {public static void main(String[] args) throws Exception {
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.101.42:9092"); props.put("group.id", "test1"); props.put("enable.auto.commit", "false"); // 配置禁止自动提交,每次从头消费供测试使用 props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 使用Confluent实现的KafkaAvroDeserializer props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); // 添加schema服务的地址,用于获取schema props.put("schema.registry.url", "http://192.168.101.42:8081"); KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); try { while (true) { ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, GenericRecord> record : records) { GenericRecord user = record.value(); System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = " + user.get("name") + ", " + "user.age = " + user.get("age") + "], " + "partition = " + record.partition() + ", " + "offset = " + record.offset()); } } } finally { consumer.close(); }
}
}消费者消费结果示例。
结果示例
这里是引用
更多推荐
所有评论(0)