RabbitMQ 知识详解(Java版)
RabbitMQ 是一个开源的消息代理,实现了高级消息队列协议(AMQP)。它用于在分布式系统中实现应用解耦、异步通信和流量削峰。
核心概念
- 生产者(Producer):发送消息的应用
- 消费者(Consumer):接收消息的应用
- 队列(Queue):消息存储的缓冲区
- 交换机(Exchange):接收消息并路由到队列
- 绑定(Binding):连接交换机和队列的规则
- 路由键(Routing Key):消息的路由标识
交换机类型
类型 | 路由规则 | 典型用途 |
---|---|---|
Direct |
精确匹配Routing Key | 点对点通信 |
Topic |
模式匹配(支持通配符) | 多条件路由 |
Fanout |
广播到所有绑定队列 | 发布/订阅 |
Headers |
消息头键值对匹配 | 复杂路由 |
Java 示例(使用官方客户端)
依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.30</version>
</dependency>
示例1:基本发送/接收(点对点)
// 生产者
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 创建队列(持久化/非持久化)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
// 消费者
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
示例2:发布/订阅模式(Fanout交换机)
// 发布者
public class Publisher {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明fanout类型交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "Broadcast message!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
// 订阅者
public class Subscriber {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
示例3:主题路由(Topic交换机)
// 生产者(主题发布)
public class TopicProducer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明topic类型交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "order.error";
String message = "Order processing error";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
}
// 消费者(主题订阅)
public class TopicConsumer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
// 绑定多个路由键(使用通配符)
channel.queueBind(queueName, EXCHANGE_NAME, "*.error");
channel.queueBind(queueName, EXCHANGE_NAME, "order.*");
System.out.println(" [*] Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
关键特性(Java实现)
1. 消息持久化
// 声明持久化队列
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
// 发送持久化消息
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
2. 公平分发(Prefetch)
// 每次只分发一条消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
3. 消息确认(ACK)
// 消费者关闭自动ACK
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
// 处理完成后手动ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
4. 持久化消费者
// 重启后自动恢复的消费者
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("persistent_queue", true, false, false, args);
使用场景
- 服务解耦:订单系统与库存系统分离
- 异步处理:耗时操作(如邮件发送)
- 流量削峰:突发请求缓冲(秒杀系统)
- 分布式事务:最终一致性实现
- 日志收集:多系统日志聚合
最佳实践
- 连接管理:使用连接池(如Spring AMQP的CachingConnectionFactory)
- 异常处理:实现Consumer和Connection的监听器
- 死信队列:处理失败消息
- 集群部署:保证高可用性
- 监控管理:使用RabbitMQ Management Plugin
提示:生产环境推荐使用Spring AMQP简化开发,它提供了RabbitTemplate和@RabbitListener等便捷工具。