WebSocket集成方案对比与实战
架构选型全景图
一、Javax原生WebSocket API
核心实现代码
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@ServerEndpoint("/ws/javax")
public class JavaxWebSocketEndpoint {
private static final Set<Session> sessions = new CopyOnWriteArraySet<>();
@OnOpen
public void onOpen(Session session) {
sessions.add(session);
System.out.println("New connection: " + session.getId());
}
@OnMessage
public void onMessage(String message, Session sender) {
sessions.parallelStream()
.filter(Session::isOpen)
.forEach(session -> {
try {
session.getBasicRemote().sendText("Echo: " + message);
} catch (IOException e) {
e.printStackTrace();
}
});
}
@OnClose
public void onClose(Session session) {
sessions.remove(session);
System.out.println("Connection closed: " + session.getId());
}
}
技术特点
✅ 原生JavaEE标准支持(JSR-356)
✅ 无需额外依赖
⚠️ 需手动处理线程安全
⚠️ 不支持协议自动升级
二、Spring WebMVC集成方案
Maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置与实现
@Configuration
@EnableWebSocket
public class WebMvcWebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler(), "/ws/spring")
.addInterceptors(new HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*");
}
@Bean
public WebSocketHandler webSocketHandler() {
return new TextWebSocketHandler() {
private final List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
sessions.add(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
sessions.forEach(s -> {
try {
s.sendMessage(new TextMessage("Processed: " + message.getPayload()));
} catch (IOException e) {
// 异常处理
}
});
}
};
}
}
进阶特性
- 消息转换器(JSON/Protobuf)
- STOMP子协议支持
- 与Spring Security集成
三、Spring WebFlux响应式方案
响应式端点
@Configuration
@Slf4j
public class BusinessWebSocketConfig {
// 自定义业务处理器
@Component
public static class BusinessProcessor {
private final ReactiveRedisTemplate<String, String> redisTemplate;
public BusinessProcessor(ReactiveRedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
// 示例业务处理:消息校验+存储Redis+生成响应
public Mono<String> processMessage(WebSocketMessage message) {
String payload = message.getPayloadAsText();
return Mono.just(payload)
.filter(msg -> !msg.isBlank()) // 空消息过滤
.switchIfEmpty(Mono.error(new IllegalArgumentException("空消息")))
.flatMap(msg ->
redisTemplate.opsForList().leftPush("ws:message:queue", msg) // 存储到Redis队列
.thenReturn("ACK: " + msg) // 生成响应消息
)
.timeout(Duration.ofSeconds(2)) // 超时控制
.onErrorResume(ex -> {
log.error("处理失败: {}", ex.getMessage());
return Mono.just("ERROR: " + ex.getMessage());
});
}
}
@Bean
public HandlerMapping handlerMapping(BusinessProcessor processor) {
Map<String, WebSocketHandler> handlers = new HashMap<>();
handlers.put("/ws/business", session -> {
// 输入流背压配置
Flux<WebSocketMessage> inputStream = session.receive()
.onBackpressureBuffer(2000,
BufferOverflowStrategy.DROP_OLDEST)
.doOnNext(msg ->
Metrics.counter("websocket.receive.count").increment())
.publishOn(Schedulers.boundedElastic()); // 切换到弹性线程池
// 业务处理管道
return session.send(
inputStream
.delayElements(Duration.ofMillis(50)) // 流速控制
.concatMap(processor::processMessage) // 业务处理(保证顺序)
.map(resp -> session.textMessage(resp))
.doOnError(ex -> log.error("发送异常", ex))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
);
});
return new SimpleUrlHandlerMapping(handlers, -1);
}
}
选择策略建议
- 实时聊天系统:采用DROP_OLDEST策略+500ms延迟均衡体验
- 金融交易系统:使用ERROR策略+重试队列保证数据完整性
- 物联网数据采集:结合publishOn与delayElements实现阶梯式降速
四、Java-WebSocket独立库
服务端实现
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
public class JavaWebSocketServer extends WebSocketServer {
public JavaWebSocketServer(int port) {
super(new InetSocketAddress(port));
}
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
System.out.println("New client: " + conn.getRemoteSocketAddress());
}
@Override
public void onMessage(WebSocket conn, String message) {
broadcast("Broadcast: " + message);
}
public static void main(String[] args) {
new JavaWebSocketServer(9001).run();
}
}
客户端连接
const ws = new WebSocket('ws://localhost:9001');
ws.onmessage = (event) => console.log('Received:', event.data);
五、Socket.IO集成方案
服务端配置(基于Netty)
@Configuration
public class SocketIOConfig {
@Bean
public SocketIOServer socketIOServer() {
Configuration config = new Configuration();
config.setHostname("localhost");
config.setPort(9092);
SocketIOServer server = new SocketIOServer(config);
server.addConnectListener(client -> {
client.sendEvent("welcome", "Connected to Socket.IO");
});
server.addEventListener("chat", String.class,
(client, data, ack) -> server.getBroadcastOperations().sendEvent("message", data));
return server;
}
}
客户端适配
import { io } from "socket.io-client";
const socket = io("http://localhost:9092");
socket.on("welcome", data => console.log(data));
socket.emit("chat", "Hello Socket.IO");
六、Netty原生实现
完整服务端代码
public class NettyWebSocketServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8192));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler());
}
})
.bind(8080).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
ctx.writeAndFlush(new TextWebSocketFrame("NETTY: " + msg.text()));
}
}
}
技术方案对比矩阵
特性 | Javax | WebMVC | WebFlux | Java-WebSocket | Socket.IO | Netty |
---|---|---|---|---|---|---|
协议支持 | WS | WS/STOMP | RSocket | WS | WS+Polling | 自定义 |
最大连接数 | 1万 | 5万 | 10万+ | 3万 | 5万 | 100万+ |
内存消耗 | 中 | 中 | 低 | 中 | 高 | 极低 |
学习曲线 | 简单 | 中等 | 较高 | 简单 | 中等 | 陡峭 |
集群支持 | 需扩展 | 需扩展 | 原生支持 | 需扩展 | 需扩展 | 需扩展 |
生产就绪度 | ☆☆ | ☆☆☆☆ | ☆☆☆☆ | ☆☆☆ | ☆☆☆☆ | ☆☆☆☆☆ |
最佳实践指南
- 中小型项目:优先选择Spring WebMVC方案
- 高并发场景:WebFlux或Netty方案
- 多协议需求:Socket.IO支持降级通信
- 资源受限环境:Java-WebSocket轻量级方案
- 需要精细控制:直接使用Netty底层API
通过本文您可以快速掌握不同场景下的WebSocket技术选型,建议结合实际业务需求进行性能测试后确定最终方案。