SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
    at org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1184)
    at Streaming.WriteToKafka.main(WriteToKafka.java:29)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.streaming.api.functions.source.SourceFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
    at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1239)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java:1263)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1226)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:789)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1461)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1398)
    at Streaming.WriteToKafka.main(WriteToKafka.java:27)

Process finished with exit code 1

 

1.在上一个工程里面直接创建新的主函数就ok了

这里面还是有个坑。注意下

【尖叫提示】:

自己写的类实现SourceFunction这里的泛型一定要加上!如果不加则会报一下错误。

publicstaticclassSimpleStringGeneratorimplementsSourceFunction<String>{

 

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

at org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)

at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1184)

at Streaming.WriteToKafka.main(WriteToKafka.java:29)

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.streaming.api.functions.source.SourceFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point

at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1239)

at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java:1263)

at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1226)

at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:789)

at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1461)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1398)

at Streaming.WriteToKafka.main(WriteToKafka.java:27)

主要代码如下:

package Streaming;



import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;



import java.util.Properties;



/**

 * Created with IntelliJ IDEA.

 * User: @ziyu  freedomziyua@gmail.com

 * Date: 2018-09-10

 * Time: 11:31

 * Description: Streaming.WriteToKafka

 */

public class WriteToKafka {





    public static void main(String args[]) throws Exception{



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "192.168.2.41:9092");



        DataStreamSource stream = env.addSource(new SimpleStringGenerator());



        stream.addSink(new FlinkKafkaProducer09("flink-demo", new SimpleStringSchema(), properties));

        try {

            env.execute();

        }catch (Exception e ){

            e.printStackTrace();

        }

    }

    /**

     * 一个简单的类去生产数据

     */

    public static class SimpleStringGenerator implements SourceFunction<String>{

        private static final long serialVersionUID = 119007289730474249L;

        boolean running = true;

        long i = 0;





        public void run(SourceContext ctx) throws Exception {

            while (running){

                ctx.collect("Flink----"+(i++));

                Thread.sleep(100);

            }

        }

        public void cancel() {

            running = false;

        }

    }



}

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐