1. 什么是 SSE? (30秒)
SSE (Server-Sent Events) 是一种允许服务器通过 HTTP 连接主动向客户端发送实时更新的技术。
特点:基于 HTTP,使用简单,单向通信(服务器 -> 客户端),自动重连。
对比 WebSocket:WebSocket 是双向的,更复杂;SSE 是单向的,更轻量,适用于通知、日志流、实时数据更新等场景。
2. 核心依赖与配置 (30秒)
Spring Boot 从 2.2.x 版本开始提供了对 SSE 的专用支持,主要包含在 spring-boot-starter-web
中,无需引入额外依赖。
确保你的 pom.xml
中有:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
3. 三步编写代码 (3分钟)
第一步:创建控制器 (Controller)
创建一个 @RestController
,并定义一个方法来产生 SSE 流。
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@RestController
public class SseController {
// 用于保存所有连接的 SseEmitter,可以根据用户ID等关键字进行存储
private static final Map<String, SseEmitter> EMITTER_MAP = new ConcurrentHashMap<>();
/**
* 用于客户端连接 SSE
* @param clientId 客户端标识,用于区分不同客户端
* @return SseEmitter
*/
@GetMapping(path = "/sse/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(@RequestParam String clientId) {
// 设置超时时间,0表示永不超时。可以根据需要设置,例如 30_000L (30秒)
SseEmitter emitter = new SseEmitter(0L);
// 注册回调函数,当连接完成或出错时,从Map中移除这个Emitter
emitter.onCompletion(() -> EMITTER_MAP.remove(clientId));
emitter.onError((e) -> EMITTER_MAP.remove(clientId));
emitter.onTimeout(() -> EMITTER_MAP.remove(clientId));
// 将新的 emitter 存入 Map
EMITTER_MAP.put(clientId, emitter);
// 可选:发送一个初始连接成功的事件
try {
emitter.send(SseEmitter.event()
.name("INIT") // 事件名称,可选
.data("连接成功 for: " + clientId) // 事件数据
.id("1") // 事件ID,可选,用于重连
.reconnectTime(5000)); // 重连时间,可选
} catch (IOException e) {
e.printStackTrace();
}
return emitter;
}
}
第二步:创建发送消息的方法
在同一个 Controller 中,添加一个 API 来模拟向特定客户端发送消息。
/**
* 向指定客户端发送消息
*/
@GetMapping("/sse/send")
public String sendMessage(@RequestParam String clientId, @RequestParam String message) {
SseEmitter emitter = EMITTER_MAP.get(clientId);
if (emitter != null) {
try {
// 构建并发送事件
emitter.send(SseEmitter.event()
.name("MESSAGE") // 事件类型
.data(message) // 事件数据
.id("msg-id-" + System.currentTimeMillis())); // ID
} catch (IOException e) {
// 发送失败,移除 emitter
EMITTER_MAP.remove(clientId);
return "发送失败,客户端可能已断开";
}
return "发送成功 to: " + clientId;
}
return "客户端不存在";
}
第三步:编写前端页面进行测试 (1分钟)
在 src/main/resources/static
目录下创建一个 sse-demo.html
文件。
<!DOCTYPE html>
<html>
<head>
<title>SSE Demo</title>
</head>
<body>
<h1>SSE 客户端测试</h1>
<label for="clientId">客户端ID: </label>
<input type="text" id="clientId" value="test-client-1">
<button onclick="connectSSE()">连接SSE</button>
<button onclick="closeSSE()">断开连接</button>
<hr>
<label for="message">要发送的消息: </label>
<input type="text" id="message" value="Hello SSE!">
<button onclick="sendMessage()">发送消息</button>
<hr>
<h3>收到的事件:</h3>
<div id="messages"></div>
<script>
let eventSource;
function connectSSE() {
const clientId = document.getElementById('clientId').value;
// 断开现有连接
if (eventSource) {
eventSource.close();
}
// 建立新的 SSE 连接
eventSource = new EventSource(`/sse/connect?clientId=${clientId}`);
// 监听通用消息(没有指定 event name 的消息)
eventSource.onmessage = function (event) {
appendMessage(`[message]: ${event.data}`);
};
// 监听特定名称的事件 (例如:MESSAGE)
eventSource.addEventListener("MESSAGE", function (event) {
appendMessage(`[MESSAGE]: ${event.data}`);
});
// 监听特定名称的事件 (例如:INIT)
eventSource.addEventListener("INIT", function (event) {
appendMessage(`[INIT]: ${event.data}`);
});
eventSource.onerror = function (err) {
console.error("SSE error:", err);
appendMessage('[错误] 连接出错');
};
}
function closeSSE() {
if (eventSource) {
eventSource.close();
appendMessage('[信息] 连接已关闭');
eventSource = null;
}
}
function sendMessage() {
const clientId = document.getElementById('clientId').value;
const message = document.getElementById('message').value;
fetch(`/sse/send?clientId=${clientId}&message=${encodeURIComponent(message)}`)
.then(response => response.text())
.then(data => console.log(data));
}
function appendMessage(text) {
const messageDiv = document.getElementById('messages');
const p = document.createElement('p');
p.textContent = `${new Date().toLocaleTimeString()}: ${text}`;
messageDiv.appendChild(p);
}
</script>
</body>
</html>
4. 运行与测试 (1分钟)
启动应用:运行你的 Spring Boot 应用。
打开页面:访问
http://localhost:8080/sse-demo.html
。进行测试:
输入一个客户端 ID(如
user1
),点击 “连接SSE”。前端会收到[INIT]
事件。在另一个浏览器标签页或使用 Postman 访问 :
http://localhost:8080/sse/send?clientId=user1&message=你好!
。观察第一个标签页,会立即收到
[MESSAGE]: 你好!
的消息。
总结
核心对象:
SseEmitter
关键注解:
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
流程:
客户端连接
/sse/connect
,服务端创建并保存SseEmitter
。服务端通过
emitter.send()
主动推送消息。客户端通过
EventSource
API 监听和处理消息。连接结束时,服务端需要清理
SseEmitter
(通过回调函数)。
现在你已经掌握了 Spring Boot 整合 SSE 的基本方法!在实际项目中,你可能需要将其与业务逻辑、身份认证(如 JWT)以及更强大的连接管理(如使用数据库或 Redis 存储 emitter)相结合。