在现代分布式系统和微服务架构中,消息队列(Message Queue)是不可或缺的组件。它能实现服务间的异步通信、应用解耦和流量削峰。RabbitMQ 作为最受欢迎的消息队列中间件之一,以其稳定性、可靠性和灵活的路由模式而著称。
本文将带领你使用 Spring Boot (spring-boot-starter-amqp
),通过清晰的代码实例和详尽的解释,深入理解并通过代码demo实践 RabbitMQ 的四种核心工作模式:
- Work Queue (工作队列模式)
- Direct Exchange (直连交换机模式)
- Fanout Exchange (扇出/广播交换机模式)
- Topic Exchange (主题交换机模式)
java原生的操作方式请看这边
核心概念速览
在深入代码之前,我们先快速了解几个 RabbitMQ 的核心概念:
- Producer (生产者):发送消息的一方。
- Consumer (消费者):接收并处理消息的一方。
- Queue (队列):存储消息的缓冲区,位于内存或磁盘。
- Exchange (交换机):接收来自生产者的消息,并根据特定规则(类型和路由键)将消息路由到一个或多个队列。
- Binding (绑定):建立 Exchange 和 Queue 之间的关联关系。
- Routing Key (路由键):生产者在发送消息给 Exchange 时指定的“地址”或“标签”,Exchange 根据它来决定消息的去向。
模式一:Work Queue (工作队列)
这是最简单的模式,用于将一个耗时任务分发给多个消费者并行处理,从而提高整体处理效率。
特点:一个生产者将消息发送到一个特定队列,多个消费者共同监听这同一个队列。消息会以轮询(Round-Robin)的方式被分发给消费者,即一条消息只会被一个消费者处理。
1. 生产者配置
在 Work 模式下,我们只需要定义一个队列即可。消息将直接发送到这个队列。
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
// 为了方便管理,我们将队列、交换机、路由键的名称统一定义在常量类中
public static final String SPRING_WORK_QUEUE = "spring.work.queue";
@Bean
public Queue workQueue() {
// 创建一个持久化的队列
return QueueBuilder.durable(SPRING_WORK_QUEUE).build();
}
}
2. 生产者接口
我们创建一个接口,循环发送 10 条消息到工作队列。
// ProducerController.java
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/work")
public String sendWorkMessages() {
for (int i = 1; i <= 10; i++) {
String message = "Hello, Spring Work Queue message " + i;
// 第一个参数是交换机名,这里为空字符串表示使用默认交换机
// 第二个参数是路由键,对于工作队列模式,通常就是队列名
rabbitTemplate.convertAndSend("", RabbitMQConfig.SPRING_WORK_QUEUE, message);
}
return "10 work messages sent successfully.";
}
}
3. 消费者代码
我们创建两个消费者,它们监听同一个 SPRING_WORK_QUEUE
队列,以模拟任务竞争。
// WorkListener.java
@Component
public class WorkListener {
@RabbitListener(queues = RabbitMQConfig.SPRING_WORK_QUEUE)
public void listen1(String message) throws InterruptedException {
System.out.println("[Work Consumer 1] received: " + message);
// 模拟耗时任务
Thread.sleep(100);
}
@RabbitListener(queues = RabbitMQConfig.SPRING_WORK_QUEUE)
public void listen2(String message) throws InterruptedException {
System.out.println("[Work Consumer 2] received: " + message);
Thread.sleep(150);
}
}
4. 测试与结果
访问 http://localhost:8080/producer/work
。你会看到控制台交替打印输出来自两个消费者的日志,这表明 10 条消息被两个消费者“瓜分”了。
[Work Consumer 1] received: Hello, Spring Work Queue message 1
[Work Consumer 2] received: Hello, Spring Work Queue message 2
[Work Consumer 1] received: Hello, Spring Work Queue message 3
[Work Consumer 2] received: Hello, Spring Work Queue message 4
[Work Consumer 1] received: Hello, Spring Work Queue message 5
... (交替输出)
注意:默认情况下,分发策略是公平轮询。可以配置
prefetch
等参数实现更复杂的负载均衡。
模式二:Direct Exchange (直连交换机)
Direct Exchange 会将消息路由到 Routing Key 与 Binding Key 完全匹配的队列。这是一种精确的、点对点的路由方式。
特点:一对一路由。你可以将多个队列用不同的 Binding Key
绑定到同一个 Direct Exchange 上,实现消息的精确投递。
1. 生产者配置
我们定义一个 Direct Exchange,两个队列,以及三个绑定关系:
queue1
绑定orange
queue2
绑定black
queue2
也绑定green
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
// ... 其他常量
public static final String SPRING_DIRECT_EXCHANGE = "spring.direct.exchange";
public static final String SPRING_DIRECT_QUEUE_1 = "spring.direct.queue1";
public static final String SPRING_DIRECT_QUEUE_2 = "spring.direct.queue2";
// 声明 Direct Exchange
@Bean
public DirectExchange directExchange() {
return new DirectExchange(SPRING_DIRECT_EXCHANGE);
}
// 声明 Queue 1
@Bean
public Queue directQueue1() {
return new Queue(SPRING_DIRECT_QUEUE_1);
}
// 声明 Queue 2
@Bean
public Queue directQueue2() {
return new Queue(SPRING_DIRECT_QUEUE_2);
}
// 绑定关系1: queue1 -> exchange, with routingKey "orange"
@Bean
public Binding directBinding1(Queue directQueue1, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("orange");
}
// 绑定关系2: queue2 -> exchange, with routingKey "black"
@Bean
public Binding directBinding2(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("black");
}
// 绑定关系3: queue2 -> exchange, with routingKey "green"
@Bean
public Binding directBinding3(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("green");
}
}
2. 生产者接口
// ProducerController.java
// ...
@GetMapping("/direct")
public String sendDirectMessage(String routingKey) {
String message = "Hello, Spring Direct Exchange with routingKey: " + routingKey;
rabbitTemplate.convertAndSend(RabbitMQConfig.SPRING_DIRECT_EXCHANGE, routingKey, message);
return "Direct message sent with routingKey: " + routingKey;
}
3. 消费者代码
// DirectListener.java
@Component
public class DirectListener {
@RabbitListener(queues = RabbitMQConfig.SPRING_DIRECT_QUEUE_1)
public void listenQueue1(String message) {
System.out.println("[" + RabbitMQConfig.SPRING_DIRECT_QUEUE_1 + "] received: " + message);
}
@RabbitListener(queues = RabbitMQConfig.SPRING_DIRECT_QUEUE_2)
public void listenQueue2(String message) {
System.out.println("[" + RabbitMQConfig.SPRING_DIRECT_QUEUE_2 + "] received: " + message);
}
}
4. 测试与结果
发送 orange:
http://localhost:8080/producer/direct?routingKey=orange
- 输出:
[spring.direct.queue1] received: Hello, Spring Direct Exchange with routingKey: orange
- 分析:
orange
精确匹配directBinding1
,消息进入queue1
。
- 输出:
发送 black:
http://localhost:8080/producer/direct?routingKey=black
- 输出:
[spring.direct.queue2] received: Hello, Spring Direct Exchange with routingKey: black
- 分析:
black
精确匹配directBinding2
,消息进入queue2
。
- 输出:
发送 green:
http://localhost:8080/producer/direct?routingKey=green
- 输出:
[spring.direct.queue2] received: Hello, Spring Direct Exchange with routingKey: green
- 分析:
green
精确匹配directBinding3
,消息同样进入queue2
。
- 输出:
发送 blue:
http://localhost:8080/producer/direct?routingKey=blue
- 输出: (无任何输出)
- 分析:
blue
路由键在所有绑定关系中都找不到匹配项,消息被交换机丢弃。
模式三:Fanout Exchange (扇出/广播)
Fanout Exchange 是最简单的交换机类型。它会忽略 Routing Key
,将收到的所有消息广播给所有绑定到该交换机上的队列。
特点:一对多广播。适用于需要将同一消息通知给所有订阅者的场景,如系统通知、配置更新等。
1. 生产者配置
我们定义一个 Fanout Exchange 和两个队列,并将这两个队列都绑定到交换机上。
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
// ... 其他常量
public static final String SPRING_FANOUT_EXCHANGE = "spring.fanout.exchange";
public static final String SPRING_FANOUT_QUEUE_1 = "spring.fanout.queue1";
public static final String SPRING_FANOUT_QUEUE_2 = "spring.fanout.queue2";
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(SPRING_FANOUT_EXCHANGE);
}
@Bean
public Queue fanoutQueue1() {
return new Queue(SPRING_FANOUT_QUEUE_1);
}
@Bean
public Queue fanoutQueue2() {
return new Queue(SPRING_FANOUT_QUEUE_2);
}
// 绑定 Queue1 到 Fanout Exchange
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 绑定 Queue2 到 Fanout Exchange
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
注意:Fanout 类型的绑定不需要
.with(routingKey)
。
2. 生产者接口
// ProducerController.java
// ...
@GetMapping("/fanout")
public String sendFanoutMessage() {
String message = "Hello, this is a broadcast message!";
// Fanout Exchange 忽略路由键,所以第二个参数可以为空字符串
rabbitTemplate.convertAndSend(RabbitMQConfig.SPRING_FANOUT_EXCHANGE, "", message);
return "Broadcast message sent successfully.";
}
3. 消费者代码
// FanoutListener.java
@Component
public class FanoutListener {
@RabbitListener(queues = RabbitMQConfig.SPRING_FANOUT_QUEUE_1)
public void listenQueue1(String message) {
System.out.println("[" + RabbitMQConfig.SPRING_FANOUT_QUEUE_1 + "] received: " + message);
}
@RabbitListener(queues = RabbitMQConfig.SPRING_FANOUT_QUEUE_2)
public void listenQueue2(String message) {
System.out.println("[" + RabbitMQConfig.SPRING_FANOUT_QUEUE_2 + "] received: " + message);
}
}
4. 测试与结果
访问 http://localhost:8080/producer/fanout
。
输出:
[spring.fanout.queue1] received: Hello, this is a broadcast message! [spring.fanout.queue2] received: Hello, this is a broadcast message!
分析: 一条消息被成功广播到了所有绑定的队列,两个消费者都收到了同样的消息。
模式四:Topic Exchange (主题交换机)
Topic Exchange 是最灵活的交换机。它通过模式匹配来路由消息,Routing Key
是一个由点(.
)分隔的单词列表,而 Binding Key
可以使用通配符。
*
(星号): 匹配一个单词。#
(井号): 匹配零个或多个单词。
特点:灵活的、多对多的路由。非常适合用于实现基于内容的多维度订阅/发布系统。
1. 生产者配置
我们定义一个 Topic Exchange,两个队列,以及三个绑定关系,来演示通配符的用法:
queue1
绑定*.orange.*
(匹配中间是 orange 的三个单词的 key)queue2
绑定*.*.rabbit
(匹配结尾是 rabbit 的三个单词的 key)queue2
也绑定lazy.#
(匹配以lazy.
开头的所有 key)
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
// ... 其他常量
public static final String SPRING_TOPIC_EXCHANGE = "spring.topic.exchange";
public static final String SPRING_TOPIC_QUEUE_1 = "spring.topic.queue1";
public static final String SPRING_TOPIC_QUEUE_2 = "spring.topic.queue2";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(SPRING_TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueue1() {
return new Queue(SPRING_TOPIC_QUEUE_1);
}
@Bean
public Queue topicQueue2() {
return new Queue(SPRING_TOPIC_QUEUE_2);
}
// 绑定1: *.orange.*
@Bean
public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.orange.*");
}
// 绑定2: *.*.rabbit
@Bean
public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("*.*.rabbit");
}
// 绑定3: lazy.#
@Bean
public Binding topicBinding3(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("lazy.#");
}
}
2. 生产者接口
// ProducerController.java
// ...
@GetMapping("/topic")
public String sendTopicMessage(String routingKey) {
String message = "Hello, Spring Topic Exchange with routingKey: " + routingKey;
rabbitTemplate.convertAndSend(RabbitMQConfig.SPRING_TOPIC_EXCHANGE, routingKey, message);
return "Topic message sent with routingKey: " + routingKey;
}
3. 消费者代码
// TopicListener.java
@Component
public class TopicListener {
@RabbitListener(queues = RabbitMQConfig.SPRING_TOPIC_QUEUE_1)
public void listenQueue1(String message) {
System.out.println("[" + RabbitMQConfig.SPRING_TOPIC_QUEUE_1 + "] received: " + message);
}
@RabbitListener(queues = RabbitMQConfig.SPRING_TOPIC_QUEUE_2)
public void listenQueue2(String message) {
System.out.println("[" + RabbitMQConfig.SPRING_TOPIC_QUEUE_2 + "] received: " + message);
}
}
4. 测试与结果
发送
quick.orange.rabbit
:http://localhost:8080/producer/topic?routingKey=quick.orange.rabbit
- 输出:
[spring.topic.queue1] received: Hello, Spring Topic Exchange with routingKey: quick.orange.rabbit
[spring.topic.queue2] received: Hello, Spring Topic Exchange with routingKey: quick.orange.rabbit
分析:
quick.orange.rabbit
同时匹配*.orange.*
(queue1) 和*.*.rabbit
(queue2),所以两个队列都收到了消息。发送
lazy.orange.elephant
:http://localhost:8080/producer/topic?routingKey=lazy.orange.elephant
- 输出:
[spring.topic.queue1] received: Hello, Spring Topic Exchange with routingKey: lazy.orange.elephant
[spring.topic.queue2] received: Hello, Spring Topic Exchange with routingKey: lazy.orange.elephant
分析:
lazy.orange.elephant
同时匹配*.orange.*
(queue1) 和lazy.#
(queue2)。发送
quick.brown.fox
:http://localhost:8080/producer/topic?routingKey=quick.brown.fox
- 输出: (无任何输出)
- 分析: 该 routing key 不匹配任何绑定规则,消息被丢弃。
发送
lazy.fox
:http://localhost:8080/producer/topic?routingKey=lazy.fox
- 输出:
[spring.topic.queue2] received: Hello, Spring Topic Exchange with routingKey: lazy.fox
- 分析: 该 routing key 仅匹配
lazy.#
(queue2)。
总结:如何选择合适的模式?
模式 | 交换机类型 | Routing Key | 核心特点 | 适用场景 |
---|---|---|---|---|
Work Queue | 默认 (空字符串) | 必须是队列名 | 任务分发,竞争消费 | 耗时任务处理、资源密集型操作,如视频转码、日志分析。 |
Direct | Direct |
精确匹配 | 点对点精确路由 | 需要将消息准确发送到特定处理者的场景,如按地区、按类型分发任务。 |
Fanout | Fanout |
忽略 | 广播 | 向所有订阅者发送相同消息,如系统通知、配置更新、实时聊天室。 |
Topic | Topic |
通配符模式匹配 | 灵活的、多维度的订阅/发布 | 基于内容的多条件订阅,如新闻系统(*.sports.basketball )、日志系统(error.critical.# )。 |