RabbitMQ 核心原理与Spring Boot整合实战

发布于:2025-05-27 ⋅ 阅读:(46) ⋅ 点赞:(0)

RabbitMQ 核心原理与Spring Boot整合实战

一、RabbitMQ 核心架构

1.1 AMQP 协议模型

组件 作用描述
Producer 消息生产者,发送消息到Exchange
Consumer 消息消费者,从队列获取消息处理
Exchange 接收消息并根据规则路由到队列
Queue 存储消息的缓冲区
Binding 定义Exchange和Queue之间的关系规则

1.2 交换机类型对比

类型 路由规则 典型应用场景
Direct 精确匹配Routing Key 点对点精确路由
Fanout 广播到所有绑定队列 发布/订阅模式
Topic 通配符匹配Routing Key 多条件复杂路由
Headers 根据Header属性匹配 非路由键匹配场景

二、Spring Boot 整合配置

2.1 基础依赖配置

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 配置文件示例

# application.yml
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 10 # 消费者预取数量
        concurrency: 5 # 最小消费者数
        max-concurrency: 10 # 最大消费者数

三、消息生产消费实战

3.1 生产者配置模板

@Configuration
public class RabbitProducerConfig {

    // 声明直连交换机
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order.exchange");
    }

    // 声明持久化队列
    @Bean
    public Queue orderQueue() {
        return new Queue("order.queue", true);
    }

    // 绑定队列到交换机
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
            .to(orderExchange())
            .with("order.routingKey");
    }
}

@Component
public class OrderProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendOrder(Order order) {
        rabbitTemplate.convertAndSend(
            "order.exchange",
            "order.routingKey",
            order,
            message -> {
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            }
        );
    }
}

3.2 消费者监听实现

@Component
public class OrderConsumer {

    // 注解式监听方法
    @RabbitListener(queues = "order.queue")
    @RabbitHandler
    public void processOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            // 业务处理逻辑
            handleOrder(order);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            channel.basicNack(tag, false, true); // 重新入队
        }
    }

    // 手动确认示例
    @RabbitListener(queues = "dead.letter.queue")
    public void handleDeadLetter(Message message, Channel channel) throws IOException {
        // 处理死信消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

四、高级消息模式

4.1 延迟消息实现

// 配置死信交换机
@Bean
public DirectExchange delayExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new DirectExchange("delay.exchange", true, false, args);
}

// 发送延迟消息
public void sendDelayMessage(String message, int delayTime) {
    rabbitTemplate.convertAndSend(
        "delay.exchange",
        "delay.routingKey",
        message,
        msg -> {
            msg.getMessageProperties().setHeader("x-delay", delayTime);
            return msg;
        }
    );
}

4.2 消息可靠性保证

Producer Broker Disk Consumer 发送消息(Confirm模式) 确认收到 持久化消息 投递消息 手动ACK 删除消息 Producer Broker Disk Consumer

五、监控与维护

5.1 常用监控指标

指标名称 描述 健康阈值
queue_messages 队列中待处理消息数 <1000
message_ack_rate 消息确认率 >99%
consumer_utilization 消费者利用率 40%-80%
deliver_get 每秒投递消息数 根据硬件配置调整

5.2 管理命令示例

# 查看队列状态
rabbitmqctl list_queues name messages_ready messages_unacknowledged

# 查看消费者信息
rabbitmqctl list_consumers

# 清除队列
rabbitmqctl purge_queue order.queue

六、最佳实践指南

6.1 消息设计规范

  1. 消息体大小:单条消息建议不超过1MB
  2. 序列化格式:优先使用JSON格式
  3. 幂等处理:消费端需要保证重复消息处理安全
  4. 过期时间:设置合理的TTL(Time-To-Live)

6.2 集群配置建议

客户端
HAProxy
节点1
节点2
节点3
镜像队列

扩展学习