../bin/schema-registry-start ../etc/schema-registry/schema-registry.properties 
Confluent schema-registry启动失败,报错:
ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)

由于日志打印不全,google了好多,看到的答案也不是我的问题,唯一一个github上的issue比较像,owner给出的solution是版本兼容性问题,所以换个版本就好了。

但是我用的卡夫卡(apache kafka kafka_2.11-1.0.0-cp1.jar,非confluent自带的)版本和confluent(kafka_2.12-0.10.2.1.jar)的版本相差不是很大。 而且换版本还得重新下载。

最主要的是找不出到底是哪里的问题(源码中打出的日志实在是太少了),无奈,只能下载源码。github

然后自己对源码加日志然后打包kafka-schema-registry-4.0.0.jar,放到

...confluent-4.0.0(为confluent安装目录))/share/java/schema-registry

然后调试,最后发现是:

[2018-04-08 03:44:15,637] INFO Validating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:231)
[2018-04-08 03:44:15,649] INFO Validating schemas :_schema,partitions:1 (io.confluent.kafka.schemaregistry.storage.KafkaStore:238)
[2018-04-08 03:44:15,655] ERROR UnsupportedVersionException:org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_CONFIGS (io.confluent.kafka.schemaregistry.storage.KafkaStore:272)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_CONFIGS
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.verifySchemaTopic(KafkaStore.java:257)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:167)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:113)
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:199)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:64)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:1)
        at io.confluent.rest.Application.createServer(Application.java:157)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)
Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_CONFIGS
[2018-04-08 03:44:15,659] INFO verifySchemaTopic ok! (io.confluent.kafka.schemaregistry.storage.KafkaStore:274)

果然还是版本兼容性问题。

io.confluent.kafka.schemaregistry.storage.KafkaStore(#227)

 private void verifySchemaTopic(AdminClient admin) throws StoreInitializationException,
                                                           InterruptedException,
                                                           ExecutionException,
                                                           TimeoutException {
    log.info("Validating schemas topic {}", topic);

    Set<String> topics = Collections.singleton(topic);
    Map<String, TopicDescription> topicDescription = admin.describeTopics(topics)
        .all().get(initTimeout, TimeUnit.MILLISECONDS);
    TopicDescription description = topicDescription.get(topic);
    final int numPartitions = description.partitions().size();
    log.info("Validating schemas :_schema,partitions:"+ numPartitions );
    if (numPartitions != 1) {
      throw new StoreInitializationException("The schema topic " + topic + " should have only 1 "
                                             + "partition but has " + numPartitions);
    }

    if (description.partitions().get(0).replicas().size() < desiredReplicationFactor) {
      log.warn("The replication factor of the schema topic "
               + topic
               + " is less than the desired one of "
               + desiredReplicationFactor
               + ". If this is a production environment, it's crucial to add more brokers and "
               + "increase the replication factor of the topic.");
    }
    
    ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
    try { //catch主异常,此处只是用来判断retentionPolicy,配置文件用的默认retentionPolicy,所以即使跳过也没问题。
		Map<ConfigResource, Config> configs =
		    admin.describeConfigs(Collections.singleton(topicResource)).all()
		        .get(initTimeout, TimeUnit.MILLISECONDS);
		Config topicConfigs = configs.get(topicResource);
		String retentionPolicy = topicConfigs.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
		if (retentionPolicy == null || !TopicConfig.CLEANUP_POLICY_COMPACT.equals(retentionPolicy)) {
		  log.error("The retention policy of the schema topic " + topic + " is incorrect. "
		            + "You must configure the topic to 'compact' cleanup policy to avoid Kafka "
		            + "deleting your schemas after a week. "
		            + "Refer to Kafka documentation for more details on cleanup policies");

		  throw new StoreInitializationException("The retention policy of the schema topic " + topic
		                                         + " is incorrect. Expected cleanup.policy to be "
		                                         + "'compact' but it is " + retentionPolicy);

		}
	} catch (Exception e) { 	// UnsupportedVersionException: The broker does not support DESCRIBE_CONFIGS
		log.error("UnsupportedVersionException:"+e.getLocalizedMessage(),e);
	}
    log.info("verifySchemaTopic ok!" );
  }




Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐