flink 代码:

注册kafka source 版本:0.10,格式:Json

  

相关依赖,版本为flink1.8.0:

这里要注意flink-json,如果没有此依赖,以上代码块将无法使用Json格式

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

在本地运行没有出现任何问题,打jar到服务器之运行报:

找不到  org.apache.flink.table.factories.TableSourceFactory

(此处不贴报错截图,有可能报错提示信息会有些出处,但万卷不离其宗,还有的可能是  org.apache.flink.table.factories.TableSinkFactory )

原因:

进入org.apache.flink.table.factories.TableFactoryService源码中你可以找到这个类似匹配器的代码块,异常由这里发出:

查看TableFactoryService的实现,是有KafkaTable的:

而查看此目录下TableFactory配置文件中并没有Kafka Source及Sink相关Factory:

 

解决方法一: 

找到源码目录 flink-table\flink-table-planner\src\main\resources\META-INF\services\org.apache.flink.table.factories.TableFactory进行更改,添加以下内容并重新编译:

org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory

开始并没有添加 JsonRowFormatFactory,也报出了找不到的异常信息,所以这里直接加上

解决方法二(推荐): 

  copy后添加以上内容

添加后打包到服务器正常运行。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐