基于 RabbitMQ 优先级队列的订阅推送服务详细设计方案

发布于:2025-04-14 ⋅ 阅读:(23) ⋅ 点赞:(0)

基于 RabbitMQ 优先级队列的订阅推送服务详细设计方案

一、架构设计
  1. 分层架构

    • 订阅管理层(Spring Boot)
    • 消息分发层(RabbitMQ Cluster)
    • 推送执行层(Spring Cloud Stream)
    • 数据存储层(Redis + MySQL)
  2. 核心组件

    +-------------------+       +-------------------+       +-------------------+
    |  订阅配置管理模块  |       |  消息优先级路由器  |       |  推送执行引擎      |
    | (Spring Boot)     |------>| (RabbitMQ Exchange)|------>| (Spring Cloud Stream)
    +-------------------+       +-------------------+       +-------------------+
            |                           |                           |
            v                           v                           v
    +-------------------+       +-------------------+       +-------------------+
    | 订阅规则数据库     |       | 优先级队列集群     |       | 推送状态监控中心    |
    | (MySQL)          |       | (x-max-priority=10)|       | (Prometheus+Grafana)
    +-------------------+       +-------------------+       +-------------------+
    
二、优先级队列实现方案
  1. 队列定义
// 紧急队列(优先级5-10)
Map<String, Object> urgentArgs = new HashMap<>();
urgentArgs.put("x-max-priority", 10); // 支持10级优先级
urgentArgs.put("x-queue-mode", "lazy"); // 惰性队列防止内存溢出
Queue urgentQueue = new Queue("urgent_queue", true, false, false, urgentArgs);

// 普通队列(优先级0-4)
Map<String, Object> normalArgs = new HashMap<>();
normalArgs.put("x-max-priority", 4);
Queue normalQueue = new Queue("normal_queue", true, false, false, normalArgs);
  1. 消息路由策略
public class PriorityMessageRouter {
   
    private static final int URGENT_THRESHOLD = 5;

    // 根据业务规则自动判断优先级
    public String determineRoutingKey(Message message) {
   
        String bidType = message.getHeader("bid_type");
        LocalDateTime deadline = message.getHeader("deadline");
        
        if ("EMERGENCY".equals(bidType) || 
            LocalDateTime.parse(deadline).isBefore(LocalDateTime.now().plusHours(2))) {
   
            return "urgent_queue";
        }
        return "normal_queue";
    }
}
三、消息生产端优化
  1. 消息封装规范
public class PriorityMessageBuilder {
   
    public static Message buildMessage(Object payload, int priority) {
   
        MessageProperties props = new MessageProperties();
        props(priority);
        props.setHeader("retry_count", 0);
        props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return new 

网站公告

今日签到

点亮在社区的每一天
去签到