目录
一、延时插件实现
1、版本要求
RabbitMQ 3.5.7以上
2、为运行新容器时安装
# 1. 拉取带管理界面的镜像
docker pull rabbitmq:3.11-management
# 2. 启动容器并启用插件
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3.11-management \
bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"
3、为已运行的容器安装
# 1. 进入正在运行的容器
docker exec -it rabbitmq /bin/bash
# 2. 在容器内执行插件安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 3. 退出容器
exit
# 4. 重启容器使插件生效
docker restart rabbitmq
4、验证安装
# 方法1:检查插件列表
docker exec rabbitmq rabbitmq-plugins list | grep delayed
# 方法2:登录管理界面
# 访问 http://localhost:15672 (使用设置的账号密码登录)
# 在 "Exchanges" 标签页创建交换机时,Type 下拉框会出现 "x-delayed-message" 选项
5、代码编写
1. 配置类
@Configuration
public class RabbitMqConfig {
public static final String DELAYED_EXCHANGE = "delayed.exchange";
public static final String DELAYED_QUEUE = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 交换机类型
return new CustomExchange(
DELAYED_EXCHANGE,
"x-delayed-message", // 固定类型
true,
false,
args
);
}
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE, true);
}
@Bean
public Binding delayedBinding(Queue delayedQueue,
CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue)
.to(delayedExchange)
.with(DELAYED_ROUTING_KEY)
.noargs();
}
}
2. 生产者
public void send(String exchange, String routing_key,
Object data, Integer delayMillis) {
// 消息后处理器:设置延时和持久化
MessagePostProcessor processor = message -> {
// 毫秒
message.getMessageProperties().setDelay(delayMillis);
// 持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
};
rabbitTemplate.convertAndSend(exchange, routingKey, data, processor);
}
3. 消费者
@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
@RabbitHandler
public void listener(String data, Channel channel, Message message) {
log.warn("消息消费成功,消息内容:{}", data);
MessageProperties properties = message.getMessageProperties();
long deliveryTag = properties.getDeliveryTag()
channel.basicAck(deliveryTag, false);
}
}
二、死信队列实现
1、代码编写
1. 配置类
@Configuration
public class RabbitMqConfig {
public static final String DELAYED_EXCHANGE = "delayed.exchange";
public static final String DELAYED_QUEUE = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
public static final String NORMAL_EXCHANGE = "normal.exchange";
public static final String NORMAL_QUEUE = "normal.queue";
public static final String NORMAL_ROUTING_KEY = "normal_routing_key";
// 死信队列(延时队列)
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE).build();
}
// 死信交换机
@Bean
public DirectExchange delayedExchange() {
return new DirectExchange(DELAYED_EXCHANGE);
}
// 绑定死信队列到死信交换机
@Bean
public Binding delayedBinding(Queue delayedQueue,
DirectExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue)
.to(delayedExchange)
.with(DELAYED_ROUTING_KEY);
}
// 普通队列
@Bean
public Queue normalQueue() {
return QueueBuilder.durable(NORMAL_QUEUE)
.deadLetterExchange(DELAYED_EXCHANGE)
.deadLetterRoutingKey(DELAYED_ROUTING_KEY)
.build();
}
// 普通交换机
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
// 绑定普通队列到普通交换机
@Bean
public Binding normalBinding(Queue normalQueue,
DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue)
.to(normalExchange)
.with(NORMAL_ROUTING_KEY);
}
}
2. 生产者
public void send(String exchange, String routing_key,
Object data, Integer delayMillis) {
String uuid = IdUtil.simpleUUID();
// 消息入库略,uuid为主键
MessageProperties properties = new MessageProperties();
// 设置TTL,单位毫秒
properties.setExpiration(String.valueOf(delayMillis));
// 消息持久化(2 表示持久化)
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message msg = rabbitTemplate.getMessageConverter().toMessage(data, properties);
rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));
}
3. 消费者
@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
@RabbitHandler
public void listener(String data, Channel channel, Message message) {
log.warn("消息消费成功,消息内容:{}", data);
MessageProperties properties = message.getMessageProperties();
long deliveryTag = properties.getDeliveryTag()
channel.basicAck(deliveryTag, false);
}
}
三、踩坑记录
1、发送消息失败
原因:RabbitTemplate
配置了消息抵达确认,消息ID没有传值。
RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 消息抵达确认通知
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
String msgId = data.getId();
if (ack) {
log.info("消息抵达队列成功:{}", data);
} else {
log.error("消息未能发送成功,消息ID:{}", data.getId(), cause);
}
});
生产者实际发送消息未传消息ID:
错误格式
rabbitTemplate.convertAndSend(exchange, routingKey, data);
正确格式
String uuid = IdUtil.simpleUUID();
rabbitTemplate.convertAndSend(exchange, routingKey, data, new CorrelationData(uuid));
2、消息过期后未能转发到死信队列
原因:正常消息未绑定死信队列,消息过期自动删除,而不会转发到死信队列中。
错误格式
@Bean
public Queue delayedNormalQueue() {
return QueueBuilder.durable(NORMAL_QUEUE).build();
}
正确格式
@Bean
public Queue delayedNormalQueue() {
return QueueBuilder.durable(NORMAL_QUEUE)
.deadLetterExchange(DELAYED_EXCHANGE) // 指定死信交换机
.deadLetterRoutingKey(DELAYED_ROUTING_KEY) // 指定死信路由键
.build();
}
3、消费者消费报错
原因:发送的消息由于自定义的 MessageProperties
,其中缺失了 contentType
参数,需要使用转化器进行转换,而不是直接发送消息。
错误格式
MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData(uuid));
正确格式
MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
Message msg = rabbitTemplate.getMessageConverter().toMessage(message, properties);
rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));