示例一、Routing
exchange类型direct,根据消息的routekey将消息直接转发到指定队列。producer.ts 生产者主要发送消息,consumer.ts负责接收消息,同时也都可以创建exchange交换机,创建队列,为队列绑定exchange,为避免重复简化代码,提高可维护性,队列相关操作移动到消费者端。队列,exchange交换机推荐在启动程序前手动创建好。
producer.ts
import RabbitMQ from 'amqplib/callback_api';
function start() {
RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60", function (err0, conn) {
if (err0) {
console.error("[AMQP]", err0.message);
return setTimeout(start, 1000);
}
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
conn.createChannel(async (err2, channel) => {
if (err2) {
console.error("[AMQP]", err2.message);
return setTimeout(start, 1000);
}
const exchangeName = 'exchange1';
channel.assertExchange(
exchangeName,
'direct',
{
durable: true
},
(err, ok) => {
if (err) {
console.log('exchange路由转发创建失败', err);
} else {
let args = ['info', 'warn', 'error'];
for (let i = 0; i < 10; ++i) {
// console.log('message send!', channel.sendToQueue(
// queueName,
// Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
// { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
// // (err: any, ok: Replies.Empty)=>{}
// ));
const routeKey = args[Math.floor(Math.random() * 3)];
console.log('消息发送是否成功', channel.publish(
exchangeName,
routeKey,
Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)},${routeKey}`),
{ persistent: true },
));
}
}
}
);
});
setTimeout(() => {
conn.close();
process.exit(0);
}, 1000);
});
}
start();
consumer.ts
import RabbitMQ, { type Replies } from 'amqplib/callback_api';
RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
if (err0) {
console.error(err0);
return;
}
conn.createChannel(function (err1, channel) {
const queueName = 'queue1';
channel.assertQueue(queueName, { durable: true }, (err2) => {
if (err2) {
console.log('队列创建失败', err2);
return;
}
console.log('[*] waiting...');
// 一次只有一个未确认消息,防止消费者过载
channel.prefetch(1);
channel.bindQueue(queueName, 'exchange1', 'info', {}, (err3, ok) => {
console.log(queueName, '队列绑定结果', err3, ok);
});
channel.bindQueue(queueName, 'exchange1', 'warn', {}, (err3, ok) => {
console.log(queueName, '队列绑定结果', err3, ok);
});
channel.bindQueue(queueName, 'exchange1', 'error', {}, (err3, ok) => {
console.log(queueName, '队列绑定结果', err3, ok);
});
channel.consume(
queueName,
function (msg) {
console.log('接收到的消息', msg?.content.toString());
/*
// 手动确认取消channel.ack(msg); noAck:false,
// 自动确认消息
// if (msg) {
// channel.ack(msg);
// }
*/
},
{
noAck: true, // 是否自动确认消息
// noAck: false
},
(err3: any, ok: Replies.Empty) => {
console.log(err3, ok);
},
);
});
});
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
});
consumer2.ts
import RabbitMQ from 'amqplib';
const conn = await RabbitMQ.connect('amqp://admin:admin1234@localhost:5672');
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
const channel = await conn.createChannel();
const queueName = 'queue2';
await channel.assertQueue(queueName, { durable: true });
console.log('[*] waiting...');
// 一次只有一个未确认消息,防止消费者过载
await channel.prefetch(1);
await channel.bindQueue(queueName, 'exchange1', 'error', {});
channel.consume(
queueName,
function (msg) {
console.log('接收到的消息', msg?.content.toString());
/*
// 手动确认取消channel.ack(msg); noAck:false,
// 自动确认消息
// if (msg) {
// channel.ack(msg);
// }
*/
},
{
noAck: true, // 是否自动确认消息
// noAck: false
},
);
示例二、Topic
exchange的topic类型和direct类似,使用的仍然是routeKey进行匹配转发,topic支持通过*和#进行模糊查询。*代码一个具体单词,#代码0或多个单词。
producer.ts
import RabbitMQ from 'amqplib';
async function start() {
const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
try {
const channel = await conn.createChannel();
console.log("[AMQP] connected");
const exchangeName = 'exchange4';
await channel.assertExchange(exchangeName, 'topic', { durable: true });
let args = ['123.orange.456', '123.456.rabbit', 'lazy', 'lazy.123', 'lazy.123.456'];
for (let i = 0; i < 20; ++i) {
// console.log('message send!', channel.sendToQueue(
// queueName,
// Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
// { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
// // (err: any, ok: Replies.Empty)=>{}
// ));
const routeKey = args[Math.floor(Math.random() * args.length)];
console.log('消息发送是否成功', channel.publish(
exchangeName,
routeKey,
Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)},${routeKey}`),
{ persistent: true },
));
}
} catch (err) {
console.error("[AMQP]", err);
return setTimeout(start, 1000);
}
setTimeout(() => {
conn.close();
process.exit(0);
}, 1000);
}
start();
consumer.ts
import RabbitMQ from 'amqplib';
const conn = await RabbitMQ.connect('amqp://admin:admin1234@localhost:5672');
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
const channel = await conn.createChannel();
const queueName = 'queue1';
channel.assertQueue(queueName, { durable: true });
console.log('[*] waiting...');
// 一次只有一个未确认消息,防止消费者过载
await channel.prefetch(1);
// *代码一个具体单词,#代码0或多个单词
await channel.bindQueue(queueName, 'exchange4', '*.orange.*', {});
channel.consume(queueName, function (msg) {
console.log('接收到的消息', msg?.content.toString());
// 手动确认取消channel.ack(msg);设置noAck:false,
// 自动确认消息noAck:true,不需要channel.ack(msg);
try {
if (msg) {
channel.ack(msg);
}
} catch (err) {
if (msg) {
// 第二个参数,false拒绝当前消息
// 第二个参数,true拒绝小于等于当前消息
// 第三个参数,3false从队列中清除
// 第三个参数,4true从新在队列中排队
channel.nack(msg, false, false);
}
console.log(err);
}
}, {
// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
noAck: false
});
consumer2.ts
import RabbitMQ, { type Replies } from 'amqplib/callback_api';
RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
if (err0) {
console.error(err0);
return;
}
conn.createChannel(function (err1, channel) {
const queueName = 'queue2';
channel.assertQueue(queueName, { durable: true });
console.log('[*] waiting...');
// 一次只有一个未确认消息,防止消费者过载
channel.prefetch(1);
channel.bindQueue(queueName, 'exchange4', '*.*.rabbit', {}, (err, ok) => {
console.log(queueName, '队列绑定结果', err, ok);
});
channel.bindQueue(queueName, 'exchange4', 'lazy.#', {}, (err, ok) => {
console.log(queueName, '队列绑定结果', err, ok);
});
channel.consume(queueName, function (msg) {
console.log('接收到的消息', msg?.content.toString());
// 手动确认取消channel.ack(msg);设置noAck:false,
// 自动确认消息noAck:true,不需要channel.ack(msg);
try {
if (msg) {
channel.ack(msg);
}
} catch (err) {
if (msg) {
// 第二个参数,false拒绝当前消息
// 第二个参数,true拒绝小于等于当前消息
// 第三个参数,3false从队列中清除
// 第三个参数,4true从新在队列中排队
channel.nack(msg, false, false);
}
console.log(err);
}
}, {
// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
noAck: false
});
// return,error事件不会把消息重新放回队列
channel.on('return', (msg) => {
console.error('消息发送失败:', msg);
});
channel.on('error', (err) => {
console.error('通道发生错误:', err);
});
});
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
});
示例三、Headers
exchange类型headers,根据传递的头部信息进行转发,头部信息类型为object对象。在头部信息中要设置x-match属性,'x-match': 'any', any,下方消息匹配上一个就可以。all,下方消息要全部匹配。
producer.ts
import RabbitMQ from 'amqplib/callback_api';
function start() {
RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60", function (err0, conn) {
if (err0) {
console.error("[AMQP]", err0.message);
return setTimeout(start, 1000);
}
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
conn.createChannel(async (err2, channel) => {
if (err2) {
console.error("[AMQP]", err2.message);
return setTimeout(start, 1000);
}
const exchangeName = 'exchange5';
channel.assertExchange(
exchangeName,
'headers',
{
durable: true
},
(err, ok) => {
if (err) {
console.log('exchange路由转发创建失败', err);
} else {
let args = [
{
// 'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
'loglevel': 'info',
// 'buslevel': 'product',
// 'syslevel': 'admin'
},
{
// 'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
// 'loglevel': 'info',
'buslevel': 'product',
'syslevel': 'admin'
},
{
// 'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
// 'loglevel': 'info',
'buslevel': 'product',
// 'syslevel': 'admin'
},
{
// 'x-match': 'all', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
'loglevel': 'info',
'buslevel': 'product',
'syslevel': 'admin'
},
];
for (let i = 0; i < 20; ++i) {
// console.log('message send!', channel.sendToQueue(
// queueName,
// Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
// { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
// // (err: any, ok: Replies.Empty)=>{}
// ));
const routeKey = args[Math.floor(Math.random() * args.length)];
console.log('消息发送是否成功', routeKey, channel.publish(
exchangeName,
'',
Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)},${JSON.stringify(routeKey)}`),
{
persistent: true,
headers: routeKey
},
));
}
}
}
);
});
setTimeout(() => {
conn.close();
process.exit(0);
}, 1000);
});
}
start();
consumer.ts
import RabbitMQ, { type Replies } from 'amqplib/callback_api';
RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
if (err0) {
console.error(err0);
return;
}
conn.createChannel(function (err1, channel) {
const queueName = 'queue1';
channel.assertQueue(queueName, { durable: true });
console.log('[*] waiting...');
// 一次只有一个未确认消息,防止消费者过载
channel.prefetch(1);
// *代码一个具体单词,#代码0或多个单词
channel.bindQueue(
queueName,
'exchange5',
'',
{
'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
'loglevel': 'info',
'buslevel': 'product',
'syslevel': 'admin'
},
(err, ok) => {
console.log(queueName, '队列绑定结果', err, ok);
},
);
channel.consume(queueName, function (msg) {
console.log('接收到的消息', msg?.content.toString());
// 手动确认取消channel.ack(msg);设置noAck:false,
// 自动确认消息noAck:true,不需要channel.ack(msg);
try {
if (msg) {
channel.ack(msg);
}
} catch (err) {
if (msg) {
// 第二个参数,false拒绝当前消息
// 第二个参数,true拒绝小于等于当前消息
// 第三个参数,3false从队列中清除
// 第三个参数,4true从新在队列中排队
channel.nack(msg, false, false);
}
console.log(err);
}
}, {
// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
noAck: false
}, (err: any, ok: Replies.Empty) => {
console.log(err, ok);
});
// return,error事件不会把消息重新放回队列
channel.on('return', (msg) => {
console.error('消息发送失败:', msg);
});
channel.on('error', (err) => {
console.error('通道发生错误:', err);
});
});
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
});
consumer2.ts
import RabbitMQ, { type Replies } from 'amqplib/callback_api';
RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
if (err0) {
console.error(err0);
return;
}
conn.createChannel(function (err1, channel) {
const queueName = 'queue2';
channel.assertQueue(queueName, { durable: true });
console.log('[*] waiting...');
// 一次只有一个未确认消息,防止消费者过载
channel.prefetch(1);
channel.bindQueue(
queueName,
'exchange5',
'',
{
'x-match': 'all', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
'loglevel': 'info',
'buslevel': 'product',
'syslevel': 'admin'
},
(err, ok) => {
console.log(queueName, '队列绑定结果', err, ok);
},
);
channel.consume(
queueName,
function (msg) {
console.log('接收到的消息', msg?.content.toString());
/*
// 手动确认取消channel.ack(msg); noAck:false,
// 自动确认消息
// if (msg) {
// channel.ack(msg);
// }
*/
},
{
noAck: true, // 是否自动确认消息
// noAck: false
},
(err: any, ok: Replies.Empty) => {
console.log(err, ok);
},
);
// return,error事件不会把消息重新放回队列
channel.on('return', (msg) => {
console.error('消息发送失败:', msg);
});
channel.on('error', (err) => {
console.error('通道发生错误:', err);
});
});
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
});