文章目录
1、消息确认
消息确认机制作用于队列和消费者之间,当消费者接收到消息,会执行回调函数handleDelivery
,发送一个回调信息给到队列,告诉队列它已经正确接收到了消息,以此保证消息的可靠传递。
原生API
RabbitMQ支持自动确认和手动确认(autoAck)。
- 自动确认:
autoAck=true,发送完该消息,该消息就会从队列中移除,不论消费者是否真正消费。适合可靠性要求底的场景。 - 手动确认:
autoAck=false,消息发送后,需要消费者发送确认信号,才能从队列中移除该消息,否则一直存在于队列中。
String basicConsume(String queue, boolean autoAck, Consumer callback) throws
IOException;
callback用于指定回调方式:
DefaultConsumer consumer = new DefaultConsumer(channel) {
//回调函数
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//如果autoAck=false,需要指定消息确认方式
System.out.println("接收到消息: " + new String(body));
}
};
//消费消息
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
RabbitMQ原生API中消息确认方式有三种:
肯定确认:
Channel.basicAck(long deliveryTag, boolean multiple)
通过信道告知队列消费者已经处理了该消息,队列会主动把该消息从磁盘中删除
参数说明:
1)deliveryTag是每个信道独自维护的自增唯一ID用于区分不同消息。
2)multiple用于标识是否批量确认接收到的消息。否定确认:
Channel.basicReject(long deliveryTag, boolean requeue)
否定确认
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
💡TIPS
- requeue的含义是如果否定确认是否让队列重新发送消息。
- basicReject和basicReject都是用于否定确认的,唯一的区别在于basicReject一次性只能确认一条消息,basicNack可以批量否定(multiple)
Spring.AMQP
Spring 框架中的消息确认机制(通常通过 Spring AMQP 提供)是对 RabbitMQ 原生 API 的封装。Spring AMQP 提供了一种更高层次的抽象,简化了与 RabbitMQ 的交互,底层使用的还是RabbitMQ原生API。
Spring提供了三种确认模式:
spring:
application:
name:
extensions
rabbitmq:
addresses: amqp://admin:password@ip:port/virtualhost_name
listener:
simple:
# acknowledge-mode: none # 不做处理
acknowledge-mode: auto # 自动模式 spring默认配置
# acknowledge-mode: manual # 手动模式
none
- 队列发送完消息,不论消费者是否正确处理,RabbitMQ自动ACK消息,从队列移除该消息,因此如果消费者没有正确处理消息,消息可能丢失。
auto
- 消息正确处理,自动确认消息,移除队列;如果处理失败,消息不会被移除,RabbitMQ会一直重发该消息直到处理成功(每次重试DeliveryTag会自增);如果一直处理失败,消息就会一直处于UnAck状态,导致消息积压。
manual
- 消息发送后,不会自动确认,需要消费端使用
channel.basicAck(deliveryTag, multiple)
或channel.basicNack(deliveryTag, multiple,requeue)
,又或者channel.basicReject(deliveryTag, requeue)
进行确认。如果不进行确认,消息会处于UnAck状态,requeue
为true
表示重发消息,反之不会。
- 消息发送后,不会自动确认,需要消费端使用
💡
basicNack和basicReject的唯一区别是前者可以批量否定确认,后者只能单个消息否定确认。
代码演示(manual模式为例):
spring:
application:
name:
extensions
rabbitmq:
addresses: amqp://账号:密码@113.44.150.39:5672/extension
listener:
simple:
acknowledge-mode: manual # 消息接收确认模式
//绑定关系
@Configuration
public class RabbitMQConfig {
@Bean("ackQueue")
public Queue ackQueue(){
return QueueBuilder.durable(Constants.ACK_QUEUE).build();
}
@Bean("ackExchange")
public DirectExchange ackExchange(){
return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
}
@Bean("ackBind")
public Binding ackBind(@Qualifier("ackExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue ackQueue) {
return BindingBuilder.bind(ackQueue).to(directExchange).with("ack");
}
}
@RequestMapping("ack")
public String ack() {
//默认情况下,发送持久化消息
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消息发送了,用的Direct类型交换机,绑定的ack key");
return "消息发送成功";
}
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handerMessage(Message message, Channel channel) throws Exception {
try {
System.out.println("消费者接收到消息:"
+ new String(message.getBody(), "UTF-8") +
message.getMessageProperties().getDeliveryTag());
// //业务逻辑处理,此处一定会抛出异常,进行reject
int num = 3 / 0;
System.out.println("业务处理完成");
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
此时我们发送消息给RabbitMQ服务器,就会不断重试(requeue=true):
2、发送方确认(Spring)
发送方确认作用与生产者和RabbitMQ服务器之间,也是为了保证数据传递的可靠性。发送方确认分为两个模式:Confirm确认和Retrurn回退。
Confirm确认模式
- 配置:
spring:
rabbitmq:
addresses: amqp://账号:密码@110.41.51.65:15673/bite
listener:
simple:
acknowledge-mode: manual #消息接收确认
publisher-confirm-type: correlated #消息发送确认
- 定义消息确认的回调逻辑,并发送消息
配置yml之后,Spring会自动创建connectionFactory
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory
connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
System.out.printf("");
if (ack) {
System.out.printf("消息接收成功, id:%s \n",
correlationData.getId());//这个id用来区分不同的消息
} else {
System.out.printf("消息接收失败, id:%s, cause: %s",
correlationData.getId(), cause);
}
}
});
return rabbitTemplate;
}
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm() throws InterruptedException {
CorrelationData correlationData1 = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,
"confirm", "confirm test...", correlationData1);
return "确认成功";
}
说明:
@FunctionalInterface
public interface ConfirmCallback {
/**
* Confirmation callback.
* @param correlationData 发送消息时的附加消息,识别特定消息
* @param ack 消息被exchange确认,为true,否则false
* @param cause 消息确认失败时,存储的出错原因
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}
💡RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别
- 前者作用域生产者和exchange之间,后者作用与队列和消费者之间
- RabbitTemplate.ConfirmCallback是Spring.AMQP实现,只有一个需要重写的方法
confirm()
用于确认回调。- ConfirmListener来自于RabbitMQ原生API,内含h
andleAck
和handleNack
, ⽤于处理消息确认和否定确认。
Retrurn回退:
confirm确认保证的是生产者和exchange之间的可靠性,Return回退保证的是exchange能够正确路由到指定队列,如果消息没有被路由到任何队列,把消息回退给生产者。
- Spring的yml配置和confirm确认一致。
- 定义回退逻辑并发送消息
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
//消息被退回时,回调下面的方法
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息退回:" + returned);
}
});
return rabbitTemplate;
}
@RequestMapping("/returns")
public String returns() {
CorrelationData correlationData = new CorrelationData("5");
//回退模式必须加上这个设置
confirmRabbitTemplate.setMandatory(true);
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE
, "confirm***"
, "return test..."
, correlationData);
return "回退模式,消息发送了";
}
💡TIPS
setMandatory
设置为true告诉RabbitMQ,如果exchange没有把该消息成功给到任何队列,执行回退,因此setMandatory必须加。- ReturnedMessage参数中的信息
public class ReturnedMessage {
//返回的消息对象,包含了消息体和消息属性
private final Message message;
//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义 .
private final int replyCode;
//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.
private final String replyText;
//消息被发送到的交换机名称
private final String exchange;
//消息的路由键,即发送消息时指定的键
private final String routingKey;
}
3、持久化
如果RabbitMQ服务器宕机,某些重要数据遗失可能造成严重后果,RabbbitMQ提供持久化功能,保证消息不会丢失。
RabbitMQ的持久化分为三个部分:
- 交换器的持久化
- 队列的持久化
- 消息的持久化
交换机持久化
当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机, 交换机会⾃动建⽴,相当于⼀直存在。如果是一个需要长期使用的交换机,建议进行持久化。
@Bean("ackExchange")
public DirectExchange ackExchange(){
return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
}
上面这种方式创建交换机,默认是持久化的,如果想要设置非持久化,这样设置:
@Bean("ackExchange")
public DirectExchange ackExchange(){
return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(false).build();
}
队列持久化
如果队列不设置持久化,RabbitMQ重启,队列也会跟着消息,队列中的消息也会丢失。
队列设置持久化方式和Exchange有所不同:
@Bean("ackQueue")
public Queue ackQueue(){
return QueueBuilder.durable(Constants.ACK_QUEUE).build();
}
设置非持久化:
@Bean("ackQueue")
public Queue ackQueue(){
return QueueBuilder.nonDurable(Constants.ACK_EXCHANGE).build();
}
消息持久化
如果队列是持久化的,消息不是持久化的,RabbitMQ重启,消息也会丢失。换言之,队列和消息共同持久化才能保证消息在重启之后不会丢失。
设置消息持久化需要把消息的投递模式( MessageProperties 中的 deliveryMode )
设置为2,也就是 MessageDeliveryMode.PERSISTENT
public enum MessageDeliveryMode {
NON_PERSISTENT,//⾮持久化
PERSISTENT;//持久化
}
RabbitMQ原生API设置持久化:
💡TIP
PERSISTENT_TEXT_PLAIN
中设置了deliveryMode =2,进而实现的持久化:
Spring.AMQP设置持久化:
4、重试机制
在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息
处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的。启用下方配置重试机制将会生效;
配置:
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://账号:密码@47.108.157.13:5672/extension
listener:
simple:
acknowledge-mode: auto
retry:
enabled: true # 开启消费者失败重试
initial-interval: 5000ms # 初始失败等待时长为5秒
max-attempts: 5 # 最大重试次数
这里我们故意设置一个异常,看看运行结果:
@Component
public class RetryListener {
@RabbitListener(queues=Constants.RETRY_QUEUE)
public void retryQueue(Message message) throws UnsupportedEncodingException {
System.out.println("重试队列消费者auto模式 接收到消息 deliveryTag是:"+message.getMessageProperties().getDeliveryTag());
int a=10/0;
System.out.println("业务处理完成");
}
}
运行结果:
💡 1、重试机制只在manual模式下生效!
- none模式消息一旦发送rabbitmq就会把消息从队列移除。
- 而manual模式下不进行手动确认,只会导致消息积压
- manual模式进行手动确认且requeue=false,那么消息只会被发送一次,因为设置了不重新入队
- manual模式进行手动确认且requeue=true,会一直进行重试,因为每次发送确认消息给rabbitmq,都会告诉他重新入队,发送消息过来。
💡2、为什么deliveryTag一直是1,而不像第二点中auto模式自动重发消息deliveryTag一直递增?
- 重试机制,retry=enable,每次重试deliveryTag相同,是因为这个重试是在消费者内部执行的
- 而auto模式下,出现异常,不断重试则是RabbitMQ服务器重发消息给到消费者,所以deliveryTag自然是递增的。
5、TTL(过期时间)
当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除。目前设置TTL方式有两种:
- 设置队列的TTL, 队列中所有消息都有相同的过期时间.
- 对消息本⾝进⾏单独设置, 每条消息的TTL可以不同.
如果两种⽅法⼀起使⽤, 则消息的TTL以两者之间较⼩的那个数值为准.
消息TTL
@RequestMapping("/ttl")
public String ttl() {
System.out.println("ttl....");
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test...");
//也可以这样设置过期时间
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...", message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
return "ttl测试";
}
队列TTL
public Queue ttlQueue() {
return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20000).build();
两者区别:
- 设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除
- 设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定
💡为什么这两种⽅法处理的⽅式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可.⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可.
6、死信队列
死信队列(DLQ)和死信交换机(DLX)与普通队列、普通交换机本质上没有任何区别。
只不过DLX绑定的是一个个队列,接收的消息都是普通队列丢弃的消息。
DLX和DLQ协同分工,把普通队列中的死信发送给指定消费者进行特殊处理。
示例代码:
//普通队列
@Bean("normalQueue")
public Queue normalQueue() {
/*
* 重点代码:*/
return QueueBuilder.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE)//正常队列需要绑定指定死信交换机(根据名字)
.deadLetterRoutingKey("key2")//根据dl routingKey指定路由到哪一个死信队列
.build();
}
//普通交换机
@Bean("normalExchange")
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
}
//交换机队列进行绑定
@Bean("bindingNormal")
public Binding bindingNormal(@Qualifier("normalQueue") Queue normalQueue,@Qualifier("normalExchange") DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("key1");
}
//死信队列
@Bean("DLQueue")
public Queue DLQueue() {
return QueueBuilder.durable(Constants.DL_QUEUE).build();
}
//死信交换机
@Bean("DLExchange")
public DirectExchange DLExchange() {
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
}
//死信队列和死信交换机的绑定
@Bean("DLBinding")
public Binding DLBinding(@Qualifier("DLQueue") Queue DLQueue,@Qualifier("DLExchange") DirectExchange DLExchange){
return BindingBuilder.bind(DLQueue).to(DLExchange).with("key2");
}
💡TIPS
- 由于normalQueue设置了
deadLetterExchange
和deadLetterRoutingKey
,所以normalQueue中的死信会被路由到指定的DLQ(死信队列),给到指定消费者执行,(当然代码中并没有编写消费者代码)- 消息变成死信,主要有这几种情况:
- Basic.Reject/Basic.Nack,拒绝了消息,并且requeue=false
- 消息TTL过期
- 队列已经到达了最大长度
7、延迟队列
延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费。
比如只能家具的定时任务、会议时间提前提醒、会员注册一定时间后的用户满意度调查等。
坏消息是RabbitMQ不直接支持延迟队列。好消息是RabbitMQ可以通过其他方式实现延迟队列:
- TTL+DLQ
- 延迟队列插件
TTL+DLQ
我们可以设置一个TTL队列,队列中消息一旦过期,产生死信,把该死信路由到指定的死信队列,然后消费者直接去监听该死信队列即可实现一个延迟队列的功能。
我们在第6节死信队列代码基础上进行编写:
- 交换机和队列相关配置
@Configuration
public class DLConfig {
//普通队列
@Bean("normalQueue")
public Queue normalQueue() {
/*
* 重点代码:*/
return QueueBuilder.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE)//正常队列需要绑定指定死信交换机(根据名字)
.deadLetterRoutingKey("key2")//根据dl routingKey指定路由到哪一个死信队列
.build();
}
//普通交换机
@Bean("normalExchange")
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
}
//交换机队列进行绑定
@Bean("bindingNormal")
public Binding bindingNormal(@Qualifier("normalQueue") Queue normalQueue,@Qualifier("normalExchange") DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("key1");
}
//死信队列
@Bean("DLQueue")
public Queue DLQueue() {
return QueueBuilder.durable(Constants.DL_QUEUE).build();
}
//死信交换机
@Bean("DLExchange")
public DirectExchange DLExchange() {
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
}
//死信队列和死信交换机的绑定
@Bean("DLBinding")
public Binding DLBinding(@Qualifier("DLQueue") Queue DLQueue,@Qualifier("DLExchange") DirectExchange DLExchange){
return BindingBuilder.bind(DLQueue).to(DLExchange).with("key2");
}
}
- 监听死信队列
@Component
public class DelayListener {
@RabbitListener(queues =Constants.DL_QUEUE)
public void handleMessage(Message message){
long end=System.currentTimeMillis();//获取接收到消息时的时间
String msg=new String(message.getBody());
String[] msgs=msg.split(":");
long start=Long.parseLong(msgs[1]);
System.out.println("写收到延迟消息,时间:"+(end-start)/1000);
}
}
- 发送延迟消息
@RequestMapping("/delay")
public String delay() {
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "key1",("当前消息需要延迟10秒发送:" + System.currentTimeMillis()).getBytes(),message -> {
message.getMessageProperties().setExpiration("10000"); //单位: 毫秒, 过期时间为30s
return message;
});
return "延迟队列消息已经执行";
}
运行结果:在这里插入图片描述
消息首先在normal.queue上等待10s,然后发送到dl.queue消费(我设置的auto模式,所以消息已经ack,没有显示任何消息):
💡TTL+DLQ有一个缺陷
如果如果在队列中:消息m1延迟时间大于消息m2的延迟时间,并且m1优先于m2出队列,这样种情况消息m2只能等待m1过期才能被发送,导致m2消息在发送的时候必定过期。
因此rabbitmq提供了一个插件,来解决这个问题
延迟队列插件
1. 安装教程
注意:
如果启动完插件之后没有生效,需要使用这个命令重启rabbitmq服务:
service rabbitmq-server restart
重启完之后,就会增加一个交换机类型:
这个交换机可以认为是一个“高级的延迟队列”,它可以临时存储延迟消息,所有存储在该交换机的消息,都会在延迟时间结束后发送给绑定了该交换机的队列,解决了TTL+DLQ中的缺陷。
2. 代码实现
@Configuration
public class DLConfig {
//声明队列方式和之前没有差别
@Bean("delayQueue")
public Queue delayQueue() {
return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
}
//与之前不同的是,这里需要用。delays声明使用延迟队列插件
@Bean("delayExchange")
public Exchange delayExchange() {
return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();
}
//和其他绑定没有区别
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayExchange") Exchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();
}
}
@Component
public class DelayListener {
@RabbitListener(queues =Constants.DELAY_QUEUE)
public void handleMessage(Message message){
long end=System.currentTimeMillis();//获取接收到消息时的时间
String msg=new String(message.getBody());
String[] msgs=msg.split(":");
long start=Long.parseLong(msgs[1]);
System.out.println("写收到延迟消息,时间:"+(end-start)/1000);
}
}
@RequestMapping("/delayPlug")
public String delayPlug() {
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay"
,("当前消息需要延迟10秒发送:" + System.currentTimeMillis()).getBytes(),messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelayLong(10000L); //单位: 毫秒 延迟时间
return messagePostProcessor;
});
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay"
,("当前消息需要延迟5秒发送:" + System.currentTimeMillis()).getBytes(),messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelayLong(5000L); //单位: 毫秒 延迟时间
return messagePostProcessor;
});
return "延迟队列消息已经执行";
}
运行结果:
可以看到即使先发送10s延迟的数据,在发送5s延迟的数据,消息仍然按照正常顺序接收,解决的消息乱序问题。
8、事务
概念理解
RabbitMQ支持消息的发送和接收是原子性的,要么所有消息全部成功,要么全部失败。
例如这个场景,发送两个消息,由于过程中出现异常,正常情况下,队列会接收到第一条消息,第二条消息不会接收到:
@RequestMapping("/trans")
public String trans() {
System.out.println("trans test...");
rabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 1...");
int num = 5/0;
rabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 2...");
return "消息发送成功";
}
控制台只显示接收到一条消息:
如果我们希望,出现异常时,回滚所有发送的消息(出现错误一条消息也不发送),那么就可以启用事务。
代码实现
配置代码,为了简便使用默认交换机:
@Configuration
public class TransactionMQConfig {
@Bean("transationQueue")
public Queue transationQueue() {
return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
}
//1、建立事务管理器
@Bean
public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
//2、开启事务通道
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
}
发送消息:
@Transactional//3、声明开启事务
@RequestMapping("/trans")
public String trans() {
System.out.println("trans test...");
transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 1...");
int num = 5/0;
transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 2...");
return "消息发送成功";
}
发送消息后运行结果:
程序抛出异常:
但是控制台,队列没有接收到任何消息:
9、消息分发
RabbitMQ中,消费者获取消息既可以使用推模式(队列把消息发送给消费者),也可以使用拉模式(消费者主要向队列获取)。虽然两者都支持,但是主要还是使用推模式。使用推模式,会出现一个问题,就是默认情况下RabbitMQ采用轮询方式,把消息发送给指定消费者,这种情况下如果消费者本身负载较重,继续接收消息,可能会出现严重后果。为此,RabbitMQ提供了消息分发这样一个机制。每个消费者都可以设置最大接收消息数prefetchCount,如果消费者目前获取消息数等于prefetchCount,RabbittMQ就不会推送消息给这个消费者,这是RabbitMQ中消息分发的核心概念。通过这样一个机制可以实现限流、负载均衡等功能。
代码实现
RabbitMQ 的SDK通过这个方法可以设置预取数量:
Spring.AMQP可以直接通过yml配置:
spring:
application:
name:
applicationName
rabbitmq:
addresses: amqp://账号:密码@113.44.150.39:5672/extension
listener:
simple:
acknowledge-mode: manual # 必须开启手动模式,prefetch才会生效
prefetch: 5
💡TIPS
- prefectch设置为0表示没有获取上限。
- 在
拉模式
中,设置channel.basicQos(prefetchCount)
是无效的,因为拉模式,一次只能从队列拉去一个消息。