【Easylive】视频删除方法详解:重点分析异步线程池使用

发布于:2025-04-08 ⋅ 阅读:(19) ⋅ 点赞:(0)

【Easylive】项目常见问题解答(自用&持续更新中…) 汇总版

方法整体功能

这个deleteVideo方法是一个综合性的视频删除操作,主要完成以下功能:

  1. 权限验证:检查视频是否存在及用户是否有权限删除
  2. 核心数据删除:删除视频主信息、投稿信息
  3. 经济系统调整:扣除用户发布视频获得的硬币
  4. 搜索索引清理:从Elasticsearch中移除文档
  5. 异步清理关联数据:使用线程池异步删除分P视频、弹幕、评论等关联数据及物理文件

重点:异步线程池部分详解

1. 线程池初始化

private static ExecutorService executorService = Executors.newFixedThreadPool(10);

线程池类型:固定大小线程池(10个线程)
特点
• 池中线程数量固定不变
• 适合已知并发量的稳定负载场景
• 超出线程数的任务会在队列中等待
潜在问题
• 使用无界队列(默认LinkedBlockingQueue),可能导致OOM
• 静态变量生命周期与应用一致,可能造成线程泄漏

2. 异步任务执行逻辑

executorService.execute(() -> {
    // 异步任务代码块
});

任务封装:使用Lambda表达式封装Runnable任务
执行方式execute()方法提交任务到线程池
与事务的关系
• 异步任务在新线程中执行
不受主方法@Transactional注解影响,形成独立的事务上下文
• 若异步操作需要事务,需在任务内部添加事务注解

3. 异步任务具体操作

(1) 查询和删除分P视频
VideoInfoFileQuery videoInfoFileQuery = new VideoInfoFileQuery();
videoInfoFileQuery.setVideoId(videoId);
List<VideoInfoFile> videoInfoFileList = this.videoInfoFileMapper.selectList(videoInfoFileQuery);
videoInfoFileMapper.deleteByParam(videoInfoFileQuery);

操作顺序:先查询后删除
目的:获取文件路径用于后续物理删除

(2) 删除关联投稿信息
VideoInfoFilePostQuery videoInfoFilePostQuery = new VideoInfoFilePostQuery();
videoInfoFilePostQuery.setVideoId(videoId);
videoInfoFilePostMapper.deleteByParam(videoInfoFilePostQuery);

直接删除:无需查询,根据videoId直接删除

(3) 删除弹幕数据
VideoDanmuQuery videoDanmuQuery = new VideoDanmuQuery();
videoDanmuQuery.setVideoId(videoId);
videoDanmuMapper.deleteByParam(videoDanmuQuery);

批量删除:通过videoId一次性删除所有关联弹幕

(4) 删除评论数据
VideoCommentQuery videoCommentQuery = new VideoCommentQuery();
videoCommentQuery.setVideoId(videoId);
videoCommentMapper.deleteByParam(videoCommentQuery);

级联删除:通常需要确保评论的关联数据(回复、点赞等)也被清理

(5) 物理文件删除
for (VideoInfoFile item : videoInfoFileList) {
    try {
        FileUtils.deleteDirectory(new File(appConfig.getProjectFolder() + item.getFilePath()));
    } catch (IOException e) {
        log.error("删除文件失败,文件路径:{}", item.getFilePath());
    }
}

关键点
• 使用deleteDirectory删除整个目录
• 捕获并记录IO异常,避免任务中断
• 文件路径拼接了项目基础目录(appConfig.getProjectFolder())

4. 异步设计的优缺点分析

优点
  1. 响应速度:主线程快速返回,用户体验好
  2. 资源隔离:IO密集型操作不影响核心业务
  3. 错误隔离:文件删除失败不影响主流程
缺点及风险
  1. 事务不一致

    // 主事务提交后异步任务才执行
    // 若异步任务失败,系统处于不一致状态
    
  2. 错误处理缺失

    // 当前实现没有记录任务执行结果
    // 无法知道异步操作是否成功
    
  3. 资源竞争

    // 固定10个线程可能在高并发时成为瓶颈
    // 文件删除操作可能阻塞其他异步任务
    

5. 改进建议

(1) 增强型线程池配置
private static ExecutorService executorService = new ThreadPoolExecutor(
    5, // 核心线程数
    20, // 最大线程数
    60, TimeUnit.SECONDS, // 空闲线程存活时间
    new ArrayBlockingQueue<>(1000), // 有界队列
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
(2) 添加任务结果处理
Future<?> future = executorService.submit(() -> {
    // 任务代码
});

// 可选:通过Future跟踪任务状态
future.get(10, TimeUnit.SECONDS); // 带超时的等待
(3) 事务补偿机制
@TransactionalEventListener(phase = AFTER_COMMIT)
public void handleAfterCommit(VideoDeleteEvent event) {
    // 主事务提交后执行异步清理
    asyncCleanService.cleanVideoResources(event.getVideoId());
}
(4) 完善日志监控
executorService.execute(() -> {
    MDC.put("traceId", UUID.randomUUID().toString());
    try {
        // 任务代码
        log.info("视频资源清理完成: {}", videoId);
    } catch (Exception e) {
        log.error("视频资源清理失败: {}", videoId, e);
        // 发送告警或记录失败状态
    } finally {
        MDC.clear();
    }
});

总结

这个删除方法通过线程池实现了:

  1. 核心数据同步删除:保证关键数据立即清除
  2. 资源异步清理:提升响应速度
  3. 物理文件删除:释放存储空间

关键改进方向
• 线程池参数优化
• 完善错误处理和状态跟踪
• 考虑引入事务事件机制
• 增加监控和告警能力

这种设计适合对实时性要求高但允许最终一致性的场景,是典型的"快速响应+后台清理"架构模式。

异步线程池及executorService.execute详解

一、异步线程池基础

1. 线程池核心概念

线程池是一种线程管理机制,它维护着多个线程,避免频繁创建和销毁线程带来的性能开销。在Java中,主要通过ExecutorService接口及其实现类来使用线程池。

2. 线程池关键参数

参数 说明 示例值
corePoolSize 核心线程数 10
maximumPoolSize 最大线程数 50
keepAliveTime 空闲线程存活时间 60秒
workQueue 任务队列 new LinkedBlockingQueue(1000)
threadFactory 线程创建工厂 Executors.defaultThreadFactory()
handler 拒绝策略 AbortPolicy

3. 线程池工作流程

  1. 提交任务时,优先使用核心线程处理
  2. 核心线程全忙时,任务进入队列
  3. 队列满时,创建新线程(不超过maxPoolSize)
  4. 线程数达最大值且队列满时,触发拒绝策略

二、executorService.execute方法详解

1. 方法签名

void execute(Runnable command)

2. 核心特点

异步执行:立即返回,不阻塞调用线程
无返回值:适用于不需要获取结果的场景
异常处理:任务异常会传递给未捕获异常处理器

3. 执行流程

Caller Executor Queue Worker execute(task) 立即执行 放入队列 队列非空时取出执行 alt [有可用核心线程] [无可用核心线程] 任务完成 Caller Executor Queue Worker

4. 在示例代码中的使用

executorService.execute(() -> {
    // 1. 查询和删除分P视频
    VideoInfoFileQuery videoInfoFileQuery = new VideoInfoFileQuery();
    videoInfoFileQuery.setVideoId(videoId);
    List<VideoInfoFile> videoInfoFileList = this.videoInfoFileMapper.selectList(videoInfoFileQuery);
    videoInfoFileMapper.deleteByParam(videoInfoFileQuery);
    
    // 2. 删除其他关联数据...
    
    // 3. 删除物理文件
    for (VideoInfoFile item : videoInfoFileList) {
        try {
            FileUtils.deleteDirectory(new File(appConfig.getProjectFolder() + item.getFilePath()));
        } catch (IOException e) {
            log.error("删除文件失败,文件路径:{}", item.getFilePath());
        }
    }
});

5. 为什么使用execute而不是submit?

对比项 execute submit
返回值 Future对象
异常处理 直接抛出 封装在Future中
适用场景 简单异步任务 需要获取结果的任务
示例代码 当前场景适合 需要结果时使用

在当前场景下:
• 不需要获取清理操作的结果
• 简单的日志记录已足够
• 更轻量级的执行方式

三、线程池配置优化建议

1. 当前实现的潜在问题

private static ExecutorService executorService = Executors.newFixedThreadPool(10);

• 使用无界队列(默认LinkedBlockingQueue),可能导致OOM
• 固定线程数无法应对突发流量
• 缺少合理的拒绝策略

2. 推荐改进方案

private static ExecutorService executorService = new ThreadPoolExecutor(
    5,                              // 核心线程数
    20,                             // 最大线程数
    60, TimeUnit.SECONDS,           // 空闲线程存活时间
    new ArrayBlockingQueue<>(1000), // 有界队列
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

3. 各参数说明

  1. corePoolSize=5:保持5个常驻线程
  2. maxPoolSize=20:突发流量时可扩展到20线程
  3. keepAliveTime=60s:空闲线程60秒后回收
  4. 有界队列(1000):防止资源耗尽
  5. CallerRunsPolicy:队列满时由调用线程执行任务

四、异常处理机制

1. 当前实现的异常处理

try {
    FileUtils.deleteDirectory(...);
} catch (IOException e) {
    log.error("删除文件失败...");
}

• 仅记录日志,无恢复机制
• 异常不会传播到主线程

2. 增强型异常处理方案

方案1:全局异常处理器
executorService = new ThreadPoolExecutor(
    // ...其他参数
    new ThreadPoolExecutor.AbortPolicy() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 记录被拒绝的任务
            log.warn("Task rejected: {}", r.toString());
            super.rejectedExecution(r, e);
        }
    }
);

// 设置未捕获异常处理器
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
    log.error("Uncaught exception in thread: {}", t.getName(), e);
});
方案2:封装任务
public class SafeRunnable implements Runnable {
    private final Runnable task;
    
    public SafeRunnable(Runnable task) {
        this.task = task;
    }
    
    @Override
    public void run() {
        try {
            task.run();
        } catch (Exception e) {
            log.error("Task execution failed", e);
            // 可添加重试或补偿逻辑
        }
    }
}

// 使用方式
executorService.execute(new SafeRunnable(() -> {
    // 任务代码
}));

五、性能监控建议

1. 添加线程池监控

// 定时打印线程池状态
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
    ThreadPoolExecutor tpe = (ThreadPoolExecutor) executorService;
    log.info(
        "Pool stats: active={}, queue={}/{}, completed={}",
        tpe.getActiveCount(),
        tpe.getQueue().size(),
        tpe.getQueue().remainingCapacity(),
        tpe.getCompletedTaskCount()
    );
}, 1, 1, TimeUnit.MINUTES);

2. 关键监控指标

指标 说明 健康值参考
activeCount 活动线程数 < maxPoolSize
queueSize 队列大小 < queueCapacity * 0.8
completedTaskCount 已完成任务 持续增长
rejectedCount 被拒绝任务 = 0

六、实际应用场景分析

1. 当前视频删除场景特点

耗时操作:文件删除可能很慢
非关键路径:不影响主业务流程
允许延迟:最终一致性即可
可能失败:文件可能被占用等

2. 为什么适合使用线程池?

  1. 解耦:将清理操作与主业务分离
  2. 提速:主线程快速返回
  3. 可控:通过线程池限制资源使用
  4. 可扩展:方便添加重试等机制

3. 潜在风险及应对

风险 应对措施
线程泄漏 使用有界队列,合理配置存活时间
任务丢失 添加持久化队列或任务记录
资源竞争 监控和动态调整线程池参数
异常传播 完善任务级别的异常处理

七、总结最佳实践

  1. 选择合适的线程池类型:根据场景选择fixed/cached/custom
  2. 使用有界队列:防止资源耗尽
  3. 配置合理的拒绝策略:如CallerRunsPolicy
  4. 完善异常处理:任务级别和全局级别
  5. 添加监控:实时了解线程池状态
  6. 考虑任务重要性:关键任务建议使用带返回值的submit

在视频删除场景中,通过线程池异步处理清理任务是一种合理的设计,但需要注意:
• 线程池参数的合理配置
• 异常情况的妥善处理
• 重要操作的日志记录
• 系统资源的监控告警


网站公告

今日签到

点亮在社区的每一天
去签到