一.简单模式
1.1.核心逻辑
生产者 → 队列 → 单个消费者(1:1 直连),消息被消费后自动从队列删除。
1.2.关键特性
- 无交换器(其实使用的是默认交换机不是显示指定),直接指定队列
- 消息默认自动确认(autoAck),易丢失消息
1.3.应用场景
单任务即时处理(如聊天消息、简单日志)
1.4.架构图
1.5.代码示例
在Rabbit中,生产者发送完消息后,就结束了,之后的操作就与生产者无关了,而消费者是被动接收的,一直处于监听状态。
- pom依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
- 生产者
package com.example.demo.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args)throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("127.0.0.1");
//设置连接端口号:默认为 5672
connectionFactory.setPort(5672);
// 虚拟主机名称:默认为/
connectionFactory.setVirtualHost("/");
//设置连接用户名;默认为guest
connectionFactory.setUsername("guest");
//设置连接密码;默认为guest
connectionFactory.setPassword("guest");
//1 创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel=connection.createChannel();
/**
* 声明(创建)队列
* 如果没有一个名字叫simp1e-queue的队列,则会创建该队列,如果有则不会创建,所以该方法在确认有消息该消息队列的情况下可以省略
* queue 参数1:队列名称
* durable 参数2:是否定义持久化队列,当MQ重启之后还在
* exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列目Connection关闭时删除这个队列
* autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
* arquments 参数5:队列其它参数
*/
channel.queueDeclare("simple_queue", true, false, false, null);
//要发送的信息
String message="Hello RabbitMQ!";
/**
* 指定消息队列
* 参数1:交换机名称,如果没有指定则使用默认Default_Exchange
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:配置信息
* 参数4:消息内容
*/
channel.basicPublish( "","simple_queue", null, message.getBytes());
channel.close();
connection.close();
}
}
生产者main方法执行日志
如果你先启动生产者main方法,那么你可以在RabbitMQ的web页面可以看到在队列中有一条消息。如果消费者一直都是监听的,那大概率看不到,因为生产者发送消息的那一刻立马就被消费者接收了,在消息队列中就删除了。
- 消费者
package com.example.demo.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建channe1
Channel channel = connection.createChannel();
/**
*5.创建队列
* 如果没有一个名字叫simp1e-queue的队列,则会创建该队列,如果有则不会创建,所以该方法在确认有消息该消息队列的情况下可以省略
* 数1.queue:队列名称
* 参数2.durab1e:是否持久化。如果持久化,则当MQ重启之后还在
* 参数3.exclusive:是否独占。
* 参数4.autoDelete:是否自动删除。当没有Consumer时,自动删除掉
* 参数5.arguments:队列其它参数
*/
channel.queueDeclare("simple_queue", true, false, false, null);
// 接收消息
DefaultConsumer consumer=new DefaultConsumer(channel){
/**
* 接收到消息后,此方法将被调用
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由key...
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
// 参数1.queue:队列名称
// 参数2.autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
// 参数3.callback:回调对象
// 消费者类似一个监听程序,主要是用来监听消息
channel.basicConsume("simple_queue",true,consumer);
}
}
接收后,消息队列就是空的了。
你也可以点击上面图片中消息队列的名称simple_queue,查看下面具体详情,如下面图片。
在查看消息时,注意在右上角选择页面的刷新频率。
二.工作模式
2.1.核心逻辑
生产者 → 队列 → 多个消费者并行消费(1:N)。
工作模式与简单模式唯一的不同在于它有多个消费者,当队列中有消息时,多个消费者竞争,每条消息仅被一个消费者处理。
2.2.关键特性
- 多个消费者竞争消费同一队列,默认轮询分发(Round-Robin)
- 可配置
basicQos(prefetchCount)
实现公平分发(能者多劳)
2.3.应用场景
资源密集型任务并行处理(如文件转码、批量邮件)
2.4.架构图
2.5.代码示例
main方法启动消费者1和消费者2,当然你可以多创建几个消费者,复制简单模式中的代码即可。
修改下生产者代码,创建一个新的消息队列,并且发送10条消息
消费者1接收到的信息
消费者2接收到的消息
三.发布订阅模式
交换器类型:fanout
(广播)
3.1.核心逻辑
生产者 → Fanout交换器 → 绑定队列 → 所有消费者
忽略路由键(Routing Key) ,消息复制到所有绑定队列。
3.3.关键特性
- 一条消息被多个消费者独立消费(广播)
- 需显式绑定队列到交换器
3.3.应用场景
事件广播(如用户注册后同时发邮件、短信)
3.4.架构图
如上图所示,发布订阅模式有以下特点;
- 指定类型的交换机;
- 多个消息队列,交换机会将一条消息发布到每一个消息队列中;
- 每个消息队列可以有一个或者多个消费者;
3.5.代码示例
生产者代码
package com.example.demo.rabbitmq.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private static final String EXCHANGE_NAME="test_fanout_exchange";
public static void main(String[] args)throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("127.0.0.1");
//设置连接端口号:默认为 5672
connectionFactory.setPort(5672);
// 虚拟主机名称:默认为/
connectionFactory.setVirtualHost("/");
//设置连接用户名;默认为guest
connectionFactory.setUsername("guest");
//设置连接密码;默认为guest
connectionFactory.setPassword("guest");
//1 创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel=connection.createChannel();
/**
* 创建交换机
* 参数1:交换机名称
* 参数2:交换机类型
* 参数3.durable:是否持久化
* 参数4.autoDelete:自动删除
* 参数5.internal:内部使用,一般false
* 参数6.arquments:其它参数
*/
//这两个方法是一样的
//channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, false, null);
/**
* 声明(创建)队列
* 如果没有一个名字叫simp1e-queue的队列,则会创建该队列,如果有则不会创建,所以该方法在确认有消息该消息队列的情况下可以省略
* queue 参数1:队列名称
* durable 参数2:是否定义持久化队列,当MQ重启之后还在
* exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列目Connection关闭时删除这个队列
* autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
* arquments 参数5:队列其它参数
*/
channel.queueDeclare("fanout_queue_1", true, false, false, null);
channel.queueDeclare("fanout_queue_2", true, false, false, null);
channel.queueDeclare("fanout_queue_3", true, false, false, null);
/**
* 绑定队列到交换机
* 参数1:队列名称
* 参数2:交换机名称
* 参数3:路由key 交换机的类型为fanout,为空
*/
channel.queueBind("fanout_queue_1", EXCHANGE_NAME, "");
channel.queueBind("fanout_queue_2", EXCHANGE_NAME, "");
channel.queueBind("fanout_queue_3", EXCHANGE_NAME, "");
//要发送的信息
String message="Hello RabbitMQ!";
/**
* 指定消息队列
* 参数1:交换机名称,如果没有指定则使用默认Default_Exchange
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:配置信息
* 参数4:消息内容
*/
channel.basicPublish( EXCHANGE_NAME,"", null, message.getBytes());
channel.close();
connection.close();
}
}
上面代码有以下作用:
- 创建指定类型的交换机(如果有,不创建);
- 创建三个消息队列(如果有,不创建);
- 绑定交换机与消息队列;
- 发送消息;
运行后可在Rabbit MQ的管理页面查看到下面的内容
交换机
点击交换机名称,查看绑定关系,绑定了三个消息队列
点击任何一个消息队列,你都可以看到有一条消息
或者你可以根据下图查看消息队列中的消息
如果你要查看某个队列中的具体消息,点击队列名称,找到Get messages。
如果不修改图中的任何一个选项,是不会删除队列中的消息的。者三个消息队列中,都有一条消息【Hello RabbitMQ!】这就是广播的效果
消费端代码
消费端代码并没有变化,与简单模式和工作模式的没有不同,因为消费端监听的是消息队列,只需要修改消息队列名称后运行即可。
在上面图片中,我只创建了两个消费者,分别监听了队列1和队列2。通过下面可以看到,队列1和队列2的消息被接收了。如果你感兴趣,可以多加几个消费者,注意:一个队列可以有多个消费者。
四.路由模式
交换器类型:direct
(精确匹配)
4.1.核心逻辑
生产者 → Direct交换器 → 匹配路由键的队列 → 消费者
路由键需与绑定键(Binding Key)完全一致。
4.2.关键特性
- 实现消息分类投递(如按日志级别分发)
- 队列可绑定多个路由键
4.3.应用场景
精准路由(如ERROR日志存数据库,INFO日志打印)
4.4.架构图

根据上图,路由模式有以下特点
- 特定类型的交换机
direct;
- 指定具体的路由,交换机根据路由将消息发送到对应的队列中;
- 需要注意的是,交换机到队列的路由规则,可以多个。
4.5.代码示例
消费者
消费端的代码没什么不同,在这里,我创建了两个消费端,各监听一个消息队列。
生产者
package com.example.demo.rabbitmq.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private static final String EXCHANGE_NAME="direct_exchange";
public static void main(String[] args)throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("127.0.0.1");
//设置连接端口号:默认为 5672
connectionFactory.setPort(5672);
// 虚拟主机名称:默认为/
connectionFactory.setVirtualHost("/");
//设置连接用户名;默认为guest
connectionFactory.setUsername("guest");
//设置连接密码;默认为guest
connectionFactory.setPassword("guest");
//1 创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel=connection.createChannel();
/**
* 创建交换机
* 参数1:交换机名称
* 参数2:交换机类型
* 参数3.durable:是否持久化
* 参数4.autoDelete:自动删除
* 参数5.internal:内部使用,一般false
* 参数6.arquments:其它参数
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);
/**
* 声明(创建)队列
* 如果没有一个名字叫simp1e-queue的队列,则会创建该队列,如果有则不会创建,所以该方法在确认有消息该消息队列的情况下可以省略
* queue 参数1:队列名称
* durable 参数2:是否定义持久化队列,当MQ重启之后还在
* exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列目Connection关闭时删除这个队列
* autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
* arquments 参数5:队列其它参数
*/
channel.queueDeclare("direct_queue_1", true, false, false, null);
channel.queueDeclare("direct_queue_2", true, false, false, null);
/**
* 绑定队列到交换机
* 参数1:队列名称
* 参数2:交换机名称
* 参数3:路由key
*/
channel.queueBind("direct_queue_1", EXCHANGE_NAME, "error");
channel.queueBind("direct_queue_2", EXCHANGE_NAME, "info");
//要发送的信息
String message="日志信息:张三调用了delete方法.错误了,目志级别error";
/**
* 指定消息队列
* 参数1:交换机名称,如果没有指定则使用默认Default_Exchange
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:配置信息
* 参数4:消息内容
*/
channel.basicPublish( EXCHANGE_NAME,"error", null, message.getBytes());
channel.close();
connection.close();
}
}
在上面代码中,重点在于:
- 创建了一个交换机,类型为DIRECT;
- 创建了两个队列;
- 绑定交换机与队列的关系,并指定路由;发送消息时需指定交换机名称和路由key;
运行上面代码后,消费者1接收到了error消息。
修改生产者代码中的路由key,再次执行
消费者2接收到了信息
五.主题模式
交换器类型:topic
(模糊匹配)
5.1.核心逻辑
生产者 → Topic交换器 → 通配符匹配的队列 → 消费者
路由键支持 *
(匹配一词)和 #
(匹配多词),如 user.*.order
。
5.2.关键特性
- 动态路由(如按用户兴趣订阅消息)
- 绑定键格式示例:
news.#
(接收所有新闻)
5.3.应用场景
动态消息分发(如电商系统按用户标签推送促销)
5.4.架构图
主题模式和路由模式的区别在于,主题模式的路由key可以模糊匹配 。
将交换机的类型设置为topic类型,在绑定队列时配置路由key,可以设置模糊匹配的规则,如下图
在生产者发送消息后,交换机根据路由开始匹配,将消息发送到所有匹配的队列中。
六.头模式
交换器类型:headers
(键值对匹配)
6.1.核心逻辑
生产者 → Headers交换器 → 匹配消息头的队列 → 消费者
通过 x-match
指定 all
(全匹配)或 any
(任一匹配)。
6.2.关键特性
- 不依赖路由键,用消息头(Headers)路由
- 性能较低,极少使用
6.3.应用场景
特殊路由需求(如按消息语言或版本过滤)
6.4.架构图
该模式与上文中的各个模式完全不同,在头模式中,生产者不指定或创建消息队列,不绑定交换机与消息队列。这部分功能在消费者中。
在生产者中,有以下几步:
- 声明或创建交换机;
- 构建消息属性,指定消息头;
6.5.代码案例
消费者
在该模式中,必须先启动消费者,因为生产者没有指定消息队列,如果先启动生产者,会导致数据丢失。
package com.example.demo.rabbitmq.head;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建channe1
Channel channel = connection.createChannel();
//5.创建交换机
channel.exchangeDeclare("headers_exchange", BuiltinExchangeType.HEADERS, true, false, false, null);
/**
*5.创建队列
* 如果没有一个名字叫simp1e-queue的队列,则会创建该队列,如果有则不会创建,所以该方法在确认有消息该消息队列的情况下可以省略
* 数1.queue:队列名称
* 参数2.durab1e:是否持久化。如果持久化,则当MQ重启之后还在
* 参数3.exclusive:是否独占。
* 参数4.autoDelete:是否自动删除。当没有Consumer时,自动删除掉
* 参数5.arguments:队列其它参数
*/
channel.queueDeclare("headers_queue_1", true, false, false, null);
// 设置绑定参数(完全匹配)
Map<String, Object> bindingArgs = new HashMap<>();
bindingArgs.put("x-match", "all"); // 必须所有Header匹配
bindingArgs.put("format", "JSON");
bindingArgs.put("priority", "high");
/**
* 绑定队列到交换机
* 参数1:队列名称
* 参数2:交换机名称
* 参数3:路由key 为空
* 参数4:绑定参数
*/
channel.queueBind("headers_queue_1", "headers_exchange", "", bindingArgs);
// 接收消息
// 消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received (ALL match): " + message +
" Headers: " + delivery.getProperties().getHeaders());
};
/**
* 监听消息
* 参数1.queue:队列名称
* 参数2.autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
* 参数3.callback:回调对象
* 参数4.cancelCallback:取消消费的回调
* 参数5.arguments:消费者其它参数
*/
channel.basicConsume("headers_queue_1",true,deliverCallback,consumerTag->{});
}
}
生产者
package com.example.demo.rabbitmq.head;
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
public class Producer {
private static final String EXCHANGE_NAME="headers_exchange";
public static void main(String[] args)throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址
connectionFactory.setHost("127.0.0.1");
//设置连接端口号:默认为 5672
connectionFactory.setPort(5672);
// 虚拟主机名称:默认为/
connectionFactory.setVirtualHost("/");
//设置连接用户名;默认为guest
connectionFactory.setUsername("guest");
//设置连接密码;默认为guest
connectionFactory.setPassword("guest");
//1 创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel=connection.createChannel();
/**
* 创建交换机
* 参数1:交换机名称
* 参数2:交换机类型
* 参数3.durable:是否持久化
* 参数4.autoDelete:自动删除
* 参数5.internal:内部使用,一般false
* 参数6.arquments:其它参数
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true, false, false, null);
//设置头消息
Map<String, Object> headers = new HashMap<>();
headers.put("format", "JSON");
headers.put("priority", "high");
// 构建消息属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.headers(headers) //设置头消息
.build();
String message = "Header Exchange Test Message";
/**
* 指定消息队列
* 参数1:交换机名称,如果没有指定则使用默认Default_Exchange
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:配置信息
* 参数4:消息内容
*/
channel.basicPublish( EXCHANGE_NAME,"", properties, message.getBytes());
channel.close();
connection.close();
}
}
消费者接收到的消息
当然Rabbit MQ还有其它模式,如,RPC模式:远程过程调用,本质上是同步调用,和我们使用OpenFeign调用远程接口一样,有机会再说。