Rabbitmq集成springboot,手动确认消息basicAck、basicNack、basicReject的使用

发布于:2025-06-22 ⋅ 阅读:(12) ⋅ 点赞:(0)

一、手动确认消息模式

模式 成功消费 消费失败 连接中断 性能
NONE 自动删除 消息丢失 消息丢失 最高
AUTO 自动删除 重试/死信 消息重回队列 中等
MANUAL 手动删除 自定义处理 消息重回队列 最低

手动确认: 消息到达消费者,不会自动确认,会等待消费者调用Basic.Ack命令,才会从内存/磁盘 移除这条消息。
自动确认: 消息只要到达消费者就会自动确认,不会考虑消费者是否正确消费了这些消息,直接从 内存/磁盘 中删除消息;

ack:成功处理消息,RabbitMO从队列中删除该消息
nack:消息处理失败,RabbitMO需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息

手动确认消息basicAck、basicNack、basicReject的使用

1、basicAck

// 确认消息处理成功,basicAck 是 RabbitMQ 的手动确认机制核心方法
// 第一个参数 tag:消息的交付标签,唯一标识这条消息  第二个参数 false:表示不批量确认,仅确认当前这条消息
channel.basicAck(deliveryTag,false);

2、basicNack

//拒绝确认消息,即告诉 RabbitMQ 我们无法处理这条消息。这会导致该消息被重新放回队列中等待再次投递。
//1 第一个参数 表示消息的交付标签,是一个单调递增的标识符,用于唯一标识队列中的一条消息。通过这个标签,RabbitMQ 能够知道你正在处理哪一条消息。
//2 第二个参数 表示是否批量确认 true 表示否定确认当前 tag 以及之前所有未确认的消息;如果为 false,则只否定确认当前 tag 对应的消息。 此处为 false,代表只处理当前这一条消息
//3 第三个参数 如果设置为 true,消息会被重新放回队列,等待再次投递给其他消费者或同一个消费者。如果设置为 false,消息不会被重新入队,
//而是根据队列的配置可能会被丢弃或者路由到死信队列(如果配置了的话)。此处为 true,表示在发生异常时将消息重新入队以便重试。
channel.basicNack(tag, false, true);

3、basicReject


// 拒绝消息,
// 1、第一个参数是消息的交付标签,
// 2、第二个参数表示是否将消息重新入队;false 表示拒绝后不重新入队,消息会被丢弃(如果没有配置死信队列,则可能被直接删除)
channel.basicReject(tag, false);

二、准备基本环境

1、pom.xml引入的java包

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      <version>${springboot-version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      <version>${springboot-version}</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.24</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <version>${springboot-version}</version>
      <scope>test</scope>
    </dependency>
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>2.0.57</version>
      </dependency>
  </dependencies>

2、yaml配置文件

# 8004是zookeeper服务器的支付服务提供者端口号
server:
  port: 8004
spring:
  application:
    name: cloud-mq
  rabbitmq:
    addresses: 192.168.96.133
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #消费者配置
    listener:
      #todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效
      simple:
        #开启ack 手动确认消息是否被消费成功
        acknowledge-mode: manual
        retry:
          enabled: true
          # 消费失败后,继续消费,然后最多消费5次就不再消费。
          max-attempts: 5
          # 消费失败后 ,重试初始间隔时间 2秒
          initial-interval: 2000
          # 重试最大间隔时间5秒
          max-interval: 5000
          # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
          multiplier: 2

      direct:
        #开启ack 手动确认消息是否被消费成功
        acknowledge-mode: manual
        #todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效
        retry:
          enabled: true
          # 消费失败后,继续消费,然后最多消费3次就不再消费。
          max-attempts: 3
          # 消费失败后 ,重试初始间隔时间 3秒
          initial-interval: 3000
          # 重试最大间隔时间
          max-interval: 7000
          # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
          multiplier: 2
    # 生产者配置
    template:
      retry:
        # 开启消息发送失败重试机制
        enabled: true
    # 生产者 true-开启消息抵达队列的确认
    publisher-returns: false
    #simple 配置用于设置 RabbitMQ 消息生产者的消息确认类型为“简单确认”。这意味着当消息被发送到 RabbitMQ 之后,只有在消息成功投递到队列中后,RabbitMQ 才会向生产者发送一个确认(ack)通知。如果消息未能成功投递,则不会收到确认。
    #该配置通常与 publisher-returns: true 一起使用以启用消息返回机制,但在此配置中 publisher-returns 被设置为 false,表示不启用消息返回功能
    publisher-confirm-type: simple

3、主启动类


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author 10564
 */
@SpringBootApplication
public class ApplicationRabbitmq {
    public static void main(String[] args) {
        SpringApplication.run(ApplicationRabbitmq.class, args);
    }
}

三、手动确认消息(以普通消息为例)

1、定义消息队列Queue名称

package org.xwb.springcloud.constant;

/**
 * @author 10564
 */
public class MqConstant {
    /**
     * 手动确认消息
     */
    public static final String ACK_MQ_NAME = "ackQueue";
}

2、配置类Configuration


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;

/**
 * 创建RabbitMQ的配置类
 * @author 10564
 */
@Configuration
public class RabbitmqAckConfig {

    /**
     * 简单消息队列
     */
    @Bean
    public Queue ackQueue() {

        //名字(name):队列的名字,用来区分不同的队列。
        //是否持久化(durable):如果设置为 true,表示即使服务器重启了,这个队列依然存在。
        //是否独占(exclusive):如果设置为 true,表示只有创建它的连接才能使用这个队列。
        //是否自动删除(autoDelete):如果设置为 true,表示当不再有消费者使用这个队列时,服务器会自动删除它。
        return new Queue(MqConstant.ACK_MQ_NAME,true,false,false);
    }
}

3、生产者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;

import javax.annotation.Resource;

/**
 * @author 10564
 */
@Component
public class AckProducer {

    private static final Logger log = LoggerFactory.getLogger(AckProducer.class);

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void senderAckMessage(String message) {
        log.info("\nack生产者发送消息:{}\n", message);
        rabbitTemplate.convertAndSend(MqConstant.ACK_MQ_NAME, message);
    }
}

4、消费者Consumer

package org.xwb.springcloud.messagetype.ack;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;

import java.util.Date;

/**
 * @author 10564
 */
@Component
public class AckConsumer {
    private static final Logger log = LoggerFactory.getLogger(AckConsumer.class);

    @RabbitListener(queues = MqConstant.ACK_MQ_NAME)
    public void receiveAckQueueMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws  Exception {
        try {
            log.info("\nack消费者接收消息:{},tag:{} \n", message, tag);
            if("basicAck".equals( message)){
                //todo  确认消息处理成功,
                // 第一个参数 tag:消息的交付标签,唯一标识这条消息
                // 第二个参数 false:表示不批量确认,仅确认当前这条消息
                log.info("\n手动确认 处理成功 basicAck :{} \n", new Date());
                channel.basicAck(tag, false);
            }else if("basicNack".equals( message)){
                //todo 拒绝确认消息,即告诉 RabbitMQ 我们无法处理这条消息。这会导致该消息被重新放回队列中等待再次投递。
                //todo  1 第一个参数 表示消息的交付标签,是一个单调递增的标识符,用于唯一标识队列中的一条消息。通过这个标签,RabbitMQ 能够知道你正在处理哪一条消息。
                //todo  2 第二个参数 表示是否批量确认 true 表示否定确认当前 tag 以及之前所有未确认的消息;如果为 false,则只否定确认当前 tag 对应的消息。 此处为 false,代表只处理当前这一条消息
                //todo  3 第三个参数 如果设置为 true,消息会被重新放回队列,等待再次投递给其他消费者或同一个消费者。如果设置为 false,消息不会被重新入队,
                //todo 而是根据队列的配置可能会被丢弃或者路由到死信队列(如果配置了的话)。此处为 true,表示在发生异常时将消息重新入队以便重试。
                log.info("\n手动拒绝确认消息 basicNack :{} \n", new Date());
                channel.basicNack(tag, false, false);
            }else if("basicReject".equals( message)){
                //todo  拒绝消息,
                //todo  1、第一个参数是消息的交付标签,
                //todo  2、第二个参数表示是否将消息重新入队;false 表示拒绝后不重新入队,消息会被丢弃(如果没有配置死信队列,则可能被直接删除)
                log.info("\n手动确认 拒绝消息 basicReject :{} \n", new Date());
                channel.basicReject(tag, false);
            }else{
                throw new RuntimeException("模拟消费失败");
            }
        } catch (Exception e) {
            log.error("\n消费消息异常,抛出异常{} message:{}\n",tag, e.getMessage());
            //todo 抛出异常,触发 spring retry
            throw e;
        }
    }
}

5、测试Test


import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.ack.AckProducer;

import javax.annotation.Resource;

/**
 * @author 10564
 */
@RestController
@RequestMapping("/mq")
public class MqMessageController {
    @Resource
    private AckProducer ackProducer;
    @GetMapping("/ack")
    public void ack(String message) {
        ackProducer.senderAckMessage(message);
    }

6、测试结果


### ack
GET http://localhost:8004/mq/ack?message=basicAck

2025-06-21 23:33:21.758  INFO 19824 --- [nio-8004-exec-1] o.x.s.messagetype.ack.AckProducer        : 
ack生产者发送消息:basicAck
2025-06-21 23:33:21.771  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
ack消费者接收消息:basicAck,tag:1 
2025-06-21 23:33:21.772  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
手动确认 处理成功 basicAck :Sat Jun 21 23:33:21 CST 2025 


### ack
GET http://localhost:8004/mq/ack?message=basicNack
2025-06-21 23:33:52.687  INFO 19824 --- [nio-8004-exec-2] o.x.s.messagetype.ack.AckProducer        : 
ack生产者发送消息:basicNack

2025-06-21 23:33:52.690  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
ack消费者接收消息:basicNack,tag:2 

2025-06-21 23:33:52.690  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
手动拒绝确认消息 basicNack :Sat Jun 21 23:33:52 CST 2025 


### ack
GET http://localhost:8004/mq/ack?message=basicReject
2025-06-21 23:34:14.653  INFO 19824 --- [nio-8004-exec-3] o.x.s.messagetype.ack.AckProducer        : 
ack生产者发送消息:basicReject

2025-06-21 23:34:14.656  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
ack消费者接收消息:basicReject,tag:3 

2025-06-21 23:34:14.656  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
手动确认 拒绝消息 basicReject :Sat Jun 21 23:34:14 CST 2025 

网站公告

今日签到

点亮在社区的每一天
去签到