Spring WebFlux WebSocket 连接保持策略

发布于:2025-03-04 ⋅ 阅读:(15) ⋅ 点赞:(0)
摘要

在 Spring WebFlux 的 WebSocket 处理器 (WebSocketHandler) 中,handle(WebSocketSession session) 方法用于管理 WebSocket 连接。其返回的 Mono<Void> 表示 WebSocket 处理的生命周期,Mono 终止时,WebSocket 连接将被关闭,WebSocketSession 也会释放资源。
由于 session.receive()Flux,它本身不会结束,但 session.send() 可能会在数据流完成后终止 Mono<Void>,导致 WebSocket 连接关闭。为了确保 WebSocket 连接保持开放,需要避免 Mono<Void> 过早完成。本文探讨了 Mono<Void> 终止的影响,并提供了两种解决方案:

  1. 心跳机制Flux.interval() 定期发送心跳消息)。
  2. 阻止 Mono<Void> 结束Flux.never() 保持 WebSocket 连接)。

1. handle(WebSocketSession session) 方法解析

Spring WebFlux 提供的 WebSocketHandler 接口定义了 WebSocket 处理逻辑:

Mono<Void> handle(WebSocketSession session);

作用

  • handle() 负责处理 WebSocket 连接的生命周期,WebSocketSession 提供 WebSocket 消息的 收发接口
  • 该方法返回一个 Mono<Void>,表示 WebSocket 处理流程的 异步执行,直到 Mono<Void> 终止。

Mono<Void> 终止的影响

  • 一旦 handle() 返回的 Mono<Void> 终止,WebSocket 连接会自动关闭
  • session 也会被关闭,底层 资源(如网络连接、缓冲区等)会被释放
  • 因此,要保持 WebSocket 连接,必须确保 Mono<Void> 不会过早完成

2. 为什么 session.send() 可能会导致 WebSocket 关闭?

错误示例

return session.send(session.receive()
    .map(msg -> session.textMessage("Echo: " + msg.getPayloadAsText())));

潜在问题

  1. session.receive()Flux,会持续监听消息,但它不会主动结束。
  2. 如果客户端不发送消息,session.send() 可能会完成,导致 Mono<Void> 终止,最终 WebSocket 连接关闭。

原因

  • session.send(Publisher<WebSocketMessage>) 需要一个持续的 Flux<WebSocketMessage>
  • 如果 session.receive() 没有数据,session.send() 可能会完成,从而关闭 WebSocket 连接。

3. 解决方案

方案 1:使用 Flux.interval() 发送心跳

核心思路

  • 定期发送 “ping” 消息,保证 session.send() 始终有数据,避免连接关闭。

实现方式

public class KeepAliveWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 处理客户端消息流
        Flux<WebSocketMessage> incomingMessages = session.receive()
            .map(msg -> session.textMessage("Echo: " + msg.getPayloadAsText()));

        // 发送心跳消息,每 30 秒发送一次 "ping"
        Flux<WebSocketMessage> heartbeats = Flux.interval(Duration.ofSeconds(30))
            .map(tick -> session.textMessage("ping"));

        // 合并消息流 & 心跳,保持 WebSocket 连接
        Flux<WebSocketMessage> combinedFlux = Flux.merge(incomingMessages, heartbeats);

        return session.send(combinedFlux);
    }
}

优势
session.send() 始终有数据,不会因 Flux 结束而关闭连接。
✅ 服务器定期发送心跳,防止客户端超时断开。


方案 2:使用 Flux.never() 阻止 Mono<Void> 结束

如果不想发送心跳消息,但仍想保持 WebSocket 连接,可以使用 Flux.never()

return session.send(session.receive()
    .map(msg -> session.textMessage("Echo: " + msg.getPayloadAsText())))
    .then(Flux.never().then());  // 保持连接,避免 Mono<Void> 结束

问题

  • 这种方式 不会主动发送心跳,可能导致某些 WebSocket 服务器 因长时间无数据而关闭连接

4. WebSocket 关闭后,资源是否会被释放?

是的,WebSocket 关闭后 session 会被释放,相关资源也会清理

  1. WebSocketSession 关闭后,底层 TCP 连接会断开
  2. session.receive()session.send() 会停止订阅,释放 Reactor 资源
  3. 缓冲区、事件循环线程、内存等 也会被回收。

如果 Mono<Void> 终止,Spring WebFlux 会 自动释放 WebSocketSession 相关资源,开发者 无需手动关闭 session


5. 结论

问题

  • session.receive() 本身不会结束,但 session.send() 可能因数据流完成而关闭 WebSocket 连接。
  • 需要保证 session.send() 始终有数据要发送,避免 Mono<Void> 提前完成。

推荐方案
方案 1(心跳机制):使用 Flux.interval() 发送 “ping” 消息,保持连接(推荐)。
方案 2(Flux.never():阻止 Mono<Void> 结束,但不发送心跳(适用于特定场景)。

关于资源释放

  • Mono<Void> 终止后,WebSocket 连接会关闭,session 及其相关资源(TCP 连接、缓冲区、事件循环等)会被 Spring WebFlux 自动释放

通过这些方法,可以确保 WebSocket 连接 持续保持开放,并避免 session.send() 过早完成导致的连接关闭问题。