RabbitMQ之旅(1)

发布于:2025-03-09 ⋅ 阅读:(24) ⋅ 点赞:(0)

相信自己,终会成功

目录

主流MQ产品

1.kafaka 

2.RocketMQ

3.RabbitMQ

在xshell上安装RabbitMQ

 RabbitMQ七种工作模式

1.简单模式

​编辑

2.工作队列模式

3.发布/订阅模式

4.路由模式

5.通配符模式

6.RPC模式

AMQP.BasicProperties 设置消息属性的类

7.发布确认模式

代码展示(生产者)

常量命名规范

连接工厂 (ConnectionFactory) 

Channel(通道)

channel.queueDeclare 声明队列

channel.basicPublish 发送消息

channel.exchangeDeclare 声明创建交换机

channel.queueBind() 将队列绑定到交换机

channel.basicQos设置消费者预取限制

channel.basicAck 手动确认消息

代码展示(消费者)

DefaultConsumer:

handleDelivery 方法:


主流MQ产品

1.kafaka 

  1. 特点:高吞吐量、分布式、持久化、支持分区和副本。
  2. 适用场景:日志收集、流处理、实时数据分析等大数据场景。
  3. 优势:高吞吐量和低延迟,适合处理大量数据。
  4. 缺点:配置复杂,对小型项目可能过于重量级。

2.RocketMQ

  1. 特点:分布式、高吞吐量、低延迟、支持事务消息。
  2. 适用场景:电商、金融等需要高可靠性和事务支持的场景。
  3. 优势:支持事务消息,适合金融等高可靠性要求的场景。
  4. 缺点:社区相对较小,文档和资源不如Kafka丰富。

3.RabbitMQ

  1. 特点:轻量级、支持多种消息协议、易于使用和部署。
  2. 适用场景:中小型项目、需要快速上手的场景。
  3. 优势:易于使用,支持多种消息协议,社区活跃。
  4. 缺点:在大规模高并发场景下性能不如Kafka和RocketMQ。

在xshell上安装RabbitMQ

1.更新xshell中最新的软件包列表

sudo apt-get update

2.安装erlang

sudo apt-get install erlang

输入erl,出现下图内容表示安装成功  输入halt().退出即可

 3.安装rabbitmq

sudo apt-get install rabbitmq-server

4.确认安装结果

systemctl status rabbitmq-server

 显示running即可

 5.安装RabbitMQ管理界面

rabbitmq-plugins enable rabbiting_management

6.启动服务

sudo service rabbitmq-server start

在浏览器中输入自己云服务器的端口号+15672即可访问页面

添加用户名和密码 

rabbitmqctl add_user 用户名 密码

给用户权限

rabbitmqctl set_user_tags 用户名 权限等级

 RabbitMQ七种工作模式

P:生产者        C:消费者

queue:队列   X:交换机

在使用绑定的时候为 BindingKey

在发送消息的时候为RoutingKey

官方网站:RabbitMQ Tutorials | RabbitMQ

建立连接需要的信息:

1.IP   2.端口号   3.账号   4.密码   5. 虚拟主机

消费者代码:

1.创建连接  2.创建Channel  3.声明一个队列Queue  4.消费信息  5.释放资源

1.简单模式

一个生产者,一个消费者,点到点模式

2.工作队列模式

一个生产者,多个消费者

假如有十条队列消息,C1和C2是共同消费这10条消息,消息不会重复消费

3.发布/订阅模式

4.路由模式

订阅模式的变化形式

5.通配符模式

6.RPC模式

AMQP.BasicProperties 设置消息属性的类
属性名 类型 说明
contentType String 消息内容的 MIME 类型(如 text/plainapplication/json)。
contentEncoding String 消息内容的编码方式(如 UTF-8)。
headers Map<String, Object> 自定义消息头,用于传递额外信息。
deliveryMode Integer 消息的持久化模式:1(非持久化)或 2(持久化)。
priority Integer 消息的优先级(0 到 9,数值越大优先级越高)。
correlationId String 关联 ID,通常用于 RPC 模式,匹配请求和响应。
replyTo String 响应队列的名称,通常用于 RPC 模式。
expiration String 消息的过期时间(以毫秒为单位的字符串)。
messageId String 消息的唯一标识符。
timestamp Date 消息的时间戳。
type String 消息的类型标识符。
userId String 用户 ID,用于验证消息的发送者。
appId String 应用程序 ID,用于标识发送消息的应用程序。
//        AMQP.BasicProperties 是一个不可变类,因此需要通过其内部类 Builder 来创建对象。
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
  1. 客户端

    • 生成唯一的 correlationId

    • 设置 replyTo 为响应队列的名称。

    • 发送消息到请求队列(Constants.RPC_REQUEST_QUEUE)。

    • 监听响应队列(Constants.RPC_RESPONSE_QUEUE),等待服务器返回结果。

  2. 服务器

    • 监听请求队列(Constants.RPC_REQUEST_QUEUE)。

    • 处理请求后,将结果发送到客户端指定的响应队列(replyTo)。

    • 在响应消息中设置与请求相同的 correlationId

  3. 客户端匹配响应

    • 收到响应后,根据 correlationId 匹配对应的请求。

7.发布确认模式

是RabbitMQ消息可靠性保证的关键 


代码展示(生产者)

下图的代码Constants是自己写的 Java 常量

常量命名规范

1.常量名使用 全大写字母,并用下划线 _ 分隔单词(如 VIRTUALHOST 和 WORK_QUEUE)。

2.这是 Java 中的命名约定,便于区分常量和变量。 

//1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost(Constants.HOST); //云服务器的IP地址
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUALHOST); //虚拟主机


        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机   使用内置的交换机

连接工厂 (ConnectionFactory) 

是一个设计模式中的“工厂类”,它的目的是隐藏创建连接的复杂细节(比如网络配置、认证信息等)。你可以通过这个工厂对象预先设置连接参数(如服务器地址、端口、用户名、密码等),然后通过它来生成具体的连接对象

Channel(通道)

通道 是建立在连接(Connection)之上的一个轻量级逻辑连接。一个连接(Connection)可以创建多个通道,每个通道可以独立地发送和接收消息。通道的设计是为了复用连接,避免频繁创建和销毁连接的开销。创建通道后,通常会用它来声明队列、发送消息或消费消息

channel.queueDeclare 声明队列
//4.声明队列
        
channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);

         /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                  Map<String, Object> arguments)
         *  参数说明:
         *  queue: 队列名称
         *  durable: 可持久化
         *  exclusive: 是否独占
         *  autoDelete: 是否自动删除
         *  arguments: 参数
         */

channel.basicPublish 发送消息
        //5. 发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 参数说明:
         * exchange: 交换机名称
         * routingKey: 内置交换机, routingkey和队列名称保持一致
         * props: 属性配置
         * body: 消息
         */
        for (int i = 0; i < 10; i++) {
            String msg = "hello rabbitmq~"+i;
            channel.basicPublish("","hello", null, msg.getBytes());
        }

 //6. 资源释放
        channel.close();
        connection.close();
channel.exchangeDeclare 声明创建交换机
Exchange.DeclareOk exchangeDeclare(
    String exchange,              // 交换机名称
    String type,                  // 交换机类型(direct、fanout、topic、headers)
    boolean durable,              // 是否持久化
    boolean autoDelete,           // 是否自动删除
    boolean internal,            // 是否内部交换机
    Map<String, Object> arguments // 额外参数
) throws IOException;

channel.queueBind() 将队列绑定到交换机
  • queue:队列名称(如 Constants.PUBLISH_QUEUE1)。

  • exchange:交换机名称(如 Constants.PUBLISH_EXCHANGE)。

  • routingKey:路由键(如 "" 或 "key1"

channel.queueBind(Constants.PUBLISH_QUEUE1, Constants.PUBLISH_EXCHANGE, "");
channel.queueBind(Constants.PUBLISH_QUEUE2, Constants.PUBLISH_EXCHANGE, "");
//作用:将 Constants.PUBLISH_QUEUE1 和 Constants.PUBLISH_QUEUE2 绑定到 //Constants.PUBLISH_EXCHANGE。
//路由键为空字符串(""),表示所有消息都会被路由到这两个队列(前提是交换机类型支持)。
channel.basicQos设置消费者预取限制
参数名 类型 说明
prefetchSize int 预取消息的总大小(以字节为单位),通常设置为 0 表示不限制大小。
prefetchCount int 预取消息的数量限制(即未确认消息的最大数量)。
global boolean 是否全局生效:true 表示对整个 Channel 生效,false 表示对每个消费者生效。
// 设置每个消费者最多预取 10 条未确认消息
channel.basicQos(10);

// 设置整个 Channel 最多预取 100 条未确认消息
channel.basicQos(100, true);

// 设置预取消息的总大小不超过 1MB,数量不超过 10 条
channel.basicQos(1024 * 1024, 10, false);
channel.basicAck 手动确认消息
参数名 类型 说明
deliveryTag long 消息的唯一标识符,由 RabbitMQ 分配。
multiple boolean 是否批量确认:true 表示确认所有比 deliveryTag 小的消息;false 表示仅确认当前消息。

使用场景

  • 手动确认模式:当消费者从队列中拉取消息时,如果启用了手动确认模式(autoAck=false),则必须调用 basicAck 来确认消息。

  • 确保消息可靠性:通过手动确认,可以确保消息在处理成功后才会从队列中删除,避免消息丢失。

  • 批量确认:通过设置 multiple=true,可以一次性确认多条消息,提高效率。


代码展示(消费者)

public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列   使用内置的交换机
        //如果队列不存在, 则创建, 如果队列存在, 则不创建
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        6. 资源释放
        channel.close();
        connection.close();
    }

DefaultConsumer

RabbitMQ 提供的默认消费者类。

channel:消费者绑定的通道(Channel)。

匿名内部类:通过 new DefaultConsumer(channel) { ... } 创建一个匿名内部类,并写 handleDelivery 方法。

handleDelivery 方法

当队列中有消息时,RabbitMQ 会调用此方法将消息传递给消费者

RabbitMQ 支持两种消息确认模式:

  1. 自动确认

    • 在 basicConsume 方法中将第二个参数设置为 true

    • 消费者接收到消息后,RabbitMQ 会自动将消息标记为已处理。

    • 示例:

      channel.basicConsume(QUEUE_NAME, true, consumer);
  2. 手动确认

    • 在 basicConsume 方法中将第二个参数设置为 false

    • 需要在 handleDelivery 方法中手动调用 channel.basicAck() 确认消息。

    • 示例:

      channel.basicAck(envelope.getDeliveryTag(), false);