Flink Table 将kafka流数据追加到Csv格式文件

Flink Table可以很好的将Stream数据直接写入到文件系统。示例如下:

代码示例一
public class SqlSinkFileSystemStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        Schema schema = new Schema()
                .field("userId", Types.STRING)
                .field("name", Types.STRING)
                .field("age", Types.STRING)
                .field("sex", Types.STRING)
                .field("createTime", Types.BIG_DEC)
                .field("updateTime", Types.BIG_DEC);

        TableSchema tableSchema = new TableSchema.Builder()
                .field("userId", Types.STRING)
                .field("name", Types.STRING)
                .field("age", Types.STRING)
                .field("sex", Types.STRING)
                .field("createTime", Types.BIG_DEC)
                .field("updateTime", Types.BIG_DEC)
                .build();

        Properties p = new Properties();
        p.setProperty("bootstrap.servers", "localhost:9092");
        p.setProperty("group.id", "test");
        Kafka kafka = new Kafka().properties(p).topic("user").version("0.10");

        tableEnv.connect(kafka)
                .withSchema(schema)
                .withFormat(new Json().deriveSchema())
                .inAppendMode()
                .registerTableSource("Users");

        Table table = tableEnv.sqlQuery("select * from Users");

        // 输出到本地
        tableEnv.toAppendStream(table, TypeInformation.of(Row.class)).print("row:");

        FileSystem fileSystem = new FileSystem().path("data/user.csv");
        tableEnv.connect(fileSystem)
                .withSchema(schema)
                // 使用new Csv()不是很好用,schema的参数处理不好
                .withFormat(new OldCsv().schema(tableSchema).fieldDelimiter(","))
                .inAppendMode()
                .registerTableSink("Users2");

        // 插入到fs
        QueryConfig conf = new StreamQueryConfig();
        tableEnv.insertInto(table, "Users2", conf);

        env.execute("SqlSinkFileSystemStream");
    }
}
示例代码二

Flink 自己实现了CsvTableSink类,可以直接使用,代码如下:

public class SqlSinkCsvFileStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        Schema schema = new Schema()
                .field("userId", Types.STRING)
                .field("name", Types.STRING)
                .field("age", Types.STRING)
                .field("sex", Types.STRING)
                .field("createTime", Types.BIG_DEC)
                .field("updateTime", Types.BIG_DEC);

        tableEnv
                .connect(
                        new Kafka().version("0.10").topic("user").property("bootstrap.servers", "localhost:9092")
                )
                .withSchema(schema)
                .withFormat(new Json().deriveSchema())
                .inAppendMode()
                .registerTableSource("Users");

        Table table = tableEnv.sqlQuery("select userId,name,age,sex,createTime from Users");
        tableEnv.toAppendStream(table, TypeInformation.of(Row.class)).print();

        CsvTableSink sink = new CsvTableSink("data/users.csv", ",", 1, FileSystem.WriteMode.NO_OVERWRITE);

        tableEnv.registerTableSink("Result",
                new String[]{"userId", "name", "age", "sex", "createTime"},
                new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.BIG_DEC},
                sink);

        tableEnv.insertInto(table, "Result", new StreamQueryConfig());

        env.execute("SqlSinkCsvFileStream");
    }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐