LangChain4j:基于 SSE 与 Flux 的 AI 流式对话实现方案

发布于:2025-08-17 ⋅ 阅读:(35) ⋅ 点赞:(0)

前言:本文聚焦 SSE Flux 组合,解析二者如何协作:Flux 处理后端 AI 流式输出SSE 将片段推向前端,实现 AI 内容逐段实时展示,为流式交互提供高效解决方案。

流式对话的核心需求

流式对话(如 ChatGPT、豆包等 AI 交互)的核心诉求是 实时、流畅、低延迟地展示动态生成的内容”,具体可拆解为以下需求点:

  1. 单向持续推送
    流式对话的核心数据流向是 "服务器→客户端":用户发送一次提问后,AI 模型在后端逐步生成回答(通常是逐字 / 逐句生成),需要实时将中间结果推送给客户端,而客户端在此过程中无需向服务器发送额外数据,仅需等待结果即可。

  2. 低延迟的片段化传输
    AI 生成回答时,实时返回已生成题目给前端,从而给用户及时的回答,而不是让前端请求一直阻塞等待,最后一起返回。

SSE与Flux结合恰好针对性地满足了上述需求:

Flux 作为 Java 响应式编程的核心类,负责后端流式数据处理 —— 以异步数据流形式封装 AI 逐字生成的内容,支持片段化数据的逐段发射,完美适配 AI “边生成边输出” 的逻辑,是后端处理流式数据的基础工具。

SSE 则专注于网络传输层面的单向推送 —— 基于 HTTP 长连接,客户端一次连接后,服务器可持续推送数据,与 AI 对话中 “后端→客户端” 的单向数据流完全匹配,高效实现实时传输。

两者配合,Flux 在后端接收并处理 AI 生成的片段,SSE 通过长连接将这些片段实时推送给前端,最终实现 “逐字显示” 的流畅体验,在实时性与资源消耗间达到平衡,是流式对话的理想选择。

Flux技术

Flux 是 Java 响应式编程库(如 Project Reactor)中的核心类,专门用于处理异步、流式的数据序列,可以理解为 “能动态产生多个数据元素的数据流容器”。它的设计非常适合需要逐段生成、实时处理数据的场景(如 AI 流式输出、实时日志等)

在 AI 流式场景中的作用

以 AI 对话为例:

当调用 AI 模型生成回答时,模型并非一次性输出完整内容,而是逐字 / 逐句计算(类似人 “边想边说”)。这些实时生成的片段(如 “你”“好”)会被依次 “发射” 到 Flux 中,形成一个持续的数据流。后端可通过 Flux 的 map 操作将片段转换为 SSE 格式,再推送给前端,实现 “逐字显示” 的效果。

简单说,Flux 是后端 “接住” 并处理 AI 流式输出的 “传送带”,让数据能按生成顺序实时流动、加工,为后续的传输(如通过 SSE 推给前端)提供基础。

我们可以对 Flux 对象进行下列操作:

SSE 技术

基本概念

服务器发送事件(Server-Sent Events)是一种用于从服务器到客户端的 单向、实时 数据传输技术,基于 HTTP协议实现。

它有几个重要的特点:

  1. 单向通信:SSE 只支持服务器向客户端的单向通信,客户端不能向服务器发送数据。
  2. 文本格式:SSE 使用 纯文本格式 传输数据,使用 HTTP 响应的 text/event-stream MIME 类型。
  3. 保持连接:SSE 通过保持一个持久的 HTTP 连接,实现服务器向客户端推送更新,而不需要客户端频繁轮询。
  4. 自动重连:如果连接中断,浏览器会自动尝试重新连接,确保数据流的连续性。

SSE 数据格式

SSE 数据流的格式非常简单,使用 event 指定事件名称,用于区分不同类型的消息。每个事件使用 data 字段,作为消息主体,事件以两个换行符结束。还可以使用 id 字段来标识事件,并且 retry 字段可以设置重新连接的时间间隔。

event: 事件名\n    // 可选,用于区分不同类型的消息
data: 消息内容\n    // 必选,消息主体(可多行,每行以 data: 开头)
id: 消息ID\n       // 可选,用于客户端记录最后接收的消息ID(重连时可通过 Last-Event-ID 头传递)
retry: 重连时间(毫秒)\n  // 可选,指定客户端重连间隔
\n  // 空行表示一条消息结束

示例格式如下:

data: Third message\n
id: 3\n
\n
retry: 10000\n
data: Fourth message\n
\n

SSE 与 Flux 的结合实现AI输出流式对话

后端代码

@GetMapping(value = "/chat/gen/code", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatToGenCode(@RequestParam Long appId,
                                                   @RequestParam String message,
                                                   HttpServletRequest request) {
    // 参数校验
    ThrowUtils.throwIf(appId == null || appId <= 0, ErrorCode.PARAMS_ERROR, "应用ID无效");
    ThrowUtils.throwIf(StrUtil.isBlank(message), ErrorCode.PARAMS_ERROR, "用户消息不能为空");
    // 获取当前登录用户
    User loginUser = userService.getLoginUser(request);
    // 调用服务生成代码(流式)
    Flux<String> contentFlux = appService.chatToGenCode(appId, message, loginUser);
    // 转换为 ServerSentEvent 格式
    return contentFlux
            .map(chunk -> {
                // 将内容包装成JSON对象
                Map<String, String> wrapper = Map.of("d", chunk);
                String jsonData = JSONUtil.toJsonStr(wrapper);
                return ServerSentEvent.<String>builder()
                        .data(jsonData)
                        .build();
            });
}

后端用 Flux 处理 AI 流式输出,通过 SSE 协议推送

重难点解析:

1  @GetMapping(value = "/chat/gen/code", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

produces = MediaType.TEXT_EVENT_STREAM_VALUE声明接口返回的数据类型为 text/event-stream(SSE 协议的标准媒体类型),告诉浏览器:这是一个流式响应,需要保持连接并持续接收数据,而非一次性响应。

2 public Flux<ServerSentEvent<String>> chatToGenCode(...){}

将 AI 生成的流式代码片段,包装成符合 SSE协议的格式,持续推送给前端

设计原因:

  • 如果只返回 Flux<String>:数据是原始字符串,不符合 SSE 协议格式,前端 EventSource 无法解析,会认为是无效数据。
  • 如果返回单个 ServerSentEvent:只能推送一次数据,无法实现 “持续流式输出”(失去了流式的核心意义)。
  • 只有 Flux<ServerSentEvent<String>> 能同时满足:“持续发射数据”(Flux 的能力) 和 “数据符合 SSE 协议”(ServerSentEvent 的作用),从而实现前端实时接收流式数据的需求。

3 Flux<String> contentFlux = appService.chatToGenCode(appId, message, loginUser);

通过 Flux 实现 "边生成边返回",而非等待 AI 生成完整代码后一次性返回

4    chunk -> {
                // 将内容包装成JSON对象
                Map<String, String> wrapper = Map.of("d", chunk);
                String jsonData = JSONUtil.toJsonStr(wrapper);
  • chunk 是 AI 生成的单个代码片段(如 <div class="login">)。
  • 用 Map.of("d", chunk) 将片段包装成 {"d": "片段内容"} 的Map结构,再将片段转为 JSON 字符串(键 d 可自定义,需与前端解析逻辑对应),方便前端统一解析。

5    return ServerSentEvent.<String>builder()
                        .data(jsonData)
                        .build();

使用ServerSentEvent 类,将 JSON 格式的代码片段包装为符合 SSE 协议的事件对象。

6    .concatWith(Mono.just(
        // 发送结束事件
        ServerSentEvent.<String>builder()
                .event("done")
                .data("")
                .build()
    ))

在原始流的所有数据发送完毕后,额外追加一个自定义事件名为 done的SSE事件

前端代码

  // 开始生成
  isGenerating.value = true
  generationProgress.value = 0
  await generateCode(message, aiMessageIndex)
}

// 生成代码 - 使用 EventSource 处理流式响应
const generateCode = async (userMessage: string, aiMessageIndex: number) => {
  let eventSource: EventSource | null = null
  let streamCompleted = false

  try {
    // 获取 axios 配置的 baseURL
    const baseURL = request.defaults.baseURL || API_BASE_URL

    // 构建URL参数
    const params = new URLSearchParams({
      appId: appId.value || '',
      message: userMessage,
      stream: 'true',
    })

    const url = `${baseURL}/app/chat/gen/code?${params}`

    // 创建 EventSource 连接
    eventSource = new EventSource(url, {
      withCredentials: true,
    })

    let fullContent = ''

    // 处理接收到的消息
    eventSource.onmessage = function (event) {
      if (streamCompleted) return

      try {
        // 解析JSON包装的数据
        const parsed = JSON.parse(event.data)
        const content = parsed.d

        // 拼接内容
        if (content !== undefined && content !== null) {
          fullContent += content
          messages.value[aiMessageIndex].content = fullContent
          messages.value[aiMessageIndex].loading = false
          scrollToBottom()

          // 更新进度
          generationProgress.value = Math.min(90, generationProgress.value + 5)
        }
      } catch (error) {
        console.error('解析消息失败:', error)
        handleError(error, aiMessageIndex)
      }
    }

    // 处理done事件
    eventSource.addEventListener('done', function () {
      if (streamCompleted) return

      streamCompleted = true
      isGenerating.value = false
      generationProgress.value = 100

      // 延迟更新预览,确保后端已完成处理
      setTimeout(async () => {
        await fetchAppInfo()
        updatePreview()
      }, 1000)
    })

前端代码通过 EventSource 与后端 SSE 接口建立连接,通过 onmessage 实时接收并解析后端推送的代码片段(与后端 d 键对应),通过 done 事件监听生成完成信号

重点代码解析

1    eventSource = new EventSource(url, {
     withCredentials: true, // 携带 cookies(如登录凭证),与后端用户认证对应
     });

创建 EventSource 连接(与后端 SSE 接口建立长连接)

2    eventSource.onmessage = function (event) {
      if (streamCompleted) return;

      try {
    // 解析后端返回的 JSON 数据(与后端 Map.of("d", chunk) 对应)
    const parsed = JSON.parse(event.data);
    const content = parsed.d; // 键 "d" 与后端一致

    // 拼接代码片段,实时更新界面(实现流式显示效果)
    if (content !== undefined && content !== null) {
      fullContent += content;
      messages.value[aiMessageIndex].content = fullContent; // 更新 UI
      scrollToBottom(); // 滚动到最新内容
      generationProgress.value = Math.min(90, generationProgress.value + 5); // 更新进度
        }
      } catch (error) {
        console.error('解析消息失败:', error);
        handleError(error, aiMessageIndex);
      }
    };

处理后端推送的流式数据(与后端 map 操作生成的 SSE 事件对应)

3    eventSource.addEventListener('done', function () {
      if (streamCompleted) return;

      streamCompleted = true; // 标记流已完成
      isGenerating.value = false; // 关闭生成状态
      generationProgress.value = 100; // 进度设为 100%

      // 延迟更新预览(确保后端处理完成)
      setTimeout(async () => {
        await fetchAppInfo();
        updatePreview(); // 生成完成后执行后续操作(如预览代码)
      }, 1000);
    });

处理后端发送的结束事件(与后端 concatWith 中的 done 事件对应)

对应关系

角色 技术实现 作用
后端数据处理 Flux<String> 接收 AI 生成的流式片段(如代码片段)
后端传输协议 Flux<ServerSentEvent<String>> 将 Flux 片段包装为 SSE 格式
前端接收协议 EventSource 建立 SSE 连接,接收后端推送的流式数据

完整工作流程

1.后端用 Flux 处理流式数据

  • appService.chatToGenCode(...) 调用 AI 模型,返回 Flux<String>—— 这个 Flux 会 “逐段” 发射 AI 生成的代码(比如先返回 <html>, 再返回 <body>, 等等)。
  • 后端通过 map 操作,将每个代码片段 chunk 包装成 SSE 协议要求的格式(ServerSentEvent 对象,包含 data 字段),确保符合 SSE 数据规范。

2.后端通过 SSE 协议推送数据

  • 接口标注 produces = MediaType.TEXT_EVENT_STREAM_VALUE,告诉浏览器:这是一个 SSE 流,需要保持连接并接收持续推送。
  • 每个 ServerSentEvent 会被转换为 SSE 格式的文本(如 data: {"d": "<html>"}),通过长连接推送给前端。

3.前端用 EventSource 接收并处理

  • 前端创建 EventSource 连接到后端接口,建立 SSE 长连接。
  • 每收到一个 SSE 消息(eventSource.onmessage),就解析出代码片段 content,并实时更新到页面(messages.value[aiMessageIndex].content = fullContent),实现 “逐段显示” 的流式效果。
  • 额外处理 done 事件(生成完成)和 business-error 事件(业务错误),完善交互逻辑。

核心配合点

  • Flux 负责 “内部流式处理”:在后端接收 AI 生成的片段,通过响应式编程实现高效的异步处理。
  • SSE 负责 “外部流式传输”:后端将 Flux 中的片段包装为 SSE 格式,通过 HTTP 长连接推给前端;前端用 EventSource 接收,完成从 “后端数据” 到 “用户界面” 的实时展示。
  • 两者结合,既利用了 Flux 对后端流式数据的处理能力,又通过 SSE 协议实现了前端的实时接收,最终达成 “AI 生成内容逐段显示” 的效果。

大功告成!


网站公告

今日签到

点亮在社区的每一天
去签到