在 Coze 平台处理长时间(5分钟)图生视频任务并确保后续流程正确执行,
需要采用 同步 或者异步工作流架构。以下是具体解决方案:
一:同步工作流解决方案:简单高效的图生视频超时处理
核心方案:分阶段轮询 + 状态保持
具体实现步骤
1. 创建视频生成服务(需要独立部署)
# 简单Flask示例 (部署在单独服务器)
from flask import Flask, request, jsonify
import threading
import time
import uuid
app = Flask(__name__)
tasks = {}
def video_generation_task(task_id, image_data):
"""模拟5分钟的视频生成任务"""
# 实际项目中调用图生视频模型
print(f"开始生成视频任务: {task_id}")
time.sleep(300) # 模拟5分钟处理时间
tasks[task_id] = {
'status': 'completed',
'video_url': f'https://storage.example.com/videos/{task_id}.mp4'
}
print(f"视频生成完成: {task_id}")
@app.route('/generate', methods=['POST'])
def generate_video():
"""启动视频生成任务"""
task_id = str(uuid.uuid4())
image_data = request.json['image']
# 存储初始状态
tasks[task_id] = {'status': 'processing'}
# 在新线程中启动任务
thread = threading.Thread(target=video_generation_task, args=(task_id, image_data))
thread.start()
return jsonify({'task_id': task_id})
@app.route('/status/<task_id>', methods=['GET'])
def check_status(task_id):
"""检查任务状态"""
task = tasks.get(task_id, {})
return jsonify(task)
2. Coze 工作流配置
节点1: 启动视频生成
- 节点类型: HTTP 请求
- 方法: POST
- URL:
https://your-video-service/generate
- 请求体:
{ "image": "{{input.image}}" }
- 保存响应:
task_response
节点2: 提取任务ID
- 节点类型: JavaScript 代码
- 代码:
return { task_id: task_response.task_id };
节点3: 状态轮询循环
- 节点类型: 循环
- 循环条件:
// 最多尝试30次(5分钟) attempt < 30 && status !== "completed"
- 循环内容:
- HTTP 请求 (GET):
- URL:
https://your-video-service/status/{{task_id}}
- 保存为:
status_response
- URL:
- 条件分支:
- 如果
status_response.status === "completed"
: 跳出循环 - 否则: 继续
- 如果
- 等待节点: 10秒
- HTTP 请求 (GET):
节点4: 获取视频结果
- 节点类型: JavaScript 代码
- 代码:
return { video_url: status_response.video_url };
节点5: 后续处理
- 使用视频URL执行后续步骤
关键技术点说明
状态轮询机制:
- 每60秒检查一次任务状态
- 最大轮询5次(5分钟)
- 避免单次请求超时
轻量级HTTP检查:
- 状态检查接口设计为轻量级操作
- 只返回简单状态信息,不返回大文件
异步任务处理:
- 视频生成在服务器后台线程运行
- 主工作流通过轮询获取状态
超时保护:
// 在循环条件中添加超时保护 const maxAttempts = 30; // 5分钟(30*10秒) let attempt = 0; while (attempt < maxAttempts) { attempt++; const status = await checkStatus(task_id); if (status === 'completed') break; await sleep(10000); // 等待10秒 } if (attempt >= maxAttempts) { throw new Error('视频生成超时'); }
此方案通过简单的轮询机制解决了长时任务在同步工作流中的超时问题,保持了工作流的线性执行流程,同时确保在视频生成完成后继续后续步骤。
二:异步工作流架构(核心方案)
具体实施步骤:
1. 创建异步视频生成服务
# 伪代码:视频生成服务API
@app.post("/generate-video")
async def generate_video(image: UploadFile):
task_id = str(uuid4())
# 存储任务到队列
redis.set(f"task:{task_id}", "processing")
# 异步执行任务
asyncio.create_task(process_video(task_id, image))
return {"task_id": task_id, "status_url": f"/status/{task_id}"}
async def process_video(task_id, image):
# 实际调用视频模型(耗时5分钟)
video_url = video_model.generate(image)
# 存储结果
redis.set(f"task:{task_id}", json.dumps({"status": "completed", "video_url": video_url}))
2. Coze 主工作流配置
步骤 | 操作 | 关键配置 |
---|---|---|
1 | 调用视频服务 | 使用 HTTP 请求节点Method: POST , URL: https://your-service/generate-video |
2 | 获取任务ID | 解析响应中的 task_id |
3 | 存储任务状态 | 使用 Coze 内置数据库 存储 task_id 和状态 |
4 | 返回等待提示 | 输出:“视频生成中,完成后将自动通知…” |
5 | 结束工作流 | 不等待结果,直接结束 |
3. 回调服务实现(关键组件)
# 回调服务伪代码
@app.get("/check-status/{task_id}")
def check_status(task_id):
status_data = redis.get(f"task:{task_id}")
if status_data["status"] == "completed":
# 触发后续工作流
coze_client.trigger_workflow(
workflow_id="downstream-flow",
input_data={"video_url": status_data["video_url"]}
)
return {"status": "completed"}
return {"status": "processing"}
4. 后续工作流配置
- 触发方式:通过 Coze OpenAPI 触发
- 输入参数:接收
video_url
- 执行操作:
- 视频二次处理
- 发送结果通知
- 数据存储等
⚙️ 关键技术保障
1. 状态监控与重试
# 轮询服务添加重试机制
def poll_status(task_id, max_retries=30):
for _ in range(max_retries):
status = requests.get(f"/status/{task_id}").json()
if status["completed"]:
return status
time.sleep(10) # 每10秒检查一次
raise TimeoutError("Video generation timeout")
2. **Coze 平台集成要点
- 使用数据库插件:存储任务状态
// Coze 数据库操作示例 await cozeDB.set(`videoTask_${taskId}`, { status: "processing", startTime: Date.now() });
- 配置 Webhook 接收器:
# coze.yaml 配置片段 webhooks: video_callback: path: /video-callback handler: handleVideoCompletion
3. 超时处理方案
实施示例
- 创建视频生成服务(AWS Lambda + S3)
- 配置Coze数据库:存储任务状态
- 部署回调服务(Vercel Serverless)
- 主工作流:
# Coze工作流伪代码 def main_workflow(image): response = http_post("https://video-api/generate", json={"image": image}) task_id = response['task_id'] db.save("pending_tasks", task_id, status="processing") return "视频生成启动成功!完成后您将收到通知"
- 后续工作流:
def downstream_workflow(video_url): # 添加水印 watermarked = add_watermark(video_url) # 发送给用户 user.send_message(f"视频已生成:{watermarked}")
通过此方案,工作流执行时间从5分钟缩短到秒级,同时确保视频生成完成后自动触发后续操作,完美解决超时问题。实际部署时应根据业务需求添加:
三:最后
还是同步的比较简单,直接在coze 工作流里面就可以搞定了,也不用写代码之类的