Node.js 中基于请求 ID 实现简单队列(即时阻止策略/排队等待策略)

发布于:2025-07-22 ⋅ 阅读:(16) ⋅ 点赞:(0)

在Node.js 中基于请求 ID 实现简单队列

下面示例演示两种策略,以同一个请求 ID 为单位:

  1. 即时阻止策略:如果已有相同 ID 的请求在处理,直接报错并返回。

  2. 排队等待策略:后续相同 ID 的请求不报错,而是挂起,直到首个请求完成后一起接收相同的处理结果。

Powered by Moshow@https://zhengkai.blog.csdn.net/

一、核心思路

  1. 用一个 Map 存储「正在处理」的条目,key 为请求 ID,value 包含:

    • 当前正在执行的 Promise

    • 一个用于挂起后续请求的队列数组

  2. 新请求到来时,检查 Map:

    • 如果不存在对应条目:将当前请求注册为「首个处理」,并执行异步业务逻辑

    • 如果已存在对应条目:

      • 即时阻止策略:直接抛错

      • 排队等待策略:返回一个挂起的 Promise,入队等待

  3. 首个请求完成(resolve 或 reject)后:

    • 删除 Map 中的条目

    • 将结果(成功或失败)广播给队列中的所有挂起请求

二、代码示例

队列管理类

// queue-manager.js
class QueueManager {
  constructor() {
    this.map = new Map();  // key: requestId, value: { promise, queue }
  }

  /**
   * 注册请求
   * @param {string} id 请求 ID
   * @param {() => Promise<any>} handler 首次请求的异步逻辑
   * @param {Object}   options
   * @param {boolean}  options.rejectOnDuplicate 是否即时拒绝重复请求
   * @returns {Promise<any>}
   * @author Moshow@https://zhengkai.blog.csdn.net/
   */
  enqueue(id, handler, options = { rejectOnDuplicate: true }) {
    // 未在处理队列中,直接执行 handler
    if (!this.map.has(id)) {
      const queue = [];
      const entry = { queue };
      this.map.set(id, entry);

      // 执行核心业务逻辑
      entry.promise = handler()
        .then(result => {
          // 广播成功结果给所有挂起请求
          queue.forEach(({ resolve }) => resolve(result));
          return result;
        })
        .catch(err => {
          // 广播错误给所有挂起请求
          queue.forEach(({ reject }) => reject(err));
          throw err;
        })
        .finally(() => {
          // 清理
          this.map.delete(id);
        });

      return entry.promise;
    }

    // 已有正在处理的相同 ID
    const entry = this.map.get(id);
    // ****** 即时阻止策略:直接抛错
    if (options.rejectOnDuplicate) {
      return Promise.reject(new Error(`Request ${id} 正在处理中,请稍后再试`));
    }

    // ****** 排队等待策略:返回挂起的 Promise,结果由首个请求结束时统一广播
    return new Promise((resolve, reject) => {
      entry.queue.push({ resolve, reject });
    });
  }
}

module.exports = new QueueManager();

排队等待策略:返回一个挂起的 Promise,入队等待

即时阻止策略:直接抛错

// app.js
const express = require('express');
const queueManager = require('./queue-manager');
const app = express();

app.get('/process', async (req, res) => {
  const requestId = req.query.id;
  if (!requestId) {
    return res.status(400).send('缺少 id');
  }

  try {
    // Powered by Moshow@https://zhengkai.blog.csdn.net/
    // strategy 1: 即时拒绝 rejectOnDuplicate: true
    // const result = await queueManager.enqueue(requestId, () => doWork(requestId), { rejectOnDuplicate: true });

    // strategy 2: 排队等待并共享结果 rejectOnDuplicate: false
    const result = await queueManager.enqueue(requestId, () => doWork(requestId), { rejectOnDuplicate: false });

    res.json({ status: 'ok', data: result });
  } catch (err) {
    res.status(429).json({ status: 'error', message: err.message });
  }
});

async function doWork(id) {
  // 模拟耗时操作
  await new Promise(r => setTimeout(r, 2000));
  return { id, timestamp: Date.now() };
}

app.listen(3000, () => console.log('Server started on port 3000'));

三、使用说明与扩展

  • 即时阻止

    • 优点:后续请求瞬间反馈,不占用额外内存。

    • 缺点:用户需自行决定重试时机,可能客户端不停打重试包。

  • 排队等待并共享结果

    • 优点:后续请求无需重试,直接拿到同样的处理结果。

    • 缺点:高并发时可能积压大量挂起请求,需监控内存和队列长度。(或者配置超过30分钟则清空队列+终端业务重新来,根据业务调整)

  • 异步错误处理

    • 在实际业务中,记得对 handler 内部的异常超时做额外保护,避免永久挂起。

  • 更细粒度的队列

    • 如果想要限制同时处理的总量(不只是同 ID),可在 QueueManager 里再加一个全局计数器或池子。

  • 分布式场景

    • 若服务水平扩展到多台机器,需要引入分布式锁(如 Redis 的 Redlock)或共享队列(如 Kafka、RabbitMQ)来保证同一 ID 只被一台机器处理。

这样我们就能在 Node.js/Express 中灵活地基于请求 ID 实现:要么即时拒绝重复请求,要么统一排队等待并共用第一个请求的处理结果。根据业务场景,选择最合适的策略即可。


网站公告

今日签到

点亮在社区的每一天
去签到