Spring Boot集成RabbitMQ终极指南:从配置到高级消息处理

发布于:2025-07-27 ⋅ 阅读:(13) ⋅ 点赞:(0)

1. 什么是 RabbitMQ?

RabbitMQ 是一个开源的 消息代理(Message Broker),实现了 AMQP(Advanced Message Queuing Protocol) 协议。它充当应用程序之间的中间人,负责接收、存储和转发消息,实现 异步通信应用解耦 和 流量削峰

https://www.rabbitmq.com/img/tutorials/intro/rabbitmq-exchanges.webp
(生产者发送消息到 Exchange,Exchange 路由到 Queue,消费者从 Queue 获取消息)


1.1. 核心概念
(1) 消息流模型
(2) 关键组件
组件 作用
生产者 发送消息的应用(如订单服务)
消费者 接收消息的应用(如库存服务)
Exchange 接收生产者消息,根据规则路由到队列(核心路由中枢)
Queue 存储消息的缓冲区(FIFO),消息在此等待消费者处理
Binding Exchange 和 Queue 之间的绑定规则(如路由键匹配)
Channel 轻量级连接(复用 TCP 连接),减少资源开销
Virtual Host 虚拟隔离环境(类似命名空间),用于多租户隔
 1.2. 消息生命周期


1.3. 高级特性
特性 说明
消息持久化 消息写入磁盘(防止服务器重启丢失)
ACK 确认机制 消费者处理成功后发送 ACK,否则重新入队
QoS 预取 限制消费者未确认消息数(防止消息积压)
死信队列(DLX) 处理失败的消息转发到特殊队列(用于重试/分析)
TTL(生存时间) 设置消息/队列的过期时间(自动删除超时消息)
插件扩展 支持 MQTT、STOMP 等协议(如物联网场景)

2核心概念与实现方案

1. 核心依赖

pom.xml中添加依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置文件

# application.yml
spring:
  application:
    name: rabbitmq-demo
    
  rabbitmq:
    # 基础连接配置
    host: ${RABBITMQ_HOST:localhost}
    port: ${RABBITMQ_PORT:5672}
    username: ${RABBITMQ_USER:guest}
    password: ${RABBITMQ_PASS:guest}
    virtual-host: ${RABBITMQ_VHOST:/}
    
    # 连接池配置
    connection-factory:
      cache:
        mode: CONNECTION
        size: 5
      connection-timeout: 5000
    
    # 发布者确认配置
    publisher-confirm-type: correlated
    publisher-returns: true
    
    # 消费者配置
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 5-10
        prefetch: 10

# 自定义应用配置
app:
  rabbitmq:
    max-retry-count: 3
    
    # 死信队列配置
    dead-letter:
      exchange: dlx.exchange
      routing-key: dlx.routing.key
    
    # 重试配置
    retry:
      enabled: true
      max-attempts: 3
      initial-interval: 1000
      max-interval: 10000
      multiplier: 2.0
    
    # 交换机配置
    exchanges:
      order:
        name: order.exchange
        type: direct
        durable: true
      payment:
        name: payment.exchange
        type: topic
        durable: true
      notification:
        name: notification.exchange
        type: fanout
        durable: true
      dlx:
        name: dlx.exchange
        type: direct
        durable: true
    
    # 队列配置
    queues:
      order:
        name: order.queue
        durable: true
        arguments:
          x-dead-letter-exchange: dlx.exchange
          x-dead-letter-routing-key: dlx.routing.key
      payment:
        name: payment.queue
        durable: true
      email:
        name: email.queue
        durable: true
      sms:
        name: sms.queue
        durable: true
      dlq:
        name: dlq.queue
        durable: true
    
    # 绑定配置
    bindings:
      order:
        queue: order.queue
        exchange: order.exchange
        routing-key: order.routing.key
      payment:
        queue: payment.queue
        exchange: payment.exchange
        routing-key: payment.#
      email:
        queue: email.queue
        exchange: notification.exchange
      sms:
        queue: sms.queue
        exchange: notification.exchange
      dlq:
        queue: dlq.queue
        exchange: dlx.exchange
        routing-key: dlx.routing.key

# 日志配置
logging:
  level:
    org.springframework.amqp: DEBUG
    com.example.rabbitmqdemo: TRACE

# 服务端口
server:
  port: 8080

3. RabbitMQ自定义配置类

package com.example.rabbitmqdemo.config.properties;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.Map;

@Data
@Builder
@Component
@NoArgsConstructor
@AllArgsConstructor
@ConfigurationProperties(prefix = "app.rabbitmq")
public class RabbitMQCustomProperties {

    // 最大重试次数
    private int maxRetryCount = 3;
    
    // 死信交换机配置
    private DeadLetterConfig deadLetter;
    
    // 队列配置
    private Map<String, QueueConfig> queues;
    
    // 交换机配置
    private Map<String, ExchangeConfig> exchanges;
    
    // 绑定配置
    private Map<String, BindingConfig> bindings;
    
    // 重试配置
    private RetryConfig retry;

    // 内部配置类
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class DeadLetterConfig {
        private String exchange;
        private String routingKey;
    }

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class QueueConfig {
        private String name;
        private boolean durable;
        private Map<String, Object> arguments;
    }

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class ExchangeConfig {
        private String name;
        private String type;
        private boolean durable;
    }

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class BindingConfig {
        private String queue;
        private String exchange;
        private String routingKey;
    }

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class RetryConfig {
        private boolean enabled;
        private int maxAttempts;
        private long initialInterval;
        private long maxInterval;
        private double multiplier;
    }
}

4. 应用配置绑定类

package com.example.rabbitmqdemo.config;

import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties(RabbitMQCustomProperties.class)
public class AppConfig {
    // 启用配置属性绑定
}

5. RabbitMQ配置类(使用配置属性)

package com.example.rabbitmqdemo.config;

import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    private final RabbitMQCustomProperties customProperties;

    @Autowired
    public RabbitMQConfig(RabbitMQCustomProperties customProperties) {
        this.customProperties = customProperties;
    }

    // 创建交换机
    @Bean
    public DirectExchange orderExchange() {
        RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("order");
        return new DirectExchange(
            exchange.getName(),
            exchange.isDurable(),
            false
        );
    }

    @Bean
    public TopicExchange paymentExchange() {
        RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("payment");
        return new TopicExchange(
            exchange.getName(),
            exchange.isDurable(),
            false
        );
    }

    @Bean
    public FanoutExchange notificationExchange() {
        RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("notification");
        return new FanoutExchange(
            exchange.getName(),
            exchange.isDurable(),
            false
        );
    }

    @Bean
    public DirectExchange dlxExchange() {
        RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("dlx");
        return new DirectExchange(
            exchange.getName(),
            exchange.isDurable(),
            false
        );
    }

    // 创建队列
    @Bean
    public Queue orderQueue() {
        RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("order");
        return QueueBuilder.durable(queue.getName())
                .withArguments(queue.getArguments())
                .build();
    }

    @Bean
    public Queue paymentQueue() {
        RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("payment");
        return QueueBuilder.durable(queue.getName()).build();
    }

    @Bean
    public Queue emailQueue() {
        RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("email");
        return QueueBuilder.durable(queue.getName()).build();
    }

    @Bean
    public Queue smsQueue() {
        RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("sms");
        return QueueBuilder.durable(queue.getName()).build();
    }

    @Bean
    public Queue dlqQueue() {
        RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("dlq");
        return QueueBuilder.durable(queue.getName()).build();
    }

    // 创建绑定
    @Bean
    public Binding orderBinding() {
        RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("order");
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(binding.getRoutingKey());
    }

    @Bean
    public Binding paymentBinding() {
        RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("payment");
        return BindingBuilder.bind(paymentQueue())
                .to(paymentExchange())
                .with(binding.getRoutingKey());
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue())
                .to(notificationExchange());
    }

    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue())
                .to(notificationExchange());
    }

    @Bean
    public Binding dlqBinding() {
        RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("dlq");
        return BindingBuilder.bind(dlqQueue())
                .to(dlxExchange())
                .with(binding.getRoutingKey());
    }
}

 6. RabbitMQ工具类(使用配置属性)

package com.example.rabbitmqdemo.util;

import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class RabbitMQUtil {

    private final RabbitTemplate rabbitTemplate;
    private final RabbitMQCustomProperties customProperties;

    @Autowired
    public RabbitMQUtil(RabbitTemplate rabbitTemplate, 
                        RabbitMQCustomProperties customProperties) {
        this.rabbitTemplate = rabbitTemplate;
        this.customProperties = customProperties;
    }

    /**
     * 发送普通消息
     * @param exchange 交换机名称
     * @param routingKey 路由键
     * @param message 消息对象
     */
    public void sendMessage(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            setCommonMessageProperties(msg);
            return msg;
        });
    }

    /**
     * 发送延迟消息
     * @param exchange 交换机名称
     * @param routingKey 路由键
     * @param message 消息对象
     * @param delay 延迟时间(毫秒)
     */
    public void sendDelayedMessage(String exchange, String routingKey, Object message, int delay) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            setCommonMessageProperties(msg);
            msg.getMessageProperties().setDelay(delay);
            return msg;
        });
    }

    /**
     * 发送广播消息
     * @param message 消息对象
     */
    public void sendBroadcastMessage(Object message) {
        String exchangeName = customProperties.getExchanges().get("notification").getName();
        rabbitTemplate.convertAndSend(
            exchangeName, 
            "", 
            message,
            msg -> {
                setCommonMessageProperties(msg);
                return msg;
            }
        );
    }

    /**
     * 发送死信消息
     * @param message 消息对象
     */
    public void sendDeadLetterMessage(Object message) {
        RabbitMQCustomProperties.DeadLetterConfig dlConfig = customProperties.getDeadLetter();
        rabbitTemplate.convertAndSend(
            dlConfig.getExchange(), 
            dlConfig.getRoutingKey(), 
            message,
            msg -> {
                setCommonMessageProperties(msg);
                return msg;
            }
        );
    }

    // 设置通用消息属性
    private void setCommonMessageProperties(Message message) {
        message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    }
}

7. 消息生产者服务(使用配置属性)

package com.example.rabbitmqdemo.service;

import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import com.example.rabbitmqdemo.dto.OrderDTO;
import com.example.rabbitmqdemo.util.RabbitMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    private final RabbitMQUtil rabbitMQUtil;
    private final RabbitMQCustomProperties customProperties;

    @Autowired
    public MessageProducer(RabbitMQUtil rabbitMQUtil, 
                           RabbitMQCustomProperties customProperties) {
        this.rabbitMQUtil = rabbitMQUtil;
        this.customProperties = customProperties;
    }

    /**
     * 发送订单创建消息
     */
    public void sendOrderCreatedMessage(OrderDTO order) {
        RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("order");
        
        rabbitMQUtil.sendMessage(
            binding.getExchange(), 
            binding.getRoutingKey(), 
            order
        );
        System.out.println("订单创建消息已发送: " + order.getOrderId());
    }

    /**
     * 发送支付成功消息
     */
    public void sendPaymentSuccessMessage(String paymentInfo) {
        RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("payment");
        
        rabbitMQUtil.sendMessage(
            binding.getExchange(), 
            binding.getRoutingKey(), 
            paymentInfo
        );
        System.out.println("支付成功消息已发送: " + paymentInfo);
    }

    /**
     * 发送系统通知消息(广播)
     */
    public void sendSystemNotification(String notification) {
        rabbitMQUtil.sendBroadcastMessage(notification);
        System.out.println("系统通知已广播: " + notification);
    }

    /**
     * 发送延迟订单取消消息
     */
    public void sendDelayedOrderCancelMessage(OrderDTO order, int delayMillis) {
        RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("order");
        
        rabbitMQUtil.sendDelayedMessage(
            binding.getExchange(), 
            binding.getRoutingKey(), 
            order, 
            delayMillis
        );
        System.out.println("订单取消延迟消息已发送: " + order.getOrderId() + ", 延迟: " + delayMillis + "ms");
    }
}

8. 消息消费者服务(使用配置属性)

package com.example.rabbitmqdemo.service;

import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import com.example.rabbitmqdemo.dto.OrderDTO;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;

@Service
public class MessageConsumer {

    private final RabbitMQCustomProperties customProperties;

    @Autowired
    public MessageConsumer(RabbitMQCustomProperties customProperties) {
        this.customProperties = customProperties;
    }

    /**
     * 处理订单创建消息
     */
    @RabbitListener(queues = "#{@orderQueue.name}")
    public void handleOrderMessage(OrderDTO order, Message message, Channel channel) 
            throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();
        
        try {
            System.out.println("收到订单消息: " + messageId);
            System.out.println("订单ID: " + order.getOrderId() + ", 金额: " + order.getAmount());
            
            // 模拟业务处理
            processOrder(order);
            
            // 手动ACK确认
            channel.basicAck(deliveryTag, false);
            System.out.println("订单处理完成: " + order.getOrderId());
            
        } catch (BusinessException e) {
            // 业务异常,不需要重试
            System.err.println("订单处理业务异常: " + e.getMessage());
            channel.basicReject(deliveryTag, false);
            
        } catch (Exception e) {
            // 其他异常,根据重试次数决定是否重新入队
            System.err.println("订单处理异常: " + e.getMessage());
            
            if (shouldRetry(message)) {
                // 重新入队进行重试
                channel.basicNack(deliveryTag, false, true);
                System.out.println("订单消息重新入队: " + messageId);
            } else {
                // 超过最大重试次数,拒绝并不重新入队
                channel.basicNack(deliveryTag, false, false);
                System.out.println("订单消息进入死信队列: " + messageId);
            }
        }
    }

    // 判断是否应该重试
    private boolean shouldRetry(Message message) {
        Integer retryCount = message.getMessageProperties().getHeader("x-retry-count");
        if (retryCount == null) {
            retryCount = 0;
        }
        
        // 使用配置的最大重试次数
        return retryCount < customProperties.getMaxRetryCount();
    }
    
    // 模拟订单处理逻辑
    private void processOrder(OrderDTO order) throws BusinessException {
        // 实际业务逻辑
        if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
            throw new BusinessException("订单金额异常");
        }
        // 其他处理...
    }
    
    // 自定义业务异常
    private static class BusinessException extends Exception {
        public BusinessException(String message) {
            super(message);
        }
    }
}

9. 消息控制器(完整调用示例)

package com.example.rabbitmqdemo.controller;

import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import com.example.rabbitmqdemo.dto.OrderDTO;
import com.example.rabbitmqdemo.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.math.BigDecimal;

@RestController
@RequestMapping("/api/messages")
public class MessageController {

    private final MessageProducer messageProducer;
    private final RabbitMQCustomProperties customProperties;

    @Autowired
    public MessageController(MessageProducer messageProducer, 
                             RabbitMQCustomProperties customProperties) {
        this.messageProducer = messageProducer;
        this.customProperties = customProperties;
    }

    /**
     * 发送订单创建消息
     */
    @PostMapping("/order")
    public String sendOrderMessage(@RequestBody OrderDTO order) {
        messageProducer.sendOrderCreatedMessage(order);
        return "订单消息已发送";
    }

    /**
     * 发送支付成功消息
     */
    @PostMapping("/payment/{paymentId}")
    public String sendPaymentMessage(@PathVariable String paymentId) {
        messageProducer.sendPaymentSuccessMessage("支付ID: " + paymentId);
        return "支付消息已发送";
    }

    /**
     * 发送系统通知
     */
    @PostMapping("/notification")
    public String sendNotification(@RequestParam String content) {
        messageProducer.sendSystemNotification(content);
        return "系统通知已发送";
    }

    /**
     * 发送延迟订单取消消息
     */
    @PostMapping("/delayed-order")
    public String sendDelayedOrder(@RequestBody OrderDTO order, 
                                  @RequestParam int delaySeconds) {
        // 使用配置中的重试间隔作为基础延迟单位
        long baseDelay = customProperties.getRetry().getInitialInterval();
        long delayMillis = delaySeconds * 1000 + baseDelay;
        
        messageProducer.sendDelayedOrderCancelMessage(order, (int) delayMillis);
        return "延迟订单消息已发送,延迟时间: " + delaySeconds + "秒";
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到