WebSocket 结合消息中间件的实时通知架构设计

发布于:2025-04-19 ⋅ 阅读:(19) ⋅ 点赞:(0)

WebSocket 结合消息中间件的实时通知架构设计


1. 架构总览
+----------------+       +----------------+       +---------------------+
|  前端客户端     |       |  WebSocket网关  |       |  消息中间件          |
| (浏览器/APP)    |<---->| (集群部署)      |<---->| (RabbitMQ/Kafka)   |
+----------------+       +----------------+       +---------------------+
                          |           |
                          |           v
                          |     +---------------------+
                          |     |  业务服务           |
                          |     | (生成通知消息)       |
                          |     +---------------------+
                          v
                   +---------------------+
                   |  Redis/共享存储     |
                   | (管理在线状态/会话)  |
                   +---------------------+

2. 核心组件与职责
组件 职责
前端客户端 建立WebSocket长连接,接收服务端推送的实时消息。
WebSocket网关 维护客户端连接,路由消息到中间件,处理上下线事件,管理会话状态。
消息中间件 解耦业务服务与推送服务,存储待推送消息,支持削峰填谷、消息广播和顺序保证。
业务服务 生成业务消息(如订单支付成功),发布到消息中间件。
Redis 存储用户与连接的映射关系,支持水平扩展时的会话同步。

3. 详细设计流程
3.1 连接建立与认证
  1. 前端连接
    客户端通过 wss://gateway.example.com/ws?token=xxx 发起WebSocket连接,携带JWT Token。
  2. 网关认证
    网关验证Token有效性,解析用户ID,记录到Redis(user:1001:connections -> [conn1, conn2])。
  3. 订阅主题
    客户端发送订阅请求(如SUBSCRIBE /user/notifications),网关绑定用户ID与消息队列主题。

代码示例(Spring WebSocket + STOMP)

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue")
                .setRelayHost("rabbitmq-host")
                .setRelayPort(61613);
        registry.setApplicationDestinationPrefixes("/app");
    }
}

3.2 消息生产与路由
  1. 业务服务生产消息
    业务逻辑完成后,向消息中间件发送事件(如订单支付成功通知):
@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void completeOrder(Order order) {
        // 发布消息到MQ
        rabbitTemplate.convertAndSend("notifications.exchange", 
            "user." + order.getUserId(), 
            new Notification(..., order.getId()));
    }
}
  1. 中间件路由规则
    • RabbitMQ:使用Topic Exchange,按 user.{userId} 路由到用户专属队列。
    • Kafka:按用户ID分区,确保同一用户的消息顺序性。

3.3 消息推送
  1. 网关消费消息
    WebSocket网关订阅中间件队列,接收消息后查找在线用户连接:
@RabbitListener(queues = "user.notifications.${userId}")
public void handleNotification(Notification message) {
    String userId = message.getUserId();
    Set<String> connections = redisTemplate.opsForSet().members("user:" + userId + ":connections");
    connections.forEach(connId -> {
        messagingTemplate.convertAndSendToUser(connId, "/queue/notifications", message);
    });
}
  1. 多网关同步
    使用Redis Pub/Sub 广播用户上下线事件,确保集群内各网关状态一致。

3.4 连接维护
  1. 心跳检测
    客户端定期发送心跳帧(PING),服务端回复PONG,超时则主动断开。
  2. 断线重连
    客户端检测到连接中断后,按指数退避策略重连,恢复订阅。
  3. 离线存储
    用户离线期间的消息持久化到数据库,待上线后通过/notifications/unread接口拉取。

4. 关键优化策略
4.1 性能优化
  • 连接复用:同一用户多设备共享TCP连接,通过虚拟通道(STOMP SUBSCRIPTION)隔离消息流。
  • 消息压缩:对大型通知(如富文本)使用gzip压缩,减少带宽占用。
  • 批量推送:累积窗口期内多条消息合并发送,降低网络开销。
4.2 可靠性保障
  • 消息确认:使用中间件的ACK机制,确保消息至少送达一次。
  • 死信队列:处理无法推送的消息(如用户长期离线),转储后人工介入。
  • 幂等消费:消息携带唯一ID,避免重复处理。
4.3 安全控制
  • TLS加密:WebSocket连接使用wss协议,防止中间人攻击。
  • 权限隔离:校验用户对消息的访问权限(如订单号归属校验)。
  • 限流防护:按用户/IP限制连接数和推送频率,防止DoS攻击。

5. 监控与运维
5.1 监控指标
指标 采集方式 告警阈值
在线连接数 Prometheus + Gateway暴露/metrics端点 > 80% 最大容量
消息吞吐量 RabbitMQ Management API/Kafka Metrics 突发增长 > 100%/分钟
平均推送延迟 分布式追踪(SkyWalking) > 500ms
离线消息堆积数 数据库查询 > 10,000条
5.2 运维工具
  • 连接管理:开发内部控制台,强制踢除异常连接。
  • 消息追溯:基于TraceID查询消息全链路状态。
  • 灰度发布:按用户分批次升级网关,观察稳定性。

6. 技术选型对比
需求场景 RabbitMQ + STOMP Kafka + WebSocket
消息顺序 单队列保证顺序 分区内顺序保证
吞吐量 中等(万级/秒) 高(十万级/秒)
协议支持 原生支持STOMP,与WebSocket集成简单 需自定义适配层
离线消息 通过TTL+死信队列实现 依赖Kafka持久化存储
适用规模 中小型系统,需快速搭建 大型高并发系统,需水平扩展

7. 示例:支付成功实时通知
  1. 用户下单支付,支付服务处理成功后,发布事件到RabbitMQ:
// 支付服务代码
public void onPaymentSuccess(Payment payment) {
    NotificationMessage msg = new NotificationMessage(
        payment.getUserId(), 
        "支付成功,订单号:" + payment.getOrderId()
    );
    rabbitTemplate.convertAndSend("notifications", "user." + userId, msg);
}
  1. WebSocket网关消费消息,推送至在线客户端:
// 前端监听
const socket = new SockJS('/ws');
const client = Stomp.over(socket);
client.connect({}, () => {
  client.subscribe('/user/queue/notifications', (message) => {
    showToast(JSON.parse(message.body).content);
  });
});
  1. 用户离线时,消息持久化到MySQL,待下次登录后拉取未读消息。

总结

通过WebSocket + 消息中间件的架构,实现了低延迟、高可靠的实时通知系统。该设计解耦了业务逻辑与消息推送,利用中间件的持久化和路由能力保障消息可达性,结合集群化部署和监控体系,适用于电商、IM、物联网等实时交互场景。


网站公告

今日签到

点亮在社区的每一天
去签到