在nodejs中使用RabbitMQ(一)安装,使用

发布于:2025-02-12 ⋅ 阅读:(10) ⋅ 点赞:(0)

安装 

1、安装RabbitMQ,推荐直接使用docker安装。

docker container run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v ./data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin1234 rabbitmq:4.0-management

5672端口,rabbitmq服务

15672端口,rabbitmq可视化管理界面 

2、windows,linux不用容器,可根据官网教程按步骤进行安装。Installing on RPM-based Linux | RabbitMQ

RabbitMQ 介绍

RabbitMQ 是一个开源的消息代理软件(也称为消息队列),它实现了高级消息队列协议(AMQP)。RabbitMQ 提供了可靠的消息传递机制,适用于构建分布式系统、微服务架构以及需要解耦组件的应用程序。它支持多种消息传递模式,并且具有高度的可扩展性和灵活性。官方还提供了多语言支持:Python,Java,Ruby,PHP,C#,JavaScript,Go。

RabbitMQ 的基本概念

1.生产者(Producer):左侧绿色方框代表消息的生产者,生产者将消息发送到 RabbitMQ 服务器。

2.连接(Connection)和通道(Channel):消息首先通过一个连接进入 RabbitMQ,连接内部包含多个通道。每个通道是一个轻量级的连接,用于减少开销并进行通信。

3.RabbitMQ 服务(RabbitMQ Server)和虚拟主机(Virtual Host):中央部分展示了 RabbitMQ 服务器及其内部结构。RabbitMQ 服务器中,可以创建多个虚拟主机(Virtual Host),每个虚拟主机是一个独立的消息命名空间。

4.交换器(Exchange):在每个虚拟主机内,有多个交换器。交换器负责接收来自生产者的消息,并根据预定的路由规则将消息分发到不同的队列。常见的交换机类型包括:

Direct:基于精确匹配的路由键进行消息路由。
Fanout:广播消息到所有绑定的队列。
Topic:基于通配符匹配的路由键进行消息路由。
Headers:基于消息头中的键值对进行消息路由。

5.队列(Queue):消息根据路由规则被分发到对应的队列中。队列用于存储和管理消息,等待消费者来获取消息。

6.消费者(Consumer):右侧黄色方框代表消费者。通过连接和通道,消费者从 RabbitMQ 服务器的队列中获取和处理消息。

 示例

一个生产者对应一个消费者。

 一个生产者对应多个消费者。

如何防止数据丢失

1、优先处理每一步错误,如,队列创建,exchange路由创建,消息是否发送成功。

2、持久化队列数据 durable: true,持久化队列中消息 persistent: true。

3、手动确认消息是否接收,在数据处理完后确认,channel.ack(msg)。 (注:要防止数据重复处理)
 

producer.ts 在代码中没有创建exchange,会使用默认exchange。

import RabbitMQ from 'amqplib/callback_api';


function start() {
  RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60", function (err0, conn) {
    if (err0) {
      console.error("[AMQP]", err0.message);
      return setTimeout(start, 1000);
    }

    conn.on("error", function (err1) {
      if (err1.message !== "Connection closing") {
        console.error("[AMQP] conn error", err1.message);
      }
    });

    conn.on("close", function () {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 1000);
    });

    console.log("[AMQP] connected");

    conn.createChannel(async (err2, channel) => {
      if (err2) {
        console.error("[AMQP]", err2.message);
        return setTimeout(start, 1000);
      }

      const queueName = 'queue1';

      // 创建一个队列
      channel.assertQueue(queueName, {
        durable: true, //队列持久化
      }, (err, ok) => {
        if (err) {
          console.log('队列创建失败!');
        }
        console.log(err, ok);
      });

      for (let i = 0; i < 30; ++i) {
        console.log('message send!', channel.sendToQueue(
          queueName,
          Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
          { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
          // (err: any, ok: Replies.Empty)=>{}
        ));
      }
    });

    setTimeout(() => {
      conn.close();
      process.exit(0);
    }, 1000);
  });
}

start();

consumer.ts 消费者可以启动零到多个,因为设置了持久化如果没有消费者,数据会保存等待消费。

import RabbitMQ, { type Replies } from 'amqplib/callback_api';

RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
  if (err0) {
    console.error(err0);
    return;
  }

  conn.createChannel(function (err1, channel) {
    const queueName = 'queue1';

    channel.assertQueue(queueName, { durable: true });

    console.log('[*] waiting...');

    // 一次只有一个未确认消息,防止消费者过载
    channel.prefetch(1);

    channel.consume(
      queueName,
      function (msg) {
        console.log('接收到的消息', msg, msg?.content.toString());

        /*  
         // 手动确认取消channel.ack(msg); noAck:false,
   
         // 自动确认消息
         // if (msg) {
         //   channel.ack(msg);
         // } 
        */
      },
      {
        noAck: true, // 是否自动确认消息
        // noAck: false
      },
      (err: any, ok: Replies.Empty) => {
        console.log(err, ok);
      },
    );

  });

  conn.on("error", function (err1) {
    if (err1.message !== "Connection closing") {
      console.error("[AMQP] conn error", err1.message);
    }
  });

  conn.on("close", function () {
    console.error("[AMQP] reconnecting");
  });
});