一、基于前面kafka部署

大数据-玩转数据-Kafka安装

二、FLINK中编写代码

package com.lyh.flink04;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class flink04_fromkafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","hadoop100:9092");
        properties.setProperty("group.id","test");
        env.addSource(new FlinkKafkaConsumer<>("wordsendertest",new SimpleStringSchema(),properties))
                .print();
        env.execute();


    }
}

三、运行测试

运行本段代码,等待kafka产生数据进行消费。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐