RabbitMQ的高级特性介绍(一)

发布于:2025-03-22 ⋅ 阅读:(15) ⋅ 点赞:(0)

消息确认机制

⽣产者发送消息之后, 到达消费端之后, 可能会有以下情况:

a. 消息处理成功
b. 消息处理异常

RabbitMQ向消费者发送消息之后, 就会把这条消息删掉, 那么第二种情况, 就会造成消息丢失
那么如何确保消费端已经成功接收了, 并正确处理了呢?

为了保证消息从队列可靠地到达消费者, RabbitMQ提供了消息确认机制

消息确认机制分为两种:

自动确认:RabbitMQ 会自动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除, 而不管消费者是否真正地消费到了这些消息。⾃动确认模式适合对于消息可靠性要求不⾼的场景。

手动确认:RabbitMQ会等待消费者显式地调⽤Basic.Ack命令, 回复确认信号后才从内存(或者磁盘) 中移去消息. 这种模式适合对消息可靠性要求⽐较⾼的场景。

如果RabbitMQ⼀直没有收到消费者的确认信号, 并且消费此消息的消费者已经断开连接, 则RabbitMQ会安排该消息重新进⼊队列,等待投递给下⼀个消费者,当然也有可能还是原来的那个消费者。

手动确认的方法

消费者在收到消息之后, 可以选择确认, 也可以选择直接拒绝或者跳过, RabbitMQ也提供了不同的确认应答的⽅式,消费者客⼾端可以调⽤与其对应的channel的相关⽅法,共有三种。

肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)

RabbitMQ 已知道该消息并且成功的处理消息。可以将其丢弃了。

参数说明:

1) deliveryTag: 消息的唯⼀标识,它是⼀个单调递增的64 位的⻓整型值。

2)multiple: 是否批量确认。 在某些情况下, 为了减少⽹络流量, 可以对⼀系列连续的 deliveryTag 进行批量确认。

否定确认: Channel.basicReject(long deliveryTag, boolean requeue)

消费者客⼾端可以调用channel.basicReject方法来告诉RabbitMQ拒绝这个消息。

参数说明:

1)deliveryTag: 参考channel.basicAck

2)requeue: 表⽰拒绝后, 这条消息如何处理. 如果requeue 参数设置为true, 则RabbitMQ会重新将这条消息存⼊队列,以便可以发送给下⼀个订阅的消费者. 如果requeue参数设置为false, 则RabbitMQ会把消息从队列中移除, 而不会把它发送给新的消费者。

否定确认: Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)

Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使⽤Basic.Nack这个命令. 消费者客⼾端可以调⽤ channel.basicNack⽅法来实现。

代码实现

我们基于SpringBoot来演示消息的确认机制。Spring-AMQP 对消息确认机制提供了三种策略。

1. AcknowledgeMode.NONE
这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会⾃动确认
消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失.
2. AcknowledgeMode.AUTO(默认)

这种模式下, 消费者在消息处理成功时会⾃动确认消息, 但如果处理过程中抛出了异常, 则不会确
认消息。
3. AcknowledgeMode.MANUAL
手动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消
息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可⽤时重新投递该消息, 这
种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, ⽽是可以被
重新处理。

在application.yml文件中,设置消息确认的机制:

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    address: amqp://study:study@47.108.157.13:5672/extension
    listener:
      simple:
#        acknowledge-mode: none  
#        acknowledge-mode: auto  #默认
        acknowledge-mode: manual  

生产端逻辑:

@RestController
@RequestMapping("/producer")
public class ProductController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/ack")
    public String ack(){
        rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack",
        "consumer ack test...");
    return "发送成功!";
    }
}

消费端逻辑:

@Component
public class AckQueueListener {
//指定监听队列的名称
  @RabbitListener(queues = Constant.ACK_QUEUE)
  public void ListenerQueue(Message message, Channel channel) throws  Exception {
    System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
    String(message.getBody(),"UTF-8"),
    message.getMessageProperties().getDeliveryTag());
//模拟处理失败
    int num = 3/0;
    System.out.println("处理完成");
}

持久化

如何保证生产者发送消息到Broker,Broker在保存消息时,Broker发送到消费者之间,消息不会丢失。我们需要实现RabbitMQ的持久化

RabbitMQ的持久性分为三个部分:

交换机持久化

交换器的持久化是通过在声明交换机时是将durable参数置为true实现的。相当于将交换机的属性在服务器内部保存,当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机。⼀个⻓期使⽤的交换器来说,建议将其置为持久化的。

在交换机的配置类中设置:

@Bean("directExchange")
        public DirectExchange  directExchange(){   //默认是true
           return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();
        }

队列持久化

队列的持久化是通过在声明队列时将 durable 参数置为 true实现的。
如果队列不设置持久化, 那么在RabbitMQ服务重启之后,该队列就会被删掉, 此时数据也会丢失。

队列的持久化能保证该队列本⾝的元数据不会因异常情况⽽丢失, 但是并不能保证内部所存储的消息不会丢失。 要确保消息不会丢失, 需要将消息设置为持久化。

 @Bean("ackQueue")
 public Queue ackQueue()
      return QueueBuilder.durable(Constants.ACK_QUEUE).build();
     }

消息持久化

消息实现持久化, 需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,
也就是 MessageDeliveryMode.PERSISTENT

设置了队列和消息的持久化, 当 RabbitMQ 服务重启之后, 消息依旧存在.

如果只设置队列持久化, 重启之后消息会丢失.

如果只设置消息的持久化, 重启之后队列消失, 继⽽消息也丢失.

所以单单设置消息持久化⽽不设置队列的持久化显得毫⽆意义.

在RabbitMQ配置类中设置:

//持久化
    @Bean("presQueue")
    public Queue presQueue(){
             return QueueBuilder.nonDurable(Constants.PRES_QUEUE).build();
    }

    @Bean("presExchange")
    public DirectExchange presExchange(){
             return ExchangeBuilder.directExchange(Constants.PRES_EXCHANGE).durable(false).build();
    }

    @Bean("presBinding")
    public Binding presBinding(@Qualifier("presQueue") Queue queue,@Qualifier("presExchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("pres").noargs();
    }

以上,关于RabbitMQ,希望对你有所帮助。