WebSocket 结合消息中间件的实时通知架构设计
1. 架构总览
+----------------+ +----------------+ +---------------------+
| 前端客户端 | | WebSocket网关 | | 消息中间件 |
| (浏览器/APP) |<---->| (集群部署) |<---->| (RabbitMQ/Kafka) |
+----------------+ +----------------+ +---------------------+
| |
| v
| +---------------------+
| | 业务服务 |
| | (生成通知消息) |
| +---------------------+
v
+---------------------+
| Redis/共享存储 |
| (管理在线状态/会话) |
+---------------------+
2. 核心组件与职责
组件 |
职责 |
前端客户端 |
建立WebSocket长连接,接收服务端推送的实时消息。 |
WebSocket网关 |
维护客户端连接,路由消息到中间件,处理上下线事件,管理会话状态。 |
消息中间件 |
解耦业务服务与推送服务,存储待推送消息,支持削峰填谷、消息广播和顺序保证。 |
业务服务 |
生成业务消息(如订单支付成功),发布到消息中间件。 |
Redis |
存储用户与连接的映射关系,支持水平扩展时的会话同步。 |
3. 详细设计流程
3.1 连接建立与认证
- 前端连接
客户端通过 wss://gateway.example.com/ws?token=xxx
发起WebSocket连接,携带JWT Token。
- 网关认证
网关验证Token有效性,解析用户ID,记录到Redis(user:1001:connections -> [conn1, conn2]
)。
- 订阅主题
客户端发送订阅请求(如SUBSCRIBE /user/notifications
),网关绑定用户ID与消息队列主题。
代码示例(Spring WebSocket + STOMP):
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("rabbitmq-host")
.setRelayPort(61613);
registry.setApplicationDestinationPrefixes("/app");
}
}
3.2 消息生产与路由
- 业务服务生产消息
业务逻辑完成后,向消息中间件发送事件(如订单支付成功通知):
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void completeOrder(Order order) {
rabbitTemplate.convertAndSend("notifications.exchange",
"user." + order.getUserId(),
new Notification(..., order.getId()));
}
}
- 中间件路由规则
- RabbitMQ:使用Topic Exchange,按
user.{userId}
路由到用户专属队列。
- Kafka:按用户ID分区,确保同一用户的消息顺序性。
3.3 消息推送
- 网关消费消息
WebSocket网关订阅中间件队列,接收消息后查找在线用户连接:
@RabbitListener(queues = "user.notifications.${userId}")
public void handleNotification(Notification message) {
String userId = message.getUserId();
Set<String> connections = redisTemplate.opsForSet().members("user:" + userId + ":connections");
connections.forEach(connId -> {
messagingTemplate.convertAndSendToUser(connId, "/queue/notifications", message);
});
}
- 多网关同步
使用Redis Pub/Sub 广播用户上下线事件,确保集群内各网关状态一致。
3.4 连接维护
- 心跳检测
客户端定期发送心跳帧(PING),服务端回复PONG,超时则主动断开。
- 断线重连
客户端检测到连接中断后,按指数退避策略重连,恢复订阅。
- 离线存储
用户离线期间的消息持久化到数据库,待上线后通过/notifications/unread
接口拉取。
4. 关键优化策略
4.1 性能优化
- 连接复用:同一用户多设备共享TCP连接,通过虚拟通道(STOMP SUBSCRIPTION)隔离消息流。
- 消息压缩:对大型通知(如富文本)使用gzip压缩,减少带宽占用。
- 批量推送:累积窗口期内多条消息合并发送,降低网络开销。
4.2 可靠性保障
- 消息确认:使用中间件的ACK机制,确保消息至少送达一次。
- 死信队列:处理无法推送的消息(如用户长期离线),转储后人工介入。
- 幂等消费:消息携带唯一ID,避免重复处理。
4.3 安全控制
- TLS加密:WebSocket连接使用wss协议,防止中间人攻击。
- 权限隔离:校验用户对消息的访问权限(如订单号归属校验)。
- 限流防护:按用户/IP限制连接数和推送频率,防止DoS攻击。
5. 监控与运维
5.1 监控指标
指标 |
采集方式 |
告警阈值 |
在线连接数 |
Prometheus + Gateway暴露/metrics端点 |
> 80% 最大容量 |
消息吞吐量 |
RabbitMQ Management API/Kafka Metrics |
突发增长 > 100%/分钟 |
平均推送延迟 |
分布式追踪(SkyWalking) |
> 500ms |
离线消息堆积数 |
数据库查询 |
> 10,000条 |
5.2 运维工具
- 连接管理:开发内部控制台,强制踢除异常连接。
- 消息追溯:基于TraceID查询消息全链路状态。
- 灰度发布:按用户分批次升级网关,观察稳定性。
6. 技术选型对比
需求场景 |
RabbitMQ + STOMP |
Kafka + WebSocket |
消息顺序 |
单队列保证顺序 |
分区内顺序保证 |
吞吐量 |
中等(万级/秒) |
高(十万级/秒) |
协议支持 |
原生支持STOMP,与WebSocket集成简单 |
需自定义适配层 |
离线消息 |
通过TTL+死信队列实现 |
依赖Kafka持久化存储 |
适用规模 |
中小型系统,需快速搭建 |
大型高并发系统,需水平扩展 |
7. 示例:支付成功实时通知
- 用户下单支付,支付服务处理成功后,发布事件到RabbitMQ:
public void onPaymentSuccess(Payment payment) {
NotificationMessage msg = new NotificationMessage(
payment.getUserId(),
"支付成功,订单号:" + payment.getOrderId()
);
rabbitTemplate.convertAndSend("notifications", "user." + userId, msg);
}
- WebSocket网关消费消息,推送至在线客户端:
const socket = new SockJS('/ws');
const client = Stomp.over(socket);
client.connect({}, () => {
client.subscribe('/user/queue/notifications', (message) => {
showToast(JSON.parse(message.body).content);
});
});
- 用户离线时,消息持久化到MySQL,待下次登录后拉取未读消息。
总结
通过WebSocket + 消息中间件的架构,实现了低延迟、高可靠的实时通知系统。该设计解耦了业务逻辑与消息推送,利用中间件的持久化和路由能力保障消息可达性,结合集群化部署和监控体系,适用于电商、IM、物联网等实时交互场景。