在Node.js 中基于请求 ID 实现简单队列
下面示例演示两种策略,以同一个请求 ID 为单位:
即时阻止策略:如果已有相同 ID 的请求在处理,直接报错并返回。
排队等待策略:后续相同 ID 的请求不报错,而是挂起,直到首个请求完成后一起接收相同的处理结果。
Powered by Moshow@https://zhengkai.blog.csdn.net/
一、核心思路
用一个
Map
存储「正在处理」的条目,key 为请求 ID,value 包含:当前正在执行的 Promise
一个用于挂起后续请求的队列数组
新请求到来时,检查 Map:
如果不存在对应条目:将当前请求注册为「首个处理」,并执行异步业务逻辑
如果已存在对应条目:
即时阻止策略:直接抛错
排队等待策略:返回一个挂起的 Promise,入队等待
首个请求完成(resolve 或 reject)后:
删除 Map 中的条目
将结果(成功或失败)广播给队列中的所有挂起请求
二、代码示例
队列管理类
// queue-manager.js
class QueueManager {
constructor() {
this.map = new Map(); // key: requestId, value: { promise, queue }
}
/**
* 注册请求
* @param {string} id 请求 ID
* @param {() => Promise<any>} handler 首次请求的异步逻辑
* @param {Object} options
* @param {boolean} options.rejectOnDuplicate 是否即时拒绝重复请求
* @returns {Promise<any>}
* @author Moshow@https://zhengkai.blog.csdn.net/
*/
enqueue(id, handler, options = { rejectOnDuplicate: true }) {
// 未在处理队列中,直接执行 handler
if (!this.map.has(id)) {
const queue = [];
const entry = { queue };
this.map.set(id, entry);
// 执行核心业务逻辑
entry.promise = handler()
.then(result => {
// 广播成功结果给所有挂起请求
queue.forEach(({ resolve }) => resolve(result));
return result;
})
.catch(err => {
// 广播错误给所有挂起请求
queue.forEach(({ reject }) => reject(err));
throw err;
})
.finally(() => {
// 清理
this.map.delete(id);
});
return entry.promise;
}
// 已有正在处理的相同 ID
const entry = this.map.get(id);
// ****** 即时阻止策略:直接抛错
if (options.rejectOnDuplicate) {
return Promise.reject(new Error(`Request ${id} 正在处理中,请稍后再试`));
}
// ****** 排队等待策略:返回挂起的 Promise,结果由首个请求结束时统一广播
return new Promise((resolve, reject) => {
entry.queue.push({ resolve, reject });
});
}
}
module.exports = new QueueManager();
排队等待策略:返回一个挂起的 Promise,入队等待
即时阻止策略:直接抛错
// app.js
const express = require('express');
const queueManager = require('./queue-manager');
const app = express();
app.get('/process', async (req, res) => {
const requestId = req.query.id;
if (!requestId) {
return res.status(400).send('缺少 id');
}
try {
// Powered by Moshow@https://zhengkai.blog.csdn.net/
// strategy 1: 即时拒绝 rejectOnDuplicate: true
// const result = await queueManager.enqueue(requestId, () => doWork(requestId), { rejectOnDuplicate: true });
// strategy 2: 排队等待并共享结果 rejectOnDuplicate: false
const result = await queueManager.enqueue(requestId, () => doWork(requestId), { rejectOnDuplicate: false });
res.json({ status: 'ok', data: result });
} catch (err) {
res.status(429).json({ status: 'error', message: err.message });
}
});
async function doWork(id) {
// 模拟耗时操作
await new Promise(r => setTimeout(r, 2000));
return { id, timestamp: Date.now() };
}
app.listen(3000, () => console.log('Server started on port 3000'));
三、使用说明与扩展
即时阻止
优点:后续请求瞬间反馈,不占用额外内存。
缺点:用户需自行决定重试时机,可能客户端不停打重试包。
排队等待并共享结果
优点:后续请求无需重试,直接拿到同样的处理结果。
缺点:高并发时可能积压大量挂起请求,需监控内存和队列长度。(或者配置超过30分钟则清空队列+终端业务重新来,根据业务调整)
异步错误处理
在实际业务中,记得对
handler
内部的异常或超时做额外保护,避免永久挂起。
更细粒度的队列
如果想要限制同时处理的总量(不只是同 ID),可在
QueueManager
里再加一个全局计数器或池子。
分布式场景
若服务水平扩展到多台机器,需要引入分布式锁(如 Redis 的 Redlock)或共享队列(如 Kafka、RabbitMQ)来保证同一 ID 只被一台机器处理。
这样我们就能在 Node.js/Express 中灵活地基于请求 ID 实现:要么即时拒绝重复请求,要么统一排队等待并共用第一个请求的处理结果。根据业务场景,选择最合适的策略即可。