RabbitMQ高级特性2
一.TTL
RabbitMQ可以对队列和消息设置TTL(过期时间),当消息存活的时间超过了TTL,此时消息及时没有被消费也会自动删除,队列也是一样的。
需要注意的是,在队列和消息都设置了过期时间后,会根据最短的过期时间来进行删除消息或者队列,如果队列删除了,消息自然也就没有了。
如果此时发送了两条消息,第一条消息过期时间为10秒,第二条消息过期时间为5秒,此时先等待的是第一条消息,当第一条消息过期了,第二条消息才会过期。
因为此时第二条消息已经在队列中存在的时间超过了TTL设置的时间,当第一条消息过期时,第二条消息就会称为队列头部,检查时,则满足了TTL的过期时间,则直接出队,二者并不会互相影响。
个人理解:他们等待时间是共享的,意思就是当第一条消息等待了10秒,未被消费者消费,并且与此同时第二条消息也没有被消费,此时第一条消息过期被删除,第二条也会直接被过期删除,而不会在等待5秒的时间。
1.设置消息的TTL
Configuration
//过期时间
@Bean("ttlQueue")
public Queue ttlQueue() {
return QueueBuilder.durable(Constants.TTL_QUEUE).build();
}
@Bean("ttlExchange")
public DirectExchange ttlExchange() {
return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();
}
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue , @Qualifier("ttlExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("ttl");
}
Producer
@RequestMapping("/ttl")
public String ttl() {
//设置过期时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");//毫秒
return message;
}
};
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test...",messagePostProcessor);
return "ttl is ok!";
}
2.设置队列的过期时间
Configuration
//设置过期队列
@Bean("ttlQueue2")
public Queue ttlQueue2() {
return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();
}
@Bean("ttlExchange2")
public DirectExchange ttlExchange2() {
return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();
}
@Bean("ttlBinding2")
public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue , @Qualifier("ttlExchange2") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("ttl");
}
Producer
@RequestMapping("/ttl2")
public String ttl2() {
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl2 test...");
return "ttl2 is ok!";
}
二.死信队列
1.死信
死信的概念:
死信(dead message)简单理解就是因为种种原因,无法被消费的信息,就是死信。有死信,自然就有死信队列。
当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器中,这个交换器就是DLX( Dead Letter Exchange ),绑定DLX的队列,就称为死信队列(Dead Letter Queue,简称DLQ)。
死信的原因
- 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为false。
原因:消息在队列中存活的时间超过了设定的TTL - 消息过期。
原因:消费者在处理消息时,可能因为消息内容错误,处理逻辑异常等原因拒绝处理该消息。如果拒绝时指定不重新入队(requeue=false),消息也会成为死信. - 队列达到最大长度。
原因:当队列达到最大长度,无法再容纳新的消息时,新来的消息会被处理为死信
2.代码实现
DLConfiguration
package com.example.rabbitmqdemo.config;
import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DLconfig {
//正常的交换机和队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder
.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE)
.deadLetterRoutingKey("dlx")
.ttl(10000)
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
}
@Bean("/normalBinding")
public Binding normalBinding(@Qualifier("normalQueue") Queue queuem, @Qualifier("normalExchange") Exchange exchange) {
return BindingBuilder.bind(queuem).to(exchange).with("normal").noargs();
}
//死信交换机和队列
@Bean("dlQueue")
public Queue dlQueue() {
return QueueBuilder.durable(Constants.DL_QUEUE).build();
}
@Bean("dlExchange")
public DirectExchange dlExchange() {
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
}
@Bean("/dlBinding")
public Binding dlBinding(@Qualifier("dlQueue") Queue queuem, @Qualifier("dlExchange") DirectExchange exchange) {
return BindingBuilder.bind(queuem).to(exchange).with("dlx");
}
}
Producer
@RequestMapping("/dl")
public String dl() {
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","normal test...");
return "dl test!";
}
3.消息被拒绝的死信
DLListener
package com.example.rabbitmqdemo.listener;
import com.example.rabbitmqdemo.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@Component
public class DLListener {
@RabbitListener(queues = Constants.NORMAL_QUEUE)
public void handleMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("接收到消息: %s ,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),deliveryTag);
System.out.println("业务逻辑处理");
int num = 3 / 0;
System.out.println("业务逻辑处理完成");
//肯定确认
channel.basicAck(deliveryTag,false);
}catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag,false,false);
//requeue为false,则为死信
}
}
@RabbitListener(queues = Constants.DL_QUEUE)
public void handleMessage2(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息: %s ,deliveryTag: %d \n", new String(message.getBody(), "UTF-8"), deliveryTag);
}
}
超出队列长度时的死信
Configuration
package com.example.rabbitmqdemo.config;
import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DLconfig {
//正常的交换机和队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder
.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE)
.deadLetterRoutingKey("dlx")
.maxLength(10)
.ttl(10000)
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
}
@Bean("/normalBinding")
public Binding normalBinding(@Qualifier("normalQueue") Queue queuem, @Qualifier("normalExchange") Exchange exchange) {
return BindingBuilder.bind(queuem).to(exchange).with("normal").noargs();
}
//死信交换机和队列
@Bean("dlQueue")
public Queue dlQueue() {
return QueueBuilder.durable(Constants.DL_QUEUE).build();
}
@Bean("dlExchange")
public DirectExchange dlExchange() {
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
}
@Bean("/dlBinding")
public Binding dlBinding(@Qualifier("dlQueue") Queue queuem, @Qualifier("dlExchange") DirectExchange exchange) {
return BindingBuilder.bind(queuem).to(exchange).with("dlx");
}
}
** Producer**
@RequestMapping("/dl")
public String dl() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","normal test... " + i);
}
return "dl test!";
}
死信队列的应用场景
对于RabbitMQ来说,死信队列是⼀个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。
比如:用户支付订单之后,支付系统会给订单系统返回当前订单的支付状态为了保证支付信息不丢失,需要使用到死信队列机制。当消息消费异常时,将消息投入到死信队列中,由订单系统的其他消费者来监听这个队列,并对数据进行处理(比如发送工单等,进行人工确认)
场景的应用场景还有:
消息重试:将死信消息重新发送到原队列或另⼀个队列进行重试处理
消息丢弃:直接丢弃这些无法处理的消息,以避免它们占用系统资源
日志收集:将死信消息作为日志收集起来,用于后续分析和问题定位
三.延迟队列
1.概念
延迟队列(Delayed Queue),即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ本身没有直接支持延迟队列的的功能,但是可以通过前面所介绍的TTL+死信队列的方式组合模拟出延迟队列的功能。
2.应用场景
- 智能家居:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。
- 日常管理:预定会议后,需要在会议开始前十五分钟提醒参会人参加会议
- 用户注册成功后,7天后发送短信,提高用户活跃度等
3.代码实现
在实现死信队列的代码中将监听的消费者的队列设置为DL的队列即可,不在监听normal的队列,再将normal的队列设置TTL后,就是所谓的延迟队列实现。
但是如果是消息有TTL的话,如果处于队头的消息是30秒过期,在队头之后的元素是10秒过期,则当30秒的消息过期之后,10秒的消息也直接过期了,此时两条消息就会一起出来,此时就不是我们所预想的结果了
延迟队列插件安装和配置
此时解决这个办法可以使用一个延迟队列的插件,点击此处就可以跳转插件连接的下载地址
下载这个ez后缀的文件即可
在云服务器上进入这个路径后在进行安装插件:
再将下载好后的文件拖入到云服务器中进行安装:
在通过命令(rabbitmq-plugins list)查看插件是否安装成功:
还需要注意下载插件的版本要和云服务器上安装的版本相近,差距太远是无法启动这个插件的。
(rabbitmq-plugins enable rabbitmq_delayed_message_exchange)
通过这个命令启动插件
成功后会有红色框的提示:
成功之后最好重启RabbitMQ的服务器(service rabbitmq-server restart)
成功后在RabbitMQ的管理页面中会有红色框的选择:
停止插件的命令
rabbitmq-plugins disable rabbitmq_delayed_message_exchange:
上述安装插件的路径还可以选择其他路径,这里就不再赘述了,安装的方式都是大同小异的。
这样就能够解决了消息TTL不同而导致消息出队问题。
代码
Producer
@RequestMapping("/delay2")
public String delay2() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelayLong(30000L);
return message;
}
};
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test is running...",messagePostProcessor);
MessagePostProcessor messagePostProcessor2 = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelayLong(10000L);
return message;
}
};
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test is running...",messagePostProcessor2);
System.out.printf("%tc 消息发送成功 \n",new Date());
return "delay2 is ok!";
}
Configuration
@Configuration
public class DelayConfiguration {
@Bean("delayQueue")
public Queue delayQueue() {
return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
}
@Bean("delayExchange")
public Exchange delayExchange() {
return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();
}
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(delayExchange()).with("delay").noargs();
}
}
Listener
@Component
public class DelayListener {
@RabbitListener(queues = Constants.DELAY_QUEUE)
public void delayHandMessage(Message message, Channel channel) throws UnsupportedEncodingException {
System.out.printf("%tc delay接受 \n",new Date());
System.out.printf("接收到消息 %s\n",new String(message.getBody(),"UTF-8"));
}
}
4.总结
RabbitMQ本⾝并没直接实现延迟队列,通常有两种方法:
- TTL+死信队列组合的方式
- 使用官方提供的延迟插件实现延迟功能
⼆者对比:
基于死信实现的延迟队列
a. 优点:
1)灵活不需要额外的插件支持
b. 缺点:
1)存在消息顺序问题
2)需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性基于插件实现的延迟队列
a. 优点:
1)通过插件可以直接创建延迟队列,简化延迟消息的实现.
2)避免了DLX的时序问题
b. 缺点:
1)需要依赖特定的插件,有运维⼯作
2)只适用特定版本
四.事务
RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制。因此RabbitMQ也⽀持事务机制,Spring AMQP也提供了对事务相关的操作。RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败。
1.未采用事务
如果不采用事务,第一条消息发送成功,第二条消息发送失败:
Producer
@RequestMapping("/trans")
public String trans() {
System.out.println("事务测试");
//使用内置交换机
rabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test1....");
int sum = 3 / 0;
rabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test2....");
return "trans is ok!";
}
Configuration
@Configuration
public class TransConfig {
@Bean("transQueue")
public Queue transQueue() {
return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
}
}
2.采用事务
因为事务发送失败,应该一条都不发送,所以需要按照下面的方式进行写代码:
为了不让所有使用RabbitTemplate的其他方法也启动事务,所以单独写一个方法交给Spring进行管理:
RabbitTemplate
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack) {
System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());
}else {
System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);
//相应的业务处理
}
}
});
//return模式
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息退回: " + returnedMessage);
}
});
return rabbitTemplate;
}
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);//开启事务
return rabbitTemplate;
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
Producer
@Resource(name = "transRabbitTemplate")
private RabbitTemplate transRabbitTemplate;
@Transactional
@RequestMapping("/trans2")
public String trans2() {
System.out.println("事务测试");
//使用内置交换机
transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test1....");
int sum = 3 / 0;
transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test2....");
return "trans is ok!";
}
Configuration
@Configuration
public class TransConfig {
@Bean("transQueue")
public Queue transQueue() {
return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
}
}
五.消息分发
1.概念和解决方法
概念
RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅列表里的⼀个消费者。这种方式非常适合扩展。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息。这种方式是不太合理的。试想⼀下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。
解决方法
可以使用前面章节讲到的channel.basicQos(int prefetchCount)方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量。
比如:消费端调⽤了channelbasicQos(5),RabbitMQ会为该消费者计数,发送⼀条消息计数+1,消费⼀条消息计数-1,当达到了设定的上限,RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息.类似TCP/IP中的"滑动窗".
2.应用场景
- 限流
- ⾮公平分发
3.限流实现
未将确认消息启动,启动后就是限流模式
配置yml
#ack 确认方式:开启ack
listener:
simple:
acknowledge-mode: manual #⼿动确认
prefetch: 5
Producer
@RequestMapping("/qos")
public String qos() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE,"qos","qos test...." + i);
}
return "trans is ok!";
}
Configuration
@Configuration
public class QosConfiguration {
@Bean("qosQueue")
public Queue qosQueue() {
return QueueBuilder.durable(Constants.QOS_QUEUE).build();
}
@Bean("qosExchange")
public Exchange qosExchange() {
return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
}
}
Listener
@Component
public class QosListener {
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handleMessage(Message message, Channel channel) throws IOException {
try {
System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());
//确认消息(肯定)
//如果没有启动确认消息则会有下图的情况 //channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e) {
//否定确认
//最后一个参数为true,则发生异常重新入队,false,为不再入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
4.负载均衡
代码实现:
其余都和限流模式相同
Listener
@Component
public class QosListener {
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handleMessage(Message message, Channel channel) throws IOException {
try {
System.out.printf("111接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());
//模拟业务处理的时间
Thread.sleep(2000);
//确认消息(肯定)
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e) {
//否定确认
//最后一个参数为true,则发生异常重新入队,false,为不再入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handleMessage2(Message message, Channel channel) throws IOException {
try {
System.out.printf("222接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());
//模拟业务处理的时间
Thread.sleep(1000);
//确认消息(肯定)
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e) {
//否定确认
//最后一个参数为true,则发生异常重新入队,false,为不再入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}