第三章:任务编排
欢迎来到MoneyPrinterTurbo的第三章!
在前两章中,我们学习了网页用户界面(UI)——这是告知系统需求的交互界面,以及应用配置——系统参数与凭证的存储机制。
现在,让我们探讨连接用户请求与最终视频输出
的核心机制:任务编排。
什么是任务编排?
想象我们正在执导一部电影。剧组包含编剧团队、配音演员、素材设计师(负责场景素材)和剪辑师。要完成影片制作,需要协调这些团队:首先向编剧传达故事创意,生成剧本;接着将剧本交给配音演员录制音频;同时素材团队根据剧本寻找场景素材;最后由剪辑师整合所有元素(音频、视频片段、剧本)成片。
MoneyPrinterTurbo的任务编排系统正如同电影导演!它是中枢调度器,接收用户请求(例如"制作关于’阅读益处’的视频"),协调各个专项"团队"(即服务模块)逐步构建视频。
当我们在UI中点击"生成视频"按钮时,实质上是向任务编排器发出指令。其核心职责包括:
- 确保步骤按正确顺序执行
- 使每个步骤正确使用前序步骤的输出
- 应用UI和配置文件中的参数
- 处理步骤执行失败等异常情况
缺乏编排系统将导致混乱——系统无法判断应先生成剧本还是先检索视频素材,更不知如何整合所有元素。
任务编排_相关前文传送:
[es自动化更新] Updatecli编排配置.yaml | dockerfilePath值文件.yml
[Data Pipeline] Airflow DAG | 数据质量检查PyDeequ
视频生成工作流
以下是任务编排器处理视频请求时的主要步骤:
获取
创意:接收视频主题或用户提供的完整剧本生成
剧本:若仅提供主题,调用大语言模型生成详细剧本提取关键词
:基于剧本内容提取视频素材检索关键词- 生成
音频
:通过文本转语音(TTS
)服务将剧本转为语音,同时获取时间轴信息 - 生成
字幕
:根据剧本与音频时间轴生成字幕文件(.srt格式
) 获取
素材:使用关键词从Pexels、Pixabay等源检索视频素材,或使用本地文件- 整合
输出
:将音频、字幕与视频素材剪辑合成最终视频文件 - 完成交付:生成可供下载的最终视频文件
任务编排器确保这些步骤按逻辑顺序执行。
实现原理:start
函数
任务编排的核心逻辑位于app/services/task.py
文件。其中负责启动整个流程的start
函数是关键入口。
当UI中的"生成视频"被点击时,界面收集所有参数并封装为数据结构(详见第五章:数据模型与架构),随后调用该start
函数——通常以独立后台进程运行。(类似docker运行的容器)
以下是start
函数的简化逻辑:
# 摘自app/services/task.py的简化代码
from loguru import logger
# 导入编排所需的各服务模块
from app.services import llm, voice, material, video
from app.services import state as sm # 任务状态管理模块(第四章详解)
from app.models.schema import VideoParams # 输入数据结构定义(第五章)
from app.models import const # 任务状态常量(第四章)
from app.utils import utils # 工具函数
def start(task_id, params: VideoParams, stop_at: str = "video"):
"""启动视频生成流程
Args:
task_id: 任务唯一标识
params: 来自UI的生成参数
stop_at: 可选的流程终止节点(用于测试)
"""
logger.info(f"任务{task_id}启动,终止节点:{stop_at}")
# 更新任务状态为"处理中"(参见第四章)
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=5)
# --- 步骤1:生成剧本 ---
logger.info("## 步骤1:生成剧本中...")
video_script = generate_script(task_id, params) # 调用大语言模型服务
if not video_script: # 剧本生成失败检测
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
return # 终止流程
# 更新进度(参见第四章)
sm.state.update_task(task_id, progress=10)
if stop_at == "script": # 可在此阶段终止(测试用途)
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100)
return {"script": video_script} # 返回生成的剧本
# --- 步骤2:生成关键词 ---
# 当视频源非本地时执行
if params.video_source != "local":
logger.info("## 步骤2:生成关键词中...")
video_terms = generate_terms(task_id, params, video_script) # 调用大语言模型
if not video_terms:
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
return
# 保存剧本数据(工具函数)
save_script_data(task_id, video_script, video_terms, params)
if stop_at == "terms":
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100)
return {"script": video_script, "terms": video_terms}
sm.state.update_task(task_id, progress=20)
# --- 步骤3:生成音频 ---
logger.info("## 步骤3:生成音频中...")
# 调用语音服务(如Azure、Edge)
audio_file, audio_duration, sub_maker = generate_audio(task_id, params, video_script)
if not audio_file:
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
return
sm.state.update_task(task_id, progress=30)
if stop_at == "audio":
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100)
return {"audio_file": audio_file, "audio_duration": audio_duration}
# --- 步骤4:生成字幕 ---
# 调用字幕服务(或使用语音服务提供的信息)
subtitle_path = generate_subtitle(task_id, params, video_script, sub_maker, audio_file)
if stop_at == "subtitle":
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100)
return {"subtitle_path": subtitle_path}
sm.state.update_task(task_id, progress=40)
# --- 步骤5:获取视频素材 ---
logger.info("## 步骤5:获取视频素材中...")
# 调用素材服务(Pexels、Pixabay或本地)
downloaded_videos = get_video_materials(task_id, params, video_terms, audio_duration)
if not downloaded_videos:
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
return
if stop_at == "materials":
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100)
return {"materials": downloaded_videos}
sm.state.update_task(task_id, progress=50)
# --- 步骤6:生成最终视频 ---
logger.info("## 步骤6:生成最终视频中...")
# 调用视频服务(FFmpeg)
final_video_paths, combined_video_paths = generate_final_videos(
task_id, params, downloaded_videos, audio_file, subtitle_path
)
if not final_video_paths:
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
return
logger.success(f"任务{task_id}成功完成!")
# 更新状态为完成并返回结果(参见第四章)
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, videos=final_video_paths)
return {"videos": final_video_paths} # 返回最终视频路径
# start函数调用的辅助函数(简化版):
# def generate_script(...): ... 调用llm.generate_script(...) ...
# def generate_terms(...): ... 调用llm.generate_terms(...) ...
# def generate_audio(...): ... 调用voice.tts(...) ...
# def generate_subtitle(...): ... 调用subtitle.create(...) ...
# def get_video_materials(...): ... 调用material.download_videos(...)或预处理本地素材...
# def generate_final_videos(...): ... 调用video.combine_videos(...)和video.generate_video(...) ...
# def save_script_data(...): ... 保存剧本/关键词/参数至文件...
此简化代码展示了start
函数如何协调各步骤。它调用辅助函数(generate_script
、generate_audio
等),这些辅助函数进而调用具体服务模块(如llm.generate_script
、voice.tts
、material.download_videos
、video.generate_video
)。
关键机制在于:每个主要步骤完成后检查执行结果,并更新任务状态(详见第四章:任务状态管理)。若某步骤失败,编排器终止流程并标记任务失败;若成功则更新进度并进入下一阶段。
运作流程:序列图
以下是从UI发起视频生成任务
的简化流程图示:
task.py–>state.py
此图示展示了任务编排器(由
task.py
实现)作为中枢调度器的角色:接收
初始请求,按顺序调用
各服务模块,等待响应
,并通过任务状态管理器实时更新进度
。
编排器的核心职责
总结app/services/task.py
中任务编排器的主要职责:
职责维度 | 实现方式 |
---|---|
步骤顺序控制 | 按固定顺序调用辅助函数(generate_script、generate_audio等) |
数据流管理 | 将前序步骤输出(如video_script)作为输入传递给后续步骤(如generate_audio) |
参数传递 | 接收用户/配置参数(params),分发给各服务模块 |
异常处理 | 检查每个步骤的执行结果,失败时终止流程 |
进度上报 | 调用任务状态管理器(第四章)更新状态 |
结果交付 | 收集最终输出(如视频路径)并在任务完成时返回 |
该模块如同粘合剂,将各专项服务(大语言模型服务、语音合成服务、视频素材服务、视频生成服务)整合为完整自动化流程。
总结
任务编排是MoneyPrinterTurbo视频生成流程的"大脑"。
它接收
用户请求,解析
必要步骤,协调
各服务模块按序执行,管理
模块间信息流动,处理
异常并上报进度。app/services/task.py
中的start
函数是这一机制的核心入口。
现在我们已经理解整体工作流,但界面如何实时显示进度(如"生成剧本中…"、“获取素材中…”)?如何判断任务是否完成或失败?这需要任务状态管理机制的支撑。
下一章我们将深入探讨任务状态管理,解析系统如何追踪每个任务的状态与进度。
第四章:任务状态管理
在前几章中,我们学习了网页用户界面(UI)(控制面板)和应用配置(系统参数设置),并深入探讨了任务编排——构建视频的核心流程。
现在假设我们已经点击"生成视频
"按钮,任务编排器开始调用各服务模块。但我们如何得知当前进度?系统是在生成剧本?检索素材?是否卡顿?是否完成?
这正是任务状态管理的职责所在。
什么是任务状态管理?
任务状态管理是系统追踪每次视频生成请求的机制。每次点击"生成视频"都会创建新"任务",由于视频生成涉及多步骤且耗时,系统需要记录:
- 任务标识(需
唯一ID
) - 当前状态(
等待中
/处理中/完成/失败) - 进度百分比(整体
完成度
) - 关联数据(如完成时的视频
路径
)
任务状态管理模块承载这些信息的存储与更新,是UI显示进度、通知结果的核心组件。
状态追踪的重要性
- 用户反馈:我们需要实时查看进度,避免无响应等待
- 异常处理:任务失败时记录错误详情以便排查
- 持久化:应用重启后可通过Redis等存储恢复任务状态
- 多任务处理:支持潜在的多任务并行管理(当前UI主要处理单任务)
状态管理核心要素
1. 任务ID
每个任务分配唯一标识符,类似视频请求的"身份证"。当UI查询任务xyz-123
时,系统能准确定位对应请求。
2. 任务状态
系统预定义四种标准状态:
状态名称 | 描述 | 数值常量(来自app/models/const.py ) |
---|---|---|
TASK_STATE_PENDING |
任务等待处理 | 0 |
TASK_STATE_PROCESSING |
任务正在执行 | 1 |
TASK_STATE_COMPLETE |
任务成功完成 | 2 |
TASK_STATE_FAILED |
任务执行失败 | 3 |
状态值随任务流程动态变更。
3. 进度指示
0%-100%的数值,反映整体完成度。编排器在完成关键步骤(如剧本生成、音频合成)时更新该值。
4. 附加数据
存储任务关联信息:
- 完成状态包含最终视频路径
- 失败状态记录错误日志
- 处理中状态可携带中间数据
状态存储机制
系统提供两种存储方案:
内存存储(MemoryState)
- 实现方式:使用
Python字典临时存储
- 优势:默认配置,零部署成本
- 局限:应用重启后
状态丢失
- 实现方式:使用
Redis存储(RedisState)
- 实现方式:连接Redis数据库持久化存储
- 优势:支持应用重启后状态恢复
- 要求:需配置
Redis服务器参数
存储方式由config.toml
的enable_redis
参数控制(第二章)。
实现解析:state.py
模块
核心代码位于app/services/state.py
,结构如下:
# 摘自app/services/state.py的简化代码
from abc import ABC, abstractmethod
from app.models import const
from app.config import config
# 状态管理抽象基类
class BaseState(ABC):
@abstractmethod
def update_task(self, task_id: str, state: int, progress: int = 0, **kwargs):
pass # 更新任务状态、进度及其他数据
@abstractmethod
def get_task(self, task_id: str):
pass # 获取任务当前状态
# 内存存储实现
class MemoryState(BaseState):
def __init__(self):
self._tasks = {} # 内存字典存储
def update_task(self, task_id, state=const.TASK_STATE_PROCESSING, progress=0, **kwargs):
self._tasks[task_id] = {
"task_id": task_id,
"state": state,
"progress": progress,
**kwargs # 扩展字段(如videos/error)
}
# Redis存储实现
class RedisState(BaseState):
def __init__(self, host="localhost", port=6379, db=0, password=None):
import redis
self._redis = redis.StrictRedis(host=host, port=port, db=db, password=password)
def update_task(self, task_id, state=const.TASK_STATE_PROCESSING, progress=0, **kwargs):
# 使用Redis哈希表存储
fields = {"task_id": task_id, "state": state, "progress": progress, **kwargs}
for field, value in fields.items():
self._redis.hset(task_id, field, str(value))
# 全局状态管理器实例
_enable_redis = config.app.get("enable_redis", False)
state = (
RedisState(config.app.get("redis_host"), ...)
if _enable_redis
else MemoryState()
)
实现了一个任务状态管理系统,可以跟踪任务的执行状态和进度,支持两种存储方式:内存存储和Redis存储。
核心组件
BaseState抽象基类
定义了两个必须实现的方法:
update_task()
:更新任务状态、进度和额外数据get_task()
:获取任务当前状态信息
MemoryState实现
使用Python字典在内存中存储任务数据:
- 通过
_tasks
字典保存所有任务 - 每个任务以task_id为键,存储状态、进度及其他自定义字段
RedisState实现
使用Redis数据库存储任务数据:
- 通过Redis的哈希表结构存储每个任务
- 每个字段(state/progress等)作为哈希表的field-value对
运行逻辑
根据配置文件决定使用哪种存储方式:
- 如果
enable_redis
为True,则创建RedisState实例,连接配置的Redis服务器- 如果为False或未配置,则使用MemoryState实例(内存存储)
最终通过全局变量state
提供统一的状态管理接口,无论底层是内存还是Redis存储,调用方式完全相同。
sum:
该模块通过抽象基类
定义接口,提供两种具体实现。(多态)
全局state
对象根据配置动态选择存储方式,对其他模块透明。
编排器中的状态更新
任务编排器(task.py
)在关键节点调用状态更新:
# 摘自app/services/task.py的简化代码
from app.services import state as sm
from app.models import const
def start(task_id, params):
# 任务启动
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=5)
# 生成剧本后
sm.state.update_task(task_id, progress=10)
# 音频生成失败
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
# 最终完成
sm.state.update_task(
task_id,
state=const.TASK_STATE_COMPLETE,
progress=100,
videos=final_video_paths # 扩展字段存储结果
)
UI状态查询机制
Web UI通过轮询获取状态更新:
# Web UI状态查询逻辑示意
def update_ui_status(task_id):
task_info = sm.state.get_task(task_id)
if task_info["state"] == const.TASK_STATE_PROCESSING:
show_progress_bar(task_info["progress"])
elif task_info["state"] == const.TASK_STATE_COMPLETE:
show_download_links(task_info["videos"])
状态流转示意图
总结
任务状态管理通过唯一ID
、预定义状态
、进度指示
和扩展数据字段
,为MoneyPrinterTurbo提供全流程监控能力。
app/services/state.py
模块通过内存/REDIS双存储方案
,实现状态信息的临时或持久化存储。任务编排器负责状态更新
,Web UI通过状态查询实现用户反馈。
理解状态管理机制后,我们将进一步探讨系统如何结构化处理任务参数、状态数据等复杂信息。