RabbitMQ 核心原理与Spring Boot整合实战
一、RabbitMQ 核心架构
1.1 AMQP 协议模型
组件 |
作用描述 |
Producer |
消息生产者,发送消息到Exchange |
Consumer |
消息消费者,从队列获取消息处理 |
Exchange |
接收消息并根据规则路由到队列 |
Queue |
存储消息的缓冲区 |
Binding |
定义Exchange和Queue之间的关系规则 |
1.2 交换机类型对比
类型 |
路由规则 |
典型应用场景 |
Direct |
精确匹配Routing Key |
点对点精确路由 |
Fanout |
广播到所有绑定队列 |
发布/订阅模式 |
Topic |
通配符匹配Routing Key |
多条件复杂路由 |
Headers |
根据Header属性匹配 |
非路由键匹配场景 |
二、Spring Boot 整合配置
2.1 基础依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 配置文件示例
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
prefetch: 10
concurrency: 5
max-concurrency: 10
三、消息生产消费实战
3.1 生产者配置模板
@Configuration
public class RabbitProducerConfig {
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange");
}
@Bean
public Queue orderQueue() {
return new Queue("order.queue", true);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("order.routingKey");
}
}
@Component
public class OrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendOrder(Order order) {
rabbitTemplate.convertAndSend(
"order.exchange",
"order.routingKey",
order,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
);
}
}
3.2 消费者监听实现
@Component
public class OrderConsumer {
@RabbitListener(queues = "order.queue")
@RabbitHandler
public void processOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
handleOrder(order);
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true);
}
}
@RabbitListener(queues = "dead.letter.queue")
public void handleDeadLetter(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
四、高级消息模式
4.1 延迟消息实现
@Bean
public DirectExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new DirectExchange("delay.exchange", true, false, args);
}
public void sendDelayMessage(String message, int delayTime) {
rabbitTemplate.convertAndSend(
"delay.exchange",
"delay.routingKey",
message,
msg -> {
msg.getMessageProperties().setHeader("x-delay", delayTime);
return msg;
}
);
}
4.2 消息可靠性保证
五、监控与维护
5.1 常用监控指标
指标名称 |
描述 |
健康阈值 |
queue_messages |
队列中待处理消息数 |
<1000 |
message_ack_rate |
消息确认率 |
>99% |
consumer_utilization |
消费者利用率 |
40%-80% |
deliver_get |
每秒投递消息数 |
根据硬件配置调整 |
5.2 管理命令示例
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_consumers
rabbitmqctl purge_queue order.queue
六、最佳实践指南
6.1 消息设计规范
- 消息体大小:单条消息建议不超过1MB
- 序列化格式:优先使用JSON格式
- 幂等处理:消费端需要保证重复消息处理安全
- 过期时间:设置合理的TTL(Time-To-Live)
6.2 集群配置建议
扩展学习: