1. 简介
RabbitMQ 的消息发送流程:
- producer 将消息发送给 broker,consumer 从 broker 中获取消息并消费
那么在这里就涉及到了两种消息发送,即 producer 与 broker 之间和 consumer 与 broker 之间。
“消息确认” 讨论的是 consumer 与 broker 之间的消息发送。
2. 为什么会有这个特性
当 broker 给 consumer 发送消息时,可能会出现下面两种情况:
- 消息未成功到达 consumer;
- 消息成功到达 consumer,但是 consumer 没有成功消费这条消息,如:在处理消息时发生异常等情况。
这时,就需要有一种解决方案,保证 broker 与 consumer 之间消息传输的可靠性,于是就有了消息确认这一特性。
3. 使用 RabbitMQ Java 时如何进行消息确认(不是重点)
public class ConsumerDemo1 {
public static void main(String[] args) throws IOException, TimeoutException {
//建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
//声明队列
//如果队列不存在,就创建
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
}
}
这是一段路由模式的代码,在这段代码中,有下面一条语句:
channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
在这个方法中,有三个参数:
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
- queue:consumer 通过哪个队列获取 broker 发送的消息;
- autoAck:是否自动确认;
- callback:consumer 消费消息的逻辑。
其中,autoAck 就是消息确认的体现:
- autoAck 为 true:RabbitMQ 会将发送给 consumer 的消息视为已被成功接收和消费(consumer 可能并没有成功接收到或成功消费,但是 RabbitMQ 不管了),就会被将这条消息删除;
- autoAck 为 false:当 RabbitMQ 发送消息后,并不会马上就将消息删除,而是会等 consumer 调用 Basic.Ack,收到 ack 后,才会将消息删除。
将 autoAck 设置为 false 后,若 broker 长时间没有收到 consumer 发送的 ack 且 consumer 已经断开连接,就会将这条消息重新入队列,继续发送给 consumer 进行消费,此时,队列中的消息就分为了两种:
- 还未被发送的消息;
- 已经发送了的消息,但是没有收到 ack 而重新入队列等待被消费。
4. 在 spring 中使用 RabbitMQ 时如何进行消息确认
4.1 basicAck
在 spring 下的 Channel 类中提供了下面几种方法:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
在这个方法中,有三个参数:
- deliveryTag:是 broker 给 consumer 发送消息的唯一标识,在一个 channel 中 deliveryTag 是唯一的;
- mulitple: 是否批量确认
使用这个方法后,就会告知 broker 这条消息已经成功被消费,可以将其删除。
4.2 basicNack
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
在这个方法中,多了一个参数:
- requeue:是否重新入队列。
使用这个方法,就相当于给 broker 发送 nack,即这条消息没有被正确消费。
若 requeue 为 true,就会将这条消息重新入队列,继续给 consumer 消费;
若 requeue 为 false,broker 就会这条消息删除。
4.3 basicReject
void basicReject(long deliveryTag, boolean requeue) throws IOException;
这个方法与 basicNack 大致相同,此处省略。
4.4 配置
在 spring 中,提供了三种配置用于消息确认:
- none:当消息发送给 consumer,不管 consumer 是否成功消费了消息,broker 都会当作这条消息被成功消费了,然后删除这条消息;
- auto:在 consumer 处理消息时没有抛出异常时,就会确认消息,反之就不会确认,并且将消息重新放入队列中,进行下一次的消费;
- manual:手动确认,我们需要在代码中指定这条消息是消费成功还是消费失败,分别使用 basicAck 和 basicNack。
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none
5. 代码测试
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
String messageInfo = "consumer ack mode test...";
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, Constants.ACK_ROUTINGKEY, messageInfo);
return "消息发送成功";
}
}
这段代码代表的是一个 producer,下面接收到的消息都是通过这段代码发送的。
5.1 none
① 无异常时的消费者代码:
@RabbitListener(queues = Constants.ACK_QUEUE)
public void listener(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties()
.getDeliveryTag();
log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);
}
代码运行结果如下:
我们可以通过访问 RabbitMQ 客户端来观察这条消息是否成功被消费:
可以看到,Messages 这一列中,Ready 和 Unacked 都为 0,表示消息被成功消费。
② 有异常时的消费者代码:
@RabbitListener(queues = Constants.ACK_QUEUE)
public void listener(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties()
.getDeliveryTag();
int num = 1 / 0;
log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);
}
代码运行结果如下:
由于我们使用了除零操作,于是抛出了异常,我们可以通过访问 RabbitMQ 来观察这条消息是否被删除:
和上面一样,在 broker 中这条消息已经被删除,这与 none 配置性质一致。
5.2 auto
① 无异常时的消费者代码:
@Component
@Slf4j
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void listener(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties()
.getDeliveryTag();
log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);
}
}
代码运行结果如下:
在 RabbitMQ 客户端中显示,这条消息已经被成功消费:
② 有异常时的消费者代码:
@Component
@Slf4j
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void listener(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties()
.getDeliveryTag();
int num = 1 / 0;
log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);
}
}
代码运行结果如下:
在运行结果中,一直会有报错产生,并且都是两个两个为一组,并且在报错信息中可以看到,producer 发送的消息一直在被消费,这是因为存在异常,就会导致这条消息一直在队列中,通过观察 RabbitMQ 客户端可以看出,这条消息依然保存在队列中:
5.3 manual
① 无异常的消费者代码如下:
@Component
@Slf4j
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void listener(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties()
.getDeliveryTag();
try {
log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
}
}
在这段代码中,我们使用了 basicAck 和 basicNack 来进行消息确认,当消息处理成功后,就会执行 basicAck,告诉 broker 这条消息已经被成功消费,可以将其删除;当消息执行发生异常后,就会执行 basicNack,并且根据 requeue 参数决定如何处理这条消息。
代码运行结果如下:
RabbitMQ 客户端显示这条消息被成功消费:
② 有异常的消费者代码如下:
当 requeue 为 true:
@Component
@Slf4j
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void listener(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties()
.getDeliveryTag();
try {
log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);
int n = 1 / 0;
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
}
}
在此处的 basicNack,将 requeue 设置为了 true,当消息处理失败后,就会将消息重新入队列,重新被消费:
我们可以看到,这条消息一直在被消费,并且 delivertTag 在递增。
并且从 RabbitMQ 客户端中可以看到,这条消息依然存在,等待被成功消费:
当 requeue 为 false:
当处理消息发生异常后,就会将消息从队列中删除。
代码运行结果如下:
虽然异常依然存在,但是消息却没有重复发送,并且 RabbitMQ 中也将这条消息删除: