摘要
在 Spring WebFlux 的 WebSocket 处理器 (WebSocketHandler
) 中,handle(WebSocketSession session)
方法用于管理 WebSocket 连接。其返回的 Mono<Void>
表示 WebSocket 处理的生命周期,Mono
终止时,WebSocket 连接将被关闭,WebSocketSession
也会释放资源。
由于 session.receive()
是 Flux
,它本身不会结束,但 session.send()
可能会在数据流完成后终止 Mono<Void>
,导致 WebSocket 连接关闭。为了确保 WebSocket 连接保持开放,需要避免 Mono<Void>
过早完成。本文探讨了 Mono<Void>
终止的影响,并提供了两种解决方案:
- 心跳机制(
Flux.interval()
定期发送心跳消息)。 - 阻止
Mono<Void>
结束(Flux.never()
保持 WebSocket 连接)。
1. handle(WebSocketSession session)
方法解析
Spring WebFlux 提供的 WebSocketHandler
接口定义了 WebSocket 处理逻辑:
Mono<Void> handle(WebSocketSession session);
作用:
handle()
负责处理 WebSocket 连接的生命周期,WebSocketSession
提供 WebSocket 消息的 收发接口。- 该方法返回一个
Mono<Void>
,表示 WebSocket 处理流程的 异步执行,直到Mono<Void>
终止。
Mono<Void>
终止的影响:
- 一旦
handle()
返回的Mono<Void>
终止,WebSocket 连接会自动关闭。 session
也会被关闭,底层 资源(如网络连接、缓冲区等)会被释放。- 因此,要保持 WebSocket 连接,必须确保
Mono<Void>
不会过早完成。
2. 为什么 session.send()
可能会导致 WebSocket 关闭?
错误示例
return session.send(session.receive()
.map(msg -> session.textMessage("Echo: " + msg.getPayloadAsText())));
潜在问题:
session.receive()
是Flux
,会持续监听消息,但它不会主动结束。- 如果客户端不发送消息,
session.send()
可能会完成,导致Mono<Void>
终止,最终 WebSocket 连接关闭。
原因
session.send(Publisher<WebSocketMessage>)
需要一个持续的Flux<WebSocketMessage>
。- 如果
session.receive()
没有数据,session.send()
可能会完成,从而关闭 WebSocket 连接。
3. 解决方案
方案 1:使用 Flux.interval()
发送心跳
核心思路:
- 定期发送 “ping” 消息,保证
session.send()
始终有数据,避免连接关闭。
实现方式:
public class KeepAliveWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// 处理客户端消息流
Flux<WebSocketMessage> incomingMessages = session.receive()
.map(msg -> session.textMessage("Echo: " + msg.getPayloadAsText()));
// 发送心跳消息,每 30 秒发送一次 "ping"
Flux<WebSocketMessage> heartbeats = Flux.interval(Duration.ofSeconds(30))
.map(tick -> session.textMessage("ping"));
// 合并消息流 & 心跳,保持 WebSocket 连接
Flux<WebSocketMessage> combinedFlux = Flux.merge(incomingMessages, heartbeats);
return session.send(combinedFlux);
}
}
优势:
✅ session.send()
始终有数据,不会因 Flux
结束而关闭连接。
✅ 服务器定期发送心跳,防止客户端超时断开。
方案 2:使用 Flux.never()
阻止 Mono<Void>
结束
如果不想发送心跳消息,但仍想保持 WebSocket 连接,可以使用 Flux.never()
:
return session.send(session.receive()
.map(msg -> session.textMessage("Echo: " + msg.getPayloadAsText())))
.then(Flux.never().then()); // 保持连接,避免 Mono<Void> 结束
问题:
- 这种方式 不会主动发送心跳,可能导致某些 WebSocket 服务器 因长时间无数据而关闭连接。
4. WebSocket 关闭后,资源是否会被释放?
是的,WebSocket 关闭后 session
会被释放,相关资源也会清理:
WebSocketSession
关闭后,底层 TCP 连接会断开。session.receive()
和session.send()
会停止订阅,释放 Reactor 资源。- 缓冲区、事件循环线程、内存等 也会被回收。
如果 Mono<Void>
终止,Spring WebFlux 会 自动释放 WebSocketSession
相关资源,开发者 无需手动关闭 session
。
5. 结论
问题:
session.receive()
本身不会结束,但session.send()
可能因数据流完成而关闭 WebSocket 连接。- 需要保证
session.send()
始终有数据要发送,避免Mono<Void>
提前完成。
推荐方案:
✅ 方案 1(心跳机制):使用 Flux.interval()
发送 “ping” 消息,保持连接(推荐)。
✅ 方案 2(Flux.never()
):阻止 Mono<Void>
结束,但不发送心跳(适用于特定场景)。
关于资源释放:
Mono<Void>
终止后,WebSocket 连接会关闭,session
及其相关资源(TCP 连接、缓冲区、事件循环等)会被 Spring WebFlux 自动释放。
通过这些方法,可以确保 WebSocket 连接 持续保持开放,并避免 session.send()
过早完成导致的连接关闭问题。