Flink Source

进入flink的数据源大致分为以下几类:

  1. 集合 Collection
  2. 文件 File
  3. Kafka
  4. UDF

一般都是使用前三个source源即可,如果想要使用其他数据源就可以自定义数据源即UDF。

Collection

public static void main(String[] args) throws Exception {
//指定运行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局并行度
      env.setParallelism(1);
//1.使用fromCollection函数
      DataStreamSource<Integer> inputStream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));
//2.使用fromElements函数
      DataStreamSource<Integer> inputStream1 = env.fromElements(1, 2, 3, 4, 5);
//打印输出
      inputStream.print();
      inputStream1.print();
//执行任务
      env.execute();
  }

File

 public static void main(String[] args) throws Exception {
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置全局并行度
        env.setParallelism(1);
        //读取数据
        String s = "hello.text";
        DataStream<String> inputDataStream = env.readTextFile(s);
        //打印数据
        inputDataStream.print("out");
        //执行任务
        env.execute("jobname");
    }

Kafka

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    //kafka连接参数配置
    Properties prop = new Properties();
    prop.put("bootstrap.servers","hadoop102:9092");//设置需要连接的主机
    prop.put("zookeeper.connect","hadoop102:2181");
    prop.put("group.id","first");
    prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    prop.put("value.serializer","org.apache.kafka.common,serialization.StringSerializer");
    prop.put("auto.offset.rest","latest");
    //通过addSource获取一般数据源
    //注意这里使用了kafka与flink的连接器,如果没有在pom.xml文件中依赖中添加:
//<dependency>
//    <groupId>org.apache.flink</groupId>
//    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
//    <version>${flink.version}</version>
//</dependency>
    DataStreamSource<String> inputStream = 
   env.addSource(new FlinkKafkaConsumer<String>("first", new SimpleStringSchema(), prop));//注意获取的数据是String类型.
    
    inputStream.print();
    
    env.execute();
    }

UDF(自定义source源)

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //使用自定义source,需要实现一个SourceFunction,或者他的富函数RichSourceFunction
       DataStreamSource<String> inputStream= env.addSource(new MysourceFunciton());
        inputStream.print();
        env.execute();
    }
    public static class MysourceFunciton extends RichSourceFunction<String> {
        private Integer start;
        @Override
        public void open(Configuration parameters) throws Exception {
            start=1;
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while(start!=0){
                ctx.collect("hello wolrd");
            }
        }
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐