【RabbitMQ】RabbitMQ中死信交换机是什么?延迟队列呢?有哪些应用场景?

发布于:2025-03-21 ⋅ 阅读:(29) ⋅ 点赞:(0)

1.死信交换机(Dead Letter Exchange DLX)

1.1什么是死信交换机 ?

  • 死信: 在RabbitMQ中,无法被消费者正常处理的消息称为死信(Dead Letter)。
  • 死信交换机: 用于接收死信的交换机。当消息成为死信时,RabbitMQ会将其重新路由到死信交换机,再由死信交换机根据绑定规则路由到死信队列。

1.2消息成为死信的条件

  • 1.消息被拒绝:消费者调用basic.reject或basic.nack 并设置了requeue=false。
  • 2.消息过期:消息在队列中的存活时间(TTL)到期
  • 3.队列达到最大长度:队列已满,无法再接收新消息。

1.3死信交换机的配置

  • 在声明队列时,通过参数指定死信交换机和路由键
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange"); // 死信交换机
args.put("x-dead-letter-routing-key", "dlx.routingKey"); // 死信路由键
channel.queueDeclare("normal.queue", true, false, false, args);

1.4死信交换机的应用场景

  • 异常消息处理:将处理失败的消息转移到死信交换机,便于后续分析或人工处理。
  • 延迟队列:通过死信交换机+TTL实现延迟队列功能。

2.延迟队列

2.1什么是延迟队列?

  • 延迟队列:消息在发送到队列后,不会立即被消费,而是延迟一段时间后被消费者处理。
  • 实现方式:RabbitMQ本身不支持直接的延迟队列功能,但可以通过死信交换机+TTL实现。

2.2延迟队列的实现

1.设置消息TTL:

  • 发送消息时设置消息的TTL(Time-To-Live)。
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .expiration("10000") // TTL 为 10 秒
    .build();
channel.basicPublish("normal.exchange", "normal.routingKey", properties, body);

2.配置死信交换机:

  • 将消息发送到一个普通队列,并设置死信交换机和路由键。
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.routingKey");
channel.queueDeclare("normal.queue", true, false, false, args);

3.创建死信队列:

  • 创建一个死信队列,绑定到死信交换机。
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routingKey");

4.消费者处理延迟消息:

  • 消费者从死信队列中获取延迟消息并进行处理。
channel.basicConsume("dlx.queue", true, (consumerTag, delivery) -> {
    // 处理延迟消息
}, consumerTag -> {});

2.3延迟队列的应用场景

  • 超时订单取消:用户下单后,如果30min内未支付,则自动取消订单。
  • 定时任务:在指定时间后执行任务,如发送提醒邮件。
  • 重试机制:消息处理失败后,延迟一段时间再重试。

3.示例:

3.1实现订单超时取消

1.发送订单消息:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .expiration("1800000") // TTL 为 30 分钟
    .build();
channel.basicPublish("order.exchange", "order.routingKey", properties, body);

2.配置死信交换机:

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.routingKey");
channel.queueDeclare("order.queue", true, false, false, args);

3.创建死信队列:

channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routingKey");

4.消费者处理超时订单:

channel.basicConsume("dlx.queue", true, (consumerTag, delivery) -> {
    // 处理超时订单
    String orderId = new String(delivery.getBody(), StandardCharsets.UTF_8);
    cancelOrder(orderId); // 取消订单
}, consumerTag -> {});

4.总结

  • 死信交换机:用于处理无法被正常消费的消息,常用于异常消息的处理和延迟队列
  • 延迟队列:通过死信交换机+TTL实现,适用于订单超时取消、定时任务、重试机制等场景。
  • 应用场景
    • 死信交换机异常消息处理、延迟队列
    • 延迟队列订单超时取消、定时任务、重试机制