1. 什么是 RabbitMQ?
RabbitMQ 是一个开源的 消息代理(Message Broker),实现了 AMQP(Advanced Message Queuing Protocol) 协议。它充当应用程序之间的中间人,负责接收、存储和转发消息,实现 异步通信、应用解耦 和 流量削峰。
https://www.rabbitmq.com/img/tutorials/intro/rabbitmq-exchanges.webp
(生产者发送消息到 Exchange,Exchange 路由到 Queue,消费者从 Queue 获取消息)
1.1. 核心概念
(1) 消息流模型
(2) 关键组件
组件 | 作用 |
---|---|
生产者 | 发送消息的应用(如订单服务) |
消费者 | 接收消息的应用(如库存服务) |
Exchange | 接收生产者消息,根据规则路由到队列(核心路由中枢) |
Queue | 存储消息的缓冲区(FIFO),消息在此等待消费者处理 |
Binding | Exchange 和 Queue 之间的绑定规则(如路由键匹配) |
Channel | 轻量级连接(复用 TCP 连接),减少资源开销 |
Virtual Host | 虚拟隔离环境(类似命名空间),用于多租户隔 |
1.2. 消息生命周期
1.3. 高级特性
特性 | 说明 |
---|---|
消息持久化 | 消息写入磁盘(防止服务器重启丢失) |
ACK 确认机制 | 消费者处理成功后发送 ACK,否则重新入队 |
QoS 预取 | 限制消费者未确认消息数(防止消息积压) |
死信队列(DLX) | 处理失败的消息转发到特殊队列(用于重试/分析) |
TTL(生存时间) | 设置消息/队列的过期时间(自动删除超时消息) |
插件扩展 | 支持 MQTT、STOMP 等协议(如物联网场景) |
2. 核心概念与实现方案
1. 核心依赖
在pom.xml
中添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置文件
# application.yml
spring:
application:
name: rabbitmq-demo
rabbitmq:
# 基础连接配置
host: ${RABBITMQ_HOST:localhost}
port: ${RABBITMQ_PORT:5672}
username: ${RABBITMQ_USER:guest}
password: ${RABBITMQ_PASS:guest}
virtual-host: ${RABBITMQ_VHOST:/}
# 连接池配置
connection-factory:
cache:
mode: CONNECTION
size: 5
connection-timeout: 5000
# 发布者确认配置
publisher-confirm-type: correlated
publisher-returns: true
# 消费者配置
listener:
simple:
acknowledge-mode: manual
concurrency: 5-10
prefetch: 10
# 自定义应用配置
app:
rabbitmq:
max-retry-count: 3
# 死信队列配置
dead-letter:
exchange: dlx.exchange
routing-key: dlx.routing.key
# 重试配置
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
max-interval: 10000
multiplier: 2.0
# 交换机配置
exchanges:
order:
name: order.exchange
type: direct
durable: true
payment:
name: payment.exchange
type: topic
durable: true
notification:
name: notification.exchange
type: fanout
durable: true
dlx:
name: dlx.exchange
type: direct
durable: true
# 队列配置
queues:
order:
name: order.queue
durable: true
arguments:
x-dead-letter-exchange: dlx.exchange
x-dead-letter-routing-key: dlx.routing.key
payment:
name: payment.queue
durable: true
email:
name: email.queue
durable: true
sms:
name: sms.queue
durable: true
dlq:
name: dlq.queue
durable: true
# 绑定配置
bindings:
order:
queue: order.queue
exchange: order.exchange
routing-key: order.routing.key
payment:
queue: payment.queue
exchange: payment.exchange
routing-key: payment.#
email:
queue: email.queue
exchange: notification.exchange
sms:
queue: sms.queue
exchange: notification.exchange
dlq:
queue: dlq.queue
exchange: dlx.exchange
routing-key: dlx.routing.key
# 日志配置
logging:
level:
org.springframework.amqp: DEBUG
com.example.rabbitmqdemo: TRACE
# 服务端口
server:
port: 8080
3. RabbitMQ自定义配置类
package com.example.rabbitmqdemo.config.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
@Data
@Builder
@Component
@NoArgsConstructor
@AllArgsConstructor
@ConfigurationProperties(prefix = "app.rabbitmq")
public class RabbitMQCustomProperties {
// 最大重试次数
private int maxRetryCount = 3;
// 死信交换机配置
private DeadLetterConfig deadLetter;
// 队列配置
private Map<String, QueueConfig> queues;
// 交换机配置
private Map<String, ExchangeConfig> exchanges;
// 绑定配置
private Map<String, BindingConfig> bindings;
// 重试配置
private RetryConfig retry;
// 内部配置类
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class DeadLetterConfig {
private String exchange;
private String routingKey;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class QueueConfig {
private String name;
private boolean durable;
private Map<String, Object> arguments;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class ExchangeConfig {
private String name;
private String type;
private boolean durable;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class BindingConfig {
private String queue;
private String exchange;
private String routingKey;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class RetryConfig {
private boolean enabled;
private int maxAttempts;
private long initialInterval;
private long maxInterval;
private double multiplier;
}
}
4. 应用配置绑定类
package com.example.rabbitmqdemo.config;
import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties(RabbitMQCustomProperties.class)
public class AppConfig {
// 启用配置属性绑定
}
5. RabbitMQ配置类(使用配置属性)
package com.example.rabbitmqdemo.config;
import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
private final RabbitMQCustomProperties customProperties;
@Autowired
public RabbitMQConfig(RabbitMQCustomProperties customProperties) {
this.customProperties = customProperties;
}
// 创建交换机
@Bean
public DirectExchange orderExchange() {
RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("order");
return new DirectExchange(
exchange.getName(),
exchange.isDurable(),
false
);
}
@Bean
public TopicExchange paymentExchange() {
RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("payment");
return new TopicExchange(
exchange.getName(),
exchange.isDurable(),
false
);
}
@Bean
public FanoutExchange notificationExchange() {
RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("notification");
return new FanoutExchange(
exchange.getName(),
exchange.isDurable(),
false
);
}
@Bean
public DirectExchange dlxExchange() {
RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("dlx");
return new DirectExchange(
exchange.getName(),
exchange.isDurable(),
false
);
}
// 创建队列
@Bean
public Queue orderQueue() {
RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("order");
return QueueBuilder.durable(queue.getName())
.withArguments(queue.getArguments())
.build();
}
@Bean
public Queue paymentQueue() {
RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("payment");
return QueueBuilder.durable(queue.getName()).build();
}
@Bean
public Queue emailQueue() {
RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("email");
return QueueBuilder.durable(queue.getName()).build();
}
@Bean
public Queue smsQueue() {
RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("sms");
return QueueBuilder.durable(queue.getName()).build();
}
@Bean
public Queue dlqQueue() {
RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("dlq");
return QueueBuilder.durable(queue.getName()).build();
}
// 创建绑定
@Bean
public Binding orderBinding() {
RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("order");
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with(binding.getRoutingKey());
}
@Bean
public Binding paymentBinding() {
RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("payment");
return BindingBuilder.bind(paymentQueue())
.to(paymentExchange())
.with(binding.getRoutingKey());
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue())
.to(notificationExchange());
}
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue())
.to(notificationExchange());
}
@Bean
public Binding dlqBinding() {
RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("dlq");
return BindingBuilder.bind(dlqQueue())
.to(dlxExchange())
.with(binding.getRoutingKey());
}
}
6. RabbitMQ工具类(使用配置属性)
package com.example.rabbitmqdemo.util;
import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class RabbitMQUtil {
private final RabbitTemplate rabbitTemplate;
private final RabbitMQCustomProperties customProperties;
@Autowired
public RabbitMQUtil(RabbitTemplate rabbitTemplate,
RabbitMQCustomProperties customProperties) {
this.rabbitTemplate = rabbitTemplate;
this.customProperties = customProperties;
}
/**
* 发送普通消息
* @param exchange 交换机名称
* @param routingKey 路由键
* @param message 消息对象
*/
public void sendMessage(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
setCommonMessageProperties(msg);
return msg;
});
}
/**
* 发送延迟消息
* @param exchange 交换机名称
* @param routingKey 路由键
* @param message 消息对象
* @param delay 延迟时间(毫秒)
*/
public void sendDelayedMessage(String exchange, String routingKey, Object message, int delay) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
setCommonMessageProperties(msg);
msg.getMessageProperties().setDelay(delay);
return msg;
});
}
/**
* 发送广播消息
* @param message 消息对象
*/
public void sendBroadcastMessage(Object message) {
String exchangeName = customProperties.getExchanges().get("notification").getName();
rabbitTemplate.convertAndSend(
exchangeName,
"",
message,
msg -> {
setCommonMessageProperties(msg);
return msg;
}
);
}
/**
* 发送死信消息
* @param message 消息对象
*/
public void sendDeadLetterMessage(Object message) {
RabbitMQCustomProperties.DeadLetterConfig dlConfig = customProperties.getDeadLetter();
rabbitTemplate.convertAndSend(
dlConfig.getExchange(),
dlConfig.getRoutingKey(),
message,
msg -> {
setCommonMessageProperties(msg);
return msg;
}
);
}
// 设置通用消息属性
private void setCommonMessageProperties(Message message) {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
}
}
7. 消息生产者服务(使用配置属性)
package com.example.rabbitmqdemo.service;
import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import com.example.rabbitmqdemo.dto.OrderDTO;
import com.example.rabbitmqdemo.util.RabbitMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
private final RabbitMQUtil rabbitMQUtil;
private final RabbitMQCustomProperties customProperties;
@Autowired
public MessageProducer(RabbitMQUtil rabbitMQUtil,
RabbitMQCustomProperties customProperties) {
this.rabbitMQUtil = rabbitMQUtil;
this.customProperties = customProperties;
}
/**
* 发送订单创建消息
*/
public void sendOrderCreatedMessage(OrderDTO order) {
RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("order");
rabbitMQUtil.sendMessage(
binding.getExchange(),
binding.getRoutingKey(),
order
);
System.out.println("订单创建消息已发送: " + order.getOrderId());
}
/**
* 发送支付成功消息
*/
public void sendPaymentSuccessMessage(String paymentInfo) {
RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("payment");
rabbitMQUtil.sendMessage(
binding.getExchange(),
binding.getRoutingKey(),
paymentInfo
);
System.out.println("支付成功消息已发送: " + paymentInfo);
}
/**
* 发送系统通知消息(广播)
*/
public void sendSystemNotification(String notification) {
rabbitMQUtil.sendBroadcastMessage(notification);
System.out.println("系统通知已广播: " + notification);
}
/**
* 发送延迟订单取消消息
*/
public void sendDelayedOrderCancelMessage(OrderDTO order, int delayMillis) {
RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("order");
rabbitMQUtil.sendDelayedMessage(
binding.getExchange(),
binding.getRoutingKey(),
order,
delayMillis
);
System.out.println("订单取消延迟消息已发送: " + order.getOrderId() + ", 延迟: " + delayMillis + "ms");
}
}
8. 消息消费者服务(使用配置属性)
package com.example.rabbitmqdemo.service;
import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import com.example.rabbitmqdemo.dto.OrderDTO;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class MessageConsumer {
private final RabbitMQCustomProperties customProperties;
@Autowired
public MessageConsumer(RabbitMQCustomProperties customProperties) {
this.customProperties = customProperties;
}
/**
* 处理订单创建消息
*/
@RabbitListener(queues = "#{@orderQueue.name}")
public void handleOrderMessage(OrderDTO order, Message message, Channel channel)
throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
try {
System.out.println("收到订单消息: " + messageId);
System.out.println("订单ID: " + order.getOrderId() + ", 金额: " + order.getAmount());
// 模拟业务处理
processOrder(order);
// 手动ACK确认
channel.basicAck(deliveryTag, false);
System.out.println("订单处理完成: " + order.getOrderId());
} catch (BusinessException e) {
// 业务异常,不需要重试
System.err.println("订单处理业务异常: " + e.getMessage());
channel.basicReject(deliveryTag, false);
} catch (Exception e) {
// 其他异常,根据重试次数决定是否重新入队
System.err.println("订单处理异常: " + e.getMessage());
if (shouldRetry(message)) {
// 重新入队进行重试
channel.basicNack(deliveryTag, false, true);
System.out.println("订单消息重新入队: " + messageId);
} else {
// 超过最大重试次数,拒绝并不重新入队
channel.basicNack(deliveryTag, false, false);
System.out.println("订单消息进入死信队列: " + messageId);
}
}
}
// 判断是否应该重试
private boolean shouldRetry(Message message) {
Integer retryCount = message.getMessageProperties().getHeader("x-retry-count");
if (retryCount == null) {
retryCount = 0;
}
// 使用配置的最大重试次数
return retryCount < customProperties.getMaxRetryCount();
}
// 模拟订单处理逻辑
private void processOrder(OrderDTO order) throws BusinessException {
// 实际业务逻辑
if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
throw new BusinessException("订单金额异常");
}
// 其他处理...
}
// 自定义业务异常
private static class BusinessException extends Exception {
public BusinessException(String message) {
super(message);
}
}
}
9. 消息控制器(完整调用示例)
package com.example.rabbitmqdemo.controller;
import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import com.example.rabbitmqdemo.dto.OrderDTO;
import com.example.rabbitmqdemo.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.math.BigDecimal;
@RestController
@RequestMapping("/api/messages")
public class MessageController {
private final MessageProducer messageProducer;
private final RabbitMQCustomProperties customProperties;
@Autowired
public MessageController(MessageProducer messageProducer,
RabbitMQCustomProperties customProperties) {
this.messageProducer = messageProducer;
this.customProperties = customProperties;
}
/**
* 发送订单创建消息
*/
@PostMapping("/order")
public String sendOrderMessage(@RequestBody OrderDTO order) {
messageProducer.sendOrderCreatedMessage(order);
return "订单消息已发送";
}
/**
* 发送支付成功消息
*/
@PostMapping("/payment/{paymentId}")
public String sendPaymentMessage(@PathVariable String paymentId) {
messageProducer.sendPaymentSuccessMessage("支付ID: " + paymentId);
return "支付消息已发送";
}
/**
* 发送系统通知
*/
@PostMapping("/notification")
public String sendNotification(@RequestParam String content) {
messageProducer.sendSystemNotification(content);
return "系统通知已发送";
}
/**
* 发送延迟订单取消消息
*/
@PostMapping("/delayed-order")
public String sendDelayedOrder(@RequestBody OrderDTO order,
@RequestParam int delaySeconds) {
// 使用配置中的重试间隔作为基础延迟单位
long baseDelay = customProperties.getRetry().getInitialInterval();
long delayMillis = delaySeconds * 1000 + baseDelay;
messageProducer.sendDelayedOrderCancelMessage(order, (int) delayMillis);
return "延迟订单消息已发送,延迟时间: " + delaySeconds + "秒";
}
}