[AI-video] 任务编排 | 任务状态管理 | task.py-->state.py | memo&redis

发布于:2025-07-17 ⋅ 阅读:(13) ⋅ 点赞:(0)

第三章:任务编排

欢迎来到MoneyPrinterTurbo的第三章!

在前两章中,我们学习了网页用户界面(UI)——这是告知系统需求的交互界面,以及应用配置——系统参数与凭证的存储机制。

现在,让我们探讨连接用户请求与最终视频输出的核心机制:任务编排

什么是任务编排?

想象我们正在执导一部电影。剧组包含编剧团队、配音演员、素材设计师(负责场景素材)和剪辑师。要完成影片制作,需要协调这些团队:首先向编剧传达故事创意,生成剧本;接着将剧本交给配音演员录制音频;同时素材团队根据剧本寻找场景素材;最后由剪辑师整合所有元素(音频、视频片段、剧本)成片。

MoneyPrinterTurbo的任务编排系统正如同电影导演!它是中枢调度器,接收用户请求(例如"制作关于’阅读益处’的视频"),协调各个专项"团队"(即服务模块)逐步构建视频。

当我们在UI中点击"生成视频"按钮时,实质上是向任务编排器发出指令。其核心职责包括:

  1. 确保步骤按正确顺序执行
  2. 使每个步骤正确使用前序步骤的输出
  3. 应用UI和配置文件中的参数
  4. 处理步骤执行失败等异常情况

缺乏编排系统将导致混乱——系统无法判断应先生成剧本还是先检索视频素材,更不知如何整合所有元素。

任务编排_相关前文传送:
[es自动化更新] Updatecli编排配置.yaml | dockerfilePath值文件.yml
[Data Pipeline] Airflow DAG | 数据质量检查PyDeequ

视频生成工作流

以下是任务编排器处理视频请求时的主要步骤:

  1. 获取创意:接收视频主题或用户提供的完整剧本
  2. 生成剧本:若仅提供主题,调用大语言模型生成详细剧本
  3. 提取关键词:基于剧本内容提取视频素材检索关键词
  4. 生成音频:通过文本转语音(TTS)服务将剧本转为语音,同时获取时间轴信息
  5. 生成字幕:根据剧本与音频时间轴生成字幕文件(.srt格式
  6. 获取素材:使用关键词从Pexels、Pixabay等源检索视频素材,或使用本地文件
  7. 整合输出:将音频、字幕与视频素材剪辑合成最终视频文件
  8. 完成交付:生成可供下载的最终视频文件

任务编排器确保这些步骤按逻辑顺序执行。

实现原理:start函数

任务编排的核心逻辑位于app/services/task.py文件。其中负责启动整个流程的start函数是关键入口。

当UI中的"生成视频"被点击时,界面收集所有参数并封装为数据结构(详见第五章:数据模型与架构),随后调用该start函数——通常以独立后台进程运行。(类似docker运行的容器)

以下是start函数的简化逻辑:

# 摘自app/services/task.py的简化代码
from loguru import logger
# 导入编排所需的各服务模块
from app.services import llm, voice, material, video
from app.services import state as sm  # 任务状态管理模块(第四章详解)
from app.models.schema import VideoParams  # 输入数据结构定义(第五章)
from app.models import const  # 任务状态常量(第四章)
from app.utils import utils  # 工具函数

def start(task_id, params: VideoParams, stop_at: str = "video"):
    """启动视频生成流程
    
    Args:
        task_id: 任务唯一标识
        params: 来自UI的生成参数
        stop_at: 可选的流程终止节点(用于测试)
    """
    logger.info(f"任务{task_id}启动,终止节点:{stop_at}")

    # 更新任务状态为"处理中"(参见第四章)
    sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=5)

    # --- 步骤1:生成剧本 ---
    logger.info("## 步骤1:生成剧本中...")
    video_script = generate_script(task_id, params)  # 调用大语言模型服务
    if not video_script:  # 剧本生成失败检测
        sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
        return  # 终止流程

    # 更新进度(参见第四章)
    sm.state.update_task(task_id, progress=10)
    if stop_at == "script":  # 可在此阶段终止(测试用途)
        sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100)
        return {"script": video_script}  # 返回生成的剧本

    # --- 步骤2:生成关键词 ---
    # 当视频源非本地时执行
    if params.video_source != "local":
        logger.info("## 步骤2:生成关键词中...")
        video_terms = generate_terms(task_id, params, video_script)  # 调用大语言模型
        if not video_terms:
            sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
            return

    # 保存剧本数据(工具函数)
    save_script_data(task_id, video_script, video_terms, params)

    if stop_at == "terms":
        sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100)
        return {"script": video_script, "terms": video_terms}

    sm.state.update_task(task_id, progress=20)

    # --- 步骤3:生成音频 ---
    logger.info("## 步骤3:生成音频中...")
    # 调用语音服务(如Azure、Edge)
    audio_file, audio_duration, sub_maker = generate_audio(task_id, params, video_script)
    if not audio_file:
        sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
        return

    sm.state.update_task(task_id, progress=30)
    if stop_at == "audio":
        sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100)
        return {"audio_file": audio_file, "audio_duration": audio_duration}

    # --- 步骤4:生成字幕 ---
    # 调用字幕服务(或使用语音服务提供的信息)
    subtitle_path = generate_subtitle(task_id, params, video_script, sub_maker, audio_file)

    if stop_at == "subtitle":
        sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100)
        return {"subtitle_path": subtitle_path}

    sm.state.update_task(task_id, progress=40)

    # --- 步骤5:获取视频素材 ---
    logger.info("## 步骤5:获取视频素材中...")
    # 调用素材服务(Pexels、Pixabay或本地)
    downloaded_videos = get_video_materials(task_id, params, video_terms, audio_duration)
    if not downloaded_videos:
        sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
        return

    if stop_at == "materials":
        sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100)
        return {"materials": downloaded_videos}

    sm.state.update_task(task_id, progress=50)

    # --- 步骤6:生成最终视频 ---
    logger.info("## 步骤6:生成最终视频中...")
    # 调用视频服务(FFmpeg)
    final_video_paths, combined_video_paths = generate_final_videos(
        task_id, params, downloaded_videos, audio_file, subtitle_path
    )

    if not final_video_paths:
        sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
        return

    logger.success(f"任务{task_id}成功完成!")
    # 更新状态为完成并返回结果(参见第四章)
    sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, videos=final_video_paths)

    return {"videos": final_video_paths}  # 返回最终视频路径

# start函数调用的辅助函数(简化版):
# def generate_script(...): ... 调用llm.generate_script(...) ...
# def generate_terms(...): ... 调用llm.generate_terms(...) ...
# def generate_audio(...): ... 调用voice.tts(...) ...
# def generate_subtitle(...): ... 调用subtitle.create(...) ...
# def get_video_materials(...): ... 调用material.download_videos(...)或预处理本地素材...
# def generate_final_videos(...): ... 调用video.combine_videos(...)和video.generate_video(...) ...
# def save_script_data(...): ... 保存剧本/关键词/参数至文件...

此简化代码展示了start函数如何协调各步骤。它调用辅助函数(generate_scriptgenerate_audio等),这些辅助函数进而调用具体服务模块(如llm.generate_scriptvoice.ttsmaterial.download_videosvideo.generate_video)。

关键机制在于:每个主要步骤完成后检查执行结果,并更新任务状态(详见第四章:任务状态管理)。若某步骤失败,编排器终止流程并标记任务失败;若成功则更新进度并进入下一阶段。

运作流程:序列图

以下是从UI发起视频生成任务的简化流程图示:

task.py–>state.py

在这里插入图片描述

此图示展示了任务编排器(由task.py实现)作为中枢调度器的角色:接收初始请求,按顺序调用各服务模块,等待响应,并通过任务状态管理器实时更新进度

编排器的核心职责

总结app/services/task.py中任务编排器的主要职责:

职责维度 实现方式
步骤顺序控制 按固定顺序调用辅助函数(generate_script、generate_audio等)
数据流管理 将前序步骤输出(如video_script)作为输入传递给后续步骤(如generate_audio)
参数传递 接收用户/配置参数(params),分发给各服务模块
异常处理 检查每个步骤的执行结果,失败时终止流程
进度上报 调用任务状态管理器(第四章)更新状态
结果交付 收集最终输出(如视频路径)并在任务完成时返回

该模块如同粘合剂,将各专项服务(大语言模型服务语音合成服务视频素材服务视频生成服务)整合为完整自动化流程。

总结

任务编排是MoneyPrinterTurbo视频生成流程的"大脑"。

接收用户请求,解析必要步骤,协调各服务模块按序执行,管理模块间信息流动,处理异常并上报进度。app/services/task.py中的start函数是这一机制的核心入口。

现在我们已经理解整体工作流,但界面如何实时显示进度(如"生成剧本中…"、“获取素材中…”)?如何判断任务是否完成或失败?这需要任务状态管理机制的支撑。

下一章我们将深入探讨任务状态管理,解析系统如何追踪每个任务的状态与进度。

下一章:任务状态管理


第四章:任务状态管理

在前几章中,我们学习了网页用户界面(UI)(控制面板)和应用配置(系统参数设置),并深入探讨了任务编排——构建视频的核心流程。

现在假设我们已经点击"生成视频"按钮,任务编排器开始调用各服务模块。但我们如何得知当前进度?系统是在生成剧本?检索素材?是否卡顿?是否完成?

这正是任务状态管理的职责所在。

什么是任务状态管理?

任务状态管理是系统追踪每次视频生成请求的机制。每次点击"生成视频"都会创建新"任务",由于视频生成涉及多步骤且耗时,系统需要记录:

  1. 任务标识(需唯一ID
  2. 当前状态等待中/处理中/完成/失败)
  3. 进度百分比(整体完成度
  4. 关联数据(如完成时的视频路径

任务状态管理模块承载这些信息的存储与更新,是UI显示进度、通知结果的核心组件。

状态追踪的重要性

  • 用户反馈:我们需要实时查看进度,避免无响应等待
  • 异常处理:任务失败时记录错误详情以便排查
  • 持久化:应用重启后可通过Redis等存储恢复任务状态
  • 多任务处理:支持潜在的多任务并行管理(当前UI主要处理单任务)

状态管理核心要素

1. 任务ID

每个任务分配唯一标识符,类似视频请求的"身份证"。当UI查询任务xyz-123时,系统能准确定位对应请求。

2. 任务状态

系统预定义四种标准状态:

状态名称 描述 数值常量(来自app/models/const.py
TASK_STATE_PENDING 任务等待处理 0
TASK_STATE_PROCESSING 任务正在执行 1
TASK_STATE_COMPLETE 任务成功完成 2
TASK_STATE_FAILED 任务执行失败 3

状态值随任务流程动态变更。

3. 进度指示

0%-100%的数值,反映整体完成度。编排器在完成关键步骤(如剧本生成、音频合成)时更新该值。

4. 附加数据

存储任务关联信息:

  • 完成状态包含最终视频路径
  • 失败状态记录错误日志
  • 处理中状态可携带中间数据

状态存储机制

系统提供两种存储方案:

  1. 内存存储(MemoryState)

    • 实现方式:使用Python字典临时存储
    • 优势:默认配置,零部署成本
    • 局限:应用重启后状态丢失
  2. Redis存储(RedisState)

    • 实现方式:连接Redis数据库持久化存储
    • 优势:支持应用重启后状态恢复
    • 要求:需配置Redis服务器参数

存储方式由config.tomlenable_redis参数控制(第二章)。

实现解析:state.py模块

核心代码位于app/services/state.py,结构如下:

# 摘自app/services/state.py的简化代码
from abc import ABC, abstractmethod
from app.models import const
from app.config import config

# 状态管理抽象基类
class BaseState(ABC):
    @abstractmethod
    def update_task(self, task_id: str, state: int, progress: int = 0, **kwargs):
        pass  # 更新任务状态、进度及其他数据

    @abstractmethod
    def get_task(self, task_id: str):
        pass  # 获取任务当前状态

# 内存存储实现
class MemoryState(BaseState):
    def __init__(self):
        self._tasks = {}  # 内存字典存储

    def update_task(self, task_id, state=const.TASK_STATE_PROCESSING, progress=0, **kwargs):
        self._tasks[task_id] = {
            "task_id": task_id,
            "state": state,
            "progress": progress,
            **kwargs  # 扩展字段(如videos/error)
        }

# Redis存储实现
class RedisState(BaseState):
    def __init__(self, host="localhost", port=6379, db=0, password=None):
        import redis
        self._redis = redis.StrictRedis(host=host, port=port, db=db, password=password)

    def update_task(self, task_id, state=const.TASK_STATE_PROCESSING, progress=0, **kwargs):
        # 使用Redis哈希表存储
        fields = {"task_id": task_id, "state": state, "progress": progress, **kwargs}
        for field, value in fields.items():
            self._redis.hset(task_id, field, str(value))

# 全局状态管理器实例
_enable_redis = config.app.get("enable_redis", False)
state = (
    RedisState(config.app.get("redis_host"), ...) 
    if _enable_redis 
    else MemoryState()
)

实现了一个任务状态管理系统,可以跟踪任务的执行状态和进度,支持两种存储方式:内存存储和Redis存储

核心组件

BaseState抽象基类
定义了两个必须实现的方法:

  • update_task():更新任务状态、进度和额外数据
  • get_task():获取任务当前状态信息

MemoryState实现
使用Python字典在内存中存储任务数据:

  • 通过_tasks字典保存所有任务
  • 每个任务以task_id为键,存储状态、进度及其他自定义字段

RedisState实现
使用Redis数据库存储任务数据:

  • 通过Redis的哈希表结构存储每个任务
  • 每个字段(state/progress等)作为哈希表的field-value对

运行逻辑

根据配置文件决定使用哪种存储方式:

  • 如果enable_redis为True,则创建RedisState实例,连接配置的Redis服务器
  • 如果为False或未配置,则使用MemoryState实例(内存存储)

最终通过全局变量state提供统一的状态管理接口,无论底层是内存还是Redis存储,调用方式完全相同。

sum:

该模块通过抽象基类定义接口,提供两种具体实现。(多态)

全局state对象根据配置动态选择存储方式,对其他模块透明。

编排器中的状态更新

任务编排器(task.py)在关键节点调用状态更新:

# 摘自app/services/task.py的简化代码
from app.services import state as sm
from app.models import const

def start(task_id, params):
    # 任务启动
    sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=5)
    
    # 生成剧本后
    sm.state.update_task(task_id, progress=10)
    
    # 音频生成失败
    sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
    
    # 最终完成
    sm.state.update_task(
        task_id, 
        state=const.TASK_STATE_COMPLETE,
        progress=100,
        videos=final_video_paths  # 扩展字段存储结果
    )

UI状态查询机制

Web UI通过轮询获取状态更新:

# Web UI状态查询逻辑示意
def update_ui_status(task_id):
    task_info = sm.state.get_task(task_id)
    
    if task_info["state"] == const.TASK_STATE_PROCESSING:
        show_progress_bar(task_info["progress"])
    elif task_info["state"] == const.TASK_STATE_COMPLETE:
        show_download_links(task_info["videos"])

状态流转示意图

在这里插入图片描述

总结

任务状态管理通过唯一ID预定义状态进度指示扩展数据字段,为MoneyPrinterTurbo提供全流程监控能力。

app/services/state.py模块通过内存/REDIS双存储方案,实现状态信息的临时或持久化存储。任务编排器负责状态更新,Web UI通过状态查询实现用户反馈。

理解状态管理机制后,我们将进一步探讨系统如何结构化处理任务参数、状态数据等复杂信息。

下一章:数据模型与架构


网站公告

今日签到

点亮在社区的每一天
去签到