【RabbitMQ】概述

发布于:2024-09-18 ⋅ 阅读:(66) ⋅ 点赞:(0)

前言

Rabbit是一个公司名称,MQ(Message Queue)是消息队列的意思。所以RabbitMQ指的就是Rabbit企业下的一个消息队列产品。

RabbitMQ是一个实现了AMQP协议的消息队列服务,是当前主流的消息中间件之一。

AMQP(Advanced Message Queuing Protocol),高级消息队列协议。

AMQP协议是一个通用的应用层协议,用来提供统一消息服务的协议,为面向消息的中间件设计。AMQP协议定义了一套确定的消息交换功能,包括交换器、队列等,这些组件共同工作,使得生产者能够将消息发送到交换器,然后由队列接收并等待消费者接收。基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件或开发语言等的限制。

RabbitMQ是遵从AMQP协议的。换句话说,RabbitMQ就是AMQP协议的Erlang实现(RabbitMQ也支持STOMP、MQTT2等协议)。AMQP的模型结构和RabbitMQ的模型结构是一样的。

 在互联网架构中,会经常使用MQ来作为消息通信服务。

什么是MQ

MQ,即Message Queue,可以学到组件部分的同学一定对这个东西不陌生,至少对Queue不陌生。在数据结构中,就肯定学习了队列。谈到队列,最简单的无非就是它先进先出的特性。对于MQ来说,实质就是一个队列,只不过是一个功能更强大的队列而已。在消息队列中,可以存放文本字符串、JSON等简单内容,也可以内嵌对象这样的复杂内容。

MQ多用于分布式系统之间进行通信(作用)。

系统之间的调方式一般有两种:

同步通信,直接调用对方的服务,数据从一段发出后立即到达另一端

异步通信,数据从一端出发之后,现进入一个容器进行临时存储,当到达某种条件之后,再由这个容器发送给另一端。容器的一个具体实现就是消息队列。

 MQ的作用

MQ主要的工作就是收发消息,在不同的应用场景下可以展现不同的作用。

异步解耦

在业务流程中,一些操作可能非常耗时,但并不需求及时返回结果。这时可以借助MQ把这些操作异步化。比如,用户完成注册后发送短信通知或邮件通知,可以作为异步任务处理,而不必等待这些操作完成后才告知用户注册成功。

流量削峰

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是最大的浪费。使用MQ能够使关键组件支撑突发访问压力,不会因为突发流量而崩溃。比如秒杀或促销活动,可以使用MQ来控制流量,将请求排队,然后系统根据自己的处理能力逐渐处理这些请求。

异步通信

在很多时候应用不需要立即处理消息,消息队列提供了异步处理机制。允许应用把一些消息放在消息队列中,但是并不立即处理它,在需要的时候慢慢处理。

消息分发

当多个系统需要对同一数据做出相应时,可以使用MQ进行消息分发。比如支付成功后,支付系统可以向MQ发送消息,其他系统订阅消息,而无需轮询数据库。

延迟通知

在需要在特定时间后发送通知的场景中,可以使用MQ的延迟消息功能。比如在电子商务平台中,如果用户下单后一定时间内未支付,可以使用延迟队列在超时后自动取消订单。

不同MQ的区别

目前业界有很多的MQ产品,例如RabbitMQ、Kafka、RocketMQ以及ActiveMQ等,当然直接使用Redis中的消息队列的也有。这些消息队列,各有侧重,没有好坏之分,只有适合不适合。在实际选型时,往往需要结合自身需求以及MQ产品的特性,综合考虑。

Kafka

Kafka一开始的目的就是用于日志收集和传输,追求高吞吐量,性能卓越,单机吞吐可以达到十万级,在日志领域比较成熟,功能较为简单,主要支持简单的MQ功能。如果有日志采集需求,首选肯定是Kafka。

RabbitMQ

采用Erlang语言开发,MQ功能比较完善,且支持所有的主流语言,开源提供的界面也非常友好,性能较好,吞吐量能达到万级,社区活跃度也比较高。主要适合中小型企业,数据量没那么大,且并发没那么高的场景。

RocketMQ

采用Java语言开发,由阿里巴巴开源,后捐赠给Apache。在可用性、可靠性以及稳定性方面都非常出色,吞吐量达到十万级,在阿里巴巴集团内部广泛使用。但是支持的客户端语言不高,产品较新文档较少,且社区活跃度一般。适合大规模分布式系统,可靠性要求高,且并发大的场景,比如互联网金融。

核心概念

  •  Producer:生产者,是RabbitMQ Server的客户端,向RabbitMQ发送消息。
  • Consumer:消费者,是RabbitMQ Server的客户端,从RabbitMQ接收消息。
  • Broker:其实就是RabbitMQ Server,主要是接消息和发消息。
  • Virtual Host:虚拟主机。这是一个虚拟概念,他为消息队列提供了一种逻辑上的隔离机制。对于RabbitMQ而言,一个Broker上可以存在多个虚拟主机。当多个用户使用同一个Broker提供的服务时,可以划分出多个虚拟主机,每个用户在自己的虚拟主机创建交换机和队列。
  • Exchange:交换机。消息到达Broker的第一站,交换机负责接收生产者发送的消息,并根据特定的规则将这些消息路由到一个或者多个队列中。交换机起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息。
  • Queue:队列,用于存储消息。队列和消费者之间的关系是多对多的。
  • Connection:连接,是客户端和Broker之间的一个TCP连接,这个连接是建立消息传递的基础,它负责客户端和服务器之间的所有数据和控制信息。
  • Channel:通道,信道。Channel是在Connection之上的一个抽象层。在RabbitMQ中,一个TCP连接可以有多个信道,每个信道都是独立的虚拟连接,消息的收发都是基于信道的。通道的主要作用是将信息的读写操作复用到同一个TCP连接上,这样就可以减少建立和关闭连接的过程,提高性能。

工作流程

  1. 生产消息:生产者生产了一条消息。
  2. 创建连接:生产者连接到Broker,建立一个连接,开启一个信道。
  3. 声明交换机、队列以及绑定规则:生产者声明一个队列,用来存放消息;声明一个交换机,用来路由消息;制定一个绑定规则,使得交换机把消息能路由给队列。
  4. 发送消息:生产者发送消息到Broker。
  5. 消息存储:Broker接收到消息,并根据路由规则存入相应的队列中。如果未找到相应的队列,则根据生产者的配置,选择丢弃或者退回给生产者。
  6. 消费消息:消费者监听队列,当消息到达时,从队列中获取消息。处理后,向Broker发送消息确认。
  7. 删除消息:消息被确认后,RabbitMQ会把消息从队列中删除。

入门案例

引入依赖

<!--rabbitmq依赖-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

生产者代码编写

// 入门案例的生产者代码编写

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("43.138.108.125"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明交换机,此处使用的是RabbitMQ内部的交换机,因此不需要声明

        // TODO 声明队列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称
         * durable:可持久化,当mq接收到消息之后,是否要持久化到硬盘中(防止因宕机等一系列原因导致的信息丢失)
         * exclusive:是否独占,该队列在消费者消费时,是否只能存在一个消费者
         * autoDelete:是否自动删除,该队列没有消费者时,是否将其删除
         */
        channel.queueDeclare("entry", true, false, false, null);

        // TODO 发送消息
        /**
         * basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
         * exchange:交换机名称,此处使用的是内置交换机,因此不需要给出名称
         * routingKey:交换机和队列的绑定关系,由于使用的是内置交换机,所以该属性要和队列名保持一致
         * props:属性配置
         * body:消息内容
         */
        String message = "hello entry";
        channel.basicPublish("", "entry", null, message.getBytes());

        // TODO 资源释放
        channel.close();
        connection.close();

    }

}

当启动该程序之后,就会发现在开源界面的队列中新增了一个entry队列,并且在队列中存在一条消息。

消费者代码编写

// 入门案例的消费者代码编写

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("43.138.108.125"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明交换机,此处使用的是RabbitMQ内部的交换机,因此不需要声明

        /* TODO 声明队列,此处声明是可以省略的,不过不建议
           TODO 因为在实际开发者,无法确定生产者和消费者哪个先启动,如果消费者先启动
           TODO 消费则就会去监听该队列,但是却发现队列不存在,就会产生报错 */
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称
         * durable:可持久化,当mq接收到消息之后,是否要持久化到硬盘中(防止因宕机等一系列原因导致的信息丢失)
         * exclusive:是否独占,该队列在消费者消费时,是否只能存在一个消费者
         * autoDelete:是否自动删除,该队列没有消费者时,是否将其删除
         */
        channel.queueDeclare("entry", true, false, false, null);

        // TODO 消费消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * basicConsume(String queue, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback 接收到消息之后,执行的逻辑
         */
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 在这个方法中,我们可以定义如何处理接收到的消息
             * @param consumerTag 消费者标签,通常是消费者在订阅队列时指定的
             * @param envelope 包含消息的封包消息,如队列名称、交换机等
             * @param properties 一些配置信息
             * @param body 消息的具体内容
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("成功接收到消息");
                System.out.println(new String(body));
            }
        };
        channel.basicConsume("entry", true, consumer);

        // TODO 释放资源
        channel.close();
        connection.close();
    }

}

当启动该程序之后,就会看到entry队列的消息从1变成了0,并且在输出界面上出现了下述内容。

在本篇文章中,主要是对MQ整体进行了一个概述,并且写了一个简单的案例来表示RabbitMQ是如何使用的。在下篇文章中,将主要来介绍MQ的工作模式。