SSE(Server-Sent Events,服务器推送事件)是一种基于HTTP的服务器向客户端实时推送数据的技术标准。
1、推送-SseEmitter
SseEmitter用于实现服务器向客户端单向、长连接的实时数据推送。
比如用于大模型机器人回答推送给前端(逐字逐词显示)
作为服务端,使用.send()
方法发送数据,.complete()
方法完成并结束连接,简单示例:
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@GetMapping("/test")
public SseEmitter test() {
SseEmitter emitter = new SseEmitter(30_000L); // 超时时间设为30秒
ExecutorService executor = Executors.newSingleThreadExecutor();
// 模拟异步数据推送
executor.execute(() -> {
try {
for (int i = 0; i < 10; i++) {
emitter.send("Event " + i); // 发送数据
Thread.sleep(1000);
}
emitter.complete(); // 正常结束
} catch (Exception e) {
emitter.completeWithError(e); // 异常终止
}
});
return emitter;
}
2、接收-EventSourceListener
可以使用js接收sse事件流,当然如果是后端接口,也可以用java接收,使用EventSourceListener
处理从服务器端通过 SSE(Server-Sent Events)推送的事件流。
在EventSourceListener中覆写onEvent方法,onEvent方法在每次服务器推送新事件时调用;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
public void listenTest(String url, String param){
OkHttpClient client = new OkHttpClient.Builder().connectTimeout(5, TimeUnit.MINUTES).readTimeout(5, TimeUnit.MINUTES).build();
RequestBody formBody = RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"), param);
Request request = new Request.Builder().post(formBody).url(url).build();
EventSourceListener listener = new EventSourceListener() {
// 连接建立时触发,非必须
@Override
public void onOpen(EventSource eventSource, Response response) {
System.out.println("SSE连接已建立");
System.out.println("响应头: " + response.headers());
}
// 收到消息时触发
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
//在这里对推送过来的数据做解析和处理
System.out.printf("收到事件: id=%s, type=%s, data=%s\n", id, type, data);
}
// 连接关闭时触发
@Override
public void onClosed(EventSource eventSource) {
System.out.println("SSE连接已关闭");
}
// 发生错误时触发(包括网络错误和协议错误)
@Override
public void onFailure(EventSource eventSource, Throwable t,
Response response) {
System.err.println("SSE错误: " + t.getMessage());
if (response != null) {
System.err.println("错误响应码: " + response.code());
}
eventSource.cancel();//关闭连接
}
}
EventSource.Factory factory = EventSources.createFactory(client);
factory.newEventSource(request, listener);
}