Acknowledgment.nack方法重试消费kafka消息异常

发布于:2025-03-12 ⋅ 阅读:(16) ⋅ 点赞:(0)

问题

使用BatchAcknowledgingMessageListener 批量消费Kafka消息,成功则手动提交offset,失败则重试。消费成功的情况下没有问题,但消费失败情况下,调用nack方法重试时则报异常。

示例


  public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {

    private MessageHandler messageHandler;

    public BatchCustomMessageListener(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    public void onMessage(List data, Acknowledgment acknowledgment) {
        try {
            messageHandler.handle(data); // 处理多条消息
            acknowledgment.acknowledge(); // 成功处理后提交偏移量
        } catch (Exception e) {
            // 消息处理失败,30min后重试
            // nack作用:将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。
            acknowledgment.nack(30 * 60 * 1000); // 这里报了异常
        }
    }
}

上边的代码乍一看没啥问题,编译,启动也都没报错。但是在执行nack的时候进到了Acknowledgment接口默认nack(sleep) 方法里边,并抛出异常。

nack(sleep) is not supported by this Acknowledgment

异常

在这里插入图片描述
![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/8bbf4f37af134374bf1f4e34c2687dbe.png

原因

Acknowledgment接口有两个nack方法:nack(long sleep)nack(int index, long sleep), 以及两个实现类ConsumerAcknowledgmentConsumerBatchAcknowledgment。ConsumerAcknowledgment仅实现了nack(long sleep),而ConsumerBatchAcknowledgment仅实现了nack(int index, long sleep)。

nack方法

注意:调用nack方法后,将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。

Acknowledgment接口

在这里插入图片描述

实现类:ConsumerAcknowledgment

在这里插入图片描述

实现类:ConsumerBatchAcknowledgment

在这里插入图片描述

这样的设计也很好理解。。
当BatchAcknowledgingMessageListener批量消费消息时, 使用的是ConsumerBatchAcknowledgment,重试时需要告诉ConsumerBatchAcknowledgment要从这批量消息中的哪条开始重试消费,即要指定index值。我的例子中调用的是nack(long sleep),没有指定index,所以进到了默认方法里,抛了异常。

而使用AcknowledgingMessageListener消费单条消息时,使用的是ConsumerAcknowledgment,重试时它知道重试当前的消息,因为就这一条,所以只需要指定重试时间就可以了。

也就是说批量消费时,重试要调用nack(int index, long sleep),单条消费时,重试要调用nack(long sleep),二者不搭配,就会抛不支持该方法的异常。

解决方案

1 批量消费指定index

示例

public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {

    private MessageHandler messageHandler;

    public BatchCustomMessageListener(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    public void onMessage(List data, Acknowledgment acknowledgment) {
        int index = 0;
        try {
            for (; index < data.size(); index++) {
                messageHandler.handle(data.get(index)); // 处理单条消息
            }
            // 成功处理后提交偏移量
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 消息处理失败,30min后重试index及index之后的消息
            acknowledgment.nack(index, 30 * 60 * 1000);
        }
    }

}

2 单条消费

改成单条消费消息,调用nack(long sleep)

示例

public class SingleCustomMessageListener implements AcknowledgingMessageListener {

    private MessageHandler messageHandler;

    public SingleCustomMessageListener(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
        try {
            messageHandler.handle(data); // 处理单条消息
            acknowledgment.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(30 * 60 * 1000);
        }
    }