案例场景:用户下单后需要多个微服务(如营销、会员)分别订阅并处理订单事件,且每个微服务可能有多个集群实例,需要保证同一个微服务的集群中,只有一个实例消费到消息。
不同于Kafka和rocketMQ有分组消费的功能,rabbitMQ需要通过topic Exchange实现。
1、消息设计
{
"event_type": "order_created", // 事件类型(如下单、支付成功)
"order_id": "123456",
"user_id": "user_789",
"items": [
{"product_id": "p1001", "quantity": 2},
{"product_id": "p1002", "quantity": 1}
],
"total_amount": 399.00,
"created_at": "2025-07-01T12:00:00Z"
}
2、rabbitMQ设计
使用 topic 类型的 Exchange:支持灵活的路由规则,不同微服务可通过 binding key 订阅特定事件。
Exchange 名称示例:order_events。
微服务 队列名 绑定键(binding key)
营销系统 marketing_queue order.created.marketing
会员系统 member_queue order.created.member
绑定键规则:<event_type>.<microservice_name>,便于后续扩展(如 order.cancel.marketing)。
3、消息发布
生产者(订单服务)发布消息时,指定路由键(routing key) 为 order.created
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
public class OrderProducer {
private static final String EXCHANGE_NAME = "order_events";
private static final String ROUTING_KEY = "order.created";
public static void main(String[] args) throws Exception {
// 1. 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 2. topic Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
// 3. 构造消息
OrderEvent event = new OrderEvent();
event.setEvent_type("order_created");
event.setOrder_id("123456");
event.setUser_id("user_789");
event.setItems(List.of(Map.of("product_id", "p1001", "quantity", 2)));
event.setTotal_amount(399.00);
event.setCreated_at("2025-07-02T22:00:00Z");
ObjectMapper mapper = new ObjectMapper();
String messageJson = mapper.writeValueAsString(event);
// 4. 发布消息
channel.basicPublish(
EXCHANGE_NAME,
ROUTING_KEY,
true, // 持久化消息
new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化
.build(),
messageJson.getBytes()
);
System.out.println("Sent order event: " + messageJson);
}
}
}
4、消息订阅(营销服务)
不同服务只需要修改 QUEUE_NAME 和 BINDING_KEY 就可以了。
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MarketingConsumer {
private static final String QUEUE_NAME = "marketing_queue";
private static final String BINDING_KEY = "order.created.marketing";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 1. 声明队列并绑定到 Exchange
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, "order_events", BINDING_KEY);
// 2. 消费者回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String messageJson = new String(delivery.getBody(), "UTF-8");
ObjectMapper mapper = new ObjectMapper();
OrderEvent event = mapper.readValue(messageJson, OrderEvent.class);
// 3. 业务逻辑:发送营销短信
System.out.println("Marketing: Sending SMS for order " + event.getOrder_id());
// 4. 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 5. 设置手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}