以下是一个完整的基于 Spring Boot 的 Server-Sent Events (SSE) 示例,包括服务端和客户端的实现。
一、服务端实现
1. 创建 Spring Boot 项目
首先,创建一个基本的 Spring Boot 项目,并添加 spring-boot-starter-web
依赖。在 pom.xml
中添加以下内容:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
2. 创建 SSE 控制器
创建一个控制器来处理 SSE 连接并推送实时消息。
SseController.java
package com.example.sse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@RestController
public class SseController {
private final ExecutorService executorService = Executors.newCachedThreadPool();
@GetMapping("/sse")
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter();
executorService.execute(() -> {
try {
for (int i = 0; i < 10; i++) {
emitter.send("Message " + i, MediaType.TEXT_PLAIN);
TimeUnit.SECONDS.sleep(1);
}
emitter.complete();
} catch (IOException | InterruptedException e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}
3. 配置跨域(可选)
如果前端和后端运行在不同端口上,需要配置跨域。
CorsConfig.java
package com.example.sse;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class CorsConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOriginPatterns("*")
.allowedMethods("GET", "POST", "PUT", "DELETE")
.allowedHeaders("*")
.allowCredentials(true);
}
}
二、客户端实现
在前端页面中,使用 EventSource
来订阅 SSE。
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE Example</title>
</head>
<body>
<h1>Server-Sent Events Example</h1>
<div id="events"></div>
<script>
const eventSource = new EventSource('/sse');
eventSource.onmessage = function(event) {
const newElement = document.createElement("div");
newElement.innerHTML = "Message: " + event.data;
document.getElementById("events").appendChild(newElement);
};
eventSource.onerror = function(event) {
eventSource.close();
alert("EventSource failed: " + event);
};
</script>
</body>
</html>
三、运行和测试
- 启动 Spring Boot 应用。
- 在浏览器中访问
http://localhost:8080
,即可看到服务端每秒推送的消息。
四、扩展功能
1. 动态推送消息
可以通过维护一个 SseEmitter
的映射来动态推送消息。
SseController.java(动态推送版本)
package com.example.sse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@RestController
public class SseController {
private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();
@GetMapping("/sse/{userId}")
public SseEmitter connect(@PathVariable String userId) {
SseEmitter emitter = new SseEmitter();
emitterMap.put(userId, emitter);
emitter.onCompletion(() -> emitterMap.remove(userId));
emitter.onTimeout(() -> emitterMap.remove(userId));
emitter.onError(e -> emitterMap.remove(userId));
return emitter;
}
@GetMapping("/push/{userId}")
public void push(@PathVariable String userId, @RequestParam String message) {
SseEmitter emitter = emitterMap.get(userId);
if (emitter != null) {
try {
emitter.send(message);
} catch (IOException e) {
emitter.completeWithError(e);
emitterMap.remove(userId);
}
}
}
}
2. 使用 WebFlux 实现 SSE
如果需要更高效的响应式编程支持,可以使用 Spring WebFlux。
SseController.java(WebFlux 版本)
package com.example.sse;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
@RestController
public class SseController {
@GetMapping("/sse/stream")
public Flux<ServerSentEvent<String>> streamSse() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("Current time: " + java.time.LocalTime.now())
.build());
}
}
通过以上步骤,你可以实现一个完整的基于 Spring Boot 的 SSE 应用。