Spring Boot WebFlux 中的 WebSocket 提供了一种高效、异步的方式来处理客户端与服务器之间的双向通信。WebSocket 连接的生命周期包括连接建立、消息传输、连接关闭以及资源清理等过程。此外,为了确保 WebSocket 连接的稳定性和可靠性,我们可以加入重试机制,以处理断开或网络问题时自动重新连接。
1. WebSocket 连接建立
WebSocket 的连接是通过 HTTP 的 Upgrade 机制从普通的 HTTP/HTTPS 请求升级而来的。具体流程如下:
1.1 客户端请求 WebSocket 连接
客户端通过 ws://
或 wss://
协议来访问 WebSocket 服务器,并发送 HTTP Upgrade 请求头,要求服务器将连接升级为 WebSocket 协议:
GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: random-generated-key
Sec-WebSocket-Version: 13
1.2 服务器端处理 WebSocket 连接
Spring WebFlux 通过 WebSocketHandler
来处理 WebSocket 请求。以下是一个简单的 WebSocketHandler 实现:
@Component
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive()
.doOnNext(message -> System.out.println("Received: " + message.getPayloadAsText()))
.then();
}
}
当服务器收到 HTTP Upgrade 请求后,它会检查 Sec-WebSocket-Key
并返回 Sec-WebSocket-Accept
进行握手,建立连接。
1.3 握手成功,连接建立
如果握手成功,服务器会返回 101 Switching Protocols
响应,表示 WebSocket 连接已建立:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: (calculated key)
2. WebSocket 消息处理
连接建立后,WebSocket 进入消息传输阶段,包括消息的接收和发送。
2.1 消息接收
服务器端可以通过 WebSocketSession.receive()
方法来接收客户端发送的消息:
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(msg -> System.out.println("Received: " + msg))
.then();
session.receive()
返回一个 Flux<WebSocketMessage>
,可以处理流式消息,每次接收到新消息时执行 doOnNext()
中的处理逻辑。
2.2 消息发送
服务器端可以通过 WebSocketSession.send()
方法发送消息给客户端:
Flux<String> messages = Flux.interval(Duration.ofSeconds(1))
.map(i -> "Message " + i);
return session.send(messages.map(session::textMessage));
send()
方法接收一个 Publisher<WebSocketMessage>
,可以使用 Flux
来生成消息流。textMessage()
方法用于创建文本消息。
3. WebSocket 连接关闭
WebSocket 连接可以由客户端、服务器或网络异常等原因主动关闭。连接关闭的主要方式如下:
3.1 正常关闭
- 客户端主动关闭:客户端可以通过调用
WebSocket.close()
发送 Close Frame,服务器接收到后会关闭连接。 - 服务器主动关闭:服务器通过
WebSocketSession.close()
关闭连接:session.close(CloseStatus.NORMAL);
3.2 异常关闭
- 网络异常:如网络断开或客户端崩溃等,连接会被强制关闭。
- 心跳超时:如果使用 ping/pong 机制检测 WebSocket 是否存活,超时未收到 pong 响应时,连接会关闭。
session.send(Flux.just(session.pingMessage(ByteBuffer.wrap(new byte[0]))));
3.3 连接关闭后的处理
服务器可以使用 session.receive().doOnTerminate()
监听连接关闭事件,执行清理操作:
session.receive()
.doOnTerminate(() -> System.out.println("WebSocket connection closed"))
.then();
4. WebSocket 生命周期总结
阶段 | 说明 |
---|---|
连接建立 | 客户端发起 WebSocket 连接请求,服务器接受并返回 101 Switching Protocols 响应,连接建立。 |
消息传输 | 服务器和客户端可以双向传输文本或二进制消息。 |
连接关闭 | 连接可由客户端、服务器、网络异常等原因关闭。 |
资源清理 | 连接关闭后需要进行资源清理操作,如取消订阅、清理状态等。 |
5. 完整示例:WebFlux WebSocket 服务器
以下是一个完整的 WebSocket 服务器配置示例,展示了如何在 Spring Boot WebFlux 中配置 WebSocket:
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerMapping;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Map;
@Configuration
public class WebSocketConfig {
@Bean
public WebSocketHandler webSocketHandler() {
return session -> {
Flux<String> output = Flux.interval(Duration.ofSeconds(1))
.map(time -> "Server time: " + time);
return session.send(output.map(session::textMessage));
};
}
@Bean
public WebSocketHandlerMapping handlerMapping(WebSocketHandler handler) {
return new WebSocketHandlerMapping(Map.of("/ws", handler));
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
说明:
WebSocketHandler
处理 WebSocket 连接,发送定时消息。WebSocketHandlerMapping
将/ws
端点映射到 WebSocketHandler。WebSocketHandlerAdapter
用于适配 WebSocket 处理器。
6. 服务器端发起 WebSocket 连接
如果你希望服务器主动连接到其他 WebSocket 服务器,可以使用 WebSocketClient
。Spring WebFlux 提供了 ReactorNettyWebSocketClient
来发起 WebSocket 连接。
6.1 示例:服务器端发起 WebSocket 连接
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import reactor.core.publisher.Mono;
import java.net.URI;
@Service
public class WebSocketClientService {
private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();
public Mono<Void> connectToWebSocketServer() {
return client.execute(URI.create("ws://example.com/socket"), session -> {
Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println)
.subscribe();
return sendMessage;
});
}
}
6.2 在 Spring Boot 启动时自动连接
通过在 @PostConstruct
中调用连接方法,可以确保 WebSocket 客户端在 Spring Boot 启动时自动连接:
import jakarta.annotation.PostConstruct;
import org.springframework.stereotype.Component;
@Component
public class WebSocketClientInitializer {
private final WebSocketClientService webSocketClientService;
public WebSocketClientInitializer(WebSocketClientService webSocketClientService) {
this.webSocketClientService = webSocketClientService;
}
@PostConstruct
public void init() {
webSocketClientService.connectToWebSocketServer()
.subscribe();
}
}
7. WebSocket 连接重试机制
在 WebSocket 的生命周期中,由于网络问题或服务器错误,WebSocket 连接可能会中断。为了提高 WebSocket 连接的可靠性,我们可以为 WebSocket 客户端添加重试机制,以确保断开后能够重新连接。
7.1 使用 retry()
方法重试连接
WebFlux 提供了 retry()
方法来自动重试操作。以下是一个简单的重试机制示例:
import reactor.core.publisher.Mono;
public class WebSocketClientService {
private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();
public Mono<Void> connectToWebSocketServer() {
return client.execute(URI.create("ws://example.com/socket"), session -> {
Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println)
.subscribe();
return sendMessage;
}).retry(5); // 最大重试5次
}
}
在这个例子中,retry(5)
表示如果 WebSocket 连接失败,最多会重试 5 次。
7.2 使用 retryWhen()
实现自定义重试逻辑
我们还可以通过 retryWhen()
来实现更复杂的重试策略,例如设置重试间隔时间或根据错误类型决定是否重试:
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class WebSocketClientService {
private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();
public Mono<Void> connectToWebSocketServer() {
return client.execute(URI.create("ws://example.com/socket"), session -> {
Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println)
.subscribe();
return sendMessage;
}).retryWhen(errors ->
errors.zipWith(Flux.range(1, 5), (error, count) -> count) // 重试次数
.flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount))) // 增加重试间隔
);
}
}
在这个例子中,retryWhen()
会根据错误进行自定义重试逻辑,设置每次重试间隔递增。
8. 连接关闭后的重试机制
为了确保连接在关闭后重新建立,我们可以监听连接关闭事件并尝试重试:
session.receive()
.doOnTerminate(() -> {
System.out.println("WebSocket connection closed");
reconnect(); // 重新连接
})
.then();
private void reconnect() {
connectToWebSocketServer()
.retry(3) // 重试3次
.subscribe();
}
8.1 完整的客户端重试代码
public Mono<Void> connectWithRetry() {
return client.execute(URI.create("ws://example.com/socket"), session -> {
Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println)
.doOnTerminate(() -> reconnect()) // 连接关闭后重试
.subscribe();
return sendMessage;
}).retryWhen(errors ->
errors.zipWith(Flux.range(1, 5), (error, count) -> count)
.flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount)))
);
}
9. 结论
Spring Boot WebFlux 中 WebSocket 的生命周期包括:
- 连接建立:通过 HTTP Upgrade 握手建立 WebSocket 连接。
- 消息收发:服务器和客户端之间通过
receive()
和send()
方法进行消息交换。 - 连接关闭:连接可以通过正常关闭、异常关闭或主动关闭的方式结束。
- 资源清理:连接关闭后需要进行资源清理操作,确保系统稳定。
- 重试机制:通过
retry()
和retryWhen()
方法为 WebSocket 连接添加自动重试机制,提高连接的可靠性。
通过 WebSocket,Spring Boot WebFlux 提供了高效的异步通信方式,特别适合用于实时数据流应用。