WEB3全栈开发——面试专业技能点P5中间件

发布于:2025-06-10 ⋅ 阅读:(26) ⋅ 点赞:(0)

一、使用 Express 操作 Redis 的完整示例

涵盖 Redis 各种核心数据类型(String、Hash、List、Set、Sorted Set)及常见操作


1️⃣ 概念介绍

Redis 是一个开源的高性能键值对数据库,支持多种数据结构,非常适合缓存、排行榜、会话管理等场景。

Redis 支持的数据类型包括:

类型 简介
String 最基本的数据类型,支持数字/字符串存取
Hash 类似对象,适合存储用户信息、对象结构
List 链表结构,可做消息队列
Set 无序唯一集合,常用于去重
Sorted Set 有序唯一集合,常用于排行榜(带分数排序)

Node.js 下最常用的 Redis 客户端是 ioredis,支持连接池、集群等高级功能。


2️⃣ 安装依赖

npm install express ioredis

3️⃣ 初始化 Express + Redis 应用结构

project/
├── index.js
├── redisClient.js
└── package.json

4️⃣ 创建 Redis 客户端 redisClient.js

const Redis = require('ioredis');

// 默认连接本地 Redis,如果你有密码或远程地址可以传入配置对象
const redis = new Redis({
  host: '127.0.0.1',
  port: 6379,
  // password: 'your-password', // 如果有密码
});

module.exports = redis;

5️⃣ Express 应用 index.js:演示各种 Redis 数据类型操作

const express = require('express');
const redis = require('./redisClient');
const app = express();
app.use(express.json());

/**
 * ============ String 类型 ============
 */

// 设置字符串
app.post('/string/set', async (req, res) => {
  const { key, value } = req.body;
  await redis.set(key, value);
  res.send(`设置成功:${key}=${value}`);
});

// 获取字符串
app.get('/string/get/:key', async (req, res) => {
  const value = await redis.get(req.params.key);
  res.send(`值为:${value}`);
});

/**
 * ============ Hash 类型 ============
 */

// 设置哈希字段
app.post('/hash/set', async (req, res) => {
  const { key, field, value } = req.body;
  await redis.hset(key, field, value);
  res.send(`哈希字段设置成功:${key}.${field}=${value}`);
});

// 获取哈希字段
app.get('/hash/get', async (req, res) => {
  const { key, field } = req.query;
  const value = await redis.hget(key, field);
  res.send(`哈希字段值为:${value}`);
});

// 获取整个哈希
app.get('/hash/all/:key', async (req, res) => {
  const hash = await redis.hgetall(req.params.key);
  res.json(hash);
});

/**
 * ============ List 类型 ============
 */

// 从左侧推入列表
app.post('/list/leftpush', async (req, res) => {
  const { key, value } = req.body;
  await redis.lpush(key, value);
  res.send(`左侧推入成功:${value}`);
});

// 从右侧弹出列表
app.get('/list/rightpop/:key', async (req, res) => {
  const value = await redis.rpop(req.params.key);
  res.send(`右侧弹出值:${value}`);
});

/**
 * ============ Set 类型 ============
 */

// 添加到 Set 集合
app.post('/set/add', async (req, res) => {
  const { key, value } = req.body;
  await redis.sadd(key, value);
  res.send(`添加到集合成功:${value}`);
});

// 获取 Set 所有成员
app.get('/set/members/:key', async (req, res) => {
  const members = await redis.smembers(req.params.key);
  res.json(members);
});

/**
 * ============ Sorted Set 类型 ============
 */

// 添加成员和分数
app.post('/zset/add', async (req, res) => {
  const { key, score, member } = req.body;
  await redis.zadd(key, score, member);
  res.send(`添加到有序集合成功:${member}(分数${score})`);
});

// 获取排行榜
app.get('/zset/range/:key', async (req, res) => {
  const list = await redis.zrevrange(req.params.key, 0, 9, 'WITHSCORES');
  res.json(list);
});

const PORT = 3000;
app.listen(PORT, () => {
  console.log(`服务器启动:http://localhost:${PORT}`);
});

6️⃣ Postman 测试示例

使用 POSTGET 请求调用以下接口:

  • POST /string/set + JSON { "key": "user:1:name", "value": "Alice" }

  • GET /string/get/user:1:name

  • POST /hash/set + { "key": "user:2", "field": "email", "value": "a@example.com" }

  • GET /hash/all/user:2

  • POST /list/leftpush + { "key": "messages", "value": "hi" }

  • GET /list/rightpop/messages

  • POST /set/add + { "key": "tags", "value": "web3" }

  • GET /set/members/tags

  • POST /zset/add + { "key": "ranking", "score": 98, "member": "Bob" }

  • GET /zset/range/ranking


✅ 总结

数据类型 操作方法(部分) 使用场景
String set, get, incr 缓存数据、计数器
Hash hset, hget, hgetall 用户信息、配置项
List lpush, rpop, lrange 消息队列、时间轴
Set sadd, smembers, sismember 去重、用户标签
Sorted Set zadd, zrange, zrevrange 排行榜、权重排序

如需进一步加上缓存失效时间(EX)、分布式锁、订阅发布等高级操作,也可以继续告诉我,我可以补充完整!

二、Redis 在 Express 项目中实现 缓存失效时间(EX)分布式锁发布/订阅机制

1️⃣ 缓存失效时间(EX)

概念介绍

Redis 支持为 key 设置生存时间(TTL),超时后自动删除该键,常用于缓存数据。


示例代码

// 设置字符串并指定过期时间(单位:秒)
app.post('/cache/setex', async (req, res) => {
  const { key, value, ttl } = req.body;
  await redis.set(key, value, 'EX', ttl); // 'EX' 表示秒级过期时间
  res.send(`设置缓存成功:${key}=${value}(过期时间:${ttl}秒)`);
});

// 查询 key 的剩余时间
app.get('/cache/ttl/:key', async (req, res) => {
  const ttl = await redis.ttl(req.params.key);
  res.send(`剩余时间:${ttl} 秒`);
});

2️⃣ 分布式锁

概念介绍

Redis 提供 SET key value NX EX ttl 命令可用于实现简易分布式锁。

  • NX: 仅当 key 不存在时设置,保证只有一个客户端加锁成功。

  • EX ttl: 设置锁过期时间,防止死锁。


示例代码

// 尝试获取锁
app.post('/lock/acquire', async (req, res) => {
  const { lockKey, lockValue, ttl } = req.body;
  const success = await redis.set(lockKey, lockValue, 'NX', 'EX', ttl);
  if (success) {
    res.send('✅ 成功获取锁');
  } else {
    res.status(423).send('❌ 获取锁失败,资源已被占用');
  }
});

// 释放锁(注意:必须验证持有者)
app.post('/lock/release', async (req, res) => {
  const { lockKey, lockValue } = req.body;

  // 用 Lua 脚本确保只有锁持有者可以释放
  const luaScript = `
    if redis.call("get", KEYS[1]) == ARGV[1] then
      return redis.call("del", KEYS[1])
    else
      return 0
    end
  `;
  const result = await redis.eval(luaScript, 1, lockKey, lockValue);

  res.send(result === 1 ? '✅ 成功释放锁' : '⚠️ 无权释放锁');
});

3️⃣ 发布/订阅(Pub/Sub)

概念介绍

Redis 提供消息发布订阅功能,常用于:

  • 服务间通信

  • 实时通知

  • 消息驱动架构


示例代码

消费端(订阅频道)

// 单独开一个 Redis 订阅连接
const Redis = require('ioredis');
const subClient = new Redis();

subClient.subscribe('news', () => {
  console.log('📡 订阅了 news 频道');
});

subClient.on('message', (channel, message) => {
  console.log(`📨 接收到频道 ${channel} 的消息: ${message}`);
});

生产端(发布消息)

// 发布消息到 news 频道
app.post('/publish/news', async (req, res) => {
  const { message } = req.body;
  await redis.publish('news', message);
  res.send('📤 消息已发布');
});

✅ 总结表

功能 核心命令 应用场景
缓存失效 SET key value EX ttl 缓存自动清理,热点数据缓存
分布式锁 SET key val NX EX ttl + Lua 控制资源访问,避免并发冲突
发布订阅 PUBLISH/SUBSCRIBE 实时通信、系统内事件通知


如需更高级用法(如 Redlock、Redis Stream 消息队列、异步任务持久化等),欢迎继续提问!也可以让我帮你把这些整合进一个生产级项目模板。

三、Express实现Redlock、Redis Stream 消息队列、异步任务持久化

下面详细讲解 Redlock 分布式锁算法Redis Streams 消息队列异步任务持久化 的概念、使用场景与详细代码示例(基于 Express + Node.js + ioredis):


1️⃣ Redlock 分布式锁算法

概念介绍

Redlock 是 Redis 官方推荐的分布式锁算法,适用于多 Redis 实例构建高可靠性锁。

  • 使用多个 Redis 节点写入同一锁键;

  • 获取大多数实例同意才算加锁成功;

  • 自动设置超时,防止死锁。

示例代码(基于 redlock 库)

npm install redlock ioredis
const Redis = require('ioredis');
const Redlock = require('redlock');

const clients = [new Redis(), new Redis(), new Redis()]; // 模拟多个实例
const redlock = new Redlock(clients, {
  retryCount: 3,
  retryDelay: 200, // ms
});

app.post('/redlock', async (req, res) => {
  try {
    const lock = await redlock.acquire(['locks:resource'], 10000); // 10秒锁
    console.log('✅ 获取Redlock成功');
    setTimeout(async () => {
      await lock.release();
      console.log('🔓 Redlock释放成功');
    }, 5000);
    res.send('Redlock锁定成功');
  } catch (err) {
    res.status(500).send('🔒 Redlock锁获取失败');
  }
});

2️⃣ Redis Streams 消息队列

概念介绍

Redis Stream 是 Redis 5.0+ 引入的持久化消息队列,支持:

  • 消息追加(XADD

  • 消费者组(XGROUP

  • 消费与确认(XREADGROUP, XACK

适合实现 可靠队列、异步任务处理、事件流系统


示例代码(任务生产+消费)

const streamKey = 'mystream';
const consumerGroup = 'mygroup';
const consumerName = 'consumer1';

(async () => {
  try {
    await redis.xgroup('CREATE', streamKey, consumerGroup, '0', 'MKSTREAM');
  } catch (e) {
    if (!e.message.includes('BUSYGROUP')) throw e;
  }
})();

// 生产消息
app.post('/stream/produce', async (req, res) => {
  const { orderId } = req.body;
  await redis.xadd(streamKey, '*', 'orderId', orderId);
  res.send('⏩ 消息已推入流');
});

// 消费消息(自动应答)
setInterval(async () => {
  const result = await redis.xreadgroup(
    'GROUP', consumerGroup, consumerName,
    'BLOCK', 2000,
    'COUNT', 1,
    'STREAMS', streamKey, '>'
  );

  if (result) {
    const [[stream, messages]] = result;
    for (const [id, fields] of messages) {
      const orderId = fields[1];
      console.log('✅ 消费订单:', orderId);
      await redis.xack(streamKey, consumerGroup, id); // 标记已处理
    }
  }
}, 3000);

3️⃣ 异步任务持久化(基于队列)

概念介绍

将任务加入 Redis 队列并异步处理,可实现:

  • 防止阻塞主线程

  • 保证任务持久化存储

  • 支持失败重试、延时执行等

推荐使用 bullbullmq,内置持久化、并发控制、失败重试等机制。


示例代码(使用 bull

npm install bull
const Queue = require('bull');
const jobQueue = new Queue('emailQueue', { redis: { port: 6379, host: '127.0.0.1' } });

// 添加任务
app.post('/async/send-email', async (req, res) => {
  const { to, content } = req.body;
  await jobQueue.add({ to, content });
  res.send('📨 已加入异步发送任务队列');
});

// 处理任务
jobQueue.process(async (job) => {
  console.log(`发送邮件给:${job.data.to},内容:${job.data.content}`);
  // 真实场景:集成 nodemailer、Mailgun、Sendgrid 等发送
});

// 失败重试配置
jobQueue.on('failed', (job, err) => {
  console.error(`❌ 邮件发送失败:${job.id}`, err.message);
});

📌 总结对比表

技术 类型 特点与应用场景
Redlock 分布式锁 多节点锁协调,防止并发冲突与死锁
Redis Stream 消息队列 原生持久化、消费者组、高吞吐异步任务
Bull/BullMQ 队列框架 丰富功能,失败重试、延迟队列、状态可视化


如你需要我将三种方式整合成完整的 Express 微服务框架示例,也可以继续提问!

四、Express 使用布隆过滤器

下面是关于 Express 使用布隆过滤器(Bloom Filter) 的概念、详细代码示例和讲解,帮助你理解它在实际 Web 应用中的应用场景:


1️⃣ 概念介绍

布隆过滤器(Bloom Filter)是一种空间效率极高的概率型数据结构,用于判断一个元素是否存在于集合中:

  • 优点:节省内存、查询速度快,适用于大规模去重、黑名单校验等场景。

  • 缺点:可能误判为存在(假阳性),但不会漏判(不会漏掉存在的元素)。

常用于:

  • 防止缓存穿透(查询不存在数据频繁打数据库)

  • 用户是否已提交过表单、请求

  • 黑名单过滤、URL 去重


2️⃣ 安装依赖

我们使用 bloom-filters 库:

npm install bloom-filters

3️⃣ 示例代码(Express + BloomFilter)

以下示例演示如何使用布隆过滤器拦截重复请求或缓存穿透:

// 引入布隆过滤器
const express = require('express');
const { BloomFilter } = require('bloom-filters');
const app = express();

app.use(express.json());

// 创建布隆过滤器,参数: 预计插入数量、可接受误差率
const bloom = new BloomFilter(1000, 0.01);

// 示例:用户请求时检查是否访问过该资源
app.post('/api/resource', (req, res) => {
  const { userId } = req.body;

  // 检查是否已经请求过
  if (bloom.has(userId)) {
    return res.status(429).json({ message: '该用户已处理过请求,拒绝重复操作。' });
  }

  // 第一次访问,插入布隆过滤器
  bloom.add(userId);

  // 模拟处理业务逻辑
  console.log(`处理用户 ${userId} 的请求`);

  res.json({ message: '请求处理成功' });
});

4️⃣ 与 Redis 配合使用(持久化布隆过滤器)

如需跨服务共享布隆过滤器状态,可使用 Redis 存储:

npm install redis
const redis = require('redis');
const { BloomFilter } = require('bloom-filters');
const client = redis.createClient();

// 将布隆过滤器序列化为 JSON 保存
const saveToRedis = async (key, filter) => {
  const json = JSON.stringify(filter.saveAsJSON());
  await client.set(key, json);
};

// 从 Redis 读取布隆过滤器
const loadFromRedis = async (key) => {
  const json = await client.get(key);
  if (json) {
    return BloomFilter.fromJSON(JSON.parse(json));
  }
  return new BloomFilter(1000, 0.01);
};

5️⃣ 使用场景举例

使用场景 示例
防止缓存穿透 数据库中没有的 key 被频繁查询
请求重复提交拦截 用户提交重复订单、重复点赞
URL 去重 爬虫系统中防止重复抓取
黑名单判断 IP、邮箱、手机号是否在黑名单中


✅ 小结

  • 布隆过滤器适合在高并发系统中快速过滤“不存在”的请求

  • 它不能代替数据库查重,但能大幅减少无效请求打到数据库/缓存

  • 结合 Redis 可以实现布隆过滤器的持久化与多节点共享。

五、Express 使用 RabbitMQ

以下是关于 Express 使用 RabbitMQ 的完整指南,包括:

  • 概念介绍

  • 安装依赖

  • 完整代码示例(发送、消费、连接关闭、持久化、确认机制等)

  • 实战讲解与注释


1️⃣ 概念介绍

RabbitMQ 是一个高性能的消息中间件(消息队列),用于实现异步通信、解耦模块、削峰填谷等。

它基于 AMQP(Advanced Message Queuing Protocol)协议,主要组件包括:

组件 说明
Producer 消息生产者,发送消息到队列
Queue 消息队列,缓存待处理消息
Consumer 消费者,监听并处理队列中的消息
Exchange 交换机,决定消息路由到哪个队列(direct、fanout、topic、headers)
Routing Key 消息携带的路由标识


2️⃣ 安装依赖

我们使用 amqplib 模块:

npm install amqplib

3️⃣ Express 集成 RabbitMQ:发送与消费消息(全流程)

项目结构:

/rabbitmq-demo
├── producer.js        # 发送消息
├── consumer.js        # 消费消息
└── app.js             # Express 接口入口

3.1 配置 RabbitMQ 地址

// config.js
module.exports = {
  RABBITMQ_URL: 'amqp://localhost', // 默认端口5672
  QUEUE_NAME: 'task_queue'
};

3.2 消息发送(Producer)

// producer.js
const amqp = require('amqplib');
const { RABBITMQ_URL, QUEUE_NAME } = require('./config');

async function sendToQueue(msg) {
  const conn = await amqp.connect(RABBITMQ_URL);
  const channel = await conn.createChannel();

  // 保证队列存在(幂等操作)
  await channel.assertQueue(QUEUE_NAME, { durable: true });

  // 发送消息(Buffer)
  channel.sendToQueue(QUEUE_NAME, Buffer.from(msg), { persistent: true });

  console.log(`[x] 发送消息: ${msg}`);

  // 延迟关闭连接(避免连接还没写完就关闭)
  setTimeout(() => {
    channel.close();
    conn.close();
  }, 500);
}

module.exports = sendToQueue;

3.3 消息消费(Consumer)

// consumer.js
const amqp = require('amqplib');
const { RABBITMQ_URL, QUEUE_NAME } = require('./config');

async function startConsumer() {
  const conn = await amqp.connect(RABBITMQ_URL);
  const channel = await conn.createChannel();

  // 保证队列存在
  await channel.assertQueue(QUEUE_NAME, { durable: true });

  // 每次只处理一个消息(限流)
  channel.prefetch(1);

  console.log('[*] 等待接收消息...');

  channel.consume(QUEUE_NAME, async (msg) => {
    const content = msg.content.toString();
    console.log(`[x] 收到消息: ${content}`);

    // 模拟处理耗时
    await new Promise(res => setTimeout(res, 2000));

    // 确认消息处理完毕
    channel.ack(msg);
    console.log(`[✓] 处理完成: ${content}`);
  });
}

// 启动消费者
startConsumer().catch(console.error);

3.4 Express 中调用 Producer

// app.js
const express = require('express');
const sendToQueue = require('./producer');
const app = express();

app.use(express.json());

app.post('/send', async (req, res) => {
  const { message } = req.body;
  if (!message) return res.status(400).json({ error: 'message is required' });

  await sendToQueue(message);
  res.json({ status: 'Message sent to queue' });
});

app.listen(3000, () => {
  console.log('Server running at http://localhost:3000');
});

示例请求(用 Postman 或 curl):

curl -X POST http://localhost:3000/send -H "Content-Type: application/json" -d '{"message": "Hello, RabbitMQ!"}'

4️⃣ 进阶功能(操作补全)

✅ 持久化消息

  • 使用 { durable: true } 创建队列

  • 使用 { persistent: true } 发送消息
    可以防止 RabbitMQ 重启丢失消息。

✅ 消息确认机制(ack)

  • 如果 channel.ack(msg) 不执行,消息会保留在队列,确保消息不丢失。

✅ 消费者限流(prefetch)

channel.prefetch(1); // 每次只处理一个消息,避免过载

✅ 多消费者场景(水平扩展)

可以开启多个 consumer.js 实例,共享队列并均衡消费消息。


5️⃣ 常见应用场景

场景 示例
异步任务处理 用户上传图片,异步压缩处理
邮件通知 注册后异步发送邮件
流量削峰 高并发请求写入队列,逐步处理
订单系统 下单后异步库存扣减、发货处理
区块链事件队列 DApp 收到链上事件后异步处理与持久化


✅ 总结

优点 说明
解耦模块 发送方不关心处理逻辑
提升系统性能 异步处理、削峰填谷
提高可靠性 可持久化、失败重试、限流
适用于微服务架构 服务之间通过消息队列通信


如你想进一步实现 Topic 交换机、死信队列(DLX)、延迟队列等,也可以告诉我,我将继续补充对应场景。

六、Express实现Topic 交换机、死信队列(DLX)、延迟队列

下面是关于 RabbitMQ 的高级用法:Topic 交换机、死信队列(DLX)、延迟队列 的完整实现与讲解,包括 概念 + 示例代码(Node.js + amqplib)+ 使用方式


🧩 1️⃣ Topic 交换机

🔷 概念介绍

Topic 交换机(topic)可以根据通配符匹配路由键,适合复杂的消息路由规则。

  • *:匹配一个单词

  • #:匹配零个或多个单词

示例路由键:

路由键 匹配规则
order.created 匹配 order.*
order.us.created 匹配 order.#
user.deleted 不匹配 order.*


✅ 示例代码(Producer + Consumer)

Producer:发送带有路由键的消息
// topicProducer.js
const amqp = require('amqplib');
const exchange = 'topic_logs';

async function sendMessage(routingKey, msg) {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel();

  await channel.assertExchange(exchange, 'topic', { durable: true });
  channel.publish(exchange, routingKey, Buffer.from(msg));
  console.log(`[x] Sent '${msg}' with key '${routingKey}'`);

  setTimeout(() => {
    channel.close();
    conn.close();
  }, 500);
}

sendMessage('order.created', '订单已创建');

Consumer:监听带通配符的 key
// topicConsumer.js
const amqp = require('amqplib');
const exchange = 'topic_logs';

async function startConsumer(pattern) {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel();

  await channel.assertExchange(exchange, 'topic', { durable: true });

  const q = await channel.assertQueue('', { exclusive: true });
  await channel.bindQueue(q.queue, exchange, pattern);

  console.log(`[x] Waiting for messages with pattern: ${pattern}`);

  channel.consume(q.queue, msg => {
    console.log(`[✓] Received (${msg.fields.routingKey}): ${msg.content.toString()}`);
  }, { noAck: true });
}

startConsumer('order.#');

🧨 2️⃣ 死信队列(DLX)

🔷 概念介绍

死信队列(Dead Letter Exchange) 用于接收未被正常消费的消息,比如:

  • 消费失败未 ack

  • 队列 TTL 超时

  • 队列满

  • 拒绝(nack/reject 且不 requeue)


✅ 示例代码(TTL + DLX)

Producer:声明带 TTL 的主队列 + DLX
// dlxProducer.js
const amqp = require('amqplib');

const DLX_EXCHANGE = 'dlx-ex';
const NORMAL_EXCHANGE = 'normal-ex';
const QUEUE = 'normal-queue';
const DLX_QUEUE = 'dead-letter-queue';

async function setup() {
  const conn = await amqp.connect('amqp://localhost');
  const ch = await conn.createChannel();

  // 1. 声明死信交换机与队列
  await ch.assertExchange(DLX_EXCHANGE, 'fanout', { durable: true });
  await ch.assertQueue(DLX_QUEUE, { durable: true });
  await ch.bindQueue(DLX_QUEUE, DLX_EXCHANGE, '');

  // 2. 声明主交换机和带 DLX 属性的队列
  await ch.assertExchange(NORMAL_EXCHANGE, 'direct', { durable: true });
  await ch.assertQueue(QUEUE, {
    durable: true,
    deadLetterExchange: DLX_EXCHANGE, // 设置死信交换机
    messageTtl: 5000                   // 设置消息 TTL 为 5s
  });
  await ch.bindQueue(QUEUE, NORMAL_EXCHANGE, 'task');

  // 3. 发送消息
  ch.publish(NORMAL_EXCHANGE, 'task', Buffer.from('This will expire!'));
  console.log('[x] Sent message with TTL');

  setTimeout(() => {
    ch.close();
    conn.close();
  }, 1000);
}

setup();

Consumer:不消费,让其过期 → 死信队列接收
// dlxConsumer.js
const amqp = require('amqplib');

async function startDLXConsumer() {
  const conn = await amqp.connect('amqp://localhost');
  const ch = await conn.createChannel();

  await ch.assertQueue('dead-letter-queue', { durable: true });

  ch.consume('dead-letter-queue', msg => {
    console.log(`[DLX] Received expired message: ${msg.content.toString()}`);
  }, { noAck: true });
}

startDLXConsumer();

⏳ 3️⃣ 延迟队列(基于 TTL + DLX 实现)

🔷 概念介绍

RabbitMQ 本身不支持“精确到某时间点的延迟消息”,但可以组合:

消息 TTL + DLX 死信交换机 模拟延迟队列。


✅ 延迟队列示例

// delayQueue.js
const amqp = require('amqplib');

const DELAY_EX = 'delay-ex';
const DELAY_QUEUE = 'delay-queue';
const TARGET_EX = 'real-task-ex';
const TARGET_QUEUE = 'real-task-queue';

async function setupDelayQueue() {
  const conn = await amqp.connect('amqp://localhost');
  const ch = await conn.createChannel();

  // 1. 目标真实处理交换机/队列
  await ch.assertExchange(TARGET_EX, 'fanout', { durable: true });
  await ch.assertQueue(TARGET_QUEUE, { durable: true });
  await ch.bindQueue(TARGET_QUEUE, TARGET_EX, '');

  // 2. 延迟队列,消息过期后转发到目标交换机
  await ch.assertExchange(DELAY_EX, 'direct', { durable: true });
  await ch.assertQueue(DELAY_QUEUE, {
    durable: true,
    messageTtl: 10000, // 延迟10秒
    deadLetterExchange: TARGET_EX
  });
  await ch.bindQueue(DELAY_QUEUE, DELAY_EX, 'delay');

  // 3. 发送消息
  ch.publish(DELAY_EX, 'delay', Buffer.from('Hello after 10s'));
  console.log('[x] Message sent to delay queue');

  setTimeout(() => {
    ch.close();
    conn.close();
  }, 1000);
}

setupDelayQueue();

目标消费(真正业务执行)

// taskConsumer.js
const amqp = require('amqplib');

async function consumeRealTask() {
  const conn = await amqp.connect('amqp://localhost');
  const ch = await conn.createChannel();

  await ch.assertQueue('real-task-queue', { durable: true });

  ch.consume('real-task-queue', msg => {
    console.log(`[✓] 延迟后收到任务: ${msg.content.toString()}`);
  }, { noAck: true });
}

consumeRealTask();

🧠 总结对比

功能 用途 技术实现
Topic 交换机 多维匹配路由键 topic 类型交换机 + 通配符
死信队列 异常消息托底 设置 x-dead-letter-exchange
延迟队列 定时消息、延时执行 TTL + DLX 模拟,或用插件实现精确延时


如需:
✅ 多级死信队列(死信消息再次进入延迟)
✅ RabbitMQ 插件方式实现精确延迟(rabbitmq_delayed_message_exchange
✅ NestJS 集成 RabbitMQ 的完整封装

七、RabbitMQ 的可靠性投递消息的幂等性设计

🧩 1️⃣ 可靠性投递(Reliability Delivery)

🟦 概念介绍

可靠投递的目标是:确保消息从生产者 → RabbitMQ → 消费者 三段都不丢失、不重复

涉及三段关键机制:

阶段 机制/方法
生产者 → RabbitMQ 事务机制、Confirm 模式
RabbitMQ → 队列 消息持久化、队列持久化
队列 → 消费者 ack 确认消费 + nack 补偿


✅ 关键配置与代码说明(生产者 Confirm 模式)

// producer_confirm.js
const amqp = require('amqplib');

async function sendReliableMessage() {
  const conn = await amqp.connect('amqp://localhost');
  const ch = await conn.createConfirmChannel(); // 👈 Confirm 模式

  const exchange = 'reliable-ex';
  await ch.assertExchange(exchange, 'direct', { durable: true });

  const routingKey = 'reliable';
  const message = 'Hello with confirm!';

  ch.publish(exchange, routingKey, Buffer.from(message), { persistent: true }, (err, ok) => {
    if (err) {
      console.error('消息发送失败', err);
    } else {
      console.log('[✓] 消息成功投递到交换机');
    }
    ch.close();
    conn.close();
  });
}

sendReliableMessage();

📝 persistent: true 确保消息写入磁盘
📝 createConfirmChannel() 可确认 RabbitMQ 是否真正收到了消息(比事务高效)


🧠 2️⃣ 消息的幂等性(Idempotency)

🟦 概念介绍

幂等性是指:无论接收同一条消息多少次,结果都不变,避免重复消费导致的数据错误

✅ 常用方法

方法 说明
全局唯一 ID(msgId 每条消息带唯一 ID,消费前判断是否处理过
Redis Set/Hash 缓存 存储 msgId 已处理记录
数据库唯一约束 / 乐观锁 保证写入唯一性或控制版本


✅ 消费者示例(Redis 保证幂等性)

// consumer_idempotent.js
const amqp = require('amqplib');
const Redis = require('ioredis');
const redis = new Redis(); // 默认连接 127.0.0.1:6379

async function consumeWithIdempotency() {
  const conn = await amqp.connect('amqp://localhost');
  const ch = await conn.createChannel();

  const queue = 'task-queue';
  await ch.assertQueue(queue, { durable: true });

  ch.consume(queue, async msg => {
    const msgId = msg.properties.messageId; // 👈 消息唯一标识
    const key = `processed:${msgId}`;

    const alreadyProcessed = await redis.get(key);
    if (alreadyProcessed) {
      console.log(`[⚠️] 已处理跳过 msgId: ${msgId}`);
      ch.ack(msg);
      return;
    }

    const content = msg.content.toString();
    console.log(`[✓] 处理消息: ${content}`);

    // TODO: 执行业务逻辑...

    // 标记为已处理,设置过期防止 Redis 爆炸
    await redis.set(key, '1', 'EX', 86400); // 1天过期
    ch.ack(msg);
  }, { noAck: false });
}

consumeWithIdempotency();

✅ Producer 设置 messageId

// producer_with_id.js
const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');

async function sendMessage() {
  const conn = await amqp.connect('amqp://localhost');
  const ch = await conn.createChannel();

  await ch.assertQueue('task-queue', { durable: true });

  const msg = 'Buy 1 BTC';
  const msgId = uuidv4(); // 👈 唯一 ID

  ch.sendToQueue('task-queue', Buffer.from(msg), {
    persistent: true,
    messageId: msgId
  });

  console.log(`[x] Sent with msgId: ${msgId}`);
  ch.close();
  conn.close();
}

sendMessage();

✅ 小结一览

项目 目的 推荐技术
投递可靠性 消息不丢 Confirm 模式 + 持久化
幂等性处理 消息不重复处理 Redis + msgId 唯一标识
消费异常重试/补偿 消息不丢 & 不乱处理 nack + 死信/重投机制


如果你需要结合 NestJS、BullMQ、Kafka 或使用 Redlock 进行分布式幂等处理,也可以继续问我,我可以为你生成分布式架构模板方案。


网站公告

今日签到

点亮在社区的每一天
去签到