一、简介

      采用FLink SQL 读取kafka 数据,数据格式是avro('value.format' = 'avro-confluent'),任务启动报错了ValidationException: Could not find any factory for identifier 'avro-confluent'

二、错误

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro-confluent' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.

Available factory identifiers are:

canal-json
csv
debezium-json
json
maxwell-json
raw
	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
	at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:751)
	at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:649)
	at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:633)
	at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.lambda$getValueDecodingFormat$2(KafkaDynamicTableFactory.java:279)
	at java.util.Optional.orElseGet(Optional.java:267)
	at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.getValueDecodingFormat(KafkaDynamicTableFactory.java:277)
	at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:142)
	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
	... 40 more

三、解决

   1、avro-confluent 插件注册

      

2、 发现打包的jar 里面没有META-INF.services,Flink 加载 table factory 使用的时SPI机制,默认不打包进去的

3、需添加参数

  

<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐