flink 增量快照同步文件引用关系和恢复分析

发布于:2025-04-08 ⋅ 阅读:(20) ⋅ 点赞:(0)

文件引用分析

每次生成的checkpoint 里都会有所有文件的引用信息

  • 问题,引用分析里如何把f1,f2去掉了,可以参考下面的代码,每次生成引用分析时,会先list 本地的文件目录,因为f1,f2已经合并了,所以不会出现在后续里了
//保存引用的是个map,每个checkpoint 是一个
SortedMap<Long, Collection<HandleAndLocalPath>> uploadedSstFiles
uploadedSstFiles.put(checkpointId, Collections.unmodifiableList(sstFiles));

在这里插入图片描述

相关代码分析


  List<HandleAndLocalPath> sstFilesUploadResult =
                        stateUploader.uploadFilesToCheckpointFs(
                                sstFilePaths,
                                checkpointStreamFactory,
                                stateScope,
                                snapshotCloseableRegistry,
                                tmpResourcesRegistry);
                uploadedSize +=
                        sstFilesUploadResult.stream().mapToLong(e -> e.getStateSize()).sum();
                 //这里加上本次上传的
                sstFiles.addAll(sstFilesUploadResult);

 private void createUploadFilePaths(
                Path[] files,
                List<HandleAndLocalPath> sstFiles,
                List<Path> sstFilePaths,
                List<Path> miscFilePaths) {
            for (Path filePath : files) {
                final String fileName = filePath.getFileName().toString();

                if (fileName.endsWith(SST_FILE_SUFFIX)) {
                    Optional<StreamStateHandle> uploaded = previousSnapshot.getUploaded(fileName);
                    if (uploaded.isPresent()) {
                    //这里加上已经上传的
                        sstFiles.add(HandleAndLocalPath.of(uploaded.get(), fileName));
                    } else {
                        sstFilePaths.add(filePath); // re-upload
                    }
                } else {
                    miscFilePaths.add(filePath);
                }
            }
        }
    }

从state 恢复,以rocksdb为例

不修改并行度

大致是这三步

在这里插入图片描述

@SuppressWarnings("unchecked")
    private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
        logger.info(
                "Starting to restore from state handle: {} without rescaling.", keyedStateHandle);
        if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
            IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
                    (IncrementalRemoteKeyedStateHandle) keyedStateHandle;
            restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
            restoreBaseDBFromRemoteState(incrementalRemoteKeyedStateHandle);
        } else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
            IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
                    (IncrementalLocalKeyedStateHandle) keyedStateHandle;
            restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
            restoreBaseDBFromLocalState(incrementalLocalKeyedStateHandle);
        } else {
            throw unexpectedStateHandleException(
                    new Class[] {
                        IncrementalRemoteKeyedStateHandle.class,
                        IncrementalLocalKeyedStateHandle.class
                    },
                    keyedStateHandle.getClass());
        }
        logger.info(
                "Finished restoring from state handle: {} without rescaling.", keyedStateHandle);
    }
修改并行度
keyGroupRange

先理解keyGroupRange
定义 :
KeyGroupRange 是一组连续的 KeyGroup 的集合。它表示某个任务实例(subtask)负责处理哪些 KeyGroup。
作用 :
KeyGroupRange 用于定义每个并行子任务(Task)需要处理的 KeyGroup 范围。
每个 Task 都会分配到一个或多个 KeyGroupRange,从而明确该 Task 应该处理哪些 KeyGroup 的数据和状态。
特点 :
KeyGroupRange 是一个范围的概念,可以包含多个 KeyGroup。
当任务的并行度发生变化时,KeyGroupRange 的分配也会相应调整
假设:

总共有 16 个 KeyGroup(编号从 0 到 15)。
并行度为 2。
分配方式
Task 1 负责 KeyGroupRange [0, 8),即 KeyGroup 0 到 7。
Task 2 负责 KeyGroupRange [8, 16),即 KeyGroup 8 到 15。
如果并行度增加到 4:

Task 1 负责 KeyGroupRange [0, 4),即 KeyGroup 0 到 3。
Task 2 负责 KeyGroupRange [4, 8),即 KeyGroup 4 到 7。
Task 3 负责 KeyGroupRange [8, 12),即 KeyGroup 8 到 11。
Task 4 负责 KeyGroupRange [12, 16),即 KeyGroup 12 到 15。

过程
  1. 核心挑战
    当并行度变化时,每个子任务负责的 数据分片(KeyGroup) 范围会发生变化。例如:

旧并行度 :2 个子任务,每个子任务负责 KeyGroup 0-1 和 KeyGroup 2-3。
新并行度 :4 个子任务,每个子任务负责更小的 KeyGroup 范围(如 0-0, 1-1, 2-2, 3-3)。
此时,需要将旧子任务的 状态数据 按新分片规则重新分配到新子任务中,同时避免数据丢失或重复。

  1. 恢复流程步骤
    步骤 1:选择“最佳”检查点
    目标 :找到一个与新子任务的 KeyGroup 范围 重叠最多的检查点增量句柄,作为恢复的基础。
    原因 :减少需要迁移的数据量,提高效率。
    示例 :
    如果新子任务的 KeyGroup 范围是 0-0,则优先选择旧检查点中覆盖 KeyGroup 0 的数据。
    步骤 2:下载所有相关状态数据
    操作 :将所有相关检查点的数据(如 SST 文件)从远程存储(如 HDFS/S3)下载到本地临时目录。
    原因 :确保所有可能需要的数据都可用,以便后续合并。
    步骤 3:初始化基础数据库
    操作 :
    加载“最佳”检查点的数据 :将选中的检查点数据加载到一个新的 RocksDB 实例中。
    裁剪数据 :仅保留属于新子任务 KeyGroup 范围 的键(通过键的前缀过滤)。
    示例 :
    新子任务负责 KeyGroup 0-0,则删除所有不属于该范围的键。
    步骤 4:合并其他检查点数据
    操作 :
    逐个处理其他检查点 :对剩余的检查点,逐个加载到临时 RocksDB 实例。
    遍历键值对 :逐条检查键是否属于新子任务的 KeyGroup 范围 。
    复制有效数据 :将符合条件的键值对写入主 RocksDB 实例。
    示例 :
    检查点 A 包含 KeyGroup 0-3 的数据,新子任务只保留 KeyGroup 0 的部分。
    步骤 5:清理临时数据
    操作 :删除临时文件和资源,释放存储空间。
问题

为什么选择重叠最多的增量检查点句柄?
减少数据合并工作量 :
初始检查点覆盖的 KeyGroup 越多,后续需要从其他检查点合并的数据越少。
提高恢复效率 :
减少临时 RocksDB 实例的数量和迭代次数,从而加速恢复过程。
降低资源消耗 :
减少网络传输和磁盘 I/O 开销。


网站公告

今日签到

点亮在社区的每一天
去签到