java项目集成springboot使用kafka消费者,启动失败报错 Failed to construct kafka consumer
前言:之前博客里面提到本公司为物联网项目。项目中使用mqtt+kafka进行与设备端的通讯,之前的协议格式为json格式,现在改成字节数组byte[]格式进行通信。集成springboot后,具体的demo网上很多,接下来有时间会出一份kafka的demo。报错信息如下:Failed to start bean 'org.springframework.kafka.config.interna..
·
前言:之前博客里面提到本公司为物联网项目。项目中使用mqtt+kafka进行与设备端的通讯,之前的协议格式为json格式,现在改成字节数组byte[]格式进行通信。集成springboot后,具体的demo网上很多,接下来有时间会出一份kafka的demo。
- 报错信息如下:
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
nested exception is org.apache.kafka.common.KafkaException:Failed to construct kafka consumer
-
原因分析:
之前json格式通信时候,构建kafka消费工厂的时候,其中ConcurrentMessageListenerContainer的key为String类型,而value现在为byte[]类型,所以构建消费者工厂的时候需要指定正确的value类型。 -
代码如下:
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerByteFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();
factory.setConsumerFactory(consumerByteFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
整体kafka生产者+kafka消费者的demo会在接下来的博客中陆续整理。
更多推荐
已为社区贡献3条内容
所有评论(0)