文章目录
问题
使用BatchAcknowledgingMessageListener 批量消费Kafka消息,成功则手动提交offset,失败则重试。消费成功的情况下没有问题,但消费失败情况下,调用nack方法重试时则报异常。
示例
public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {
private MessageHandler messageHandler;
public BatchCustomMessageListener(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
@Override
public void onMessage(List data, Acknowledgment acknowledgment) {
try {
messageHandler.handle(data); // 处理多条消息
acknowledgment.acknowledge(); // 成功处理后提交偏移量
} catch (Exception e) {
// 消息处理失败,30min后重试
// nack作用:将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。
acknowledgment.nack(30 * 60 * 1000); // 这里报了异常
}
}
}
上边的代码乍一看没啥问题,编译,启动也都没报错。但是在执行nack的时候进到了Acknowledgment接口默认nack(sleep) 方法里边,并抛出异常。
nack(sleep) is not supported by this Acknowledgment
异常
原因
Acknowledgment接口有两个nack方法:nack(long sleep) 和nack(int index, long sleep), 以及两个实现类ConsumerAcknowledgment和ConsumerBatchAcknowledgment。ConsumerAcknowledgment仅实现了nack(long sleep),而ConsumerBatchAcknowledgment仅实现了nack(int index, long sleep)。
nack方法
注意:调用nack方法后,将会在指定sleep时间后,重新消费消息。在sleep期间内,不会消费新消息。
Acknowledgment接口
实现类:ConsumerAcknowledgment
实现类:ConsumerBatchAcknowledgment
这样的设计也很好理解。。
当BatchAcknowledgingMessageListener批量消费消息时, 使用的是ConsumerBatchAcknowledgment,重试时需要告诉ConsumerBatchAcknowledgment要从这批量消息中的哪条开始重试消费,即要指定index值。我的例子中调用的是nack(long sleep),没有指定index,所以进到了默认方法里,抛了异常。
而使用AcknowledgingMessageListener消费单条消息时,使用的是ConsumerAcknowledgment,重试时它知道重试当前的消息,因为就这一条,所以只需要指定重试时间就可以了。
也就是说批量消费时,重试要调用nack(int index, long sleep),单条消费时,重试要调用nack(long sleep),二者不搭配,就会抛不支持该方法的异常。
解决方案
1 批量消费指定index
示例
public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {
private MessageHandler messageHandler;
public BatchCustomMessageListener(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
@Override
public void onMessage(List data, Acknowledgment acknowledgment) {
int index = 0;
try {
for (; index < data.size(); index++) {
messageHandler.handle(data.get(index)); // 处理单条消息
}
// 成功处理后提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
// 消息处理失败,30min后重试index及index之后的消息
acknowledgment.nack(index, 30 * 60 * 1000);
}
}
}
2 单条消费
改成单条消费消息,调用nack(long sleep)
示例
public class SingleCustomMessageListener implements AcknowledgingMessageListener {
private MessageHandler messageHandler;
public SingleCustomMessageListener(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
@Override
public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
try {
messageHandler.handle(data); // 处理单条消息
acknowledgment.acknowledge();
} catch (Exception e) {
acknowledgment.nack(30 * 60 * 1000);
}
}