项目场景:Flink StreamingFileSink写入hdfs,文件一直处于inprogress
使用StreamingFileSink写入hdfs时,文件一直处于inprogress,导致下游无法使用
问题描述
关键代码
//sink
public static StreamingFileSink<String> getHdfsSinkWithFile(String path) {
return StreamingFileSink
.forRowFormat(new Path(path), new SimpleStringEncoder<String>(Constants.CHARSET_NAME))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(5))
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(1))
.withMaxPartSize(1024 * 1024 * 128)
.build())
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH",
ZoneId.of("Asia/Shanghai")))
.build();
}
//job
...
eventStream.addSink(getHdfsSinkWithFile(path))
.setParallelism(1);
env.execute("DataTohdfsJob");
原因分析:
- 写入文件已存在
- 没有checkpoint或checkpoint失败
- 并行度过大
解决方案:
由于我这是测试环境,我就直接把hdfs写入目录删除了,正式环境慎重考虑
关于checkpoint,官方文档特别指出StreamingFileSink需要做checkpoint,否则就会一直处于inprogress,所以需要开启checkpoint
官方文档: 传送门env的全局并行度需要设为与running的subtasks数一致
kafka source并行度与kafka的partition数保持一致设置的为10,但由于kakfa存在数据倾斜问题,只有三个partition有数据,导致存在七个finish 的 subtask ,影响 checkpoint ,checkpoint 不成功。
修改并行度后成功解决问题
问题二、
解决inprogress后,发现存在很多小文件,导致hadoop集群压力大,namenode查询速度慢
解决方案
主要是由checkpoint间隔太短造成的,试着将checkpoint间隔调大些
总结
文件一直处于inprogress归根结底就是checkpoint不成功。
使用StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成后,桶中临时文件转成正式文件。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。