引言:WebSocket在现代应用中的重要性
在当今实时交互应用盛行的时代,WebSocket协议已成为实现双向通信的核心技术。相比传统的HTTP轮询,WebSocket提供了:
- 真正的全双工通信
- 极低的延迟(毫秒级)
- 高效的连接管理
- 减少不必要的网络流量
本文将介绍如何使用netty-websocket-spring-boot-starter
构建高性能WebSocket服务,实现消息收发功能。
一、Netty-WebSocket框架简介
Netty作为高性能NIO框架,是构建WebSocket服务的理想选择。netty-websocket-spring-boot-starter
封装了Netty的复杂配置,提供Spring Boot风格的开发体验:
核心优势:
- 高性能:基于Netty的Reactor模型,支持百万级并发
- 简化开发:注解驱动,类似Spring MVC
- 无缝集成:与Spring生态完美融合
- 可扩展性:支持自定义编解码器和拦截器
<!-- Maven依赖 -->
<dependency>
<groupId>org.yeauty</groupId>
<artifactId>netty-websocket-spring-boot-starter</artifactId>
<version>0.11.0</version>
</dependency>
二、构建WebSocket服务端
1. 基础服务端实现
@ServerEndpoint(path = "/chat", port = "8080")
@Component
public class ChatServer {
private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(Session session) {
String clientId = session.id().asShortText();
sessions.put(clientId, session);
System.out.println("客户端连接: " + clientId);
}
@OnClose
public void onClose(Session session) {
String clientId = session.id().asShortText();
sessions.remove(clientId);
System.out.println("客户端断开: " + clientId);
}
@OnMessage
public void onMessage(Session session, String message) {
System.out.println("收到消息: " + message);
// 处理消息逻辑
processMessage(session, message);
}
// 发送消息给指定客户端
public static void sendToClient(String clientId, String message) {
Session session = sessions.get(clientId);
if (session != null && session.isOpen()) {
session.sendText(message);
}
}
// 广播消息
public static void broadcast(String message) {
sessions.values().forEach(session -> {
if (session.isOpen()) {
session.sendText(message);
}
});
}
}
2. 核心注解解析
注解 | 说明 | 示例 |
---|---|---|
@ServerEndpoint |
定义服务端点 | @ServerEndpoint(path="/ws", port="8080") |
@OnOpen |
连接建立时触发 | public void onOpen(Session session) |
@OnClose |
连接关闭时触发 | public void onClose(Session session) |
@OnMessage |
收到消息时触发 | public void onMessage(String message) |
@OnError |
发生错误时触发 | public void onError(Throwable error) |
三、消息收发实战
1. 接收客户端消息
@OnMessage
public void onMessage(Session session, String message) {
try {
// 解析JSON消息
JsonNode json = new ObjectMapper().readTree(message);
// 消息路由
switch (json.get("type").asText()) {
case "TEXT":
handleTextMessage(session, json);
break;
case "IMAGE":
handleImageMessage(session, json);
break;
case "COMMAND":
handleCommand(session, json);
break;
default:
sendError(session, "未知消息类型");
}
} catch (Exception e) {
sendError(session, "消息格式错误");
}
}
private void handleTextMessage(Session session, JsonNode json) {
String content = json.get("content").asText();
String sender = json.get("sender").asText();
// 业务处理逻辑
MessageEntity message = messageService.saveMessage(sender, content);
// 回复客户端
session.sendText("{\"status\":\"SUCCESS\",\"messageId\":" + message.getId() + "}");
}
2. 发送消息给客户端
// 发送文本消息
public void sendTextMessage(String clientId, String content) {
Session session = sessions.get(clientId);
if (session != null && session.isOpen()) {
JsonObject message = new JsonObject();
message.addProperty("type", "TEXT");
message.addProperty("content", content);
message.addProperty("timestamp", System.currentTimeMillis());
session.sendText(message.toString());
}
}
// 发送二进制数据(如图片)
public void sendImage(String clientId, byte[] imageData) {
Session session = sessions.get(clientId);
if (session != null && session.isOpen()) {
session.sendBinary(imageData);
}
}
// 带回调的异步发送
public void sendWithCallback(String clientId, String message) {
Session session = sessions.get(clientId);
if (session != null && session.isOpen()) {
session.sendText(message, new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
log.info("消息发送成功");
}
@Override
public void onFailure(Throwable t) {
log.error("消息发送失败", t);
// 重试逻辑
}
});
}
}
四、高级功能实现
1. 心跳检测机制
@OnEvent
public void onEvent(Session session, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleEvent = (IdleStateEvent) evt;
if (idleEvent.state() == IdleState.READER_IDLE) {
// 30秒无读操作,发送心跳
session.sendText("{\"type\":\"HEARTBEAT\"}");
} else if (idleEvent.state() == IdleState.WRITER_IDLE) {
// 60秒无写操作,关闭连接
session.close();
}
}
}
2. 消息压缩传输
@OnMessage
public void onBinaryMessage(Session session, byte[] compressedData) {
try {
// 解压缩消息
String message = decompress(compressedData);
// 处理消息...
} catch (IOException e) {
log.error("解压缩失败", e);
}
}
private String decompress(byte[] compressed) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
GZIPInputStream gis = new GZIPInputStream(bis);
return new String(gis.readAllBytes(), StandardCharsets.UTF_8);
}
3. 分布式会话管理
@Service
public class RedisSessionStore {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void saveSession(String sessionId, SessionInfo info) {
redisTemplate.opsForValue().set(
"ws:session:" + sessionId,
info,
1, TimeUnit.HOURS
);
}
public SessionInfo getSessionInfo(String sessionId) {
return (SessionInfo) redisTemplate.opsForValue().get("ws:session:" + sessionId);
}
}
// 会话信息类
@Data
public class SessionInfo {
private String userId;
private String deviceId;
private String nodeId;
private long lastActiveTime;
}
五、最佳实践建议
连接管理优化
- 设置合理的最大连接数
- 实现连接数监控和告警
@Bean public ServerEndpointConfig config() { return ServerEndpointConfig.builder() .port(8080) .bossEventLoopGroup(2) // boss线程数 .workerEventLoopGroup(16) // worker线程数 .maxFramePayloadLength(1048576) // 1MB .build(); }
安全防护措施
- 实现WSS(WebSocket Secure)
- 添加身份验证
- 防止DDoS攻击
@BeforeHandshake public void handshake(Session session, @RequestParam String token) { if (!authService.validate(token)) { session.close(); } }
性能监控指标
指标 说明 健康值 活动连接数 当前在线连接 < 80% 最大容量 消息吞吐量 消息/秒 根据业务调整 平均延迟 消息处理时间 < 100ms 错误率 失败消息比例 < 0.1%
六、客户端实现示例
// WebSocket客户端
const socket = new WebSocket('wss://yourserver.com/chat');
// 连接建立
socket.onopen = () => {
console.log('连接已建立');
// 发送文本消息
socket.send(JSON.stringify({
type: 'TEXT',
content: '你好服务器!'
}));
};
// 接收消息
socket.onmessage = (event) => {
const message = JSON.parse(event.data);
console.log('收到消息:', message);