Spring Boot 项目集成 Redisson 实现延迟队列
延迟队列应用场景
- 订单支付超时:用户下单后30分钟未支付,自动取消订单。
- 订单评价超时:订单签收后7天未评价,系统默认好评。
- 商家接单超时:下单成功后商家5分钟未接单,订单取消。
- 配送超时提醒:配送超时,推送短信提醒。
技术选型分析
针对延迟任务处理机制,主要可选方案有以下四种:定时轮询、Redisson 延迟队列、消息中间件、Redis 过期监听。
1. 定时任务轮询
机制: 通过定时任务如 @Scheduled
或 Quartz),以固定频率轮询数据库或 Redis,查找已到期的任务并处理。
优点:由于springboot原生支持,实现成本低,并且可以任务统一管理
缺点:
- 处理存在延迟非实时,精度取决于轮询间隔。
- 容易空轮询,浪费 CPU 或数据库资源。
- 不适合高并发或对时效要求高的业务场景。
适用场景:小型系统、任务量小、业务容忍较大延迟的场景。
2. Redisson 延迟队列(推荐)
机制说明:
基于 Redis ZSet 和 List 封装的延迟队列,由 Redisson 实现,支持回调消费。
优点:
- 接入简单,Redisson 封装完备。
- 实时性较好,精度可达秒级,满足大多数业务需求。
- 可注册不同处理器,业务扩展方便。
- 支持分布式部署,天然适配 Redis 集群环境。
缺点:实现依赖 Redisson,同时需要保证Redis 崩溃等情况设计补偿保护机制
适用场景:中大型系统、微服务架构下的延迟任务处理。
3. 消息中间件延迟队列( RabbitMQ、Kafka等)
机制:通过消息中间件的 TTL(消息生存时间)和死信队列机制实现延迟任务,例如 RabbitMQ 的 DLX(死信交换机)或 Kafka 的延迟消费。
优点:
- 毫秒级精度,适合高并发和对时效性要求高的业务。
- 高可靠性,天然支持异步解耦与分布式处理。
- 支持大规模任务并发调度。
缺点:
- 配置复杂,需要配置 TTL、DLX 等。
- 引入 MQ 系统,提升系统复杂度与维护成本。
- 对运维能力有一定要求。
适用场景:高并发、高可用要求的核心业务,如订单超时关闭、促销活动控制等。
4. Redis Key 过期监听(不推荐)
最初是通过redis的思路来实现延迟队列功能,但是通过查询资料和官方文档发现,redis并不适合此种场景
机制:
通过启用 Redis 的 Keyspace Notification 功能,监听键过期事件(需设置 notify-keyspace-events
配置项)。
官方文档说明:
Redis 中 Key 的过期事件
expired
有两种触发方式:
- 在访问 Key 时发现其已过期
- 后台线程定期扫描并删除过期 Key
因此,并不能保证在 TTL 恰好归零时立即触发过期事件,也不保证事件一定会触发。
缺点:
- 不可靠,事件触发不精确,且可能丢失。
- 无法支持分布式监听,Redis 集群环境下存在局限。
- 对核心业务流程不具备可控性。
适用场景:临时性、非强一致性场景,如验证码、状态标记清理等。
参考:
选型建议总结
方案 | 实现难度 | 实时性 | 可靠性 | 分布式支持 | 推荐场景 |
---|---|---|---|---|---|
定时任务轮询 | 低 | 低 | 中 | 有限 | 简单、低频业务 |
Redisson 延迟队列 | 中 | 中 | 高 | 好 | 分布式、业务量中等、场景标准 |
MQ 延迟队列 | 高 | 高 | 高 | 极好 | 高并发、大量异步、核心任务场景 |
Redis 过期监听 | 低 | 不确定 | 低 | 差 | 非核心场景,缓存状态变更类任务 |
Redisson 延迟队列实现
项目结构
├── config
│ └── RedissonConfig.java # 配置 Redisson 客户端,创建 RedissonClient Bean
├── controller
│ └── DeliveryController.java # 提供REST 接口模拟订单创建和收货操作,触发延迟任务
├── enums
│ └── DelayQueueEnum.java # 定义延迟队列的业务枚举及其关联的处理类
├── hander
│ ├── DelayQueueHandler.java # 延迟队列处理器接口,定义 execute 方法
│ ├── EvaluationTimeoutHandler.java # 处理评价超时逻辑的具体实现类
│ └── OrderPaymentTimeoutHandler.java # 处理订单支付超时逻辑的具体实现类
├── runner
│ └── RedisDelayQueueRunner.java # 启动后监听并执行延迟队列任务,使用线程池并发处理
├── utils
│ ├── RedisDelayQueueUtil.java # 封装 Redis 延迟队列的操作方法(添加/获取元素)
│ └── SpringUtils.java # 工具类,用于在非 Spring 管理类中获取 Bean
└── SpringbootApplication.java # Spring Boot 主类,包含程序入口 main 方法
Redis延迟队列工具类
package com.zhou.demo.utils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* redis延迟队列工具
*/
@Slf4j
@Component
public class RedisDelayQueueUtil {
@Resource
private RedissonClient redissonClient;
/**
* 将元素添加到延迟队列中
*
* @param queueCode 队列键(用于标识不同的队列)
* @param value 要添加到队列中的值(泛型类型)
* @param delay 延迟时间(指定元素在队列中延迟被消费的时间)
* @param timeUnit 时间单位(与延迟时间配合使用,如秒、毫秒等)
*/
public <T> void addDelayQueue(String queueCode, T value, long delay, TimeUnit timeUnit) {
try {
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(value, delay, timeUnit);
log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}秒", queueCode, value, timeUnit.toSeconds(delay));
} catch (Exception e) {
log.error("(添加延时队列失败) {}", e.getMessage(), e);
throw new RuntimeException("(添加延时队列失败)", e);
}
}
/**
* 获取延迟队列中的元素
*
* @param queueCode 队列键
* @param <T> 元素类型
* @return 队列中的元素
*/
public <T> T getDelayQueue(String queueCode) throws InterruptedException {
RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueCode);
return blockingDeque.take();
}
}
Redis延迟队列运行器
用于在Spring Boot启动后监听各个延迟队列,并在线程池中执行对应的业务逻辑。
package com.zhou.demo.runner;
import com.zhou.demo.enums.DelayQueueEnum;
import com.zhou.demo.hander.DelayQueueHandler;
import com.zhou.demo.utils.RedisDelayQueueUtil;
import com.zhou.demo.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Redis延迟队列运行器
* 用于在Spring Boot启动后监听各个延迟队列,并在线程池中执行对应的业务逻辑。
*
* @author zhouquan
*/
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {
@Resource
private RedisDelayQueueUtil redisDelayQueueUtil;
/**
* 线程池,用于并发监听不同的延迟队列
*/
private final ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 运行状态标志,控制线程是否持续监听队列
*/
private volatile boolean running = true;
/**
* Spring Boot 启动完成后自动运行的方法
* 遍历所有延迟队列枚举,为每个队列创建一个监听线程
*
* @param args 命令行参数
*/
@Override
public void run(String... args) {
for (DelayQueueEnum queueEnum : DelayQueueEnum.values()) {
executorService.execute(() -> {
log.info("启动延迟队列监听线程:{}", queueEnum.getCode());
while (running) {
try {
Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
DelayQueueHandler handler = SpringUtils.getBean(queueEnum.getBeanClass());
handler.execute(value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("线程中断:{}", queueEnum.getCode());
} catch (Exception ex) {
log.error("延迟队列 [{}] 处理异常:{}", queueEnum.getCode(), ex.getMessage(), ex);
}
}
});
}
log.info("所有 Redis 延迟队列监听启动完成");
}
/**
* 在 Bean 销毁前关闭线程池,释放资源
*/
@PreDestroy
public void shutdown() {
log.info("准备关闭 Redis 延迟队列监听线程池");
running = false;
executorService.shutdownNow();
}
}
业务枚举类
package com.zhou.demo.enums;
import com.zhou.demo.hander.EvaluationTimeoutHandler;
import com.zhou.demo.hander.OrderPaymentTimeoutHandler;
import com.zhou.demo.hander.DelayQueueHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* 延迟队列业务枚举
*
* @author 18324
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum DelayQueueEnum {
/**
* 订单超时
*/
ORDER_PAYMENT_TIMEOUT("order_payment_timeout", "订单支付超时", OrderPaymentTimeoutHandler.class),
/**
* 评价超时
*/
EVALUATION_TIMEOUT("evaluation_timeout", "评价超时", EvaluationTimeoutHandler.class);
/**
* 延迟队列 Redis Key
*/
private String code;
/**
* 中文描述
*/
private String name;
/**
* 延迟队列具体业务实现的 Bean
* 可通过 Spring 的上下文获取
*/
private Class<? extends DelayQueueHandler<Long>> beanClass;
}
测试接口类
package com.zhou.demo.enums;
import com.zhou.demo.hander.EvaluationTimeoutHandler;
import com.zhou.demo.hander.OrderPaymentTimeoutHandler;
import com.zhou.demo.hander.DelayQueueHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* 延迟队列业务枚举
*
* @author 18324
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum DelayQueueEnum {
/**
* 订单超时
*/
ORDER_PAYMENT_TIMEOUT("order_payment_timeout", "订单支付超时", OrderPaymentTimeoutHandler.class),
/**
* 评价超时
*/
EVALUATION_TIMEOUT("evaluation_timeout", "评价超时", EvaluationTimeoutHandler.class);
/**
* 延迟队列 Redis Key
*/
private String code;
/**
* 中文描述
*/
private String name;
/**
* 延迟队列具体业务实现的 Bean
* 可通过 Spring 的上下文获取
*/
private Class<? extends DelayQueueHandler<Long>> beanClass;
}
延迟队列任务测试
源码地址
https://gitee.com/zhouquanstudy/springboot-redisson-delayqueue
参考
[1]. SpringBoot集成Redisson实现延迟队列_redisson delayedqueue spring-CSDN博客
[2]. 请勿过度依赖Redis的过期监听业务