在nodejs中使用RabbitMQ(三)Routing、Topics、Headers

发布于:2025-02-12 ⋅ 阅读:(8) ⋅ 点赞:(0)

示例一、Routing

exchange类型direct,根据消息的routekey将消息直接转发到指定队列。producer.ts 生产者主要发送消息,consumer.ts负责接收消息,同时也都可以创建exchange交换机,创建队列,为队列绑定exchange,为避免重复简化代码,提高可维护性,队列相关操作移动到消费者端。队列,exchange交换机推荐在启动程序前手动创建好。

producer.ts 

import RabbitMQ from 'amqplib/callback_api';


function start() {
  RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60", function (err0, conn) {
    if (err0) {
      console.error("[AMQP]", err0.message);
      return setTimeout(start, 1000);
    }

    conn.on("error", function (err1) {
      if (err1.message !== "Connection closing") {
        console.error("[AMQP] conn error", err1.message);
      }
    });

    conn.on("close", function () {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 1000);
    });

    console.log("[AMQP] connected");

    conn.createChannel(async (err2, channel) => {
      if (err2) {
        console.error("[AMQP]", err2.message);
        return setTimeout(start, 1000);
      }

      const exchangeName = 'exchange1';
      channel.assertExchange(
        exchangeName,
        'direct',
        {
          durable: true
        },
        (err, ok) => {
          if (err) {
            console.log('exchange路由转发创建失败', err);
          } else {
            let args = ['info', 'warn', 'error'];
            for (let i = 0; i < 10; ++i) {
              // console.log('message send!', channel.sendToQueue(
              //   queueName,
              //   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
              //   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
              //   // (err: any, ok: Replies.Empty)=>{}
              // ));
              const routeKey = args[Math.floor(Math.random() * 3)];
              console.log('消息发送是否成功', channel.publish(
                exchangeName,
                routeKey,
                Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)},${routeKey}`),
                { persistent: true },
              ));
            }
          }
        }
      );
    });

    setTimeout(() => {
      conn.close();
      process.exit(0);
    }, 1000);
  });
}

start();

consumer.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';


RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
  if (err0) {
    console.error(err0);
    return;
  }

  conn.createChannel(function (err1, channel) {
    const queueName = 'queue1';

    channel.assertQueue(queueName, { durable: true }, (err2) => {
      if (err2) {
        console.log('队列创建失败', err2);
        return;
      }
      console.log('[*] waiting...');

      // 一次只有一个未确认消息,防止消费者过载
      channel.prefetch(1);

      channel.bindQueue(queueName, 'exchange1', 'info', {}, (err3, ok) => {
        console.log(queueName, '队列绑定结果', err3, ok);
      });
      channel.bindQueue(queueName, 'exchange1', 'warn', {}, (err3, ok) => {
        console.log(queueName, '队列绑定结果', err3, ok);
      });
      channel.bindQueue(queueName, 'exchange1', 'error', {}, (err3, ok) => {
        console.log(queueName, '队列绑定结果', err3, ok);
      });

      channel.consume(
        queueName,
        function (msg) {
          console.log('接收到的消息', msg?.content.toString());

          /*
           // 手动确认取消channel.ack(msg); noAck:false,
     
           // 自动确认消息
           // if (msg) {
           //   channel.ack(msg);
           // } 
          */
        },
        {
          noAck: true, // 是否自动确认消息
          // noAck: false
        },
        (err3: any, ok: Replies.Empty) => {
          console.log(err3, ok);
        },
      );
    });

  });

  conn.on("error", function (err1) {
    if (err1.message !== "Connection closing") {
      console.error("[AMQP] conn error", err1.message);
    }
  });

  conn.on("close", function () {
    console.error("[AMQP] reconnecting");
  });
});

consumer2.ts

import RabbitMQ from 'amqplib';


const conn = await RabbitMQ.connect('amqp://admin:admin1234@localhost:5672');

conn.on("error", function (err1) {
  if (err1.message !== "Connection closing") {
    console.error("[AMQP] conn error", err1.message);
  }
});

conn.on("close", function () {
  console.error("[AMQP] reconnecting");
});

const channel = await conn.createChannel();

const queueName = 'queue2';

await channel.assertQueue(queueName, { durable: true });

console.log('[*] waiting...');

// 一次只有一个未确认消息,防止消费者过载
await channel.prefetch(1);

await channel.bindQueue(queueName, 'exchange1', 'error', {});

channel.consume(
  queueName,
  function (msg) {
    console.log('接收到的消息', msg?.content.toString());

    /*
      // 手动确认取消channel.ack(msg); noAck:false,

      // 自动确认消息
      // if (msg) {
      //   channel.ack(msg);
      // } 
    */
  },
  {
    noAck: true, // 是否自动确认消息
    // noAck: false
  },
);

示例二、Topic

exchange的topic类型和direct类似,使用的仍然是routeKey进行匹配转发,topic支持通过*和#进行模糊查询。*代码一个具体单词,#代码0或多个单词。

producer.ts

import RabbitMQ from 'amqplib';


async function start() {
  const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");

  conn.on("error", function (err1) {
    if (err1.message !== "Connection closing") {
      console.error("[AMQP] conn error", err1.message);
    }
  });

  conn.on("close", function () {
    console.error("[AMQP] reconnecting");
    return setTimeout(start, 1000);
  });

  try {
    const channel = await conn.createChannel();

    console.log("[AMQP] connected");

    const exchangeName = 'exchange4';
    await channel.assertExchange(exchangeName, 'topic', { durable: true });

    let args = ['123.orange.456', '123.456.rabbit', 'lazy', 'lazy.123', 'lazy.123.456'];
    for (let i = 0; i < 20; ++i) {
      // console.log('message send!', channel.sendToQueue(
      //   queueName,
      //   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
      //   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
      //   // (err: any, ok: Replies.Empty)=>{}
      // ));

      const routeKey = args[Math.floor(Math.random() * args.length)];
      console.log('消息发送是否成功', channel.publish(
        exchangeName,
        routeKey,
        Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)},${routeKey}`),
        { persistent: true },
      ));
    }
  } catch (err) {
    console.error("[AMQP]", err);
    return setTimeout(start, 1000);
  }

  setTimeout(() => {
    conn.close();
    process.exit(0);
  }, 1000);
}

start();

consumer.ts

import RabbitMQ from 'amqplib';


const conn = await RabbitMQ.connect('amqp://admin:admin1234@localhost:5672');

conn.on("error", function (err1) {
  if (err1.message !== "Connection closing") {
    console.error("[AMQP] conn error", err1.message);
  }
});

conn.on("close", function () {
  console.error("[AMQP] reconnecting");
});

const channel = await conn.createChannel();

const queueName = 'queue1';

channel.assertQueue(queueName, { durable: true });

console.log('[*] waiting...');

// 一次只有一个未确认消息,防止消费者过载
await channel.prefetch(1);

// *代码一个具体单词,#代码0或多个单词
await channel.bindQueue(queueName, 'exchange4', '*.orange.*', {});

channel.consume(queueName, function (msg) {
  console.log('接收到的消息', msg?.content.toString());

  // 手动确认取消channel.ack(msg);设置noAck:false,
  // 自动确认消息noAck:true,不需要channel.ack(msg);
  try {
    if (msg) {
      channel.ack(msg);
    }
  } catch (err) {
    if (msg) {
      // 第二个参数,false拒绝当前消息
      // 第二个参数,true拒绝小于等于当前消息
      // 第三个参数,3false从队列中清除
      // 第三个参数,4true从新在队列中排队
      channel.nack(msg, false, false);
    }
    console.log(err);
  }
}, {
  // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
  noAck: false
});

consumer2.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';


RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
  if (err0) {
    console.error(err0);
    return;
  }

  conn.createChannel(function (err1, channel) {
    const queueName = 'queue2';

    channel.assertQueue(queueName, { durable: true });

    console.log('[*] waiting...');

    // 一次只有一个未确认消息,防止消费者过载
    channel.prefetch(1);

    channel.bindQueue(queueName, 'exchange4', '*.*.rabbit', {}, (err, ok) => {
      console.log(queueName, '队列绑定结果', err, ok);
    });
    channel.bindQueue(queueName, 'exchange4', 'lazy.#', {}, (err, ok) => {
      console.log(queueName, '队列绑定结果', err, ok);
    });

    channel.consume(queueName, function (msg) {
      console.log('接收到的消息', msg?.content.toString());

      // 手动确认取消channel.ack(msg);设置noAck:false,
      // 自动确认消息noAck:true,不需要channel.ack(msg);
      try {
        if (msg) {
          channel.ack(msg);
        }
      } catch (err) {
        if (msg) {
          // 第二个参数,false拒绝当前消息
          // 第二个参数,true拒绝小于等于当前消息
          // 第三个参数,3false从队列中清除
          // 第三个参数,4true从新在队列中排队
          channel.nack(msg, false, false);
        }
        console.log(err);
      }
    }, {
      // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
      noAck: false
    });

    // return,error事件不会把消息重新放回队列
    channel.on('return', (msg) => {
      console.error('消息发送失败:', msg);
    });

    channel.on('error', (err) => {
      console.error('通道发生错误:', err);
    });
  });

  conn.on("error", function (err1) {
    if (err1.message !== "Connection closing") {
      console.error("[AMQP] conn error", err1.message);
    }
  });

  conn.on("close", function () {
    console.error("[AMQP] reconnecting");
  });
});

示例三、Headers

 exchange类型headers,根据传递的头部信息进行转发,头部信息类型为object对象。在头部信息中要设置x-match属性,'x-match': 'any', any,下方消息匹配上一个就可以。all,下方消息要全部匹配。

producer.ts

import RabbitMQ from 'amqplib/callback_api';


function start() {
  RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60", function (err0, conn) {
    if (err0) {
      console.error("[AMQP]", err0.message);
      return setTimeout(start, 1000);
    }

    conn.on("error", function (err1) {
      if (err1.message !== "Connection closing") {
        console.error("[AMQP] conn error", err1.message);
      }
    });

    conn.on("close", function () {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 1000);
    });

    console.log("[AMQP] connected");

    conn.createChannel(async (err2, channel) => {
      if (err2) {
        console.error("[AMQP]", err2.message);
        return setTimeout(start, 1000);
      }

      const exchangeName = 'exchange5';
      channel.assertExchange(
        exchangeName,
        'headers',
        {
          durable: true
        },
        (err, ok) => {
          if (err) {
            console.log('exchange路由转发创建失败', err);
          } else {
            let args = [
              {
                // 'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
                'loglevel': 'info',
                // 'buslevel': 'product',
                // 'syslevel': 'admin'
              },
              {
                // 'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
                // 'loglevel': 'info',
                'buslevel': 'product',
                'syslevel': 'admin'
              },
              {
                // 'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
                // 'loglevel': 'info',
                'buslevel': 'product',
                // 'syslevel': 'admin'
              },
              {
                // 'x-match': 'all', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
                'loglevel': 'info',
                'buslevel': 'product',
                'syslevel': 'admin'
              },
            ];
            for (let i = 0; i < 20; ++i) {
              // console.log('message send!', channel.sendToQueue(
              //   queueName,
              //   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
              //   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
              //   // (err: any, ok: Replies.Empty)=>{}
              // ));
              
              const routeKey = args[Math.floor(Math.random() * args.length)];
              console.log('消息发送是否成功', routeKey, channel.publish(
                exchangeName,
                '',
                Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)},${JSON.stringify(routeKey)}`),
                {
                  persistent: true,
                  headers: routeKey
                },
              ));
            }
          }
        }
      );
    });

    setTimeout(() => {
      conn.close();
      process.exit(0);
    }, 1000);
  });
}

start();

consumer.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';


RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
  if (err0) {
    console.error(err0);
    return;
  }

  conn.createChannel(function (err1, channel) {
    const queueName = 'queue1';

    channel.assertQueue(queueName, { durable: true });

    console.log('[*] waiting...');

    // 一次只有一个未确认消息,防止消费者过载
    channel.prefetch(1);

    // *代码一个具体单词,#代码0或多个单词
    channel.bindQueue(
      queueName,
      'exchange5',
      '',
      {
        'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
        'loglevel': 'info',
        'buslevel': 'product',
        'syslevel': 'admin'
      },
      (err, ok) => {
        console.log(queueName, '队列绑定结果', err, ok);
      },
    );

    channel.consume(queueName, function (msg) {
      console.log('接收到的消息', msg?.content.toString());

      // 手动确认取消channel.ack(msg);设置noAck:false,
      // 自动确认消息noAck:true,不需要channel.ack(msg);
      try {
        if (msg) {
          channel.ack(msg);
        }
      } catch (err) {
        if (msg) {
          // 第二个参数,false拒绝当前消息
          // 第二个参数,true拒绝小于等于当前消息
          // 第三个参数,3false从队列中清除
          // 第三个参数,4true从新在队列中排队
          channel.nack(msg, false, false);
        }
        console.log(err);
      }
    }, {
      // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
      noAck: false
    }, (err: any, ok: Replies.Empty) => {
      console.log(err, ok);
    });

    // return,error事件不会把消息重新放回队列
    channel.on('return', (msg) => {
      console.error('消息发送失败:', msg);
    });

    channel.on('error', (err) => {
      console.error('通道发生错误:', err);
    });
  });

  conn.on("error", function (err1) {
    if (err1.message !== "Connection closing") {
      console.error("[AMQP] conn error", err1.message);
    }
  });

  conn.on("close", function () {
    console.error("[AMQP] reconnecting");
  });
});

consumer2.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';


RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
  if (err0) {
    console.error(err0);
    return;
  }

  conn.createChannel(function (err1, channel) {
    const queueName = 'queue2';

    channel.assertQueue(queueName, { durable: true });

    console.log('[*] waiting...');

    // 一次只有一个未确认消息,防止消费者过载
    channel.prefetch(1);

    channel.bindQueue(
      queueName,
      'exchange5',
      '',
      {
        'x-match': 'all', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配
        'loglevel': 'info',
        'buslevel': 'product',
        'syslevel': 'admin'
      },
      (err, ok) => {
        console.log(queueName, '队列绑定结果', err, ok);
      },
    );

    channel.consume(
      queueName,
      function (msg) {
        console.log('接收到的消息', msg?.content.toString());

        /*
         // 手动确认取消channel.ack(msg); noAck:false,
   
         // 自动确认消息
         // if (msg) {
         //   channel.ack(msg);
         // } 
        */
      },
      {
        noAck: true, // 是否自动确认消息
        // noAck: false
      },
      (err: any, ok: Replies.Empty) => {
        console.log(err, ok);
      },
    );

    // return,error事件不会把消息重新放回队列
    channel.on('return', (msg) => {
      console.error('消息发送失败:', msg);
    });

    channel.on('error', (err) => {
      console.error('通道发生错误:', err);
    });
  });

  conn.on("error", function (err1) {
    if (err1.message !== "Connection closing") {
      console.error("[AMQP] conn error", err1.message);
    }
  });

  conn.on("close", function () {
    console.error("[AMQP] reconnecting");
  });
});

网站公告

今日签到

点亮在社区的每一天
去签到