目录
一、RabbitMQ 简介
RabbitMQ 是一个基于 AMQP 协议的开源消息代理工具,专为分布式系统设计。它通过解耦生产者和消费者实现异步通信,支持流量削峰、任务队列、服务解耦等场景,是微服务架构中的核心组件之一。其核心优势包括:
•可靠性:支持消息持久化与传输确认机制。
•灵活性:提供多种交换机类型(Direct、Fanout、Topic等)。
•跨平台:支持主流编程语言(Node.js、Python、Java等)。
二、核心概念解析
概念 | 说明 |
---|---|
**生产者** | 发送消息的应用(如订单服务发送支付任务) |
**消费者** | 接收和处理消息的应用(如支付服务处理支付请求) |
**交换机** | 接收生产者消息,根据路由规则分发到队列(类似邮局分拣中心) |
**队列** | 存储消息的缓冲区,确保消息按顺序被消费 |
**绑定键** | 连接交换机与队列的规则(如 `payment.task` 标识支付相关消息) |
三、环境搭建(以 Ubuntu 为例)
1. 安装 RabbitMQ
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
2. 启用管理界面
sudo rabbitmq-plugins enable rabbitmqmanagement
# 访问 http://localhost:15672,账号/密码:guest/guest
四、Node.js 实战:生产者与消费者
1. 安装依赖
npm install amqplib
2. 生产者代码(发送消息)
const amqp = require('amqplib');
async function sendMessage() {
try {
const connection = await amqp.connect('amqp://username:password@host:port');
const channel = await connection.createChannel();
// 声明交换机与队列(确保存在)
const exchangeName = 'order_exchange';
const queueName = 'payment_queue';
const routingKey = 'payment.task';
await channel.assertExchange(exchangeName, 'direct', { durable: true });
await channel.assertQueue(queueName, { durable: true });
await channel.bindQueue(queueName, exchangeName, routingKey);
// 发送持久化消息
const message = JSON.stringify({ orderId: 1001, amount: 299.9 });
channel.publish(exchangeName, routingKey, Buffer.from(message), {
persistent: true
});
console.log('✅ 订单消息已发送:', message);
setTimeout(() => {
connection.close();
process.exit(0);
}, 500);
} catch (error) {
console.error('❌ 消息发送失败:', error);
}
}
sendMessage();
3. 消费者代码(处理消息)
const amqp = require('amqplib');
async function consumeMessages() {
try {
const connection = await amqp.connect('amqp://username:password@host:port');
const channel = await connection.createChannel();
const exchangeName = 'order_exchange';
const queueName = 'payment_queue';
const routingKey = 'payment.task';
// 声明交换机与队列(与生产者一致)
await channel.assertExchange(exchangeName, 'direct', { durable: true });
await channel.assertQueue(queueName, { durable: true });
await channel.bindQueue(queueName, exchangeName, routingKey);
console.log('⌛ 等待支付任务...');
channel.consume(queueName, (msg) => {
if (msg) {
const content = msg.content.toString();
console.log('💰 收到支付请求:', content);
// 模拟支付处理逻辑
setTimeout(() => {
console.log('✔️ 支付处理完成');
channel.ack(msg); // 手动确认消息处理完成
}, 2000);
}
}, { noAck: false }); // 关闭自动确认
} catch (error) {
console.error('❌ 消费消息失败:', error);
}
}
consumeMessages();
五、高级配置与最佳实践
1. 消息持久化
- 队列持久化:`channel.assertQueue(queue, { durable: true })`
- 消息持久化:设置 `persistent: true`
channel.publish(exchange, routingKey, Buffer.from(msg), { persistent: true });
2. 公平分发(Prefetch)
channel.prefetch(1); // 每次只处理一个消息,避免消费者过载
3. 死信队列(DLX)
死信队列(Dead Letter Queue,简称DLX)是RabbitMQ中用于处理无法被正常消费的消息的一种机制。当消息满足某些条件时,会被路由到死信队列中,而不是被丢弃。
作用:
- 处理无法被正常消费的消息
- 避免消息丢失
- 提供消息重试机制
- 实现延迟队列功能
成为死信的条件:
- 消费者明确拒绝消息(basic.reject或basic.nack)且不重新入队
- 消息在队列中存活时间超过TTL(Time To Live)
- 队列已满,无法再接收新消息
await channel.assertQueue('deadletterqueue', { durable: true });
await channel.bindQueue('deadletterqueue', 'dlxexchange', '#');
// 主队列绑定死信交换机
await channel.assertQueue('mainqueue', {
durable: true,
deadLetterExchange: 'dlxexchange'
});
六、常见问题与解决方案
问题 | 解决方案 |
---|---|
连接失败 | 检查防火墙是否开放 5672 端口,确认 RabbitMQ 服务状态 |
消息堆积 | 增加消费者数量,优化处理逻辑,设置合理消息 TTL |
重复消费 | 确保消息处理完成后调用 `channel.ack(msg)`,启用幂等性设计 |
队列未声明 | 生产者和消费者均使用 `assertExchange` 和 `assertQueue` 确保资源存在 |
七、总结
通过 RabbitMQ,Node.js 应用可以实现:
•异步任务处理(如邮件发送、文件导出)
•服务解耦(订单服务与库存服务独立运行)
•流量削峰(突发请求存入队列逐步处理)