Spring Boot 项目集成 Redisson 实现延迟队列

发布于:2025-06-25 ⋅ 阅读:(16) ⋅ 点赞:(0)

延迟队列应用场景

  1. 订单支付超时:用户下单后30分钟未支付,自动取消订单。
  2. 订单评价超时:订单签收后7天未评价,系统默认好评。
  3. 商家接单超时:下单成功后商家5分钟未接单,订单取消。
  4. 配送超时提醒:配送超时,推送短信提醒。

技术选型分析

针对延迟任务处理机制,主要可选方案有以下四种:定时轮询、Redisson 延迟队列、消息中间件、Redis 过期监听。

1. 定时任务轮询

机制: 通过定时任务如 @Scheduled 或 Quartz),以固定频率轮询数据库或 Redis,查找已到期的任务并处理。

优点:由于springboot原生支持,实现成本低,并且可以任务统一管理

缺点

  • 处理存在延迟非实时,精度取决于轮询间隔。
  • 容易空轮询,浪费 CPU 或数据库资源。
  • 不适合高并发或对时效要求高的业务场景。

适用场景:小型系统、任务量小、业务容忍较大延迟的场景。

2. Redisson 延迟队列(推荐)

机制说明
基于 Redis ZSet 和 List 封装的延迟队列,由 Redisson 实现,支持回调消费。

优点

  • 接入简单,Redisson 封装完备。
  • 实时性较好,精度可达秒级,满足大多数业务需求。
  • 可注册不同处理器,业务扩展方便。
  • 支持分布式部署,天然适配 Redis 集群环境。

缺点:实现依赖 Redisson,同时需要保证Redis 崩溃等情况设计补偿保护机制

适用场景:中大型系统、微服务架构下的延迟任务处理。

3. 消息中间件延迟队列( RabbitMQ、Kafka等)

机制:通过消息中间件的 TTL(消息生存时间)和死信队列机制实现延迟任务,例如 RabbitMQ 的 DLX(死信交换机)或 Kafka 的延迟消费。

优点

  • 毫秒级精度,适合高并发和对时效性要求高的业务。
  • 高可靠性,天然支持异步解耦与分布式处理。
  • 支持大规模任务并发调度。

缺点

  • 配置复杂,需要配置 TTL、DLX 等。
  • 引入 MQ 系统,提升系统复杂度与维护成本。
  • 对运维能力有一定要求。

适用场景:高并发、高可用要求的核心业务,如订单超时关闭、促销活动控制等。

4. Redis Key 过期监听(不推荐)

最初是通过redis的思路来实现延迟队列功能,但是通过查询资料和官方文档发现,redis并不适合此种场景

机制
通过启用 Redis 的 Keyspace Notification 功能,监听键过期事件(需设置 notify-keyspace-events 配置项)。

官方文档说明

Redis 中 Key 的过期事件 expired 有两种触发方式:

  • 在访问 Key 时发现其已过期
  • 后台线程定期扫描并删除过期 Key

因此,并不能保证在 TTL 恰好归零时立即触发过期事件,也不保证事件一定会触发。

缺点

  • 不可靠,事件触发不精确,且可能丢失。
  • 无法支持分布式监听,Redis 集群环境下存在局限。
  • 对核心业务流程不具备可控性。

适用场景:临时性、非强一致性场景,如验证码、状态标记清理等。

参考

选型建议总结

方案 实现难度 实时性 可靠性 分布式支持 推荐场景
定时任务轮询 有限 简单、低频业务
Redisson 延迟队列 分布式、业务量中等、场景标准
MQ 延迟队列 极好 高并发、大量异步、核心任务场景
Redis 过期监听 不确定 非核心场景,缓存状态变更类任务

Redisson 延迟队列实现

项目结构

├── config
│   └── RedissonConfig.java                # 配置 Redisson 客户端,创建 RedissonClient Bean
├── controller
│   └── DeliveryController.java            # 提供REST 接口模拟订单创建和收货操作,触发延迟任务
├── enums
│   └── DelayQueueEnum.java                # 定义延迟队列的业务枚举及其关联的处理类
├── hander
│   ├── DelayQueueHandler.java             # 延迟队列处理器接口,定义 execute 方法
│   ├── EvaluationTimeoutHandler.java      # 处理评价超时逻辑的具体实现类
│   └── OrderPaymentTimeoutHandler.java    # 处理订单支付超时逻辑的具体实现类
├── runner
│   └── RedisDelayQueueRunner.java         # 启动后监听并执行延迟队列任务,使用线程池并发处理
├── utils
│   ├── RedisDelayQueueUtil.java           # 封装 Redis 延迟队列的操作方法(添加/获取元素)
│   └── SpringUtils.java                   # 工具类,用于在非 Spring 管理类中获取 Bean
└── SpringbootApplication.java             # Spring Boot 主类,包含程序入口 main 方法

Redis延迟队列工具类

package com.zhou.demo.utils;

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * redis延迟队列工具
 */
@Slf4j
@Component
public class RedisDelayQueueUtil {

    @Resource
    private RedissonClient redissonClient;

    /**
     * 将元素添加到延迟队列中
     *
     * @param queueCode 队列键(用于标识不同的队列)
     * @param value     要添加到队列中的值(泛型类型)
     * @param delay     延迟时间(指定元素在队列中延迟被消费的时间)
     * @param timeUnit  时间单位(与延迟时间配合使用,如秒、毫秒等)
     */
    public <T> void addDelayQueue(String queueCode, T value, long delay, TimeUnit timeUnit) {
        try {
            RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
            RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
            delayedQueue.offer(value, delay, timeUnit);
            log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}秒", queueCode, value, timeUnit.toSeconds(delay));
        } catch (Exception e) {
            log.error("(添加延时队列失败) {}", e.getMessage(), e);
            throw new RuntimeException("(添加延时队列失败)", e);
        }
    }

    /**
     * 获取延迟队列中的元素
     *
     * @param queueCode 队列键
     * @param <T>       元素类型
     * @return 队列中的元素
     */
    public <T> T getDelayQueue(String queueCode) throws InterruptedException {
        RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueCode);
        return blockingDeque.take();
    }

}

Redis延迟队列运行器

用于在Spring Boot启动后监听各个延迟队列,并在线程池中执行对应的业务逻辑。

package com.zhou.demo.runner;

import com.zhou.demo.enums.DelayQueueEnum;
import com.zhou.demo.hander.DelayQueueHandler;
import com.zhou.demo.utils.RedisDelayQueueUtil;
import com.zhou.demo.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Redis延迟队列运行器
 * 用于在Spring Boot启动后监听各个延迟队列,并在线程池中执行对应的业务逻辑。
 *
 * @author zhouquan
 */
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {

    @Resource
    private RedisDelayQueueUtil redisDelayQueueUtil;

    /**
     * 线程池,用于并发监听不同的延迟队列
     */
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    /**
     * 运行状态标志,控制线程是否持续监听队列
     */
    private volatile boolean running = true;

    /**
     * Spring Boot 启动完成后自动运行的方法
     * 遍历所有延迟队列枚举,为每个队列创建一个监听线程
     *
     * @param args 命令行参数
     */
    @Override
    public void run(String... args) {
        for (DelayQueueEnum queueEnum : DelayQueueEnum.values()) {
            executorService.execute(() -> {
                log.info("启动延迟队列监听线程:{}", queueEnum.getCode());
                while (running) {
                    try {
                        Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
                        DelayQueueHandler handler = SpringUtils.getBean(queueEnum.getBeanClass());
                        handler.execute(value);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        log.warn("线程中断:{}", queueEnum.getCode());
                    } catch (Exception ex) {
                        log.error("延迟队列 [{}] 处理异常:{}", queueEnum.getCode(), ex.getMessage(), ex);
                    }
                }
            });
        }
        log.info("所有 Redis 延迟队列监听启动完成");
    }

    /**
     * 在 Bean 销毁前关闭线程池,释放资源
     */
    @PreDestroy
    public void shutdown() {
        log.info("准备关闭 Redis 延迟队列监听线程池");
        running = false;
        executorService.shutdownNow();
    }
}

业务枚举类

package com.zhou.demo.enums;

import com.zhou.demo.hander.EvaluationTimeoutHandler;
import com.zhou.demo.hander.OrderPaymentTimeoutHandler;
import com.zhou.demo.hander.DelayQueueHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

/**
 * 延迟队列业务枚举
 *
 * @author 18324
 */
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum DelayQueueEnum {

    /**
     * 订单超时
     */
    ORDER_PAYMENT_TIMEOUT("order_payment_timeout", "订单支付超时", OrderPaymentTimeoutHandler.class),

    /**
     * 评价超时
     */
    EVALUATION_TIMEOUT("evaluation_timeout", "评价超时", EvaluationTimeoutHandler.class);

    /**
     * 延迟队列 Redis Key
     */
    private String code;

    /**
     * 中文描述
     */
    private String name;

    /**
     * 延迟队列具体业务实现的 Bean
     * 可通过 Spring 的上下文获取
     */
    private Class<? extends DelayQueueHandler<Long>> beanClass;

}

测试接口类

package com.zhou.demo.enums;

import com.zhou.demo.hander.EvaluationTimeoutHandler;
import com.zhou.demo.hander.OrderPaymentTimeoutHandler;
import com.zhou.demo.hander.DelayQueueHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

/**
 * 延迟队列业务枚举
 *
 * @author 18324
 */
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum DelayQueueEnum {

    /**
     * 订单超时
     */
    ORDER_PAYMENT_TIMEOUT("order_payment_timeout", "订单支付超时", OrderPaymentTimeoutHandler.class),

    /**
     * 评价超时
     */
    EVALUATION_TIMEOUT("evaluation_timeout", "评价超时", EvaluationTimeoutHandler.class);

    /**
     * 延迟队列 Redis Key
     */
    private String code;

    /**
     * 中文描述
     */
    private String name;

    /**
     * 延迟队列具体业务实现的 Bean
     * 可通过 Spring 的上下文获取
     */
    private Class<? extends DelayQueueHandler<Long>> beanClass;

}

延迟队列任务测试

image-20250624145034682

image-20250624145154627

源码地址

https://gitee.com/zhouquanstudy/springboot-redisson-delayqueue

参考

[1]. SpringBoot集成Redisson实现延迟队列_redisson delayedqueue spring-CSDN博客

[2]. 请勿过度依赖Redis的过期监听业务


网站公告

今日签到

点亮在社区的每一天
去签到