MQ中的RabbitMQ

发布于:2025-04-02 ⋅ 阅读:(21) ⋅ 点赞:(0)

一、简述

MQ本质是一个队列,遵循先进先出的原则,但队列中存放的内容是消息。它是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。

优势:
1.MQ可以实现异步处理,提升用户体验和系统吞吐量。例如,用户注册后,系统可以异步发送注册邮件和短信通知,而无需用户等待这些操作完成
2.应用解耦‌:通过MQ,系统间的依赖关系得以解除,提高了系统的容错性和可维护性。
3.流量削峰:在高并发场景下,消息队列可以缓冲大量请求,防止系统过载,确保系统稳定运行。
4.消息持久化:确保即使在系统故障的情况下,消息也不会丢失。
 

核心概念:
生产者(Producer):负责生成并向消息队列发送消息的应用程序或组件。
消费者(Consumer):从消息队列中接收并处理消息的应用程序或组件。
消息(Message):在应用之间传送的数据单位,可以包含文本、对象等。
队列(Queue):存储消息的数据结构,遵循先进先出(FIFO)原则。
交换器(Exchange):在RabbitMQ等系统中,负责接收生产者发送的消息,并根据路由规则将消息路由到相应的队列。
绑定(Binding):用于将队列与交换器关联起来,定义消息如何从交换器路由到队列
 

常见产品:
1.RabbitMQ:基于AMQP协议的开源消息队列,支持多种消息传递模式,如点对点和发布订阅。
2.Apache Kafka:一个分布式流处理平台,支持高吞吐量和低延迟,适用于实时数据处理和日志聚合。
3.ActiveMQ:支持多种传输协议和消息模型的开源消息和集成模式服务器。
 

二、RabbitMQ

RabbitMQ是一个基于AMQP(高级消息队列协议)的开源消息队列中间件。它主要用于在分布式系统中传递消息,并提供可靠性、扩展性和可恢复性。

2.1 RabbitMQ的工作原理

‌1、消息生产者发送消息‌:
消息生产者创建消息并将其发送到RabbitMQ服务器。在发送消息时,生产者需要指定交换机(Exchange)和路由键。

2、交换机路由消息‌
交换机是RabbitMQ的核心组件之一,它接收生产者发送的消息,并根据一定的路由规则将消息转发到一个或多个队列中。
RabbitMQ提供了多种类型的交换机,如Direct、Fanout、Topic和Headers,以满足不同场景的需求。

3‌、队列存储消息‌:
队列用于存储消息,直到它们被消费者接收和处理。消息在队列中按照先进先出的原则进行排序。

4‌、消费者接收消息‌:
消费者通过订阅队列来接收消息。
在订阅队列之前,消费者需要先向RabbitMQ发送一条绑定指令,将自己的队列绑定到交换机上,并指定路由键

当交换机收到匹配路由键的消息时,就会将其发送到消费者的队列中。消费者可以通过拉取(Pull)或推送(Push)的方式从队列中获取消息并进行处理。

5‌、消息确认机制(ACK)‌:
RabbitMQ支持多种消息确认机制,如自动确认、显式确认和事务确认等。
消费者在接收到消息后,可以选择合适的确认机制向RabbitMQ发送确认消息(ACK),以通知RabbitMQ该消息已经被成功处理。RabbitMQ会根据消费者的确认消息来决定是否从队列中删除该消息。

6‌、消息持久化‌:
为了确保消息的可靠性,RabbitMQ支持消息持久化。
通过将消息、交换机和队列持久化存储在磁盘上,即使在RabbitMQ服务器重启后,消息也不会丢失。

7‌、消息模式‌:
RabbitMQ支持多种消息模式,如点对点模式和发布订阅模式,以满足不同场景的需求。

2.2 RabbitMQ的部署

1.安装Erlang
RabbitMQ是基于Erlang语言编写的,因此在所有服务器上都需要安装Erlang

sudo yum install erlang

2.在所有服务器上安装RabbitMQ。

sudo yum install rabbitmq-server

3.编辑/etc/rabbitmq/rabbitmq.config文件,解除以下注释行

{loopback_users,[]}

4.在第一个节点启动RabbitMQ服务

sudo systemctl start rabbitmq-server
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl start_app

4. 初始化集群;在第一台服务器上,将RabbitMQ设置为集群中的第一个节点

sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@<第一台节点的ip或主机名>
sudo rabbitmqctl start_app

5.其他服务器执行以下操作,加入集群

sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@第一节点的服务器ip
sudo rabbitmqctl start_app

6.验证集群状态

sudo rabbitmqctl cluster_status

7.创建用户,并授权,任意节点都可

sudo rabbitmqctl add_user <username> <password>
sudo rabbitmqctl set_user_tags <username> administrator
sudo rabbitmqctl set_permissions -p / <username> ".*" ".*" ".*"

8.导入RabbitMQ管理界面插件,在每台服务器上都执行

sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server

2.3 RbbitMQ的使用

1、添加依赖
Spring Boot 会自动配置 RabbitMQ 的连接工厂、模板类(RabbitTemplate)和监听容器。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置 RabbitMQ 连接;在 application.yml 中配置 RabbitMQ 服务器信息

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /  # 默认虚拟主机
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认消息(可选)

3、通过配置类声明队列、交换机和绑定关系

@Configuration
public class RabbitMQConfig {
    // Direct 模式 (消息通过 routing key 精确匹配队列)
    public static final String DIRECT_QUEUE = "direct_queue";
    public static final String DIRECT_EXCHANGE = "direct_exchange";
    public static final String ROUTING_KEY = "routing.key";

    @Bean
    public Queue directQueue() {
        return new Queue(DIRECT_QUEUE, true);  // 持久化队列
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE, true, false); // 持久化非自动删除的交换机
    }

    @Bean
    public Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with(ROUTING_KEY);
    }

    // Fanout 模式(广播消息到所有绑定队列,忽略 routing key)
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_exchange");
    }

    @Bean
    public Queue fanoutQueueA() { return new Queue("fanout_queue_A"); }

    @Bean
    public Binding bindingFanoutA(Queue fanoutQueueA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
    }
}

4、生产者示例

使用 RabbitTemplate 发送消息到交换机

@Component
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // Direct 模式发送
    public void sendDirectMessage(String message) {
        // convertAndSend 方法支持自动序列化对象
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.DIRECT_EXCHANGE, 
            RabbitMQConfig.ROUTING_KEY, 
            message
        );
    }

    // Fanout 模式发送(无需 routing key)
    public void sendFanoutMessage(String message) {
        rabbitTemplate.convertAndSend(
            "fanout_exchange", 
            "",  // routing key 为空
            message
        );
    }
}

5、消费者示例

@Component
public class MessageConsumer {
    // 监听 Direct 队列
    @RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE)
    public void handleDirectMessage(String message) {
        System.out.println("[Direct] Received: " + message);
    }

    // 监听 Fanout 队列 A
    @RabbitListener(queues = "fanout_queue_A")
    public void handleFanoutMessageA(String message) {
        System.out.println("[Fanout-A] Received: " + message);
    }
}

6、测试

@SpringBootTest
public class RabbitMQTest {
    @Autowired
    private MessageProducer producer;

    @Test
    public void testDirect() {
        producer.sendDirectMessage("Hello Direct!");
    }

    @Test
    public void testFanout() {
        producer.sendFanoutMessage("Hello Fanout!");
    }

    @Test
    public void testTopic() {
        producer.sendTopicMessage("Hello Topic!");
    }
}

7、Topic模式示例

配置类

@Configuration
public class TopicConfig {
    public static final String TOPIC_EXCHANGE = "topic_exchange";
    public static final String TOPIC_QUEUE = "topic_queue";
    public static final String ROUTING_PATTERN = "*.example.#"; // 通配符匹配规则

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue() {
        return new Queue(TOPIC_QUEUE);
    }

    @Bean
    public Binding bindingTopic(Queue topicQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with(ROUTING_PATTERN);
    }
}

发送和接收

// 生产者发送消息
rabbitTemplate.convertAndSend("topic_exchange", "user.example.action", "Topic Message");

// 消费者监听
@RabbitListener(queues = "topic_queue")
public void handleTopicMessage(String message) {
    System.out.println("[Topic] Received: " + message);
}

三、RbbitMQ注意

3.1 RbbitMQ消费者的异常处理

3.1.1 处理简单业务

使用局部 Try-Catch + 手动确认

1、开启手动确认消息

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认消息

2、局部 Try-Catch + 手动确认

@Component
public class MessageConsumer {
    
    @RabbitListener(queues = "direct_queue")
    public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 业务逻辑处理
            processMessage(message);
            // 成功处理,手动确认
            channel.basicAck(deliveryTag, false);
        } catch (BusinessException e) {
            // 业务异常,记录日志并拒绝消息(不重新入队)
            log.error("业务异常,消息丢弃: {}", message, e);
            channel.basicNack(deliveryTag, false, false);
        } catch (Exception e) {
            // 系统异常,拒绝消息并重新入队(重试)
            log.error("系统异常,消息重试: {}", message, e);
            channel.basicNack(deliveryTag, false, true);
        }
    }

    private void processMessage(String message) throws BusinessException {
        // 模拟业务处理
        if ("invalid".equals(message)) {
            throw new BusinessException("消息内容非法");
        }
    }
}
3.1.2 处理复杂业务

使用重试机制 + 死信队列;配置消息重试策略,当达到最大重试次数后,将消息自动路由到死信队列

1、声明死信交换机和队列

@Configuration
public class DeadLetterConfig {
    
    // 主队列
    @Bean
    public Queue mainQueue() {
        return QueueBuilder.durable("main_queue")
                .deadLetterExchange("dlx_exchange") // 绑定死信交换机
                .deadLetterRoutingKey("dlx.routing.key")
                .build();
    }

    // 死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx_exchange");
    }

    // 死信队列
    @Bean
    public Queue dlxQueue() {
        return new Queue("dlx_queue");
    }

    // 绑定死信路由
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with("dlx.routing.key");
    }
}

2、配置重试策略

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true          # 启用重试
          max-attempts: 3       # 最大重试次数(含首次)
          initial-interval: 1000 # 重试间隔(毫秒)
          multiplier: 2.0       # 间隔倍数(下次间隔 = initial-interval * multiplier^(attempt-1))

3、消费者处理

@Component
public class DlxConsumer {
    
    @RabbitListener(queues = "dlx_queue")
    public void handleDlxMessage(String message) {
        log.warn("死信队列消息: {}", message);
        // 记录日志、人工干预或持久化到数据库
    }
}

处理流程
生产者 → 主队列 → 消费者(处理失败)→ 重试 → 达到最大重试次数 → 死信队列(DLQ)
                                                                                                                 ↓
                                                                                                              监控告警 → 人工处理

监控方案:
1、使用 Prometheus + Grafana。
2、通过日志分析工具(如 ELK、Splunk)监控死信队列日志。

3.2 交换机的选择

队列可以同时绑定到多个交换机,每个绑定可以有不同的路由规则。

1、直连交换机(Direct)
完全匹配路由键(Routing Key),只有消息的路由键与队列绑定键(Binding Key)完全一致时才会分发消息。

2、扇型交换机(Fanout)
将消息广播到所有绑定的队列,忽略路由键。

3、主题交换机(Topic)
支持通配符匹配路由键,* 匹配单个单词,# 匹配零个或多个单词。

4、头部交换机(Headers)
根据消息头部(Headers)属性匹配,支持 x-match=any(任意匹配)或 x-match=all(全匹配)。

5、 默认交换机(Default)
无名直连交换机(""),所有队列自动绑定到它,路由键与队列名相同。

6、死信交换机(Dead Letter )
处理无法正常消费的消息(如过期、被拒绝的消息)。


网站公告

今日签到

点亮在社区的每一天
去签到