项目实战:基于Spring WebFlux与LangChain4j实现大语言模型流式输出

发布于:2025-04-05 ⋅ 阅读:(36) ⋅ 点赞:(0)

一、背景

        在大语言模型(LLM)应用场景中,GPT-4等模型的响应生成往往需要数秒至数十秒的等待时间。传统同步请求会导致用户面对空白页面等待,体验较差。本文通过Spring WebFlux响应式编程SSE服务器推送技术,实现类似打印机的逐字流式输出效果,同时结合LangChain4j框架进行AI能力集成,有效提升用户体验。

二、技术选型

  1. Spring WebFlux:基于 Reactor 的异步非阻塞 Web 框架
  2. SSE(Server-Sent Events):轻量级服务器推送技术
  3. LLM框架:LangChain4j 
  4. 大模型 API:以 OpenAI 的 GPT-4 (实际大模型是deepseek)
  5. 开发工具:IntelliJ IDEA + JDK 17

三、Spring WebFlux介绍

Spring Webflux 教程 - spring 中文网

这里就不多介绍了,网上教程很多

四、整体方案

五、实现步骤

1、pom依赖

        <dependency>
            <groupId>io.milvus</groupId>
                <artifactId>milvus-sdk-java</artifactId>
            <version>2.5.1</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
                <artifactId>langchain4j-milvus</artifactId>
            <version>0.36.2</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
                <artifactId>langchain4j-embeddings-all-minilm-l6-v2</artifactId>
            <version>0.36.2</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
                <artifactId>langchain4j-open-ai</artifactId>
            <version>0.36.2</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
                <artifactId>langchain4j-open-ai-spring-boot-starter</artifactId>
            <version>0.36.2</version>
        </dependency>

        <dependency>
            <groupId>dev.langchain4j</groupId>
                <artifactId>langchain4j-reactor</artifactId>
            <version>0.36.2</version>
        </dependency>

2、controller层

content-type= text/event-stream

@ApiOperation(value = "流式对话")
@PostMapping(value = "", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<String> chat(@RequestBody @Validated ChatReq chatReq) {
    log.info("--流式对话 chat request: {}--", chatReq);
    return chatService.chat(chatReq);
}
@ApiModel(value = "对话请求")
public class ChatReq {

    @ApiModelProperty(value = "对话id")
    private Long chatId;

    @ApiModelProperty(value = "对话类型")
    private Integer type;

    @ApiModelProperty(value = "提问")
    private String question;

    @ApiModelProperty(value = "外部id")
    private List<Long> externalIds;

    @ApiModelProperty(value = "向量检索阈值", example = "0.5")
    @Min(value = 0)
    @Max(value = 1)
    private Double retrievalThreshold;


    @ApiModelProperty(value = "向量匹配结果数", example = "5")
    @Min(value = 1)
    private Integer topK;

   ....


}

3、service层

1)主体请求

public Flux<String> chat(ChatReq chatReq) {
        // Create a Sink that will emit items to Flux
        Sinks.Many<ApiResponse<String>> sink = Sinks.many().multicast().onBackpressureBuffer();
        // 用于控制数据生成逻辑的标志
        AtomicBoolean isCancelled = new AtomicBoolean(false);
        ChatStreamingResponseHandler chatStreamingResponseHandler = new ChatStreamingResponseHandler();
        // 判断新旧对话
        if (isNewChat(chatReq.getChatId())) { // 新对话,涉及业务略过
            chatReq.setHasHistory(false);
            chatModelHandle(chatReq);
        } else { // 旧对话
            // 根据chatId查询对话类型和对话历史
            chatReq.setHasHistory(true);
            chatModelHandle(chatReq);
        }
        return sink.asFlux().doOnCancel(() -> {
            log.info("停止流处理");
            isCancelled.set(true); // 设置取消标志
            sink.tryEmitComplete(); // 停止流
        });
    }

2)构建请求参数

有会话历史,获取会话历史(请求回答和回答)

封装成ChatMessages(question存UserMessage、answer存AiMessage)

​
private void chatModelHandle(ChatReq chatReq){
        List<ChatMessage> history = new ArrayList<>();
        if (chatReq.getHasHistory()) {
            // 组装对话历史,获取question和answer分别存UserMessage和AiMessage
            history = getHistory(chatReq.getChatId());
        }
        Integer chatType = chatReq.getType();
        //依赖文本
        List<Long> externalIds = chatReq.getExternalIds();
        // 判断对话类型
        if (ChatType.NORMAL.getCode().equals(chatType)) { // 普通对话
            if (chatReq.getHasHistory()) {
                history.add(UserMessage.from(chatReq.getQuestion()));
            }
            chatStreamingResponseHandler = new ChatStreamingResponseHandler(sink, chatReq, isCancelled);
            ChatModelClient.getStreamingChatLanguageModel(chatReq.getTemperature())
                    .generate(chatReq.getHasHistory() ? history : chatReq.getQuestion(), chatStreamingResponseHandler);
        } else if (ChatType.DOCUMENT_DB.getCode().equals(chatType)) { // 文本对话
            Prompt prompt = geneRagPrompt(chatReq);
            if (chatReq.getHasHistory()) {
                history.add(UserMessage.from(prompt.text()));
            }
            chatStreamingResponseHandler = new ChatStreamingResponseHandler(sink, chatReq, isCancelled);
            ChatModelClient.getStreamingChatLanguageModel(chatReq.getTemperature())
                    .generate(chatReq.getHasHistory() ? history : prompt.text(), chatStreamingResponseHandler);
        } else {
            throw new BizException("功能待开发");
        }
    }

​

3)如果有参考文本,获取参考文本

在向量库中,根据参考文本id和向量检索阈值,查看参考文本topN

    private List<PPid> search(ChatReq chatReq, MilvusClientV2 client, MilvusConfig config, EmbeddingModel model) {
       
        //使用文本id进行查询
        TextSegment segment = TextSegment.from(chatReq.getQuestion());
        Embedding queryEmbedding = model.embed(segment).content();
        SearchResp searchResp = client.search(SearchReq.builder()
                .collectionName(config.getCollectionName())
                .data(Collections.singletonList(new FloatVec(queryEmbedding.vector())))
                .filter(String.format("ARRAY_CONTAINS(documentIdList, %s)", chatReq.getExternalIds()))
                .topK(chatReq.getTopK() == null ? config.getTopK() : chatReq.getTopK())
                .outputFields(Arrays.asList("pid", "documentId"))
                .build());
        // 过滤掉分数低于阈值的结果
        List<SearchResp.SearchResult> searchResults = searchResp.getSearchResults().get(0);
        Double minScore = chatReq.getRetrievalThreshold() == null ? config.getMinScore() : chatReq.getRetrievalThreshold();
        return searchResults.stream()
                .filter(item -> item.getScore() >= minScore)
                .sorted((item1, item2) -> Double.compare(item2.getScore(), item1.getScore()))
                .map(item -> new PPid(
                        (Long) item.getEntity().get("documentId"),
                        (Long) item.getEntity().get("pid")
                ))
                .toList();
    }

获取参考文本id后,获取文本,再封装请求模版

​
private Prompt genePrompt(String context) {
        ...
}

​

4)连接大模型客户端

public static StreamingChatLanguageModel getStreamingChatLanguageModel() {
        ChatModelConfig config = ChatConfig.getInstance().getChatModelConfig();
        return OpenAiStreamingChatModel.builder()
                .baseUrl(config.getBaseUrl())
                .modelName(config.getModelName())
                .apiKey(config.getApiKey())
                .maxTokens(config.getMaxTokens())
                .timeout(Duration.ofSeconds(config.getTimeout()))
                .build();
}

5)大模型输出处理


@Slf4j
@Data
@NoArgsConstructor
public class ChatStreamingResponseHandler implements StreamingResponseHandler<AiMessage> {

    private Sinks.Many<ApiResponse<String>> sink;
    private ChatReq chatReq;
    private AtomicBoolean isCancelled;

  
    public ChatStreamingResponseHandler(Sinks.Many<ApiResponse<String>> sink, ChatReq chatReq, AtomicBoolean isCancelled) {
        this.sink = sink;
        this.chatReq = chatReq;
        this.isCancelled = isCancelled;
    }


    @Override
    public void onNext(String answer) {
        //取消不输出
        if (isCancelled.get()) {
            return;
        }
        sink.tryEmitNext(BaseController.success(answer));
    }

    @Override
    public void onComplete(Response<AiMessage> response) {
           if (!isCancelled.get()) {
            sink.tryEmitNext("结束标识");
            sink.tryEmitComplete();
        }
		// 业务处理
    }

    @Override
    public void onError(Throwable error) {
         if (!isCancelled.get()) {
            sink.tryEmitError(error);
        }
        // 业务处理
    }

}

六、效果呈现

七、结尾

上面简要列一下实现步骤,可以留言深入讨论。

有许多体验还需要完善,以参考豆包举例

1、实现手动停止响应

2、刷新或者页面关闭自动停止流式输出,重连后流式输出继续

3、将多个Token打包发送,减少SSE帧数量


网站公告

今日签到

点亮在社区的每一天
去签到