注意:轻量级队列可以使用工具类,重量级数据量 请使用 MQ
本文章基于redis使用redisson客户端实现轻量级队列,以及代码、执行结果演示
一、常见队列了解
- 普通队列:先进先出(FIFO),只能在一端添加元素,在另一端移除元素。
- 循环队列:利用数组和取模运算实现队尾连接队首。
- 双端队列:两端都可以添加和移除元素。
- 优先级队列:根据元素的优先级顺序处理元素。
- 阻塞队列:在多线程中使用,队空时取元素会等待,队满时加元素会等待。
- 有限队列:队列长度固定,队满时新元素加入会导致队头元素自动移除。
二、工具类
基于redisson 实现的分布式工具类,copy即用
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class QueueUtils {
private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
/**
* 获取客户端实例
*/
public static RedissonClient getClient() {
return CLIENT;
}
/**
* 添加普通队列数据
*
* @param queueName 队列名
* @param data 数据
*/
public static <T> boolean addQueueObject(String queueName, T data) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.offer(data);
}
/**
* 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)
*
* @param queueName 队列名
*/
public static <T> T getQueueObject(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.poll();
}
/**
* 通用删除队列数据(不支持延迟队列)
*/
public static <T> boolean removeQueueObject(String queueName, T data) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.remove(data);
}
/**
* 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)
*/
public static <T> boolean destroyQueue(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
return queue.delete();
}
/**
* 添加延迟队列数据 默认毫秒
*
* @param queueName 队列名
* @param data 数据
* @param time 延迟时间
*/
public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
}
/**
* 添加延迟队列数据
*
* @param queueName 队列名
* @param data 数据
* @param time 延迟时间
* @param timeUnit 单位
*/
public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
delayedQueue.offer(data, time, timeUnit);
}
/**
* 获取一个延迟队列数据 没有数据返回 null
*
* @param queueName 队列名
*/
public static <T> T getDelayedQueueObject(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
return delayedQueue.poll();
}
/**
* 删除延迟队列数据
*/
public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
return delayedQueue.remove(data);
}
/**
* 销毁延迟队列 所有阻塞监听 报错
*/
public static <T> void destroyDelayedQueue(String queueName) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
delayedQueue.destroy();
}
/**
* 添加优先队列数据
*
* @param queueName 队列名
* @param data 数据
*/
public static <T> boolean addPriorityQueueObject(String queueName, T data) {
RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
return priorityBlockingQueue.offer(data);
}
/**
* 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
*
* @param queueName 队列名
*/
public static <T> T getPriorityQueueObject(String queueName) {
RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
return queue.poll();
}
/**
* 优先队列删除队列数据(不支持延迟队列)
*/
public static <T> boolean removePriorityQueueObject(String queueName, T data) {
RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
return queue.remove(data);
}
/**
* 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
*/
public static <T> boolean destroyPriorityQueue(String queueName) {
RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
return queue.delete();
}
/**
* 尝试设置 有界队列 容量 用于限制数量
*
* @param queueName 队列名
* @param capacity 容量
*/
public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
return boundedBlockingQueue.trySetCapacity(capacity);
}
/**
* 尝试设置 有界队列 容量 用于限制数量
*
* @param queueName 队列名
* @param capacity 容量
* @param destroy 已存在是否销毁
*/
public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
if (boundedBlockingQueue.isExists() && destroy) {
destroyQueue(queueName);
}
return boundedBlockingQueue.trySetCapacity(capacity);
}
/**
* 添加有界队列数据
*
* @param queueName 队列名
* @param data 数据
* @return 添加成功 true 已达到界限 false
*/
public static <T> boolean addBoundedQueueObject(String queueName, T data) {
RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
return boundedBlockingQueue.offer(data);
}
/**
* 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
*
* @param queueName 队列名
*/
public static <T> T getBoundedQueueObject(String queueName) {
RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
return queue.poll();
}
/**
* 有界队列删除队列数据(不支持延迟队列)
*/
public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
return queue.remove(data);
}
/**
* 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
*/
public static <T> boolean destroyBoundedQueue(String queueName) {
RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
return queue.delete();
}
/**
* 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)
*/
public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer, boolean isDelayed) {
RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
if (isDelayed) {
// 订阅延迟队列
CLIENT.getDelayedQueue(queue);
}
queue.subscribeOnElements(consumer);
}
}
三、普通队列代码测试
3.1 添加进入队列
QueueUtils.addQueueObject 方法添加数据进入队列 test
@RestController
@SaIgnore
public class QueueTestController {
@GetMapping("addQueue")
public void addQueue() {
TestDemo testDemo = new TestDemo();
testDemo.setTestKey("testKey");
testDemo.setCreateTime(new Date());
QueueUtils.addQueueObject("test", testDemo);
}
}
redis中查询加入队列数据:
3.2 获取队列
获取上面添加的 test队列 数据
@GetMapping("getQueue")
public void getQueue() {
TestDemo testDemo = new TestDemo();
testDemo.setTestKey("testKey");
testDemo.setCreateTime(new Date());
Object test = QueueUtils.getQueueObject("test");
Console.log("test->{}", test);
}
按照先进先出的规则,创建时间最早一条被获取,剩下2条为后添加数据
3.3 删除数据
删除test队列数据
@GetMapping("removeQueue")
public R<Void> removeQueue() throws ParseException {
TestDemo testDemo = new TestDemo();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
testDemo.setCreateTime(simpleDateFormat.parse("2024-09-04 10:40:53"));
testDemo.setTestKey("testKey");
boolean test = QueueUtils.removeQueueObject("test", testDemo);
return R.ok(test ? "成功":"失败");
}
如上代码,删除时间为:2024-09-04 10:40:53 这条数据,剩下一条
3.4 销毁队列
@GetMapping("destoryQueue")
public R<Void> destoryQueue() throws ParseException {
boolean test = QueueUtils.destroyQueue("test");
return R.ok(test ? "成功":"失败");
}
如图销毁队列后,刷新,则提示键不存在
3.5 订阅队列
开启订阅队列:
- 一般是在程序启动时候开启,比如使用 @PostConstruct 注解
- 或者实现 ApplicationRunner 接口来实现
@Component public class RuoYiSubcribeInitializer implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { QueueUtils.subscribeBlockingQueue("test",(Consumer<TestDemo>) testDemo ->{ Console.log("testDemo->{}", testDemo); },false); } }
@PostConstruct
public void subscribeQueue() {
QueueUtils.subscribeBlockingQueue("test",(Consumer<TestDemo>) testDemo ->{
Console.log("testDemo->{}", testDemo);
},false);
}
每次执行添加操作时候,订阅队列都会获取到数据
订阅队列会监听队列数据,知道队列数据为空
2024-09-04 11:37:49 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor
- [PLUS]开始请求 => URL[GET /addQueue],无参数
testDemo->TestDemo(id=null, deptId=null, userId=null, orderNum=null, testKey=testKey, value=null, version=null, delFlag=null)
2024-09-04 11:37:49 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor
- [PLUS]结束请求 => URL[GET /addQueue],耗时:[15]毫秒
2024-09-04 11:38:12 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor
- [PLUS]开始请求 => URL[GET /addQueue],无参数
2024-09-04 11:38:12 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor
- [PLUS]结束请求 => URL[GET /addQueue],耗时:[13]毫秒
testDemo->TestDemo(id=null, deptId=null, userId=null, orderNum=null, testKey=testKey, value=null, version=null, delFlag=null)
四、有界队列代码测试
4.1设置队列容量
QueueUtils.trySetBoundedQueueCapacity("test", 5); 设置队列容量
@GetMapping("addBoundedQueue")
public void addBoundedQueue() {
//销毁队列
QueueUtils.destroyQueue("test");
//设置队列容量
QueueUtils.trySetBoundedQueueCapacity("test", 5);
}
如下图bps,则是记录容量
4.2 添加队列
@GetMapping("addBounded")
public R<Void> addBounded() {
//设置队列容量
boolean test = QueueUtils.addBoundedQueueObject("test", "vlue111");
return R.ok(test ? "成功":"失败");
}
- 如果未设置容量,添加失败
- 超出容量,添加也会失败
4.3获取数据
获取队列数据,会同时改变容量大小
getBoundedQueueObject,会正确计算容量的大小。
getQueueObject 获取导数据,容量会为0.后面无法添加
@GetMapping("getBounded")
public R<Void> getBounded() {
//设置队列容量
Object test = QueueUtils.getBoundedQueueObject("test");
return R.ok(test.toString());
}
底层逻辑,如果取出一个数据,容量则会加 1
{
"code": 200,
"msg": "vlue111",
"data": null
}
<Response body is empty>Response code: 200 (OK); Time: 26ms (26 ms); Content length: 40 bytes (40 B)
五、延时队列代码测试
5.1 延时队列数据流转流程
延时队列数据到期后会存入到普通队列,如下图流程:
+-------------------+
| 添加任务到 |
| 延时队列 |---------------------------------------
+-------------------+
| |
v v
+-------------------+
| 定时检查到期 |
| 任务 | 获取数据
+-------------------+
|
v |
+-------------------+
| 延时队列 |---------------------------------------
| -> 普通队列 |
+------------------->
所以拿数据是从延时队列拿数据,还是从普通队列拿数据,考虑下业务场景
5.2 脚本的实现过程:
简单了解地底层:
struct.pack('dLc0', tonumber(ARGV[1]), string.len(ARGV[2]), ARGV[2])
:将过期时间、对象长度和对象本身打包成一个二进制字符串,便于在 Redis 中存储。redis.call('zadd', KEYS[2], ARGV[1], value)
:将打包后的值value
添加到有序集合(延时队列)中,其中ARGV[1]
是过期时间。redis.call('rpush', KEYS[3], value)
:将打包后的值value
添加到列表(待处理队列)中。local v = redis.call('zrange', KEYS[2], 0, 0)
:获取有序集合的第一个元素。if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]) end
:如果添加的新元素是有序集合的第一个元素,则通过 Redis 的发布订阅机制通知其他消费者。Lua 脚本: local value = struct.pack('dLc0', tonumber(ARGV[1]), string.len(ARGV[2]), ARGV[2]); redis.call('zadd', KEYS[2], ARGV[1], value); redis.call('rpush', KEYS[3], value); local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;
5.3 测试延时队列:
场景:
- 用于不是立即执行的任务场景:
- 比如用户创建订单但是不付款,时间到后取消订单
如图先订阅队列 test,手动开启:
/**
* <简述>订阅延时队列
* <详细描述>
* @author syf
* @date 2024/9/6 14:53
*/
@GetMapping("subscribeDelayQueue")
public R<Void> subscribeDelayQueue() {
Console.log("开启监听。。。。。。。。。。。。。。。。。。。。。。。。。。。");
QueueUtils.subscribeBlockingQueue("test",(Consumer<TestDemo>) testDemo ->{
Console.log("接受到订单->{}", testDemo);
Console.log("关闭订单");
},false);
return R.ok();
}
添加延时队列到test:
@GetMapping("addDelayQueue")
public void addDelayQueue() throws ParseException {
Console.log("创建订单。。。。。。。。。。。。。。。。。。。");
TestDemo testDemo = new TestDemo();
testDemo.setValue("订单编号");
QueueUtils.addDelayedQueueObject("test", testDemo, 10, TimeUnit.SECONDS);
Console.log("等待10秒。。。。。。。。。。。。。。。。。。。");
}
如图 10秒后,订阅队列监听到订单并关闭
开启监听。。。。。。。。。。。。。。。。。。。。。。。。。。。
2024-09-05 19:47:51 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor
- [PLUS]结束请求 => URL[GET /subscribeDelayQueue],耗时:[51]毫秒
2024-09-05 19:47:54 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor
- [PLUS]开始请求 => URL[GET /addDelayQueue],无参数
创建订单。。。。。。。。。。。。。。。。。。。
等待10秒。。。。。。。。。。。。。。。。。。。
2024-09-05 19:47:54 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor
- [PLUS]结束请求 => URL[GET /addDelayQueue],耗时:[57]毫秒
接受到订单->TestDemo(id=null, deptId=null, userId=null, orderNum=null, testKey=null, value=订单编号, version=null, delFlag=null)
关闭订单
六、优先队列代码测试
场景:
vip 用户按照OrderNum,随机生成等级进行排队
添加vip用户进入队列:
插入数据时候会按照OrderNum 大小找到位置,就像list索引一样
/**
* 添加队列数据
*
* @param queueName 队列名
*/
@GetMapping("/add")
public R<Void> add(String queueName) {
// 用完了一定要销毁 否则会一直存在
boolean b = QueueUtils.destroyPriorityQueue(queueName);
log.info("通道: {} , 删除: {}", queueName, b);
for (int i = 0; i < 10; i++) {
int randomNum = RandomUtil.randomInt(10);
PriorityDemo data = new PriorityDemo();
data.setName("data-" + i);
data.setOrderNum(randomNum);
if (QueueUtils.addPriorityQueueObject(queueName, data)) {
log.info("通道: {} , 发送数据: {}", queueName, data);
} else {
log.info("通道: {} , 发送数据: {}, 发送失败", queueName, data);
}
}
return R.ok("操作成功");
}
按照等级获取vip用户:
@GetMapping("/get")
public R<Void> get(String queueName) {
PriorityDemo data;
do {
data = QueueUtils.getPriorityQueueObject(queueName);
log.info("通道: {} , 获取数据: {}", queueName, data);
} while (data != null);
return R.ok("操作成功");
}
如图orderNum从 0 到7依次被打印
2024-09-06 11:06:57 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor
- [PLUS]结束请求 => URL[GET /demo/queue/priority/get],耗时:[11]毫秒
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor
- [PLUS]开始请求 => URL[GET /demo/queue/priority/get],参数类型[param],参数:[{"queueName":["test"]}]
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: PriorityDemo(name=data-9, orderNum=0)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: PriorityDemo(name=data-1, orderNum=2)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: PriorityDemo(name=data-2, orderNum=2)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: PriorityDemo(name=data-3, orderNum=3)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: PriorityDemo(name=data-4, orderNum=3)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: PriorityDemo(name=data-8, orderNum=3)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: PriorityDemo(name=data-0, orderNum=5)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: PriorityDemo(name=data-7, orderNum=6)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: PriorityDemo(name=data-5, orderNum=7)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: PriorityDemo(name=data-6, orderNum=7)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController
- 通道: test , 获取数据: null
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor
- [PLUS]结束请求 => URL[GET /demo/queue/priority/get],耗时:[488]毫秒
博主精心整理专栏,CV大法即可用,感谢您小手点一点 手动跪拜:
1- SpringBoot框架常用配置(若依),代码解读:
2- java常用工具类整理,示例演示:
3- CompletableFuture 异步编排与实际代码展示
4- XXL-JOB 详细学习,手把手带入门