flink中间结果写入hdfs并构建hive分区表
flink中间结果写HDFS项目中遇到一个应用场景需要将flink计算的中间结果写入到hdfs中提示:正常的kafka数据还是用flume同步至hdfs吧,用flink写hdfs会有很多问题文章目录flink中间结果写HDFS一、pandas是什么?二、使用步骤1.代码修改1)、写入本地文件中2)、写入至hdfs中2.flink写hdfs目前存在的问题总结一、pandas是什么?流数据写入到hdf
·
flink中间结果写HDFS
项目中遇到一个应用场景需要将flink计算的中间结果写入到hdfs中
提示:正常的kafka数据还是用flume同步至hdfs吧,用flink写hdfs会有很多问题
文章目录
一、数据分桶写入
流数据写入到hdfs中是将数据写入到分桶(bucket)中。默认使用基于系统时间(yyyy-MM-dd–HH)的分桶策略。在分桶中,又根据滚动策略,将输出拆分为 part 文件,具体如下:
上面的文件的问题在于:文件一直处于pending状态,句柄无法关闭
记得配置checkpoint
二、代码编写
代码如下(示例):
1 写入本地文件中
String patha = "F:\\testflinktext";
final StreamingFileSink<String> sinka = StreamingFileSink
.forRowFormat(new Path(patha), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(60))//多长时间运行一个文件
.withInactivityInterval(TimeUnit.MINUTES.toMillis(30))//多长时间没有写入就生成一个文件
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
flowIncreaStream.addSink(sinka);
2 写入至hdfs中
//输出至HDFS
String pathb = "hdfs://fh-node1:8020/hivedata/device_flowdata";
final StreamingFileSink<String> sinkb = StreamingFileSink
.forRowFormat(new Path(pathb), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(30))//多长时间运行一个文件
.withInactivityInterval(TimeUnit.MINUTES.toMillis(10))//多长时间没有写入就生成一个文件
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
flowIncreaStream.addSink(sinkb);
三 创建hive表,并增加分区
1 分区表创建
CREATE EXTERNAL TABLE `ssa_flow_device_increment`(
`fromMac` string COMMENT 'mac',
`region` string COMMENT '区域',
`update_time` string COMMENT '更新时间')
COMMENT '抖音订单行表落地表'
PARTITIONED BY (`statis_time` string COMMENT '分区字段')
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://fh-node1:8020/hivedata/device_flowdata/'
2 定时增加分区(dolphin中调度)
sql:
alter table ssa_flow_device_increment
add partition(statis_time='${statis_time}')
location '/hivedata/device_flowdata/${statis_time}';
调度任务:
hive -hivevar statis_time='${statis_time}' -S -f hdfs://nncluster/data/dolphinscheduler/dolphinscheduler/resources/sql/ssa_flow_device_increment_partition.sql
参数配置:
四 flink写hdfs目前存在的问题
截止目前,Flink 的 Streaming File Sink 仍存在不少问题,如:
它只支持 Hadoop 2.7 以上的版本,因为需要用到高版本文件系统提供的 truncate 方法来实现故障恢复。 不支持写入到
Hive。
写入HDFS时,会产生大量的小文件。
当程序突然停止时,文件仍处于inprogress状态。
默认桶下的文件名是 part-{parallel-task}-{count}。当程序重启时,从上次编号值加1继续开始。前提是程序是正常停止
除了使用StreamingFileSink外,还可以使用BucketingSink.
StreamingFileSink API 见 这里。
更多推荐
已为社区贡献2条内容
所有评论(0)