怎么做到这一点:让 Agent 可以像人类一样 边听边想、边说,而不是“等一句话 → 一次性返回”

发布于:2025-09-05 ⋅ 阅读:(16) ⋅ 点赞:(0)

要实现“边听边想、边说”,核心是把整条链路做成全双工、分片流式、可中断的流水线:
ASR 连续吐字 →(短缓冲)→ LLM 连续出 token(可抢断)→ TTS 连续合成并播放(可打断/续播)。
下面给你一份可落地的技术设计清单 + 关键参数 + 事件流示例 + 最小骨架。

  1. 架构总览(低时延流水线)

I/O 通道(推荐)
• 端到端:WebSocket / WebRTC(浏览器端)
• 服务间:FastMCP 2.0(工具/模型调用),内部可选 NATS 作为事件总线
• 控制流:MCP 事件(session、tool.call、response.delta…)
• 数据流:音频与文本分道并行(audio.* 与 text.* 两条流)

模块流水线(并行且可抢断)
1. VAD/端点检测(前端或边缘):10–30ms 帧
2. 流式 ASR(CTC/RNNT):每 50–200ms 输出 partial 文本
3. 意图/路由器:对 partial 增量判断(关键词/小模型)
4. LLM 推理:
• 接受 prefix 的增量 prompt
• 流式 token 输出(server-side generation with streaming)
• 支持抢断(barge-in)与提前输出“开场 token”
• 可加推测式解码(spec decoding)与prefix 缓存
5. 流式 TTS:
• 子句粒度 150–300ms 合成/播放块
• 回放端抖动缓冲 100–200ms + 无损打断(丢弃未播块)
6. 回声消除/自回授(AEC + Ducking):边播边录,避免自激

  1. 关键机制(必须具备)

A. 增量传递(partial → commit)
• ASR:持续发送 asr.partial,端点后补发 asr.final(带稳定时间戳)
• LLM:对 asr.partial 采用 防抖(debounce 80–150ms)+ 最小片段长度(>= 3–6 个字 / 2–4 个词)
• TTS:只对 llm.delta 的已成句或子句(中文逗号/句号、英文 punctuation)做合成,减少回退

B. 抢断(barge-in)
• 前端:一旦检测到用户讲话(VAD=active),立刻:
1. 发送 barge_in.start
2. 暂停播放;3) 服务端取消当前 LLM/TTS(cancel token)
• 服务端:
• LLM/ TTS 监听 cancel_id,立即停止/丢弃队尾片段
• 维护会话版本号(gen_id);仅播放/回答最新 gen_id

C. 迟滞控制(latency budget)
• 端到端可感延迟目标 ≤ 300–500ms
• ASR 输出间隔:50–150ms(多一层 80–150ms 防抖)
• LLM 首 token:150–350ms(可用模板热身 & prefix cache)
• TTS 首音:120–250ms(短句优先,文本分块长度 20–40 字)
• 尾音裁剪:TTS 播放块≤ 300ms,避免“长块堵塞”

D. 可靠性与回退
• 弱网:WebRTC/HTTP3(QUIC)优先;WebSocket 备用
• 重连:幂等 session_id + offset 恢复
• 丢包/乱序:块序号(seq),播放端按 seq 重排
• 回声:前端 AEC,或服务端播放音轨镜像做自适应回声抵消

  1. 事件流(FastMCP 2.0 语义示例)
session.created {session_id}
audio.input.delta {pcm_chunk, ts}
audio.input.vad {active=true/false}
asr.partial {text: "我想…", range:{start,end}}
router.intent.partial {name: "qa", conf:0.62}
llm.output.delta {text: "好的,", gen_id:12, seq:1}
tts.output.delta {audio_chunk, gen_id:12, seq:1}
barge_in.start {reason:"user_speaking", gen_id:12}
cancel {target_gen_id:12}
asr.final {text:"我想订明天早上的票", ...}
llm.output.delta {text:"已经为你查到…", gen_id:13, seq:1}
tts.output.delta {audio_chunk, gen_id:13, seq:1}

要点:所有可播放/可回复内容都带 gen_id;前端只消费最大 gen_id 的流。

  1. 具体参数建议(起步即用)
    • ASR:RNNT/CTC 流式,分帧 20–30ms,合帧 200–400ms 出 partial
    • 防抖:ASR→LLM 文本增量 debounce=120ms,最小增量=6–10字
    • LLM:
    • system prompt 固定模板,prefix-caching
    • 首 token 预算 200ms(服务端热身、就近副本、推测式解码)
    • 工具调用:遇到明显意图 并行预取(RAG/DB)
    • TTS:
    • 句法切分:逗号/顿号/句号;英文用 spaCy/sentencizer
    • 合成块:200–300ms;播放缓冲 120–200ms
    • 无点击音打断:跨块零交叉点截断 + 短淡出(5–15ms)
    • 前端音频:
    • 采样 16k/24k 单声道;AEC/NS/AGC 开启;
    • 抖动缓冲 120–180ms;播放速率 1.0–1.05 自适应

  1. 最小骨架(Python/asyncio,伪代码)

说明:示意“边听边想、边说”的并行与可抢断。实际对接你的 ASR/LLM/TTS/MCP。

import asyncio
from contextlib import suppress

class GenState:
    def __init__(self):
        self.gen_id = 0
        self.cancel = asyncio.Event()

state = GenState()

async def asr_stream(audio_in):
    async for partial in audio_in.partials():   # 50-150ms一段
        yield {"type":"asr.partial", "text": partial.text}
    yield {"type":"asr.final", "text": audio_in.final_text()}

async def llm_stream(text_iter):
    async for chunk in llm.generate_stream(text_iter):  # 首token~200ms
        if state.cancel.is_set(): break
        yield {"type":"llm.delta", "text": chunk}

async def tts_stream(text_iter, cur_gen):
    async for seg in tts.synthesize_segments(text_iter, max_ms=300):
        if state.gen_id != cur_gen: break
        yield {"type":"tts.delta", "audio": seg.audio}

async def handle_session(ws):
    while True:
        # 1) 接收音频 → ASR partial
        asr_iter = asr_stream(ws.audio())
        # 2) 组装增量文本(防抖 + 最小增量)
        text_iter = debounce_and_chunk(asr_iter, debounce_ms=120, min_chars=8)
        # 3) 新一轮生成:版本号+取消
        state.gen_id += 1
        cur_gen = state.gen_id
        state.cancel.clear()

        # 4) 并行LLM与TTS
        llm_iter = llm_stream(text_iter)
        tts_iter = tts_stream(sentencize(llm_iter), cur_gen)

        async def pump_llm():
            async for m in llm_iter:
                await ws.send_json(m)
        async def pump_tts():
            async for a in tts_iter:
                await ws.send_audio(a["audio"])

        # 5) 监听barge-in:用户再讲话→取消当前生成
        async def watch_barge_in():
            async for e in ws.events("barge_in.start"):
                state.cancel.set()

        tasks = [asyncio.create_task(p) for p in (pump_llm(), pump_tts(), watch_barge_in())]
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        for t in pending:
            with suppress(asyncio.CancelledError): t.cancel()

要点:
• 版本号 gen_id + 取消事件确保只播放最新一轮的语音
• ASR→LLM 有防抖+最小增量,减少“回退”
• LLM→TTS 先句法切分再合成,块更自然;任一处触发 barge-in 立刻打断

  1. 体验优化招式(锦上添花)
    • “先答框架,后补细节”:LLM先给 1–2 个开场 token(例如“好的,”/“Okay,”),TTS马上播;随后细节边出边播
    • 并行预取:一旦意图>阈值,并行启动 RAG/DB 查询,LLM侧采用工具结果晚绑定
    • 短回应策略:长句拆分为短子句播报,每句≤1.5s,让用户随时能插话
    • 音量压制(ducking):用户说话瞬时,把播放音量降到 10–20%
    • 多模态提示:UI 上对“正在思考/说话/可插话”的状态做清晰指示

  1. 观测与SLO(必做)
    • 指标:ASR_partial_latency、LLM_first_token_latency、TTS_first_packet_latency、BargeIn_to_Silence、Cancel_to_StopPlayback
    • 目标:端到端可感延迟 ≤ 400ms、barge-in 静音 ≤ 120ms、取消到停止播报 ≤ 80ms

  1. FastMCP 2.0 如何串起来
    • 统一事件:audio.input.delta、asr.partial/final、llm.delta、tts.delta、barge_in.start、cancel
    • 统一工具:ASR/TTS/RAG/DB 都通过 MCP tool schema 暴露;Agent 只认协议与事件
    • 统一流控:MCP 层携带 gen_id、seq、cancel_id、ts,服务间无感替换底层实现(Qwen→GPT、不同 TTS/ASR)