RabbitMQ
简介:RabbitMQ 是一种开源的消息队列中间件,你可以把它想象成一个高效的“邮局”。它专门负责在不同应用程序之间传递消息,让系统各部分能松耦合地协作
优势:
异步处理:比如用户注册后,主程序将发送验证邮件的任务扔进队列就立刻返回,邮件服务后续慢慢处理,避免用户等待。
削峰填谷:突然的流量高峰(如秒杀活动)会被队列缓冲,避免服务器被压垮。
智能路由:通过交换机(Exchange)的四种路由策略(直连/主题/广播/头匹配),实现精准投递,比如将VIP用户的订单定向到专属客服队列。
故障恢复:支持消息持久化和确认机制,即使服务器宕机,消息也不会丢失。
同步VS异步(以实际开发为例子进行说明):
同步业务功能的耦合度高,异步耦合度低,可以达到解耦的效果
同步业务流程响应的时间长,异步响应的时间短
同步模式会导致并发压力向后进行传递,异步可以削峰限流
同步模式下系统结构弹性不足,异步模式下系统弹性强,可扩展性强
注意:在实际开发中并不是说异步模式就完全优与同步模式,在一定的场景下使用异步模式是优化系统的架构,但是在一些其它的业务场景下需要同步来保证流程的完整性。所以说异步还是同步要跟据具体业务进行选择。
底层实现:
AMQP(Advanced Message Queuing Protocol):AMQP 是 跨语言的通用消息协议,适合异构系统间的复杂通信。
JMS(Java Message Service):JMS是 Java 专属的 API 标准,适合统一 Java 生态的消息处理。
主流的MQ产品对比
对比项 | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | Erlang | Java | Java | Scala/Java |
维护方 | Rabbit(公司) | Apache(社区) | 阿里(公司) | Apache(社区) |
核心机制 | 基于 AMQP 协议的生产者-消费者模型 | 基于 JMS 的消息传递模型 | 分布式消息队列(Topic + Tag 分类) | 分布式流处理平台(发布-订阅模型) |
协议支持 | AMQP、STOMP、MQTT、HTTP 插件 | AMQP、STOMP、OpenWire、REST、MQTT | 自定义协议(支持 TCP/HTTP) | 自定义协议(社区封装 HTTP 支持) |
客户端语言 | 官方:Erlang、Java、Ruby;社区:多语言 | Java、C/C++、.NET、Python、PHP | 官方:Java;社区:C++(不成熟) | 官方:Java;社区:Python、Go、Rust 等 |
可用性 | 镜像队列、仲裁队列(Quorum Queue) | 主从复制 | 主从复制 | 分区(Partition) + 副本(Replica) |
单机吞吐量 | 约 10 万/秒 | 约 5 万/秒 | 10 万+/秒(阿里双十一验证) | 百万级/秒 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息确认 | 完整 ACK/NACK 机制 | 支持 JMS ACK 模式 | 基于数据库持久化的消息表 | 基于副本同步和 ISR 机制 |
功能特性 | ✅ 低延迟、高并发、管理界面丰富 | ✅ 老牌稳定、支持 JMS 规范 | ✅ 高吞吐、阿里生态集成、事务消息 | ✅ 高吞吐、流处理、大数据场景专用 |
适用场景 | 复杂路由、实时业务(如支付订单) | 传统企业级系统(Java 生态) | 电商高并发场景(如秒杀、订单) | 日志采集、实时分析、流式计算 |
原生RabbitMQAPI调用:
//=========================================发送消息的代码示例=================================
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.200.100");
// 设置连接端口号:默认为 5672
connectionFactory.setPort(5672);
// 虚拟主机名称:默认为 /
connectionFactory.setVirtualHost("/");
// 设置连接用户名;默认为guest
connectionFactory.setUsername("guest");
// 设置连接密码;默认为guest
connectionFactory.setPassword("123456");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
// queue 参数1:队列名称
// durable 参数2:是否定义持久化队列,当 MQ 重启之后还在
// exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
// autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
// arguments 参数5:队列其它参数
channel.queueDeclare("simple_queue", true, false, false, null);
// 要发送的信息
String message = "你好;小兔子!";
// 参数1:交换机名称,如果没有指定则使用默认Default Exchange
// 参数2:路由key,简单模式可以传递队列名称
// 参数3:配置信息
// 参数4:消息内容
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}
//=========================================接收消息的代码示例=================================
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("192.168.200.100");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建Channel
Channel channel = connection.createChannel();
// 5. 创建队列
// 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
// 参数1. queue:队列名称
// 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在
// 参数3. exclusive:是否独占。
// 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
// 参数5. arguments:其它参数。
channel.queueDeclare("simple_queue",true,false,false,null);
// 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
// 参数1. consumerTag:标识
// 参数2. envelope:获取一些信息,交换机,路由key...
// 参数3. properties:配置信息
// 参数4. body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
// 参数1. queue:队列名称
// 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
// 参数3. callback:回调对象
// 消费者类似一个监听程序,主要是用来监听消息
channel.basicConsume("simple_queue",true,consumer);
}
}
封装RabbitMQ工具类:
public class ConnectionUtil {
//跟据自己服务的具体需求进行相关ip+端口的配置(动态变化)
public static final String HOST_ADDRESS = "192.168.200.100";
public static Connection getConnection() throws Exception {
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(HOST_ADDRESS);
// 端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection con = ConnectionUtil.getConnection();
// amqp://guest@192.168.200.100:5672/
System.out.println(con);
con.close();
}
}
RabbitMQ体系结构:
生产者(Producer):发送消息到 RabbitMQ 的应用程序。
消费者(Consumer):从队列中接收并处理消息的应用程序。
交换机(Exchange):接收生产者消息,根据类型和路由规则将消息分发到队列。
四大类型:
类型 | 路由规则 | 典型场景 |
---|---|---|
Direct | 精确匹配 Routing Key (如 order.pay ) |
一对一精准投递(如支付成功通知) |
Topic | 通配符匹配(如 order.* 或 *.pay ) |
多服务订阅同一类消息(如日志分类) |
Fanout | 广播到所有绑定队列(无视 Routing Key ) |
群发通知(如系统公告) |
Headers | 通过消息头(Headers)键值对匹配 | 复杂条件路由(需灵活匹配时) |
队列(Queue):定义交换机与队列之间的映射关系,指定路由规则
信道(Channel):复用 TCP 连接的轻量级虚拟链路,减少资源消耗。
虚拟主机(Virtual Host):逻辑隔离的“消息域”,不同 vhost 间资源(交换机、队列)互不干扰。
总结:RabbitMQ 通过 生产者-交换机-队列-消费者 模型实现异步通信,核心在于灵活的路由规则(交换机类型)和可靠性保障(持久化、确认机制)。
基础篇
工作模式
简单模式:最简单的消息队列模型,包含一个生产者、一个队列和一个消费者。生产者直接将消息发送到队列,消费者从队列中接收消息。
工作队列模式(Work Queues):使用默认的交换机,一个队列对应多个消费者,消息按轮询(Round-Robin)或公平分发(Fair Dispatch)分配给消费者,避免单个消费者过载。
发布/订阅模式(Publish/Subscribe):使用 扇形交换机(Fanout Exchange),生产者将消息发送到交换机,交换机将消息广播到所有绑定的队列,每个消费者独立接收一份消息副本。
路由模式(Routing):使用 直接交换机(Direct Exchange),生产者指定消息的 路由键(Routing Key),交换机根据路由键将消息精确匹配到绑定的队列。
主题模式(Topics):使用 主题交换机(Topic Exchange),路由键支持通配符匹配(
*
匹配一个词,#
匹配多个词)。例如路由键stock.usd.nyse
可被*.nyse
或stock.#
订阅。远程过程调用(RPC):通过消息队列实现远程调用。客户端发送请求消息时附带回调队列和唯一ID,服务端处理完成后将响应发送到回调队列,客户端通过ID匹配响应。
发布者确认(Publisher Confirms):生产者发送消息后,RabbitMQ会异步返回确认(ACK)或未确认(NACK),确保消息成功到达交换机或队列。
工作队列模式(Work Queues)
并行处理能力
多消费者竞争消费:一个队列可绑定多个消费者,消息被并发处理,消息只会被其中的一个消费者拿到。
横向扩展:通过增加消费者数量,轻松应对高并发或大流量场景。
负载均衡机制
轮询分发(Round-Robin):默认策略,均摊消息到所有消费者,简单但可能因消费者性能差异导致负载不均。
公平分发(Fair Dispatch):通过
prefetch_count
限制消费者同时处理的消息数,确保“能者多劳”,避免慢消费者堆积任务。
消息可靠性保障
ACK确认机制:消费者处理完成后需手动发送确认(ACK),若处理失败或消费者宕机,消息自动重新入队,确保任务不丢失。
持久化支持:队列和消息均可设置为持久化(
durable=true
),防止RabbitMQ服务重启后数据丢失。
//================================生产端代码循环发送10次消息================================
public class Producer {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 1; i <= 10; i++) {
String body = i+"hello rabbitmq~~~";
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
}
channel.close();
connection.close();
}
}
//================================消费端代码竞争消息================================
public class Consumer1/2 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
发布/订阅模式(Publish/Subscribe)
消息广播机制
扇形交换机(Fanout Exchange)驱动:生产者将消息发送到交换机,交换机会将消息无条件广播到所有与其绑定的队列,每个队列的消费者都能收到一份消息副本。
一对多分发:一条消息可被多个消费者同时接收,适用于需要广泛触达的场景(如系统通知、日志收集)。
//====================================生产者代码====================================
public class Producer {
public static void main(String[] args) throws Exception {
// 1、获取连接
Connection connection = ConnectionUtil.getConnection();
// 2、创建频道
Channel channel = connection.createChannel();
// 参数1. exchange:交换机名称
// 参数2. type:交换机类型
// DIRECT("direct"):定向
// FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"):通配符的方式
// HEADERS("headers"):参数匹配
// 参数3. durable:是否持久化
// 参数4. autoDelete:自动删除
// 参数5. internal:内部使用。一般false
// 参数6. arguments:其它参数
String exchangeName = "test_fanout";
// 3、创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
// 4、创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 5、绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout,routingKey设置为""
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
// 6、发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
// 7、释放资源
channel.close();
connection.close();
}
}
//====================================消费者1代码===================================
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
//====================================消费者2代码===================================
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue2Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
路由模式(Routing)
基于路由键的精确分发
直接交换机(Direct Exchange)驱动:生产者发送消息时需指定路由键(Routing Key),交换机会将消息精确匹配到绑定相同路由键的队列。
条件性路由:仅当队列绑定的路由键与消息的路由键完全一致时,消息才会被投递,实现按条件分发。
灵活的消息过滤
多队列绑定不同路由键:可为同一交换机绑定多个队列,每个队列声明不同的路由键(例如
error
、info
、warning
),实现消息分类处理。生产者可控性:生产者通过指定路由键决定消息的目标队列,无需消费者干预。
//================================生产者代码========================================
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
// 创建交换机
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 声明(创建)队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 队列绑定交换机
// 队列1绑定error
channel.queueBind(queue1Name,exchangeName,"error");
// 队列2绑定info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
// 发送消息
channel.basicPublish(exchangeName,"warning",null,message.getBytes());
System.out.println(message);
// 释放资源
channel.close();
connection.close();
}
}
//===============================消费者1代码========================================
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
//===============================消费者2代码========================================
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue2Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer2 将日志信息存储到数据库.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
主题模式(Topics)
基于通配符的灵活路由
主题交换机(Topic Exchange)驱动:生产者发送消息时指定带层级的路由键(Routing Key,如
order.europe.paid
),消费者通过绑定键(Binding Key)使用通配符(*
匹配一个词,#
匹配零或多个词)订阅消息。示例:绑定键
*.europe.*
可匹配order.europe.paid
或shipment.europe.delayed
。绑定键
stock.#
可匹配stock.usd.nyse
或stock.eur.london.close
。
多维度匹配:支持复杂的分层路由逻辑,适用于需要按多条件分类的场景。
高度动态的消息过滤
灵活订阅:消费者可动态定义绑定键的通配规则,按需订阅特定模式的消息,无需修改生产者逻辑。
精准与模糊匹配结合:既能精确匹配固定路由键,也能通过通配符覆盖一类消息。
//================================生产者代码========================================
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
// routing key 常用格式:系统的名称.日志的级别。
// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
// 分别发送消息到队列:order.info、goods.info、goods.error
String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());
body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
channel.close();
connection.close();
}
}
//================================消费者1代码=======================================
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue1";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
//================================消费者2代码=======================================
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue2";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}