Server-Sent Events 教程

发布于:2025-03-16 ⋅ 阅读:(24) ⋅ 点赞:(0)

一、 SSE介绍

1.1 SSE 的本质

服务器向浏览器推送信息,除了 WebSocket,还有一种方法:Server-Sent Events(以下简称 SSE)。本文介绍它的用法。

在这里插入图片描述
严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。

也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。

SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。

1.2 SSE 的特点

SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。

总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。

在这里插入图片描述

但是,SSE 也有自己的优点。

  • SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
  • SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
  • SSE 默认支持断线重连,WebSocket 需要自己实现。
  • SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
  • SSE 支持自定义发送的消息类型。

因此,两者各有特点,适合不同的场合。

二、 SSE实现

在Java中实现SSE(Server-Sent Events)接口,可以通过Spring框架的SseEmitter(传统Spring MVC)或响应式Flux(Spring WebFlux)实现。以下是两种方式的详细示例,实现类似ChatGPT的逐字回复效果:

2.1 使用Spring MVC的SseEmitter

  1. 添加依赖(Maven)
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
  1. 控制器实现
package com.example.demospringboot.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Date;
import java.util.concurrent.Executors;

@Slf4j
@RestController
public class SSEController {

    @CrossOrigin(origins = "*") // 允许所有来源
    @GetMapping("/sse-stream")
    public SseEmitter handleSse() {
        SseEmitter emitter = new SseEmitter(60_000L); // 超时时间60秒

        // 使用线程池发送事件
        Executors.newSingleThreadExecutor().submit(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    log.info("SseEmitter {}", i);
                    SseEmitter.SseEventBuilder event = SseEmitter.event()
                            .data("Message " + i + " at " + new Date())
                            .id(String.valueOf(i))
                            .name("message");
                    emitter.send(event);
                    Thread.sleep(1000); // 模拟延迟
                }
                emitter.complete(); // 完成发送
            } catch (Exception ex) {
                emitter.completeWithError(ex); // 发送错误
            }
        });

        // 处理完成和超时
        emitter.onCompletion(() -> System.out.println("SSE completed"));
        emitter.onTimeout(() -> System.out.println("SSE timed out"));
        emitter.onError((ex) -> System.out.println("SSE error: " + ex.getMessage()));

        return emitter;
    }
}

2.2 使用Spring WebFlux的Flux

适用于响应式编程(如Netty服务器)。

  1. 添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
  1. 控制器实现
package com.example.demospringboot.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.Date;

@Slf4j
@RestController
public class ReactiveSSEController {

    @CrossOrigin(origins = "*") // 允许所有来源
    @GetMapping(value = "/sse-stream-reactive", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamEvents() {
        log.info("Starting SSE stream");
        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> "Event " + sequence + " at " + new Date())
                .take(10); // 发送10次后结束
    }
}

2.3 客户端测试

1,测试命令:

curl -N -H "Accept: text/event-stream" http://localhost:8080/sse-stream

2,test.html测试:

<!DOCTYPE html>
<html>
<head>
    <title>SSE 测试</title>
</head>
<body>
    <h1>SSE 测试页面</h1>
    <div id="output"></div>

    <script>
        // 连接到 SSE 接口
        const eventSource = new EventSource('http://localhost:8080/sse-stream')
        // 监听消息事件
        eventSource.onmessage = (event) => {
            const data = event.data;
            console.log('收到消息:', data);
            document.getElementById('output').innerHTML += `<p>${data}</p>`;
        };

        // 监听自定义事件(如果服务端指定了事件名)
        eventSource.addEventListener('custom-event', (event) => {
            console.log('自定义事件:', event.data);
        });

        // 错误处理
        eventSource.onerror = (error) => {
            console.error('SSE 错误:', error);
            eventSource.close(); // 关闭连接
        };

        // 可选:手动关闭连接的按钮
        document.body.innerHTML += '<button onclick="eventSource.close()">停止连接</button>';
    </script>
</body>
</html>

post调用示例如下:

<!DOCTYPE html>
<html>
<head>
    <title>SSE POST 演示</title>
    <style>
        body { font-family: Arial, sans-serif; max-width: 800px; margin: 20px auto; }
        #output { 
            border: 1px solid #ccc; 
            padding: 15px; 
            margin: 10px 0; 
            min-height: 100px;
            white-space: pre-wrap;
            background: #f9f9f9;
        }
        button { 
            padding: 8px 16px; 
            background: #007bff; 
            color: white; 
            border: none; 
            border-radius: 4px; 
            cursor: pointer;
        }
        button:disabled { background: #6c757d; }
        .error { color: #dc3545; }
    </style>
</head>
<body>
    <h2>SSE 数据流演示</h2>
    <button id="startBtn">开始请求</button>
    <button id="stopBtn" disabled>停止请求</button>
    <div id="output"></div>
    <div id="error" class="error"></div>

    <script>
        class SSEPostClient {
            constructor() {
                this.controller = null;
                this.isStreaming = false;
                
                this.startBtn = document.getElementById('startBtn');
                this.stopBtn = document.getElementById('stopBtn');
                this.output = document.getElementById('output');
                this.errorDiv = document.getElementById('error');

                this.initializeEvents();
            }

            initializeEvents() {
                this.startBtn.addEventListener('click', () => this.startRequest());	
                this.stopBtn.addEventListener('click', () => this.stopRequest());
            }

            async startRequest() {
                if (this.isStreaming) return;

                this.clearMessages();
                this.toggleButtons(true);
                this.controller = new AbortController();

                try {
                    const response = await this.sendRequest();
                    await this.processStream(response);
                } catch (err) {
                    this.handleError(err);
                } finally {
                    this.toggleButtons(false);
                }
            }

            async sendRequest() {
                const url = 'http://localhost:8080/sse-stream-post';
                const payload = {
                    query: "你好",
                    model: "gpt-3.5",
                    temperature: 0.7
                };
                const response = await fetch(url, {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json',
                        'Authorization': 'Bearer your_token_here'
        			},
                    body: JSON.stringify(payload),
                    signal: this.controller.signal
                });

                if (!response.ok) {
                    throw new Error(`HTTP 错误! 状态码: ${response.status}`);
                }

                return response;
            }

            async processStream(response) {
                this.isStreaming = true;
                const reader = response.body.getReader();
                const decoder = new TextDecoder();

                try {
                    while (this.isStreaming) {
                        const { done, value } = await reader.read();
                        if (done) break;
                        
                        const chunk = decoder.decode(value, { stream: true });
                        this.parseAndDisplay(chunk);
                    }
                } finally {
                    reader.releaseLock();
                    this.isStreaming = false;
                }
            }

            parseAndDisplay(chunk) {
                // 假设服务端返回 JSON 格式的流数据
                try {
                    const data = JSON.parse(chunk);
                    this.output.textContent += data.content;
                } catch (err) {
                    console.warn('解析数据失败:', err);
                    this.output.textContent += chunk;  // 显示原始数据
                }
            }

            stopRequest() {
                if (this.controller) {
                    this.controller.abort();
                    this.output.textContent += '\n[请求已中止]';
                    this.isStreaming = false;
                }
            }

            toggleButtons(isLoading) {
                this.startBtn.disabled = isLoading;
                this.stopBtn.disabled = !isLoading;
            }

            handleError(err) {
                this.errorDiv.textContent = `错误: ${err.message}`;
                if (err.name === 'AbortError') {
                    console.log('请求已中止');
                } else {
                    console.error('请求失败:', err);
                }
            }

            clearMessages() {
                this.output.textContent = '';
                this.errorDiv.textContent = '';
            }
        }

        // 初始化客户端
        new SSEPostClient();
    </script>
</body>
</html>

ps:如果服务器未在响应头中设置 Access-Control-Allow-Origin,跨域请求被阻止会报如下错误:

Access to fetch at 'http://localhost:8080/sse-stream-post' from origin 'null' has been blocked by CORS policy: Response to preflight request doesn't pass access control check: No 'Access-Control-Allow-Origin' header is present on the requested resource. If an opaque response serves your needs, set the request's mode to 'no-cors' to fetch the resource with CORS disabled.

可以禁用浏览器安全策略(危险!仅测试用)进行测试:

#  Chrome 启动命令(临时生效)
chrome.exe  --ignore-certificate-errors --allow-running-insecure-content --disable-web-security --user-data-dir="C:/temp-chrome"

2.4 两种方式选择

在这里插入图片描述
在这里插入图片描述