Spring WebFlux SSE(服务器发送事件)的正确用法

发布于:2024-11-28 ⋅ 阅读:(97) ⋅ 点赞:(0)

在SpringBoot2下SSE实现是返回一个SseEmitter,然后通过SseEmitter的send方法来发送事件.

在SpringBoot3的WebFlux 下SSE实现是返回一个Flux<ServerSentEvent<?>>,但是怎么手动向客户端发送SSE事件搜遍全网也没有看到一个讲清楚的.网上的例子一般都是这样的:

  @GetMapping("/stream-sse")
  public Flux<ServerSentEvent<String>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
        .map(sequence -> {
          ServerSentEvent<String> serverSentEvent = ServerSentEvent.<String> builder()
              .id(String.valueOf(sequence))
              .event("periodic-event")
              .data("SSE - " + LocalTime.now().toString())
              .build();

          log.info("stream-sse: " + serverSentEvent);
          return serverSentEvent;
        })
        .doOnCancel(() -> log.warn("stream-sse canceled"))
        .doOnError(e -> log.error("stream-sse error", e));
  }

经过半天的摸索,终于找到解决方案,原来是通过Sinks.Many<ServerSentEvent<?>>这个类的tryEmitNext方法来手动发送事件!
下面是代码例子:

  // 使用 Sinks.Many<ServerSentEvent<String>> 对应非反应式的SseEmitter
  @GetMapping("/stream-sse-sink")
  public Flux<ServerSentEvent<String>> streamSseMvc() {
    Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureError();

    Flux<ServerSentEvent<String>> flux = sink.asFlux();
    
    Scheduler single = Schedulers.boundedElastic();
    single.schedule(() -> {
      try {
        for (int i = 0; i < 50; i++) {
          ServerSentEvent<String> serverSentEvent = ServerSentEvent.<String> builder()
              .id(String.valueOf(i))
              .event("periodic-event")
              .data("SSE - " + LocalTime.now().toString())
              .build();

          log.info("stream-sse-sink: " + serverSentEvent);

          if(sink.tryEmitNext(serverSentEvent).isFailure()) {
            log.error("sink.tryEmitNext isFailure");
            break;
          }

          Thread.sleep(1000);
        }
      } catch (Exception ex) {
        sink.tryEmitError(ex);
      } finally {
        sink.tryEmitComplete();
      }
    },3,TimeUnit.SECONDS);

    
    return flux;
  }