RabbitMQ 学习整理2 - 消峰限流

发布于:2025-03-26 ⋅ 阅读:(12) ⋅ 点赞:(0)

一点废话,关于消峰限流网上的各种资料不少,按图索骥后发现质量良莠不齐,时常出现运行异常,消峰失败等问题,反复踩坑。本人结合网上案例,在上文环境基础上,记录一种可以完成消峰限流的方式。

基本描述

  1. 本文所述,消峰限流,用于描述【消费者】在从RabbitMQ中获取消息时的限流操作
  2. 简单来说就是,生产者瞬时推送了M条消息到RabbitMQ,
  3. 消费者只是一次最多只抓取N条(N小于M)进行处理
  4. 重复上述步骤,知道M条消息都被处理完成,从而减少消费者接收到消息的瞬时压力,防止消费者运行崩溃

问题说明

  1. 消峰的核心,就是指定消费者一次最多可抓取的消息条数
  2. RabbmitMQ中通过对消费者指定preFetchCount,修改消费者一次最多可抓取的消息条数,默认情况下,Prefetch_count为250
    在这里插入图片描述

实现方式

消费者修改

YML配置文件

server:
  port: 8882
spring:
  application:
    name: RabbitMQ-Direct-Qos-Client
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
#   以下配置在注入了自定义解析工厂时,都不生效!!!!
#    listener: 
#      type: direct
#      direct:
#        #设置监听为手动答应模式
#        acknowledge-mode: manual
#        prefetch: 5 # 在这里配置预取值
#      simple:
#        acknowledge-mode: manual
#        prefetch: 5

消费工厂

@Configuration
public class RabbitMQCustomerConfiguration {

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //开启手工确认模式
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //配置消峰量
        factory.setPrefetchCount(10);
        return factory;
    }
}

消费逻辑

@Component
@Slf4j
public class DirectReceiver {

    @RabbitListener(queues = {"direct-qos-queue"})
    public void process(Message message, Channel channel) throws IOException {
        log.info("Received message: {}", new String(message.getBody()));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error(e.getMessage());
        }
        //注意一定要执行手工确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

    }
}

生产者实现

@Service
@Slf4j
public class DirectProviderService {

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    //这里模拟高频发送100个请求
    public String sendMsgBatch() {
        String result = "OK";
        for (int i = 0; i < 100 ; i++) {
            try {
                DirectDto directDto = new DirectDto();
                directDto.setMsg(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
                directDto.setMsgId(UUID.randomUUID().toString());
                directDto.setMsgType(""+i);
                rabbitTemplate.convertAndSend(DirectConfiguration.DIRECT_EXCHANGE, DirectConfiguration.DIRECT_ROUTING_KEY, directDto);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                result = "FAIL";
            }
        }
       return result;
    }
}

其他配置与controller接口代码省略

运行结果

未推送请求时管理界面

在这里插入图片描述

执行请求时管理界面

在这里插入图片描述
在这里插入图片描述

  1. 显然可以看到,每次只有10个请求处于Unacked状态(等待确认)
  2. 其他未被处理的消息,仍然属于Ready状态(等待从队列出队,被消费者解析)
  3. 特别注意:此时消费者永远只有一个
  4. 所以,因此可以认为消峰成功

消费者控制台输出

2025-03-25 15:51:47.111  INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : Received message: {"msgId":"82d66ee9-9585-4c78-9dea-624688a8a11d","msg":"2025-03-25 15:51:47","msgType":"0"}
2025-03-25 15:51:48.118  INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : Received message: {"msgId":"db7f1535-a540-44d6-9ceb-c3845b545682","msg":"2025-03-25 15:51:47","msgType":"1"}
-- ...中间值省略
2025-03-25 15:53:26.283  INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : Received message: {"msgId":"c3ac6b0c-2bee-41f3-a4eb-605969fc7655","msg":"2025-03-25 15:51:47","msgType":"98"}
2025-03-25 15:53:27.297  INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : Received message: {"msgId":"425f7b7e-5cf7-427c-b147-bf604247d61b","msg":"2025-03-25 15:51:47","msgType":"99"}

存疑示例

  1. 对于配置了自定义解析器的消费者,经测试yml配置不生效,实际使用过程中不建议使用此方法
  2. 网络上还有一种采用channel.basicQos+DefaultConsumer方式实现,但这种方式在与Spring Cloud集成时,感觉有点问题,这里只做一个记录
    1. 存疑方式原文 Springboot与RabbitMQ上手学习之Qos限流(四) - 简书
    2. 非Spring Cloud集成方式原文 RabbitMQ高级应用(三)消费端限流策略(basicQos)-CSDN博客

消费工厂

@Configuration
public class RabbitMQCustomerConfiguration {

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //开启手工确认模式
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //配置消峰量,不配置此项
        //factory.setPrefetchCount(10);
        return factory;
    }
}

消费逻辑

    @RabbitListener(queues = "direct-qos-queue")
    public void receiveA(Message message, Channel channel) throws IOException {
        //容量为2
        channel.basicQos(0,2,false);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                   log.error(e.getMessage());
                }
                log.info("[x] Received '{}", new String(body));
                channel.basicAck(envelope.getDeliveryTag(), true);
            }
        };
        //设置 Channel 消费者绑定队列
        channel.basicConsume("direct-qos-queue",false, consumer);
    }

运行结果

未推送请求时管理界面

在这里插入图片描述

推送时管理界面

在这里插入图片描述
在这里插入图片描述

  1. 显然可以看到,每次只有100个请求处于Unacked状态(等待确认),而设置的2消费者没有效果
  2. 执行过程中,没有消息处于Ready状态,即是说消息已经都出队了,只是消费者没有确认
  3. 特别注意:此时消费者永远只有一个
  4. 所以,因此可以认为消峰没有成功

消费者控制台输出

  1. 一种情况是正常输出
  2. 有时也会出现消费不响应的问题

网站公告

今日签到

点亮在社区的每一天
去签到