flink中间结果写HDFS

项目中遇到一个应用场景需要将flink计算的中间结果写入到hdfs中


提示:正常的kafka数据还是用flume同步至hdfs吧,用flink写hdfs会有很多问题


一、数据分桶写入

流数据写入到hdfs中是将数据写入到分桶(bucket)中。默认使用基于系统时间(yyyy-MM-dd–HH)的分桶策略。在分桶中,又根据滚动策略,将输出拆分为 part 文件,具体如下:
在这里插入图片描述
上面的文件的问题在于:文件一直处于pending状态,句柄无法关闭
记得配置checkpoint

二、代码编写

代码如下(示例):

1 写入本地文件中

       String patha = "F:\\testflinktext";
        final StreamingFileSink<String> sinka = StreamingFileSink
                .forRowFormat(new Path(patha), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(60))//多长时间运行一个文件
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(30))//多长时间没有写入就生成一个文件
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();
        flowIncreaStream.addSink(sinka);

2 写入至hdfs中

 //输出至HDFS
        String pathb = "hdfs://fh-node1:8020/hivedata/device_flowdata";
        final StreamingFileSink<String> sinkb = StreamingFileSink
                .forRowFormat(new Path(pathb), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(30))//多长时间运行一个文件
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(10))//多长时间没有写入就生成一个文件
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();
        flowIncreaStream.addSink(sinkb);

三 创建hive表,并增加分区

1 分区表创建

CREATE EXTERNAL TABLE `ssa_flow_device_increment`(
`fromMac` string COMMENT 'mac',
`region` string COMMENT '区域',
`update_time` string COMMENT '更新时间')
COMMENT '抖音订单行表落地表'
PARTITIONED BY (`statis_time` string COMMENT '分区字段')
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://fh-node1:8020/hivedata/device_flowdata/'

2 定时增加分区(dolphin中调度)

sql:

alter table ssa_flow_device_increment
add partition(statis_time='${statis_time}') 
location '/hivedata/device_flowdata/${statis_time}';

调度任务:

hive -hivevar statis_time='${statis_time}' -S -f hdfs://nncluster/data/dolphinscheduler/dolphinscheduler/resources/sql/ssa_flow_device_increment_partition.sql 

参数配置:
在这里插入图片描述

四 flink写hdfs目前存在的问题

截止目前,Flink 的 Streaming File Sink 仍存在不少问题,如:

它只支持 Hadoop 2.7 以上的版本,因为需要用到高版本文件系统提供的 truncate 方法来实现故障恢复。 不支持写入到
Hive。
写入HDFS时,会产生大量的小文件。
当程序突然停止时,文件仍处于inprogress状态。
默认桶下的文件名是 part-{parallel-task}-{count}。当程序重启时,从上次编号值加1继续开始。前提是程序是正常停止
除了使用StreamingFileSink外,还可以使用BucketingSink.
StreamingFileSink API 见 这里。
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐