📋目录
🎯 RabbitMQ基础概念
什么是RabbitMQ?
RabbitMQ是一个开源的消息队列中间件,就像一个邮局一样:
- 生产者(Producer):发送邮件的人
- 队列(Queue):邮箱,存放邮件的地方
- 消费者(Consumer):收邮件的人
- 交换机(Exchange):邮局的分拣中心,决定邮件发到哪个邮箱
核心概念详解
1. 队列(Queue)
队列就像一个容器,消息在这里排队等待被处理
[消息1] [消息2] [消息3] → 消费者取走处理
2. 交换机(Exchange)
交换机有4种类型:
- Direct(直连):根据路由键精确匹配
- Fanout(扇出):广播到所有绑定的队列
- Topic(主题):根据路由键模式匹配
- Headers:根据消息头匹配(较少使用)
3. 绑定(Binding)
绑定是交换机和队列之间的连接规则
🛠️ 环境搭建
1. 安装RabbitMQ
Windows安装
# 使用Chocolatey安装
choco install rabbitmq
# 启动RabbitMQ服务
rabbitmq-server
Docker安装(推荐)
# 拉取并运行RabbitMQ容器
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3-management
2. 访问管理界面
- 地址:http://localhost:15672
- 用户名:admin
- 密码:123456
🚀 SpringBoot集成RabbitMQ
1. 添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
2. 配置文件
# application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: /
# 发送者开启确认模式
publisher-confirms: true
# 发送者开启return确认机制
publisher-returns: true
# 设置消费者手动确认
listener:
simple:
acknowledge-mode: manual
# 限制每次只处理一个消息
prefetch: 1
⚙️ 基础配置
1. RabbitMQ配置类
@Configuration
@EnableRabbit
public class RabbitConfig {
/**
* 创建RabbitTemplate,用于发送消息
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 设置消息转换器(可选)
template.setMessageConverter(new Jackson2JsonMessageConverter());
// 设置发送确认回调
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败:" + cause);
}
});
return template;
}
}
📨 简单队列模式
这是最基本的模式:一个生产者发送消息到队列,一个消费者接收消息。
1. 队列配置
@Configuration
public class SimpleQueueConfig {
public static final String SIMPLE_QUEUE = "simple.queue";
/**
* 声明简单队列
*/
@Bean
public Queue simpleQueue() {
return QueueBuilder.durable(SIMPLE_QUEUE).build();
}
}
2. 生产者
@RestController
@RequestMapping("/simple")
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/send")
public String sendMessage(@RequestParam String message) {
// 发送消息到队列
rabbitTemplate.convertAndSend(SimpleQueueConfig.SIMPLE_QUEUE, message);
return "消息发送成功:" + message;
}
}
3. 消费者
@Component
public class SimpleConsumer {
/**
* 监听简单队列
*/
@RabbitListener(queues = SimpleQueueConfig.SIMPLE_QUEUE)
public void receiveMessage(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
System.out.println("接收到消息:" + message);
// 模拟业务处理
Thread.sleep(1000);
// 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
// 处理失败,拒绝消息并重新入队
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
👥 工作队列模式
多个消费者共同处理一个队列中的消息,实现任务分发。
1. 配置类
@Configuration
public class WorkQueueConfig {
public static final String WORK_QUEUE = "work.queue";
@Bean
public Queue workQueue() {
return QueueBuilder.durable(WORK_QUEUE).build();
}
}
2. 生产者
@RestController
@RequestMapping("/work")
public class WorkProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/send")
public String sendWork(@RequestParam String task) {
for (int i = 1; i <= 10; i++) {
String message = task + " - 任务" + i;
rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, message);
}
return "批量任务发送完成";
}
}
3. 消费者
@Component
public class WorkConsumer {
/**
* 工作者1
*/
@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
public void worker1(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
System.out.println("工作者1处理:" + message);
Thread.sleep(2000); // 模拟较慢的处理
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
handleError(channel, deliveryTag);
}
}