RabbitMQ ④-持久化 || 死信队列 || 延迟队列 || 事务

发布于:2025-05-17 ⋅ 阅读:(22) ⋅ 点赞:(0)

在这里插入图片描述

消息确认机制

简单介绍

RabbitMQ Broker 发送消息给消费者后,消费者处理该消息时可能会发生异常,导致消费失败。

如果 Broker 在发送消息后就直接删了,就会导致消息的丢失。

为了保证消息可靠到达消费者并且成功处理了该消息,RabbitMQ 提供了消息确认机制。

消费者在订阅队列时,可以指定 autoAck 参数,该参数指定是否自动确认消息。

  • autoAck=true:消费者接收到消息后,自动确认消息,RabbitMQ Broker 立即删除该消息。
  • autoAck=false:消费者接收到消息后,不自动确认消息,需要消费者调用 channel.basicAck() 方法确认消息。如果消费者处理消息时发生异常,则可以调用 channel.basicNack() 方法,表示不确认该消息的接收。

Spring AMQP 提供了三种模式的消息确认

  • AcknowledgeMode.NONE:消息一经发送,就不管它了,不管消费者是否处理成功,都直接确认消息。
  • AcknowledgeMode.AUTO(默认):自动确认,消息接收后,消费者处理成功时自动确认该消息,如果处理时发送异常,则不会确认消息。
  • AcknowledgeMode.MANUAL:手动确认,消息接收后,消费者处理成功时,需要调用 channel.basicAck() 方法确认消息,如果处理时发送异常,则需要调用 channel.basicNack() 方法,表示不确认该消息的接收。

代码示例

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://admin:admin@47.94.9.33:5672/extension
    listener:
      simple:
        acknowledge-mode: manual
package com.ljh.extensions.rabbit.config;

import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean("ackQueue")
    public Queue ackQueue() {
        return QueueBuilder.durable(Constants.ACK_QUEUE)
                .build();
    }
    @Bean("ackExchange")
    public DirectExchange ackExchange() {
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE)
                .durable(true)
                .build();
    }
//    @Bean("binding")
//    public Binding binding(Exchange exchange, Queue queue) {
//        return BindingBuilder.bind(queue)
//                .to(exchange)
//                .with("ack")
//                .noargs();
//    }
    @Bean("binding1")
    public Binding binding1(@Qualifier("ackExchange") DirectExchange exchange, @Qualifier("ackQueue") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("ack");
    }
}
package com.ljh.extensions.rabbit.controller;

import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Resource(name = "rabbitTemplate")
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/ack")
    public String ack() {
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消费者消息确认喵~");
        return "发送成功";
    }
}
package com.ljh.extensions.rabbit.listener;

import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void process(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("接收到消息:%s,deliveryTag:%d\n",
                new String(message.getBody(), "UTF-8"),
                deliveryTag);
        try {
            System.out.println("模拟处理业务逻辑");
           int a = 3 / 0;
            System.out.println("模拟处理业务完成");

            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, true);
        }
    }
}

持久性机制

简单介绍

前面讲了消费端处理消息时,消息如何不丢失,但是如何保证 RabbitMQ 服务停掉以后,生产者发送的消息不丢失呢。默认情况下, RabbitMQ 退出或者由于某种原因崩溃时,会忽视队列和消息。

为了保证消息持久化,RabbitMQ 提供了持久化机制,分别是:交换机持久化、队列持久化和消息持久化。

  • 交换机持久化:使用 ExchangeBuilder.durable(true) 方法创建的交换机,RabbitMQ 会将交换机信息持久化到磁盘,重启 RabbitMQ 后可以自动恢复。
  • 队列持久化:使用 QueueBuilder.durable(true) 方法创建的队列,RabbitMQ 会将队列信息持久化到磁盘,重启 RabbitMQ 后可以自动恢复。
  • 消息持久化:消息持久化可以保证消息不丢失,即使 RabbitMQ 重启或者崩溃,消息也不会丢失。

将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能,这是因为写入磁盘的速度相比于写入内存的速度还是很慢的,对于可靠性不是那么高的消息,可以不采用持久化处理以提高整体的吞吐量。

在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

尽管设置了持久化,也不能保证就一定可以持久化。这是因为在将这些持久化信息写入磁盘时也是需要时间的,如果 RabbitMQ 在这段时间内崩溃,那么这些信息也会丢失。

代码示例

package com.ljh.extensions.rabbit.config;

import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean("persistQueue")
    public Queue persistQueue() {
        return QueueBuilder.nonDurable(Constants.PERSIST_QUEUE)
                .build();
    }
    @Bean("persistExchange")
    public DirectExchange persistExchange() {
        return ExchangeBuilder.directExchange(Constants.PERSIST_EXCHANGE)
                .durable(false)
                .build();
    }
    @Bean("binding2")
    public Binding binding2(@Qualifier("persistExchange") Exchange exchange, @Qualifier("persistQueue") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("persist")
                .noargs();
    }
}
package com.ljh.extensions.rabbit.controller;

import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Resource(name = "rabbitTemplate")
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/persist")
    public String persist() {
        Message message = new Message("消费者消息确认喵~".getBytes(StandardCharsets.UTF_8), new MessageProperties());
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
        rabbitTemplate.convertAndSend(Constants.PERSIST_EXCHANGE, "persist", message);
        return "发送成功";
    }
}

发送方确认机制

简单介绍

在发送方将消息发送至 RabbitMQ Broker 时,也有可能出现消息丢失的情况。

为了保证消息可靠到达 Broker,RabbitMQ 提供了发送方确认机制。

发送方确认机制是指,在消息发送到 Broker 后,发送方会等待 Broker 回应,如果发送方收到消息,则发送方认为消息发送成功,如果发送方未收到消息,则发送方认为消息发送失败,可以重新发送。

RabbitMQ 提供了两种方式保证发送方发送的消息的可靠传输

  • confirm 确认模式:发送方在发送消息后,对发送方设置一个 ConfirmCallback 的监听,无论消息是否抵达 Exchange,这个监听都会被执行,如果消息抵达了 Exchange,则 ACKtrue,如果消息没有抵达 Exchange,则 ACKfalse
  • returns 退回模式:尽管确认消息发送至 Exchange 后,也依然不能完全保证消息的可靠传输。在 ExchangeQueue 会有一个 Routing Key(Binding Key) 的绑定关系,如果消息没有匹配到任何一个 Queue,则通过 returns 模式则会退回到发送方。

代码示例

confirm 确认模式

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://admin:admin@47.94.9.33:5672/extension
    listener:
      simple:
        acknowledge-mode: auto
    # 消息发送确认机制
    publisher-confirm-type: correlated
package com.ljh.extensions.rabbit.config;

import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(Constants.CONFIRM_QUEUE)
                .build();
    }
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE)
                .durable(true)
                .build();
    }
    @Bean("binding3")
    public Binding binding3(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(directExchange)
                .with("confirm");
    }
}
package com.ljh.extensions.rabbit.config;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: Themberfue
 * @date: 2025/4/30 21:08
 * @description:
 */
@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean
    public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // ? 设置确认消息机制
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行了confirm方法");
                if (ack) {
                    System.out.printf("接收到消息,消息ID:%s\n",
                            correlationData == null ? null : correlationData.getId());
                } else {
                    System.out.printf("未接收到消息,消息ID:%s;原因:%s\n",
                            correlationData == null ? null : correlationData.getId(), cause);
                }
            }
        });
        return rabbitTemplate;
    }
}
package com.ljh.extensions.rabbit.controller;

import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Resource(name = "confirmRabbitTemplate")
    RabbitTemplate confirmRabbitTemplate;

    @RequestMapping("/confirm")
    public String confirm() {
        // ! 直接使用 setConfirmCallback 会影响其他接口的调用
        // ! 且只能设置一个确认回调,多次发起请求会报错
//        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//            @Override
//            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//                System.out.println("执行了confirm方法");
//                if (ack) {
//                    System.out.printf("接收到消息,消息ID:%s\n",
//                            correlationData == null ? null : correlationData.getId());
//                } else {
//                    System.out.printf("未接收到消息,消息ID:%s\n;原因:%s",
//                            correlationData == null ? null : correlationData.getId(), cause);
//                }
//            }
//        });
        CorrelationData correlationData = new CorrelationData("1");
        confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + "1", "confirm", "confirm test...", correlationData);
        return "消息发送成功";
    }
}

returns 退回模式

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://admin:admin@47.94.9.33:5672/extension
    listener:
      simple:
       acknowledge-mode: auto
    # 消息发送退回机制
    publisher-returns: true
package com.ljh.extensions.rabbit.config;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: Themberfue
 * @date: 2025/4/30 21:08
 * @description:
 */
@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean
    public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // ? 设置消息退回机制
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息被退回:" + returned);
            }
        });
        return rabbitTemplate;
    }
}

总结:如何确保消息的可靠性传输

  • 发送方 => 服务端:通过发送方确认机制confirm 确认模式returns 退回模式,确保消息可靠到达。
  • 服务端:通过持久化机制,保证消息不丢失。
  • 服务端 => 接收方:通过消息确认机制,确保消息被消费者正确消费。

重试机制

简单介绍

消息在处理失败后,重新发送,重新处理,这便是消息重试机制。

RabbitMQ 提供了消息重试机制,可以设置消息最大重试次数,超过最大重试次数还未成功消费,则消息会被丢弃。

代码示例

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://admin:admin@47.94.9.33:5672/extension
    listener:
      simple:
#         消息接收确认机制
        # acknowledge-mode: manual # 手动确认时,重发机制无效
       acknowledge-mode: auto
        retry:
          enabled: true # 开启重试机制
          initial-interval: 5000ms # 重发时间间隔
          max-attempts: 5 # 最大重试次数
package com.ljh.extensions.rabbit.config;

import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean("retryQueue")
    public Queue retryQueue() {
        return QueueBuilder.durable(Constants.RETRY_QUEUE)
                .build();
    }
    @Bean("retryExchange")
    public DirectExchange retryExchange() {
        return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE)
                .durable(true)
                .build();
    }
    @Bean("binding4")
    public Binding binding4(@Qualifier("retryExchange") DirectExchange exchange, @Qualifier("retryQueue") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("retry");
    }
}
package com.ljh.extensions.rabbit.controller;

import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Resource(name = "rabbitTemplate")
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/retry")
    public String retry() {
        rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");
        return "消息发送成功";
    }
}
package com.ljh.extensions.rabbit.listener;

import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Component
public class RetryListener {
    @RabbitListener(queues = Constants.RETRY_QUEUE)
    public void process(Message message) throws UnsupportedEncodingException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",
                Constants.RETRY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);
        int num = 3 / 0;
        System.out.println("业务处理完成");
    }
}

TTL 机制

简单介绍

TTL(Time To Live)机制,可以设置消息的存活时间,超过存活时间还未消费,则消息会被丢弃。

RabbitMQ 提供了 TTL 机制,可以设置队列和消息的 TTL 值,超过 TTL 值还未消费,则消息会被丢弃。

两者区别:

  • 设置队列 TTL 值,一旦消息过期,就会从队列中删除。设置队列过期时间,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期扫描对头的消息是否过期即可。
  • 设置消息 TTL 值,即使消息过期,也不会马上删除,只有在发送至消费者时才会检测其是否已经过期,如果过期才会删除。设置消息过期时间,每个消息的过期时间都可能不尽相同,所以需要扫描整个队列的消息才可确定是否过期,为了确保性能,所以采取类似于 懒汉模式 的方式。

将队列 TTL 设置为 30s,第一个消息的 TTL 设置为 30s,第二个消息的 TTL 设置为 10s。

理论上说,在 10s 后,第二个消息应该被丢弃。但由于设置了队列 TTL 值的机制,只会扫描队头的消息是否过期,所以在第一个消息过期之前,第二个消息不会被删除。

代码示例

package com.ljh.extensions.rabbit.config;

import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean("ttlQueue")
    public Queue ttlQueue() {
        return QueueBuilder.durable(Constants.TTL_QUEUE)
                .ttl(20_000) // ? 设置队列的 TTL 值
                .build();
    }
    @Bean("ttlExchange")
    public DirectExchange ttlExchange() {
        return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE)
                .durable(true)
                .build();
    }
    @Bean("binding5")
    public Binding binding5(@Qualifier("ttlExchange") DirectExchange exchange, @Qualifier("ttlQueue") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("ttl");
    }
}
package com.ljh.extensions.rabbit.controller;

import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Resource(name = "rabbitTemplate")
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/ttl")
    public String ttl() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // ? 设置消息的 TTL 值
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        };

        rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...",
                messagePostProcessor);

        return "消息发送成功";
    }
}

死信队列

简单介绍

死信(Dead Letter),就是因为某种原因,导致消费者消费失败的消息,称之为死信。

死信队列,当消息在一个队列中变成死信时,它就被重新被发送到另一个交换机,该交换机就是死信交换机(Dead Letter Exchange)。

该死信交换机绑定死信队列,当消息被重新发送到死信交换机时,它就被重新投递到死信队列。

消息变成死信会有如下几种原因:

  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue 参数设置为 false。
  • 消息过期。
  • 队列达到最大长度。

代码示例

package com.ljh.extensions.rabbit.config;

import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DlConfig {
    @Bean("normalQueue")
    public Queue normalQueue() {
        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
                .deadLetterExchange(Constants.DL_EXCHANGE) // ? 配置该队列的死信交换机
                .deadLetterRoutingKey("dl") // ? 死信交换机绑定死信队列的 Routing Key
                .ttl(10_000)
                .maxLength(10L) // ? 设置队列最大长度
                .build();
    }
    @Bean("normalExchange")
    public DirectExchange normalExchange() {
        return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE)
                .durable(true)
                .build();
    }
    @Bean("normalBinding")
    public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange, @Qualifier("normalQueue") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("normal");
    }
    @Bean("dlQueue")
    public Queue dlQueue() {
        return QueueBuilder.durable(Constants.DL_QUEUE)
                .build();
    }
    @Bean("dlExchange")
    public DirectExchange dlExchange() {
        return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE)
                .durable(true)
                .build();
    }
    @Bean("dlBinding")
    public Binding dlBinding(@Qualifier("dlExchange") DirectExchange exchange, @Qualifier("dlQueue") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("dl");
    }
}
package com.ljh.extensions.rabbit.listener;

import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class DlListener {
   @RabbitListener(queues = Constants.NORMAL_QUEUE)
   public void processNormal(Message message, Channel channel) throws Exception {
       long deliveryTag = message.getMessageProperties().getDeliveryTag();
       System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",
               Constants.NORMAL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);
       try {
           int num = 3 / 0;
           channel.basicAck(deliveryTag, false);
       } catch (Exception e) {
           channel.basicNack(deliveryTag, false, false);
       }
   }

    @RabbitListener(queues = Constants.DL_QUEUE)
    public void processDl(Message message) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",
                new Date(), Constants.DL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);
    }
}
package com.ljh.extensions.rabbit.controller;

import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;
import java.util.Date;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Resource(name = "rabbitTemplate")
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/dl")
    public String dl() {
        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "normal test...");
        return "消息发送成功:" + new Date();
    }
}

延迟队列

简单介绍

延迟队列(Delay Queue),即消息被发送以后,并不想让消费者立刻消费该消息,而是等待一段时间后再消费。

延迟队列的使用场景有很多,比如:

  • 智能家居:智能设备产生的事件,如开关、温度变化等,可以先存放在延迟队列中,等待一段时间后再消费。
  • 日常管理:预定会议,需要在会议开始前 15 分钟通知参会人员。
  • 订单处理:订单创建后,需要 30 分钟后才会发货。

RabbitMQ 本身没有提供延迟队列的功能,但是基于消息过期后会变成死信的特性,可以通过设置 TTL 和死信队列来实现延迟队列的功能。

代码示例

@RequestMapping("/delay")
public String delay() {
    //发送带ttl的消息
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..."+ new Date(), messagePostProcessor -> {
        messagePostProcessor.getMessageProperties().setExpiration("10000");
        //10s过期
        return messagePostProcessor;
    });
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..."+ new Date(), messagePostProcessor -> {
        messagePostProcessor.getMessageProperties().setExpiration("20000");
        //20s过期
        return messagePostProcessor;
    });
    return "发送成功!";
}

由于 RabbitMQ 检查消息是否过期的机制,如果 20s 的消息先到队列,那么 10s 的消息只会在 20s 后才会被检查到过期。

延迟队列插件

RabbitMQ 官方提供了延迟队列插件,可以实现延迟队列的功能。

延迟队列插件

安装队列插件

延迟队列插件下载地址

下载插件后,需要将插件放到 RabbitMQ 的插件目录(/usr/lib/rabbitmq/plugins)下,然后重启 RabbitMQ 服务。

代码示例

package com.ljh.extensions.rabbit.config;

import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DelayConfig {
    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable(Constants.DELAY_QUEUE)
                .build();
    }
    @Bean("delayExchange")
    public DirectExchange delayExchange() {
        return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE)
                .durable(true)
                .delayed() // ? 设置队列为延迟队列
                .build();
    }
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayExchange") DirectExchange exchange, @Qualifier("delayQueue") Queue queue) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("delay");
    }
}
package com.ljh.extensions.rabbit.listener;

import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class DelayListener {
    @RabbitListener(queues = Constants.DELAY_QUEUE)
    public void processDelay(Message message) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",
                new Date(), Constants.DELAY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);
    }
}
package com.ljh.extensions.rabbit.controller;

import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;
import java.util.Date;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Resource(name = "rabbitTemplate")
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/delay")
    public String delay() {
        rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {
            message.getMessageProperties()
                    .setDelayLong(20_000L); // ? 设置消息的延迟发送时间
            return message;
        });

        rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {
            message.getMessageProperties()
                    .setDelayLong(10_000L); // ? 设置消息的延迟发送时间
            return message;
        });
        return "消息发送成功:" + new Date();
    }
}

事务机制

简单介绍

RabbitMQ 是基于 AMQP 协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制事务。

Spring AMQP 也提供了对事务相关的操作。RabbitMQ 事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败。

代码示例

配置事务管理器:

@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setChannelTransacted(true);
    return rabbitTemplate;
}

@Bean
public RabbitTransactionManager rabbitTransactionManager (ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}
@Bean("transQueue")
public Queue transQueue() {
    return QueueBuilder.durable(Constants.TRANS_QUEUE)
            .build();
}
@Transactional
@RequestMapping("/trans")
public String trans() {
    String msg = "trans test...";
    System.out.println("发送第一条消息:" + msg + 1);
    transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);

    int a = 3 / 0;

    System.out.println("发送第二条消息:" + msg + 2);
    transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);

    return "消息发送成功:";
}

消息分发

简单介绍

一个队列可以给多个消费者消费,默认情况下,RabbitMQ 是以轮询的方式将消息分发给这些消费者,不管该消费是否已经消费并且确认。

这种情况是不太合理的,如果每个消费者消费的能力都不同,有的消费者消费快,有的慢,这会极大降低整体系统的吞吐量和处理速度。

我们可以使用 channel.basicQos(int prefetchCount) 来限制当前信
道上的消费者所能保持的最大未确认消息的数量。

当该消费者达到最大的 prefetchCount 限制时,RabbitMQ 会停止向该消费者分发消息,直到该消费者的未确认消息数量小于 prefetchCount 时。

代码示例

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 手动确认
        prefetch: 5 # 队列最大接收五条消息
@Bean("qosQueue")
public Queue qosQueue() {
    return QueueBuilder.durable(Constants.QOS_QUEUE)
            .build();
}
@Bean("qosExchange")
public DirectExchange qosExchange() {
    return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE)
            .durable(true)
            .build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosExchange") DirectExchange exchange, @Qualifier("qosQueue") Queue queue) {
    return BindingBuilder.bind(queue)
            .to(exchange)
            .with("qos");
}
@RequestMapping("/qos")
public String qos() {
    for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test...");
    }
    return "消息发送成功:" + new Date();
}
@RabbitListener(queues = Constants.QOS_QUEUE)
public void process(Message message, Channel channel) throws Exception {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    System.out.printf("接收到消息:%s,deliveryTag:%d\n",
            new String(message.getBody(), "UTF-8"),
            deliveryTag);
    try {
        System.out.println("模拟处理业务逻辑");
        System.out.println("模拟处理业务完成");

//            channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        channel.basicNack(deliveryTag, false, true);
    }
}

应用场景

  • 限流:可以根据消费者的处理能力,设置 prefetchCount 限制每个消费者所能接收的消息数量,从而达到限流的目的。
  • 负载均衡:通过将 prefetchCount 设置为 1,通过设 prefetch = 1 的方式,告诉 RabbitMQ 一次只给一个消费者一条消息,也就是说,在处理并确认前一条消息之前,不要向该消费者发送新消息。相反,它会将它分派给下一个不忙的消费者。

网站公告

今日签到

点亮在社区的每一天
去签到