Spring Boot 整合 RabbitMQ

发布于:2025-07-11 ⋅ 阅读:(20) ⋅ 点赞:(0)

Spring Boot 整合 RabbitMQ

一、概述:RabbitMQ 是什么?

你可以把 RabbitMQ 想象成一个「快递中转站」。
比如你在网上买了一本书,卖家(生产者)把包裹(消息)交给快递站(RabbitMQ),快递站根据包裹上的地址(规则)把包裹分给不同的快递员(消费者),最后送到你家(业务系统)。

RabbitMQ 是一个专门用来「传递消息」的软件(专业叫「消息中间件」),它能让不同的程序、不同的电脑之间高效地「传小纸条」。


二、RabbitMQ 的「快递分类方式」(交换机类型)

快递站分包裹时,可能按「地址」「重量」「紧急程度」分类。RabbitMQ 也有类似的「分类规则」,叫 交换机(Exchange)。常用的有 4 种:

1. 直连交换机(Direct Exchange)

规则:包裹上必须写「精确地址」(路由键 Routing Key),只有地址完全匹配的快递员才能收到。
例子:卖家给「北京-朝阳区」的包裹,只有负责朝阳区的快递员能接。

2. 扇形交换机(Fanout Exchange)

规则:不管地址,「所有快递员」都能收到包裹(广播模式)。
例子:卖家发「双11大促通知」,所有快递员都要知道,一起准备加班。

3. 主题交换机(Topic Exchange)

规则:地址可以用「通配符」(比如 * 代表一个词,# 代表多个词)。
例子:卖家发「北京.*」的包裹,所有地址以「北京」开头的快递员(如北京-朝阳、北京-海淀)都能收到。

4. 头交换机(Headers Exchange)

规则:不看地址,看包裹上的「标签」(Headers 头信息,比如「优先级=高」)。
例子:卖家标「紧急」的包裹,只有关注「紧急」标签的快递员能接。


三、RabbitMQ 的使用场景(为什么需要它?)

1. 异步处理:省时间!

比如你在淘宝下单,系统需要「扣库存+发短信+更新积分」。如果一步步做,可能要等 5 秒;用 RabbitMQ 可以把「发短信」和「更新积分」的任务丢给 RabbitMQ,主流程只需要 1 秒完成下单,剩下的由其他程序慢慢处理。

2. 流量削峰:防崩溃!

双11时,订单像洪水一样涌来,系统直接处理可能被冲垮。RabbitMQ 像「水库」,把订单暂时存起来,系统按自己的速度慢慢处理(比如每秒处理 1000 单),避免被瞬间的高流量冲垮。

3. 系统解耦:不互相拖累!

比如电商系统有「订单模块」「库存模块」「短信模块」。如果订单模块直接调用库存和短信模块,一旦短信模块崩溃,订单也会失败。用 RabbitMQ 后,订单模块只需要把消息发给 RabbitMQ,其他模块自己来取,互不影响。

四、整合Springboot

1. 配置 RabbitMQ 连接

1.Maven

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

2.配置文件,yml和properties选择一个

spring:
  rabbitmq:
    host: 117.185.165.187
    port: 5672
    username: rabbitmq
    password: j8iG3KYs7Wmxxx
# RabbitMQ 服务器地址(默认 localhost:5672)
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 登录账号密码(默认 guest/guest,注意:远程连接需要改密码!)
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

2、定义「快递规则」:交换机和队列

RabbitMQ 的消息需要通过「交换机(Exchange)」和「队列(Queue)」传递。我们需要先告诉 Spring Boot 要创建哪些交换机和队列。

新建 RabbitMQConfig.java,用 @Bean 声明交换机、队列和绑定关系。

做一个「电商下单后发通知」的功能,需要:

  • 一个直连交换机(order_exchange)。
  • 一个队列(sms_queue),专门存「需要发短信的订单」。
  • 把队列和交换机绑定,路由键是 send_sms(只有路由键匹配的消息才会进这个队列)。
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 1. 声明直连交换机(名字叫 order_exchange)
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order_exchange");
    }

    // 2. 声明队列(名字叫 sms_queue,存需要发短信的订单)
    @Bean
    public Queue smsQueue() {
        return new Queue("sms_queue");
    }

    // 3. 把队列和交换机绑定,路由键是 send_sms(只有路由键匹配的消息才会进这个队列)
    @Bean
    public Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(smsQueue)
                .to(orderExchange)
                .with("send_sms");  // 路由键必须和生产者发送时一致
    }
}

如果说是多个队列按照下面的

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 1. 声明直连交换机(名字叫 order_exchange)
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order_exchange");
    }

    // 2. 声明 3 个队列(短信、积分、日志)
    @Bean
    public Queue smsQueue() {
        return new Queue("sms_queue");  // 存需要发短信的订单
    }

    @Bean
    public Queue scoreQueue() {
        return new Queue("score_queue");  // 存需要更新积分的订单
    }

    @Bean
    public Queue logQueue() {
        return new Queue("log_queue");  // 存需要记录日志的订单
    }
    
  // 3. 绑定 sms_queue(路由键 send_sms)
    @Bean
    public Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(smsQueue)
                .to(orderExchange)
                .with("send_sms");  // 路由键:只有 send_sms 的消息会进 sms_queue
    }
    // 4. 绑定 score_queue(路由键 update_score)
    @Bean
    public Binding scoreBinding(Queue scoreQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(scoreQueue)
                .to(orderExchange)
                .with("update_score");  // 路由键:只有 update_score 的消息会进 score_queue
    }
    // 5. 绑定 logQueue(路由键 log_order)
    @Bean
    public Binding logBinding(Queue logQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(logQueue)
                .to(orderExchange)
                .with("log_order");  // 路由键:只有 log_order 的消息会进 log_queue
    }
    
    
}

3、生产者:发送消息(卖家发包裹)

RabbitTemplate(Spring 提供的发消息工具)发送消息到交换机。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    // 注入 RabbitTemplate(Spring 自动帮我们创建好的发消息工具)
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 用户下单后,发送消息到 RabbitMQ
    public void createOrder(String orderInfo) {
        // 1. 主流程:扣库存、保存订单(这里简化,直接打印)
        System.out.println("主流程:订单已保存,开始扣库存...");

        // 2. 异步任务:发送短信通知(把消息发给 RabbitMQ)
        rabbitTemplate.convertAndSend(
                "order_exchange",  // 交换机名字
                "send_sms",        // 路由键(和队列绑定的路由键一致)
                orderInfo          // 消息内容(比如订单详情)
        );
        System.out.println("已发送短信通知任务到 RabbitMQ");
    }
}

4、消费者:接收消息(快递员收包裹)

@RabbitListener 注解监听队列,自动接收并处理消息。

新建消费者服务类

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SmsConsumer {
    // 监听 sms_queue 队列,有消息就自动触发这个方法
    @RabbitListener(queues = "sms_queue")
    public void sendSms(String orderInfo) {
        System.out.println("收到短信任务,正在发送...");
        // 这里调用短信接口(比如阿里云短信),实际代码需要替换
        System.out.println("已给用户发送短信:" + orderInfo);
    }
}

如果说是多线程处理就多添加一个配置concurrency = "5"

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SmsConsumer {

    // 监听 sms_queue 队列,有消息就自动触发这个方法
    @RabbitListener(queues = "sms_queue",concurrency = "5")
    public void sendSms(String orderInfo) {
        System.out.println("收到短信任务,正在发送...");
        // 这里调用短信接口(比如阿里云短信),实际代码需要替换
        System.out.println("已给用户发送短信:" + orderInfo);
    }
}

1、如何避免消息被重复处理?

如果你的场景是「多个消费者抢着处理同一条消息」(比如并行加速),需要确保 一条消息只被一个消费者处理。RabbitMQ 默认已经帮你实现了这一点!

2、原理:消息确认机制(ACK)
  • 当消费者收到消息后,RabbitMQ 会等待消费者「确认」(ACK)。
  • 如果消费者正常处理完消息并返回 ACK,RabbitMQ 会删除这条消息,不会再发给其他消费者。
  • 如果消费者处理失败(比如崩溃),RabbitMQ 会重新将消息分发给其他消费者。
3、注意事项
1. 消息幂等性(防重复处理)

如果消费者处理消息时,因为网络问题导致 ACK 未成功返回,RabbitMQ 会重新发送消息,可能导致重复处理。
解决方法

  • 消息里加唯一标识(如订单号)。
  • 处理前检查是否已处理过(比如查数据库)。
2. 消费者数量别太多!

concurrency 不是越大越好!如果消费者数量超过服务器 CPU 核心数,反而会因为线程切换浪费资源。
建议:根据业务耗时调整,比如处理耗时 1 秒的任务,消费者数量 = CPU 核心数 × 2 比较合理。

3. 手动确认消息(高级场景)

默认是自动 ACK(auto_ack=true),但如果处理消息可能失败(比如调用外部接口超时),建议用手动 ACK。

@RabbitListener(queues = "order_queue", ackMode = "MANUAL")  // 手动确认
public void processOrder(String orderInfo, Channel channel, Message message) {
    try {
        // 处理消息...
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  // 手动确认成功
    } catch (Exception e) {
        // 处理失败,重新入队(或发送到死信队列)
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

五、常见问题 & 注意事项

1. 消息丢失怎么办?

  • 开启「消息持久化」:在声明队列和交换机时,设置 durable=true(默认是 true,重启 RabbitMQ 后消息不丢失)。
  • 生产者确认:配置 spring.rabbitmq.publisher-confirm-type=correlated,确保消息成功发到交换机。
  • 消费者确认:默认是 auto_ack=true(自动确认),如果需要手动确认(比如处理消息时可能失败),可以设置 @RabbitListener(ackMode = "MANUAL"),处理完再调用 channel.basicAck()

2. 重复消费怎么办?

  • 消息里加唯一标识(如订单号),消费者处理前检查是否已处理过(比如查数据库)。

3. RabbitMQ 连不上?

  • 检查 application.properties 里的 hostportusernamepassword 是否正确。
  • 远程连接时,RabbitMQ 默认禁止 guest 用户,需要新建用户并授权(管理界面操作)。

六、总结

用 Spring Boot 整合 RabbitMQ 超简单!核心步骤就 4 步:

  1. 配连接:在 application.properties 里填 RabbitMQ 地址。
  2. 定义规则:用 @Bean 声明交换机、队列和绑定关系。
  3. 发消息:用 RabbitTemplate.convertAndSend() 发送。
  4. 收消息:用 @RabbitListener 监听队列。

适合用 Spring Boot + RabbitMQ 的场景

  • 电商、物流等需要「异步任务」的系统。
  • 高并发场景(如双11订单洪峰)。
  • 多个模块需要「松耦合」协作的系统(如订单、短信、积分模块)。