Node.js 中使用 RabbitMQ

发布于:2025-03-20 ⋅ 阅读:(17) ⋅ 点赞:(0)

目录

一、RabbitMQ 简介

二、核心概念解析

三、环境搭建(以 Ubuntu 为例)

四、Node.js 实战:生产者与消费者

1. 安装依赖

2. 生产者代码(发送消息)

3. 消费者代码(处理消息)

五、高级配置与最佳实践

六、常见问题与解决方案

七、总结


一、RabbitMQ 简介

RabbitMQ 是一个基于 AMQP 协议的开源消息代理工具,专为分布式系统设计。它通过解耦生产者和消费者实现异步通信,支持流量削峰、任务队列、服务解耦等场景,是微服务架构中的核心组件之一。其核心优势包括:

可靠性:支持消息持久化与传输确认机制。

灵活性:提供多种交换机类型(Direct、Fanout、Topic等)。

跨平台:支持主流编程语言(Node.js、Python、Java等)。

官网:https://www.rabbitmq.com/


二、核心概念解析
概念 说明
**生产者** 发送消息的应用(如订单服务发送支付任务)
**消费者** 接收和处理消息的应用(如支付服务处理支付请求)
**交换机** 接收生产者消息,根据路由规则分发到队列(类似邮局分拣中心)
**队列** 存储消息的缓冲区,确保消息按顺序被消费
**绑定键** 连接交换机与队列的规则(如 `payment.task` 标识支付相关消息)

三、环境搭建(以 Ubuntu 为例)

1. 安装 RabbitMQ

sudo apt-get update

sudo apt-get install rabbitmq-server

sudo systemctl start rabbitmq-server

2. 启用管理界面

sudo rabbitmq-plugins enable rabbitmqmanagement

# 访问 http://localhost:15672,账号/密码:guest/guest

四、Node.js 实战:生产者与消费者
1. 安装依赖
npm install amqplib
2. 生产者代码(发送消息)
const amqp = require('amqplib');
async function sendMessage() {
  try {
    const connection = await amqp.connect('amqp://username:password@host:port');
    const channel = await connection.createChannel();
    
    // 声明交换机与队列(确保存在)
    const exchangeName = 'order_exchange';
    const queueName = 'payment_queue';
    const routingKey = 'payment.task';
    
    await channel.assertExchange(exchangeName, 'direct', { durable: true });
    await channel.assertQueue(queueName, { durable: true });
    await channel.bindQueue(queueName, exchangeName, routingKey);
    
    // 发送持久化消息
    const message = JSON.stringify({ orderId: 1001, amount: 299.9 });
    channel.publish(exchangeName, routingKey, Buffer.from(message), {
      persistent: true
    });
    
    console.log('✅ 订单消息已发送:', message);
    
    setTimeout(() => {
      connection.close();
      process.exit(0);
    }, 500);
  } catch (error) {
    console.error('❌ 消息发送失败:', error);
  }
}
sendMessage();
3. 消费者代码(处理消息)
const amqp = require('amqplib');
async function consumeMessages() {
  try {
    const connection = await amqp.connect('amqp://username:password@host:port');
    const channel = await connection.createChannel();
    
    const exchangeName = 'order_exchange';
    const queueName = 'payment_queue';
    const routingKey = 'payment.task';
    
    // 声明交换机与队列(与生产者一致)
    await channel.assertExchange(exchangeName, 'direct', { durable: true });
    await channel.assertQueue(queueName, { durable: true });
    await channel.bindQueue(queueName, exchangeName, routingKey);
    
    console.log('⌛ 等待支付任务...');
    
    channel.consume(queueName, (msg) => {
      if (msg) {
        const content = msg.content.toString();
        console.log('💰 收到支付请求:', content);
        
        // 模拟支付处理逻辑
        setTimeout(() => {
          console.log('✔️ 支付处理完成');
          channel.ack(msg); // 手动确认消息处理完成
        }, 2000);
      }
    }, { noAck: false }); // 关闭自动确认
  } catch (error) {
    console.error('❌ 消费消息失败:', error);
  }
}
consumeMessages();

五、高级配置与最佳实践

1. 消息持久化

- 队列持久化:`channel.assertQueue(queue, { durable: true })`

- 消息持久化:设置 `persistent: true`

channel.publish(exchange, routingKey, Buffer.from(msg), { persistent: true });

2. 公平分发(Prefetch)

channel.prefetch(1); // 每次只处理一个消息,避免消费者过载

3. 死信队列(DLX)

死信队列(Dead Letter Queue,简称DLX)是RabbitMQ中用于处理无法被正常消费的消息的一种机制。当消息满足某些条件时,会被路由到死信队列中,而不是被丢弃。

作用:

  • 处理无法被正常消费的消息
  • 避免消息丢失
  • 提供消息重试机制
  • 实现延迟队列功能

成为死信的条件:

  • 消费者明确拒绝消息(basic.reject或basic.nack)且不重新入队
  • 消息在队列中存活时间超过TTL(Time To Live)
  • 队列已满,无法再接收新消息
await channel.assertQueue('deadletterqueue', { durable: true });
await channel.bindQueue('deadletterqueue', 'dlxexchange', '#');


// 主队列绑定死信交换机
await channel.assertQueue('mainqueue', {
    durable: true,
    deadLetterExchange: 'dlxexchange'
});

六、常见问题与解决方案
问题 解决方案
连接失败 检查防火墙是否开放 5672 端口,确认 RabbitMQ 服务状态
消息堆积 增加消费者数量,优化处理逻辑,设置合理消息 TTL
重复消费 确保消息处理完成后调用 `channel.ack(msg)`,启用幂等性设计
队列未声明 生产者和消费者均使用 `assertExchange` 和 `assertQueue` 确保资源存在

七、总结

通过 RabbitMQ,Node.js 应用可以实现:

异步任务处理(如邮件发送、文件导出)

服务解耦(订单服务与库存服务独立运行)

流量削峰(突发请求存入队列逐步处理)