一、 SSE介绍
1.1 SSE 的本质
服务器向浏览器推送信息,除了 WebSocket,还有一种方法:Server-Sent Events(以下简称 SSE)。本文介绍它的用法。
严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。
1.2 SSE 的特点
SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。
总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。
但是,SSE 也有自己的优点。
- SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
- SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
- SSE 默认支持断线重连,WebSocket 需要自己实现。
- SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
- SSE 支持自定义发送的消息类型。
因此,两者各有特点,适合不同的场合。
二、 SSE实现
在Java中实现SSE(Server-Sent Events)接口,可以通过Spring框架的SseEmitter(传统Spring MVC)或响应式Flux(Spring WebFlux)实现。以下是两种方式的详细示例,实现类似ChatGPT的逐字回复效果:
2.1 使用Spring MVC的SseEmitter
- 添加依赖(Maven)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- 控制器实现
package com.example.demospringboot.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Date;
import java.util.concurrent.Executors;
@Slf4j
@RestController
public class SSEController {
@CrossOrigin(origins = "*") // 允许所有来源
@GetMapping("/sse-stream")
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter(60_000L); // 超时时间60秒
// 使用线程池发送事件
Executors.newSingleThreadExecutor().submit(() -> {
try {
for (int i = 0; i < 10; i++) {
log.info("SseEmitter {}", i);
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data("Message " + i + " at " + new Date())
.id(String.valueOf(i))
.name("message");
emitter.send(event);
Thread.sleep(1000); // 模拟延迟
}
emitter.complete(); // 完成发送
} catch (Exception ex) {
emitter.completeWithError(ex); // 发送错误
}
});
// 处理完成和超时
emitter.onCompletion(() -> System.out.println("SSE completed"));
emitter.onTimeout(() -> System.out.println("SSE timed out"));
emitter.onError((ex) -> System.out.println("SSE error: " + ex.getMessage()));
return emitter;
}
}
2.2 使用Spring WebFlux的Flux
适用于响应式编程(如Netty服务器)。
- 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
- 控制器实现
package com.example.demospringboot.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Date;
@Slf4j
@RestController
public class ReactiveSSEController {
@CrossOrigin(origins = "*") // 允许所有来源
@GetMapping(value = "/sse-stream-reactive", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamEvents() {
log.info("Starting SSE stream");
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Event " + sequence + " at " + new Date())
.take(10); // 发送10次后结束
}
}
2.3 客户端测试
1,测试命令:
curl -N -H "Accept: text/event-stream" http://localhost:8080/sse-stream
2,test.html测试:
<!DOCTYPE html>
<html>
<head>
<title>SSE 测试</title>
</head>
<body>
<h1>SSE 测试页面</h1>
<div id="output"></div>
<script>
// 连接到 SSE 接口
const eventSource = new EventSource('http://localhost:8080/sse-stream')
// 监听消息事件
eventSource.onmessage = (event) => {
const data = event.data;
console.log('收到消息:', data);
document.getElementById('output').innerHTML += `<p>${data}</p>`;
};
// 监听自定义事件(如果服务端指定了事件名)
eventSource.addEventListener('custom-event', (event) => {
console.log('自定义事件:', event.data);
});
// 错误处理
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
eventSource.close(); // 关闭连接
};
// 可选:手动关闭连接的按钮
document.body.innerHTML += '<button onclick="eventSource.close()">停止连接</button>';
</script>
</body>
</html>
post调用示例如下:
<!DOCTYPE html>
<html>
<head>
<title>SSE POST 演示</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 20px auto; }
#output {
border: 1px solid #ccc;
padding: 15px;
margin: 10px 0;
min-height: 100px;
white-space: pre-wrap;
background: #f9f9f9;
}
button {
padding: 8px 16px;
background: #007bff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
button:disabled { background: #6c757d; }
.error { color: #dc3545; }
</style>
</head>
<body>
<h2>SSE 数据流演示</h2>
<button id="startBtn">开始请求</button>
<button id="stopBtn" disabled>停止请求</button>
<div id="output"></div>
<div id="error" class="error"></div>
<script>
class SSEPostClient {
constructor() {
this.controller = null;
this.isStreaming = false;
this.startBtn = document.getElementById('startBtn');
this.stopBtn = document.getElementById('stopBtn');
this.output = document.getElementById('output');
this.errorDiv = document.getElementById('error');
this.initializeEvents();
}
initializeEvents() {
this.startBtn.addEventListener('click', () => this.startRequest());
this.stopBtn.addEventListener('click', () => this.stopRequest());
}
async startRequest() {
if (this.isStreaming) return;
this.clearMessages();
this.toggleButtons(true);
this.controller = new AbortController();
try {
const response = await this.sendRequest();
await this.processStream(response);
} catch (err) {
this.handleError(err);
} finally {
this.toggleButtons(false);
}
}
async sendRequest() {
const url = 'http://localhost:8080/sse-stream-post';
const payload = {
query: "你好",
model: "gpt-3.5",
temperature: 0.7
};
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your_token_here'
},
body: JSON.stringify(payload),
signal: this.controller.signal
});
if (!response.ok) {
throw new Error(`HTTP 错误! 状态码: ${response.status}`);
}
return response;
}
async processStream(response) {
this.isStreaming = true;
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (this.isStreaming) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
this.parseAndDisplay(chunk);
}
} finally {
reader.releaseLock();
this.isStreaming = false;
}
}
parseAndDisplay(chunk) {
// 假设服务端返回 JSON 格式的流数据
try {
const data = JSON.parse(chunk);
this.output.textContent += data.content;
} catch (err) {
console.warn('解析数据失败:', err);
this.output.textContent += chunk; // 显示原始数据
}
}
stopRequest() {
if (this.controller) {
this.controller.abort();
this.output.textContent += '\n[请求已中止]';
this.isStreaming = false;
}
}
toggleButtons(isLoading) {
this.startBtn.disabled = isLoading;
this.stopBtn.disabled = !isLoading;
}
handleError(err) {
this.errorDiv.textContent = `错误: ${err.message}`;
if (err.name === 'AbortError') {
console.log('请求已中止');
} else {
console.error('请求失败:', err);
}
}
clearMessages() {
this.output.textContent = '';
this.errorDiv.textContent = '';
}
}
// 初始化客户端
new SSEPostClient();
</script>
</body>
</html>
ps:如果服务器未在响应头中设置 Access-Control-Allow-Origin,跨域请求被阻止会报如下错误:
Access to fetch at 'http://localhost:8080/sse-stream-post' from origin 'null' has been blocked by CORS policy: Response to preflight request doesn't pass access control check: No 'Access-Control-Allow-Origin' header is present on the requested resource. If an opaque response serves your needs, set the request's mode to 'no-cors' to fetch the resource with CORS disabled.
可以禁用浏览器安全策略(危险!仅测试用)进行测试:
# Chrome 启动命令(临时生效)
chrome.exe --ignore-certificate-errors --allow-running-insecure-content --disable-web-security --user-data-dir="C:/temp-chrome"
2.4 两种方式选择