minio大文件断点续传

发布于:2025-09-10 ⋅ 阅读:(18) ⋅ 点赞:(0)

1. 功能要点概述

将大文件按分片直传至 MinIO,支持断点续传、进度可查、校验合并与随时取消。

功能展开:

  • 分片与断点续传
    • 客户端按固定 chunkSize 切片;每片独立上传,失败可重传
    • 通过 uploadId + partNumber 唯一定位分片;已传分片可直接跳过
  • 直传 MinIO
    • 服务端生成预签名 URL;浏览器使用 HTTP PUT 将分片直接上传到 MinIO
    • 后端仅负责发号、校验、合并与进度管理
  • 状态与进度管理
    • 以 Redis 记录任务元数据(uploadId/totalChunks/status)与已传分片集合
    • 查询接口返回已完成分片数与百分比,用于前端 UI 渲染
  • 完整性校验与合并
    • 合并前通过 MinIO listMultipart 校验分片数量、连续性与 ETag 合法性
    • 校验通过后触发 mergeMultipartUpload 生成最终对象
  • 失败重试与幂等
    • 前端对单片设置重试(指数退避);服务端以“是否已存在分片”保障幂等
    • 支持随时取消任务并清理残留分片/进度

2. 功能时间序列流程图

用户 浏览器(resumable-upload.html/JS) 控制器(FileController) 服务(MinioUtil, UploadProgressService) MinIO 选择大文件 1 计算chunkSize与totalChunks 2 POST /file/initResumableUpload 3 initResumableUpload(fileName,fileSize,chunkSize,bucket) 4 initMultiPartUpload 获取uploadId 5 返回uploadId 6 计算totalChunks 生成ResumableUploadDto 7 返回uploadDto 8 返回uploadId与已存在分片列表 9 渲染分片UI, 标记已上传分片 10 POST /file/uploadChunk (uploadId,chunkNumber,chunk,fileName,bucket) 11 uploadChunk(dto) 12 生成预签名URL(含uploadId,partNumber) 13 HTTP PUT 分片数据 14 返回状态码与ETag 15 返回成功/失败 16 成功则前端标记分片已完成 17 重试至N次, 仍失败则标记失败 18 alt [失败] 可选 GET /file/getUploadProgress 19 calculateProgress(uploadId) 20 返回进度(已传分片/总分片) 21 返回进度用于UI更新 22 loop [对于每个未上传分片] POST /file/completeResumableUpload (uploadId,fileName,bucket) 23 validateChunks(uploadId,fileName,bucket,totalChunks) 24 listMultipart 查询所有Part 25 返回Part清单 26 校验数量 连续性 ETag 27 mergeMultipartUpload 合并 28 返回ObjectWriteResponse 29 返回合并结果 30 返回成功 完成上传 31 POST /file/cancelResumableUpload (uploadId,fileName,bucket) 32 cancelResumableUpload(uploadId,fileName,bucket) 33 cancelMultipartUpload 取消任务 34 返回成功 35 清理进度状态 36 返回已取消 37 用户 浏览器(resumable-upload.html/JS) 控制器(FileController) 服务(MinioUtil, UploadProgressService) MinIO

说明:

  • 泳道分为 客户端浏览器 前端JS 控制器 服务层 MinIO 五个层面 展示全链路交互
  • 初始化阶段 获取 uploadId 并在 Redis 存储任务与分片元数据
  • 上传阶段 每片生成预签名 URL 通过 HTTP PUT 直传到 MinIO 成功后更新 Redis
  • 进度阶段 通过 Redis 统计已传分片并返回给前端渲染
  • 完成阶段 校验分片完整性与连续性 后向 MinIO 发送合并分片请求
  • 取消阶段 请求 MinIO 取消上传并清理 Redis 记录

3. 关键代码与详细解释

3.1 初始化断点续传上传
// File: com/wangfugui/apprentice/controller/FileController.java
@ApiOperation("初始化断点续传上传")
@PostMapping("/initResumableUpload")
public ResponseUtils initResumableUpload(@RequestParam String fileName,
                                         @RequestParam Long fileSize,
                                         @RequestParam(required = false, defaultValue = "5242880") Long chunkSize,
                                         @RequestParam String bucketName) {
    try {
        ResumableUploadDto uploadDto = minioUtil.initResumableUpload(fileName, fileSize, chunkSize, bucketName);
        uploadProgressService.saveUploadProgress(uploadDto);
        List<Integer> existingChunks = minioUtil.getExistingChunks(uploadDto.getUploadId(), fileName, bucketName);
        uploadDto.setUploadedChunks(existingChunks);
        for (Integer chunkNumber : existingChunks) {
            uploadProgressService.updateUploadedChunk(uploadDto.getUploadId(), chunkNumber);
        }
        return ResponseUtils.success(uploadDto);
    } catch (Exception e) {
        return ResponseUtils.error("初始化断点续传上传失败: " + e.getMessage());
    }
}

要点:

  • 计算 totalChunks 并生成 uploadId 保存到 Redis
  • 返回当前任务的 uploadId 以及已存在的分片编号 用于断点续传续点
// File: com/wangfugui/apprentice/common/util/MinioUtil.java
public ResumableUploadDto initResumableUpload(String fileName, Long fileSize, Long chunkSize, String bucketName) throws Exception {
    createBucket(bucketName);
    String uploadId = getUploadId(fileName, bucketName);
    int totalChunks = (int) Math.ceil((double) fileSize / chunkSize);
    ResumableUploadDto uploadDto = new ResumableUploadDto();
    uploadDto.setUploadId(uploadId);
    uploadDto.setFileName(fileName);
    uploadDto.setBucketName(bucketName);
    uploadDto.setFileSize(fileSize);
    uploadDto.setChunkSize(chunkSize);
    uploadDto.setTotalChunks(totalChunks);
    uploadDto.setStatus("INIT");
    uploadDto.setCreateTime(System.currentTimeMillis());
    uploadDto.setUpdateTime(System.currentTimeMillis());
    return uploadDto;
}

解释:

  • getUploadId 内部通过 MinIO initMultiPartUpload 获取上传任务 ID
  • totalChunks 用于前端 UI 与后续完整性校验
3.2 分片上传与断点续传
// File: com/wangfugui/apprentice/controller/FileController.java
@ApiOperation("上传分片")
@PostMapping("/uploadChunk")
public ResponseUtils uploadChunk(@RequestParam String uploadId,
                                 @RequestParam Integer chunkNumber,
                                 @RequestParam MultipartFile chunk,
                                 @RequestParam String fileName,
                                 @RequestParam String bucketName) {
    try {
        if (uploadProgressService.isChunkUploaded(uploadId, chunkNumber)) {
            return ResponseUtils.success("分片已存在,跳过上传");
        }
        ChunkUploadDto dto = new ChunkUploadDto();
        dto.setUploadId(uploadId);
        dto.setChunkNumber(chunkNumber);
        dto.setChunk(chunk);
        dto.setFileName(fileName);
        dto.setBucketName(bucketName);
        boolean success = minioUtil.uploadChunk(dto);
        if (success) {
            uploadProgressService.updateUploadedChunk(uploadId, chunkNumber);
            return ResponseUtils.success("分片上传成功");
        } else {
            return ResponseUtils.error("分片上传失败");
        }
    } catch (Exception e) {
        return ResponseUtils.error("分片上传失败: " + e.getMessage());
    }
}
// File: com/wangfugui/apprentice/common/util/MinioUtil.java
public boolean uploadChunk(ChunkUploadDto chunkUploadDto) throws Exception {
    CloseableHttpClient httpClient = null;
    try {
        byte[] chunkData = toByteArray(chunkUploadDto.getChunk().getInputStream());
        Map<String, String> reqParams = new HashMap<>();
        reqParams.put("uploadId", chunkUploadDto.getUploadId());
        reqParams.put("partNumber", String.valueOf(chunkUploadDto.getChunkNumber()));
        String uploadUrl = getPresignedObjectUrl(chunkUploadDto.getFileName(), reqParams, chunkUploadDto.getBucketName());
        httpClient = HttpClients.createDefault();
        HttpPut httpPut = new HttpPut(uploadUrl);
        httpPut.setHeader("Content-Type", "application/octet-stream");
        httpPut.setEntity(new ByteArrayEntity(chunkData));
        CloseableHttpResponse response = httpClient.execute(httpPut);
        int statusCode = response.getStatusLine().getStatusCode();
        if (statusCode >= 200 && statusCode < 300) {
            String etag = response.getFirstHeader("ETag") != null ? response.getFirstHeader("ETag").getValue() : null;
            if (etag != null) { etag = etag.replace("\"", ""); }
            return true;
        } else {
            return false;
        }
    } finally {
        if (httpClient != null) { try { httpClient.close(); } catch (IOException ignore) {} }
    }
}

解释:

  • 分片上传直接 PUT 到 MinIO 对应对象的分段地址 上行链路不经由服务端转存
  • 使用 uploadIdpartNumber 绑定具体分片
  • 成功后在 Redis 标记该分片完成 便于断点续传跳过
3.3 进度查询
// File: com/wangfugui/apprentice/controller/FileController.java
@ApiOperation("获取上传进度")
@GetMapping("/getUploadProgress")
public ResponseUtils getUploadProgress(@RequestParam String uploadId) {
    try {
        UploadProgressDto progress = uploadProgressService.calculateProgress(uploadId);
        if (progress == null) { return ResponseUtils.error("未找到上传进度信息"); }
        return ResponseUtils.success(progress);
    } catch (Exception e) {
        return ResponseUtils.error("获取上传进度失败: " + e.getMessage());
    }
}

说明:

  • calculateProgress 统计已上传分片数量 与 totalChunks 计算百分比 并返回前端
3.4 完成上传与合并
// File: com/wangfugui/apprentice/controller/FileController.java
@ApiOperation("完成断点续传上传")
@PostMapping("/completeResumableUpload")
public ResponseUtils completeResumableUpload(@RequestParam String uploadId,
                                             @RequestParam String fileName,
                                             @RequestParam String bucketName) {
    try {
        uploadProgressService.updateUploadStatus(uploadId, "UPLOADING");
        ResumableUploadDto uploadDto = uploadProgressService.getUploadProgress(uploadId);
        if (uploadDto == null) { return ResponseUtils.error("未找到上传进度信息"); }
        boolean isValid = minioUtil.validateChunks(uploadId, fileName, bucketName, uploadDto.getTotalChunks());
        if (!isValid) { uploadProgressService.updateUploadStatus(uploadId, "FAILED");
            return ResponseUtils.error("分片验证失败,请重新上传"); }
        ObjectWriteResponse result = minioUtil.completeResumableUpload(uploadId, fileName, bucketName);
        uploadProgressService.updateUploadStatus(uploadId, "COMPLETED");
        JSONObject response = new JSONObject();
        response.put("objectName", result.object());
        response.put("bucketName", result.bucket());
        response.put("uploadId", uploadId);
        return ResponseUtils.success(response);
    } catch (Exception e) {
        uploadProgressService.updateUploadStatus(uploadId, "FAILED");
        return ResponseUtils.error("完成断点续传上传失败: " + e.getMessage());
    }
}
// File: com/wangfugui/apprentice/common/util/MinioUtil.java
public boolean validateChunks(String uploadId, String fileName, String bucketName, int expectedChunks) throws Exception {
    ListPartsResponse partResult = minioClient.listMultipart(bucketName, null, fileName, 1000, 0, uploadId, null, null);
    List<Part> partList = partResult.result().partList();
    if (partList.size() != expectedChunks) { return false; }
    List<Part> sortedParts = new ArrayList<>(partList);
    sortedParts.sort((p1, p2) -> Integer.compare(p1.partNumber(), p2.partNumber()));
    for (int i = 0; i < sortedParts.size(); i++) {
        Part part = sortedParts.get(i);
        if (part.partNumber() != i + 1) { return false; }
        if (part.etag() == null || part.etag().isEmpty()) { return false; }
    }
    return true;
}

public ObjectWriteResponse completeResumableUpload(String uploadId, String fileName, String bucketName) throws Exception {
    return mergeMultipartUpload(fileName, uploadId, bucketName);
}

解释:

  • 完成前强制校验数量 连续性 以及 ETag 的存在性 保证合并正确
  • 调用 MinIO merge 完成分段拼接 返回最终对象信息
3.5 取消上传
// File: com/wangfugui/apprentice/common/util/MinioUtil.java
public void cancelResumableUpload(String uploadId, String fileName, String bucketName) throws Exception {
    HashMultimap<String, String> headers = HashMultimap.create();
    minioClient.cancelMultipartUpload(bucketName, null, fileName, uploadId, headers, null);
}

解释:

  • 调用 MinIO 的取消接口并清理服务端 Redis 侧的任务记录 即可释放未完成的上传资源
3.6 前端断点续传关键代码
// File: src/main/resources/static/resumable-upload.html (片段)
let currentFile = null;
let uploadId = null;
let chunkSize = 5 * 1024 * 1024; // 5MB
let totalChunks = 0;
let uploadedChunks = new Set(); // 已上传分片集合
let isUploading = false;
let isPaused = false;

// 1. 文件选择与分片计算
function selectFile() {
    const fileInput = document.getElementById('fileInput');
    if (fileInput.files.length > 0) {
        currentFile = fileInput.files[0];
        totalChunks = Math.ceil(currentFile.size / chunkSize);
        showMessage(`已选择文件: ${currentFile.name} (${formatFileSize(currentFile.size)})`, 'success');
        showMessage(`总分片数: ${totalChunks}`, 'success');
        document.getElementById('startBtn').disabled = false;
        renderChunks(); // 渲染分片UI
    }
}

// 2. 初始化上传
async function startUpload() {
    if (!currentFile) {
        showMessage('请先选择文件', 'error');
        return;
    }

    try {
        const response = await fetch('/file/initResumableUpload', {
            method: 'POST',
            headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
            body: new URLSearchParams({
                fileName: currentFile.name,
                fileSize: currentFile.size,
                chunkSize: chunkSize,
                bucketName: 'test-bucket'
            })
        });

        const result = await response.json();
        if (result.code === "0000") {
            uploadId = result.data.uploadId;
            // 断点续传:设置已上传分片
            if (result.data.uploadedChunks) {
                result.data.uploadedChunks.forEach(chunkNum => {
                    uploadedChunks.add(chunkNum);
                });
            }
            showMessage('上传初始化成功', 'success');
            await uploadChunks(); // 开始上传分片
        }
    } catch (error) {
        showMessage('上传初始化失败: ' + error.message, 'error');
    }
}

// 3. 分片上传核心逻辑
async function uploadChunks() {
    isUploading = true;
    isPaused = false;
    document.getElementById('statusText').textContent = '正在上传...';

    for (let i = 1; i <= totalChunks; i++) {
        // 暂停检查
        if (isPaused) {
            document.getElementById('statusText').textContent = '已暂停';
            return;
        }

        // 断点续传:跳过已上传分片
        if (uploadedChunks.has(i)) {
            updateChunkStatus(i, 'uploaded');
            continue;
        }

        // 重试机制
        let retryCount = 0;
        const maxRetries = 3;
        let success = false;

        while (retryCount < maxRetries && !success) {
            try {
                updateChunkStatus(i, 'uploading');
                
                // 文件切片
                const start = (i - 1) * chunkSize;
                const end = Math.min(start + chunkSize, currentFile.size);
                const chunk = currentFile.slice(start, end);

                // 构建表单数据
                const formData = new FormData();
                formData.append('uploadId', uploadId);
                formData.append('chunkNumber', i);
                formData.append('chunk', chunk);
                formData.append('fileName', currentFile.name);
                formData.append('bucketName', 'test-bucket');

                // 上传分片
                const response = await fetch('/file/uploadChunk', {
                    method: 'POST',
                    body: formData
                });

                const result = await response.json();
                if (result.code === "0000") {
                    uploadedChunks.add(i);
                    updateChunkStatus(i, 'uploaded');
                    updateProgress();
                    success = true;
                    showMessage(`分片 ${i} 上传成功`, 'success');
                } else {
                    throw new Error(result.msg || '上传失败');
                }
            } catch (error) {
                retryCount++;
                if (retryCount < maxRetries) {
                    showMessage(`分片 ${i} 上传失败,正在重试 (${retryCount}/${maxRetries}): ${error.message}`, 'error');
                    // 指数退避延迟
                    await new Promise(resolve => setTimeout(resolve, 1000 * retryCount));
                } else {
                    updateChunkStatus(i, 'failed');
                    showMessage(`分片 ${i} 上传失败,已达到最大重试次数: ${error.message}`, 'error');
                }
            }
        }
    }

    // 检查是否全部完成
    if (uploadedChunks.size === totalChunks) {
        await completeUpload();
    } else {
        showMessage(`上传完成,但存在失败的分片。成功: ${uploadedChunks.size}/${totalChunks}`, 'error');
    }
}

// 4. 完成上传
async function completeUpload() {
    try {
        const response = await fetch('/file/completeResumableUpload', {
            method: 'POST',
            headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
            body: new URLSearchParams({
                uploadId: uploadId,
                fileName: currentFile.name,
                bucketName: 'test-bucket'
            })
        });

        const result = await response.json();
        if (result.code === "0000") {
            showMessage('文件上传完成!', 'success');
            document.getElementById('statusText').textContent = '上传完成';
        } else {
            showMessage('完成上传失败: ' + result.msg, 'error');
        }
    } catch (error) {
        showMessage('完成上传失败: ' + error.message, 'error');
    }
}

// 5. 暂停与恢复
function pauseUpload() {
    isPaused = true;
    isUploading = false;
    document.getElementById('statusText').textContent = '已暂停';
    document.getElementById('pauseBtn').disabled = true;
    document.getElementById('resumeBtn').disabled = false;
}

async function resumeUpload() {
    document.getElementById('resumeBtn').disabled = true;
    await uploadChunks(); // 从断点继续
}

// 6. 取消上传
function cancelUpload() {
    if (uploadId) {
        fetch('/file/cancelResumableUpload', {
            method: 'POST',
            headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
            body: new URLSearchParams({
                uploadId: uploadId,
                fileName: currentFile.name,
                bucketName: 'test-bucket'
            })
        });
    }
    resetUpload();
    showMessage('上传已取消', 'error');
}

前端关键点说明:

  • 文件切片:使用 File.slice(start, end) 按固定大小切分文件,最后一片可能小于 chunkSize
  • 断点续传:通过 uploadedChunks Set 记录已成功分片,初始化时从服务端获取已存在分片列表
  • 重试机制:每片失败后最多重试 3 次,采用指数退避延迟(1s、2s、3s)
  • 状态管理:通过 isUploadingisPaused 控制上传流程,支持暂停/恢复/取消
  • 进度更新:实时更新分片状态 UI 与整体进度百分比
  • 错误处理:区分网络错误、服务端错误,提供用户友好的错误提示

网站公告

今日签到

点亮在社区的每一天
去签到