ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
../bin/schema-registry-start ../etc/schema-registry/schema-registry.properties Confluent schema-registry启动失败,报错:ERROR Server died unexpectedly:(io.confluent.kafka.schemaregistry.rest.SchemaRegistry...
·
../bin/schema-registry-start ../etc/schema-registry/schema-registry.properties
Confluent schema-registry启动失败,报错:
ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
由于日志打印不全,google了好多,看到的答案也不是我的问题,唯一一个github上的issue比较像,owner给出的solution是版本兼容性问题,所以换个版本就好了。
但是我用的卡夫卡(apache kafka kafka_2.11-1.0.0-cp1.jar,非confluent自带的)版本和confluent(kafka_2.12-0.10.2.1.jar)的版本相差不是很大。 而且换版本还得重新下载。
最主要的是找不出到底是哪里的问题(源码中打出的日志实在是太少了),无奈,只能下载源码。github
然后自己对源码加日志然后打包kafka-schema-registry-4.0.0.jar,放到
...confluent-4.0.0(为confluent安装目录))/share/java/schema-registry
然后调试,最后发现是:
[2018-04-08 03:44:15,637] INFO Validating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:231)
[2018-04-08 03:44:15,649] INFO Validating schemas :_schema,partitions:1 (io.confluent.kafka.schemaregistry.storage.KafkaStore:238)
[2018-04-08 03:44:15,655] ERROR UnsupportedVersionException:org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_CONFIGS (io.confluent.kafka.schemaregistry.storage.KafkaStore:272)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_CONFIGS
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.verifySchemaTopic(KafkaStore.java:257)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:167)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:113)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:199)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:64)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:1)
at io.confluent.rest.Application.createServer(Application.java:157)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)
Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_CONFIGS
[2018-04-08 03:44:15,659] INFO verifySchemaTopic ok! (io.confluent.kafka.schemaregistry.storage.KafkaStore:274)
果然还是版本兼容性问题。
io.confluent.kafka.schemaregistry.storage.KafkaStore(#227)
private void verifySchemaTopic(AdminClient admin) throws StoreInitializationException,
InterruptedException,
ExecutionException,
TimeoutException {
log.info("Validating schemas topic {}", topic);
Set<String> topics = Collections.singleton(topic);
Map<String, TopicDescription> topicDescription = admin.describeTopics(topics)
.all().get(initTimeout, TimeUnit.MILLISECONDS);
TopicDescription description = topicDescription.get(topic);
final int numPartitions = description.partitions().size();
log.info("Validating schemas :_schema,partitions:"+ numPartitions );
if (numPartitions != 1) {
throw new StoreInitializationException("The schema topic " + topic + " should have only 1 "
+ "partition but has " + numPartitions);
}
if (description.partitions().get(0).replicas().size() < desiredReplicationFactor) {
log.warn("The replication factor of the schema topic "
+ topic
+ " is less than the desired one of "
+ desiredReplicationFactor
+ ". If this is a production environment, it's crucial to add more brokers and "
+ "increase the replication factor of the topic.");
}
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
try { //catch主异常,此处只是用来判断retentionPolicy,配置文件用的默认retentionPolicy,所以即使跳过也没问题。
Map<ConfigResource, Config> configs =
admin.describeConfigs(Collections.singleton(topicResource)).all()
.get(initTimeout, TimeUnit.MILLISECONDS);
Config topicConfigs = configs.get(topicResource);
String retentionPolicy = topicConfigs.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
if (retentionPolicy == null || !TopicConfig.CLEANUP_POLICY_COMPACT.equals(retentionPolicy)) {
log.error("The retention policy of the schema topic " + topic + " is incorrect. "
+ "You must configure the topic to 'compact' cleanup policy to avoid Kafka "
+ "deleting your schemas after a week. "
+ "Refer to Kafka documentation for more details on cleanup policies");
throw new StoreInitializationException("The retention policy of the schema topic " + topic
+ " is incorrect. Expected cleanup.policy to be "
+ "'compact' but it is " + retentionPolicy);
}
} catch (Exception e) { // UnsupportedVersionException: The broker does not support DESCRIBE_CONFIGS
log.error("UnsupportedVersionException:"+e.getLocalizedMessage(),e);
}
log.info("verifySchemaTopic ok!" );
}
更多推荐
已为社区贡献1条内容
所有评论(0)