1. 添加依赖 (pom.xml)
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- SSE 支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- HTTP客户端 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
</dependencies>
2. 配置类 (WebClientConfig.java
)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.deepseek.com/v1")
.defaultHeader("Authorization", "Bearer YOUR_API_KEY") // 替换为你的API密钥
.build();
}
}
3. 请求/响应DTO
import lombok.Data;
import java.util.List;
@Data
public class DeepSeekRequest {
private String model = "deepseek-chat";
private List<Message> messages;
private boolean stream = true;
@Data
public static class Message {
private String role;
private String content;
public Message(String role, String content) {
this.role = role;
this.content = content;
}
}
}
@Data
public class DeepSeekResponse {
private List<Choice> choices;
@Data
public static class Choice {
private Delta delta;
}
@Data
public static class Delta {
private String content;
}
}
4. SSE服务实现 (DeepSeekService.java
)
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import java.util.Collections;
@Service
public class DeepSeekService {
private final WebClient webClient;
public DeepSeekService(WebClient webClient) {
this.webClient = webClient;
}
public Flux<String> streamCompletion(String userMessage) {
// 使用 FluxProcessor 替代 Sinks
FluxProcessor<String, String> processor = DirectProcessor.<String>create().serialize();
FluxSink<String> sink = processor.sink();
DeepSeekRequest request = new DeepSeekRequest();
request.setMessages(Collections.singletonList(
new DeepSeekRequest.Message("user", userMessage)
));
webClient.post()
.uri("/chat/completions")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(request)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.subscribe(
data -> {
ObjectMapper objectMapper = new ObjectMapper();
try {
String jsonString = objectMapper.writeValueAsString(data);
sink.next(jsonString);
} catch (JsonProcessingException e) {
sink.error(e);
}
},
sink::error,
sink::complete
);
return processor;
}
}
5. SSE控制器 (SseController.java
)
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/sse")
public class SseController {
private final DeepSeekService deepSeekService;
public SseController(DeepSeekService deepSeekService) {
this.deepSeekService = deepSeekService;
}
@GetMapping(path = "/deepseek", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamDeepSeekResponse(@RequestParam String message) {
SseEmitter emitter = new SseEmitter(60 * 1000L); // 60秒超时
Flux<String> responseStream = deepSeekService.streamCompletion(message);
responseStream.subscribe(
content -> {
try {
// 发送SSE事件
emitter.send(SseEmitter.event()
.data(content)
.name("message"));
} catch (Exception e) {
emitter.completeWithError(e);
}
},
emitter::completeWithError,
emitter::complete
);
return emitter;
}
}
6. 前端实现 (HTML + JavaScript)
<!DOCTYPE html>
<html>
<head>
<title>DeepSeek SSE Demo</title>
</head>
<body>
<input type="text" id="message" placeholder="输入你的问题">
<button onclick="startSSE()">开始对话</button>
<div id="output" style="white-space: pre-wrap; margin-top: 20px;"></div>
<script>
let eventSource;
function startSSE() {
const message = document.getElementById('message').value;
const outputDiv = document.getElementById('output');
outputDiv.innerHTML = ''; // 清空之前的内容
if (eventSource) eventSource.close();
// 创建SSE连接
eventSource = new EventSource(`/sse/deepseek?message=${encodeURIComponent(message)}`);
eventSource.addEventListener("message", (event) => {
// 实时追加内容
outputDiv.innerHTML += event.data;
});
eventSource.addEventListener("error", (err) => {
console.error("SSE error:", err);
outputDiv.innerHTML += "\n\n[连接已关闭]";
eventSource.close();
});
}
</script>
</body>
</html>
关键点说明:
SSE流式传输:
使用SseEmitter实现服务端推送
通过text/event-stream内容类型保持长连接
DeepSeek API集成:
设置stream=true启用流式响应
处理data: [DONE]结束标记
解析JSON响应中的content字段
响应式编程:
使用WebClient处理HTTP流
使用Sinks进行背压管理
Flux实现响应式流处理
前端实现:
使用EventSource API接收SSE
实时追加内容到DOM
处理连接错误和关闭
测试步骤:
1.启动Spring Boot应用
2.访问前端页面(默认端口8080)
3.输入问题并点击按钮
4.查看实时输出的思考过程
注意事项:
1.替换YOUR_API_KEY为实际的DeepSeek API密钥
2.生产环境建议:
3.添加JSON解析库(如Jackson)处理响应
4.增加错误处理和重试机制
5.添加API速率限制
6.实现更健壮的SSE连接管理
此实现能让前端实时接收并显示DeepSeek API返回的流式响应,实现"思考过程"的逐字显示效果。