【SpringBoot】Spring Boot实现SSE实时推送实战

发布于:2025-06-21 ⋅ 阅读:(13) ⋅ 点赞:(0)

以下是一个完整的基于 Spring Boot 的 Server-Sent Events (SSE) 示例,包括服务端和客户端的实现。

一、服务端实现

1. 创建 Spring Boot 项目

首先,创建一个基本的 Spring Boot 项目,并添加 spring-boot-starter-web 依赖。在 pom.xml 中添加以下内容:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>
2. 创建 SSE 控制器

创建一个控制器来处理 SSE 连接并推送实时消息。

SseController.java

package com.example.sse;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@RestController
public class SseController {

    private final ExecutorService executorService = Executors.newCachedThreadPool();

    @GetMapping("/sse")
    public SseEmitter handleSse() {
        SseEmitter emitter = new SseEmitter();
        executorService.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    emitter.send("Message " + i, MediaType.TEXT_PLAIN);
                    TimeUnit.SECONDS.sleep(1);
                }
                emitter.complete();
            } catch (IOException | InterruptedException e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;
    }
}
3. 配置跨域(可选)

如果前端和后端运行在不同端口上,需要配置跨域。

CorsConfig.java

package com.example.sse;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class CorsConfig implements WebMvcConfigurer {
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**")
                .allowedOriginPatterns("*")
                .allowedMethods("GET", "POST", "PUT", "DELETE")
                .allowedHeaders("*")
                .allowCredentials(true);
    }
}

二、客户端实现

在前端页面中,使用 EventSource 来订阅 SSE。

index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE Example</title>
</head>
<body>
    <h1>Server-Sent Events Example</h1>
    <div id="events"></div>

    <script>
        const eventSource = new EventSource('/sse');

        eventSource.onmessage = function(event) {
            const newElement = document.createElement("div");
            newElement.innerHTML = "Message: " + event.data;
            document.getElementById("events").appendChild(newElement);
        };

        eventSource.onerror = function(event) {
            eventSource.close();
            alert("EventSource failed: " + event);
        };
    </script>
</body>
</html>

三、运行和测试

  1. 启动 Spring Boot 应用。
  2. 在浏览器中访问 http://localhost:8080,即可看到服务端每秒推送的消息。

四、扩展功能

1. 动态推送消息

可以通过维护一个 SseEmitter 的映射来动态推送消息。

SseController.java(动态推送版本)

package com.example.sse;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@RestController
public class SseController {

    private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();

    @GetMapping("/sse/{userId}")
    public SseEmitter connect(@PathVariable String userId) {
        SseEmitter emitter = new SseEmitter();
        emitterMap.put(userId, emitter);
        emitter.onCompletion(() -> emitterMap.remove(userId));
        emitter.onTimeout(() -> emitterMap.remove(userId));
        emitter.onError(e -> emitterMap.remove(userId));
        return emitter;
    }

    @GetMapping("/push/{userId}")
    public void push(@PathVariable String userId, @RequestParam String message) {
        SseEmitter emitter = emitterMap.get(userId);
        if (emitter != null) {
            try {
                emitter.send(message);
            } catch (IOException e) {
                emitter.completeWithError(e);
                emitterMap.remove(userId);
            }
        }
    }
}
2. 使用 WebFlux 实现 SSE

如果需要更高效的响应式编程支持,可以使用 Spring WebFlux。

SseController.java(WebFlux 版本)

package com.example.sse;

import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.time.Duration;

@RestController
public class SseController {

    @GetMapping("/sse/stream")
    public Flux<ServerSentEvent<String>> streamSse() {
        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> ServerSentEvent.<String>builder()
                        .id(String.valueOf(sequence))
                        .event("periodic-event")
                        .data("Current time: " + java.time.LocalTime.now())
                        .build());
    }
}

通过以上步骤,你可以实现一个完整的基于 Spring Boot 的 SSE 应用。


网站公告

今日签到

点亮在社区的每一天
去签到