RabbitMQ高级特性1
- 一.消息确认
-
- 1.消息确认机制
- 2.手动确认代码
-
- 肯定确认
- 否定确认1
- 否定确认2
- Spring中的代码
- 二.持久性
-
- 1.交换机持久化
- 2.队列的持久化
- 3.消息的持久化
- 非持久化代码实现
- 三方面都持久化,数据也会丢失
- 三.发送方确认
-
- 1.Confirm确认模式
- 2.return返回模式
- 四.总结
-
- RabbitMQ保证消息可靠传输
一.消息确认
1.消息确认机制
自动确认:当autoAck等于true时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。自动确认模式适合对于消息可靠性要求不高的场景。
手动确认:当autoAck等于false时,RabbitMQ会等待消费者显式地调用Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息。这种模式适合对消息可靠性要求比较高的场景.
2.手动确认代码
肯定确认
Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了。
参数说明
1)deliveryTag:消息的唯⼀标识,它是⼀个单调递增的64位的长整型值。 deliveryTag 是每个通道(Channel)独立维护的,所以在每个通道上都是唯⼀的。当消费者确认(ack)⼀条消息时,必须使用对应的通道上进行确认。
2)multiple:是否批量确认。在某些情况下,为了减少网络流量,可以对⼀系列连续的 deliveryTag 进行批量确认。值为true则会⼀次性把ack所有小于或等于指定deliveryTag的消息。值为false,则只确认当前指定deliveryTag的消息。
否定确认1
Channel.basicReject(long deliveryTag, boolean requeue)
RabbitMQ在2.0.0版本开始引⼊了 Basic.Reject 这个命令,消费者客户端可以调用
channel.basicReject方法来告诉RabbitMQ拒绝这个消息。
参数说明
1)deliveryTag:参考channel.basicAck。
2)requeue:表示拒绝后,这条消息如何处理。如果requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下⼀个订阅的消费者。如果requeue参数设置为false,则RabbitMQ会把消息从队列中移除,而不会把它发送给新的消费者。
否定确认2
Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)
Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。消费者客户端可以调用channel.basicNack方法来实现。
参数说明
前面的参数参考上述参数说明。
multiple的参数设置为true则接受deliveryTag编号之前所有未被当前消费者确认的消息,也就是批量处理未被确认的消息。
Spring中的代码
- AcknowledgeMode.NONE
这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会自动确认消息,从RabbitMQ队列中移除消息,如果消费者处理消息失败,消息可能会丢失。
ym配置
spring:
application:
name: rabbitmqdemo
rabbitmq:
addresses: amqp://账号:密码@IP:端口号/虚拟机
listener:
simple:
acknowledge-mode: none
- AcknowledgeMode.AUTO(默认)
这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息,但是会一直尝试重发消息。
将yml配置中的 acknowledge-mode改成auto。
上述两种模式代码相同
Configuration
package com.example.rabbitmqdemo.config;
import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfiguration {
//消息确认
//队列
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constants.ACK_QUEUE).build();
}
//虚拟机
@Bean("directExchange")
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
}
//队列和虚拟机绑定
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("ack");
}
}
** Constants**
package com.example.rabbitmqdemo.constant;
public class Constants {
public static final String ACK_QUEUE = "ack.queue";
public static final String ACK_EXCHANGE = "ack.exchange";
}
Controller
package com.example.rabbitmqdemo.controller;
import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack is ok");
return "ack is ok!";
}
}
Listener
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {
//消费者逻辑
System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());
//不做具体实现的消费者业务逻辑
}
}
- AcknowledgeMode.MANUAL
手动确认模式下,消费者必须在成功处理消息后显式调用 basicAck 方法来确认消息。如果消息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。
将yml配置中的 acknowledge-mode改成manual。
Listener
package com.example.rabbitmqdemo.listener;
import com.example.rabbitmqdemo.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: hp
* Date: 2025-04-03
* Time: 9:26
*/
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handleMessage(Message message, Channel channel) throws IOException {
//消费者逻辑
System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());
//不做具体实现的消费者业务逻辑
try {
//int sum = 3 / 0;
//确认消息(肯定)
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e) {
//否定确认
//最后一个参数为true,则发生异常重新入队,false,为不再入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
}
二.持久性
1.交换机持久化
交换器的持久化是通过在声明交换机时是将durable参数置为true实现的。
相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机,交换机会自动建立,相当于⼀直存在。
如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失,对⼀个长期使用的交换器来说,建议将其置为持久化的。
设置交换机的持久化
2.队列的持久化
队列的持久化是通过在声明队列时将 durable 参数置为true实现的。
如果队列不设置持久化,那么在RabbitMQ服务重启之后,该队列就会被删掉,此时数据也会丢失。(队列没有了,消息也无处可存了)
队列的持久化能保证该队列本⾝的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将消息设置为持久化。
咱们前面用的创建队列的方式都是持久化的。
队列持久化
队列非持久化
3.消息的持久化
消息实现持久化,需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是MessageDeliveryMode.PERSISTENT
设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧存在。如果只设置队列持久化,重启之后消息会丢失。
如果只设置消息的持久化,重启之后队列消失,继而消息也丢失。所以单单设置消息持久化而不设置队列的持久化显得毫无意义
非持久化代码实现
交换机、队列和绑定
//非持久化队列
@Bean("presQueue")
public Queue presQueue() {
return QueueBuilder.nonDurable(Constants.PRES_QUEUE).build();
}
//非持久化交换机
@Bean("presExchagne")
public DirectExchange presExchange() {
return ExchangeBuilder.directExchange(Constants.PRES_EXCHANGE).durable(false).build();
}
@Bean("presBinding")
public Binding presBinding(@Qualifier("presQueue") Queue queue,@Qualifier("presExchagne") Exchange exchange) {
//如果参数传递的是Exchange类型而不是DirectExchang类型就需要使用noargs作为收尾
return BindingBuilder.bind(queue).to(exchange).with("pres").noargs();
}
Producer
@RequestMapping("/pres")
public String pres() {
Message message = new Message("Presistent test...".getBytes(),new MessageProperties());
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE,"pres",message);
return "pres is ok!";
}
RabbitMQ服务器的虚拟机和队列
三方面都持久化,数据也会丢失
从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据居丢失。这种情况很好解决,将autoAck参数设置为false,并进行手动确认。
在持久化的消息正确存入RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中。RabbitMQ并不会为每条消息都进行同步存盘(调用内核的fsync方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。
三.发送方确认
1.Confirm确认模式
Producer在发送消息的时候,对发送端设置⼀个ConfirmCallback的监听,无论消息是否到达Exchange,这个监听都会被执行,如果Exchange成功收到,ACK( Acknowledge character ,确认字符)为true,如果没收到消息,ACK就为false。
yml配置
spring:
application:
name: rabbitmqdemo
rabbitmq:
addresses: amqp://账号:Miami@IP:端口号/虚拟机
listener:
simple:
#acknowledge-mode: none
#acknowledge-mode: auto
acknowledge-mode: manual
publisher-confirm-type: correlated #消息发送确认
Configuration
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack) {
System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());
}else {
System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);
//相应的业务处理
}
}
});
return rabbitTemplate;
}
}
Producer
@RequestMapping("/confirm")
public String confirm() {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplateConfig.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);
return "confirm is ok!";
}
2.return返回模式
Configuration
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack) {
System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());
}else {
System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);
//相应的业务处理
}
}
});
//return模式
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息退回: " + returnedMessage);
}
});
return rabbitTemplate;
}
四.总结
RabbitMQ保证消息可靠传输
Producer -> Broker:发送方确认
- Producer -> Exchange :Confirm模式(网络问题)
- Exchange -> Queue : return模式(代码或者配置层错误,导致消息路由失败)
- 队列移除:死信等
Broker:持久化(RabbitMQ服务器宕机导致消息丢失)
- 交换机持久化
- 队列持久化
- 消息持久化
Broker -> Consumer 消息确认方式(消费者未来得及消费信息,就宕机了)
- 自动确认
- 手动确认