SSE事件流简单示例

发布于:2025-07-13 ⋅ 阅读:(17) ⋅ 点赞:(0)


SSE(Server-Sent Events,服务器推送事件)是一种基于HTTP的服务器向客户端实时推送数据的技术标准。

1、推送-SseEmitter

SseEmitter用于实现服务器向客户端单向长连接的实时数据推送。
比如用于大模型机器人回答推送给前端(逐字逐词显示)

作为服务端,使用.send()方法发送数据,.complete()方法完成并结束连接,简单示例:

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@GetMapping("/test")
public SseEmitter test() {
    SseEmitter emitter = new SseEmitter(30_000L); // 超时时间设为30秒
    ExecutorService executor = Executors.newSingleThreadExecutor();

    // 模拟异步数据推送
    executor.execute(() -> {
        try {
            for (int i = 0; i < 10; i++) {
                emitter.send("Event " + i); // 发送数据
                Thread.sleep(1000);
            }
            emitter.complete(); // 正常结束
        } catch (Exception e) {
            emitter.completeWithError(e); // 异常终止
        }
    });

    return emitter;
}

2、接收-EventSourceListener

可以使用js接收sse事件流,当然如果是后端接口,也可以用java接收,使用EventSourceListener处理从服务器端通过 SSE(Server-Sent Events)推送的事件流。
在EventSourceListener中覆写onEvent方法,onEvent方法在每次服务器推送新事件时调用;

import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;

public void listenTest(String url, String param){
	OkHttpClient client = new OkHttpClient.Builder().connectTimeout(5, TimeUnit.MINUTES).readTimeout(5, TimeUnit.MINUTES).build();
	
	RequestBody formBody = RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"), param);
	Request request = new Request.Builder().post(formBody).url(url).build();
	
	EventSourceListener listener = new EventSourceListener() {

		// 连接建立时触发,非必须
	    @Override
	    public void onOpen(EventSource eventSource, Response response) {
	        System.out.println("SSE连接已建立");
	        System.out.println("响应头: " + response.headers());
	    }

		// 收到消息时触发
	    @Override
        public void onEvent(EventSource eventSource, String id, String type, String data) {
            //在这里对推送过来的数据做解析和处理
            System.out.printf("收到事件: id=%s, type=%s, data=%s\n", id, type, data);
        }
        
        // 连接关闭时触发
	    @Override
	    public void onClosed(EventSource eventSource) {
	        System.out.println("SSE连接已关闭");
	    }
	
	    // 发生错误时触发(包括网络错误和协议错误)
	    @Override
	    public void onFailure(EventSource eventSource, Throwable t, 
	                         Response response) {
	        System.err.println("SSE错误: " + t.getMessage());
	        if (response != null) {
	            System.err.println("错误响应码: " + response.code());
	        }
	        eventSource.cancel();//关闭连接
	    }
	}
	
	EventSource.Factory factory = EventSources.createFactory(client);
	factory.newEventSource(request, listener);
}


网站公告

今日签到

点亮在社区的每一天
去签到