文章目录
消息队列:Redis Stream到RabbitMQ的转换
消息队列是一个使用队列来进行通信的组件,可以达到模块间的解耦、异步和削峰的功能。使用消息队列的时候我们最重要的就是要保证这个消息队列不会丢失消息,也就是可靠性问题。
Redis Stream
Redis是一个NoSQL数据库,因为它是将数据保存在内存中的,速度很快,所以常用来做缓存。在新版本的Redis中,推出了一个Stream数据结构用来做消息队列。提供了消费者组来消费一个队列,也有pending-list和ack确认机制来保证消息不丢失,也实现了持久化来预防redis宕机导致的消息丢失。
Redis Stream的特点:
- 优点:轻量级、低延迟、与Redis生态集成良好
- 缺点:需要应用层轮询消费,没有原生的事件驱动机制
在我们的项目中,使用轮询阻塞读取方式消费消息:
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
这种轮询方式的问题是通过while循环不断使用XREADGROUP指令来读队列中的消息,会消耗一定的CPU资源,没有使用事件驱动的监听+回调机制。
RabbitMQ
RabbitMQ是一个专业的消息队列中间件,支持多种消息模式,具有更完善的可靠性保证和监控能力。我们使用RabbitMQ来替换Redis Stream作为项目中要使用到的消息队列。
项目为Spring Boot单体架构项目。
第一步 安装Erlang
RabbitMQ是使用Erlang语言编写的消息队列组件,它本身的安装包里没有Erlang环境,所以我们要先安装Erlang。
https://erlang.org/download/otp_versions_tree.html
进入网页找到最新的版本,下载后双击安装即可。
第二步 安装RabbitMQ
https://www.rabbitmq.com/install-windows.html
进入下载地址,找到对应版本的文件点击下载,然后进行安装。
安装完成后,开启RabbitMQ服务(RabbitMQ Service - start)。然后进入命令行窗口(RabbitMQ Command Prompt),输入 rabbitmqctl.bat status
可确认 RabbitMQ 的启动状态。
也可以在命令行中输入 rabbitmq-plugins enable rabbitmq_management
来开启Web管理界面,之后在浏览器中输入 http://localhost:15672/,就可以进入管理端页面。
第三步 引入RabbitMQ依赖
引入Spring为AMQP协议的消息中间件支持的框架。SpringBoot会自动配置连接到RabbitMQ,提供消息监听容器,提供RabbitTemplate工具类。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第四步 配置RabbitMQ
在 application.yml
中添加:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 开启手动ack模式
listener:
simple:
acknowledge-mode: manual
# 设置消费者数量
concurrency: 1
max-concurrency: 3
# 关闭Spring层面的自动重试,我们手动控制
retry:
enabled: false
# 设置预取数量,避免消息积压
prefetch: 1
第五步 定义RabbitMQ配置类
让Spring在启动的时候配置好要使用的RabbitMQ,包括普通队列、死信队列等。
@Configuration
@Slf4j
public class RabbitMQConfig {
// 普通队列配置
public static final String ORDER_QUEUE = "order.queue";
public static final String ORDER_EXCHANGE = "order.exchange";
public static final String ORDER_ROUTING_KEY = "order.routing";
// 死信队列配置
public static final String ORDER_DLX_QUEUE = "order.dlx.queue";
public static final String ORDER_DLX_EXCHANGE = "order.dlx.exchange";
public static final String ORDER_DLX_ROUTING_KEY = "order.dlx.routing";
// 普通队列(绑定死信交换机)
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(ORDER_QUEUE)
.withArgument("x-dead-letter-exchange", ORDER_DLX_EXCHANGE)
.withArgument("x-dead-letter-routing-key", ORDER_DLX_ROUTING_KEY)
.withArgument("x-message-ttl", 1800000) // 30分钟TTL
.build();
}
// 普通交换机
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(ORDER_EXCHANGE);
}
// 普通队列绑定
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
}
// 死信队列
@Bean
public Queue orderDlxQueue() {
return QueueBuilder.durable(ORDER_DLX_QUEUE).build();
}
// 死信交换机
@Bean
public DirectExchange orderDlxExchange() {
return new DirectExchange(ORDER_DLX_EXCHANGE);
}
// 死信队列绑定
@Bean
public Binding orderDlxBinding() {
return BindingBuilder.bind(orderDlxQueue()).to(orderDlxExchange()).with(ORDER_DLX_ROUTING_KEY);
}
}
第六步 发送消息(原来的 Redis Stream 写入逻辑替换)
在 seckillVoucher()
中将 Redis Stream 写入替换成 RabbitMQ 的消息发送:
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
if (r != 0) {
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 构造订单对象
VoucherOrder order = new VoucherOrder();
order.setId(orderId);
order.setUserId(userId);
order.setVoucherId(voucherId);
try {
// 发送消息到 RabbitMQ
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EXCHANGE,
RabbitMQConfig.ORDER_ROUTING_KEY,
order, // 直接发送对象,让Spring自动序列化
message -> {
// 设置消息属性
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setExpiration("1800000"); // 30分钟过期
return message;
}
);
log.info("订单消息发送成功,orderId: {}", orderId);
} catch (Exception e) {
log.error("订单消息发送失败,orderId: {}", orderId, e);
return Result.fail("系统繁忙,请稍后重试");
}
return Result.ok(orderId);
}
第七步 消费消息(代替 Redis Stream 轮询线程)
创建消息消费者类,支持手动ack和重试机制:
@Component
@Slf4j
public class VoucherOrderMQListener {
@Resource
private IVoucherOrderService voucherOrderService;
@Resource
private RedisTemplate<String, Object> redisTemplate;
private static final String RETRY_COUNT_KEY = "order:retry:count:";
private static final int MAX_RETRY_COUNT = 3;
@RabbitListener(queues = RabbitMQConfig.ORDER_QUEUE)
public void handleVoucherOrder(
VoucherOrder order,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
String retryCountKey = RETRY_COUNT_KEY + order.getId();
try {
// 业务处理
voucherOrderService.createVoucherOrder(order);
// 处理成功,删除重试计数
redisTemplate.delete(retryCountKey);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("订单处理成功,orderId: {}", order.getId());
} catch (Exception e) {
log.error("处理订单消息失败,orderId: {}", order.getId(), e);
try {
// 获取当前重试次数
Integer retryCount = (Integer) redisTemplate.opsForValue().get(retryCountKey);
if (retryCount == null) {
retryCount = 0;
}
retryCount++;
if (retryCount <= MAX_RETRY_COUNT) {
// 还没达到最大重试次数,记录重试次数并重新入队
redisTemplate.opsForValue().set(retryCountKey, retryCount, Duration.ofHours(1));
log.warn("订单处理失败,第{}次重试,orderId: {}", retryCount, order.getId());
channel.basicReject(deliveryTag, true); // 重新入队
} else {
// 达到最大重试次数,拒绝消息进入死信队列
redisTemplate.delete(retryCountKey);
log.error("订单处理失败,达到最大重试次数,进入死信队列,orderId: {}", order.getId());
channel.basicReject(deliveryTag, false); // 不重新入队,进入死信队列
}
} catch (IOException ioException) {
log.error("消息确认失败", ioException);
}
}
}
@RabbitListener(queues = RabbitMQConfig.ORDER_DLX_QUEUE)
public void handleDeadLetterOrder(
VoucherOrder order,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
log.error("收到死信消息,orderId: {}", order.getId());
try {
// 记录失败的订单信息
recordFailedOrder(order);
// 确认死信消息
channel.basicAck(deliveryTag, false);
log.info("死信消息处理完成,orderId: {}", order.getId());
} catch (Exception e) {
log.error("死信消息处理异常,orderId: {}", order.getId(), e);
try {
channel.basicAck(deliveryTag, false);
} catch (IOException ioException) {
log.error("死信消息确认失败", ioException);
}
}
}
private void recordFailedOrder(VoucherOrder order) {
log.error("订单处理最终失败,需要人工介入,订单信息: {}", order);
// 这里可以写入失败订单表、发送告警等
}
}
第八步 消息序列化配置
为了更好的性能和兼容性,配置消息序列化方式:
@Configuration
public class RabbitMQSerializationConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 设置JSON序列化
template.setMessageConverter(new Jackson2JsonMessageConverter());
// 设置发送确认
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.debug("消息发送成功");
} else {
log.error("消息发送失败: {}", cause);
}
});
// 设置返回确认
template.setReturnsCallback(returned -> {
log.error("消息被退回: {}", returned.getMessage());
});
return template;
}
}
迁移后的优势
- 事件驱动:使用@RabbitListener注解将方法注册到Spring AMQP提供的消息监听容器中,避免了Redis Stream中的while(true)轮询消费
- 自动重试:RabbitMQ支持配置化的自动重试机制,失败的消息会自动重新投递
- 死信处理:多次重试失败的消息会进入死信队列,可以进行人工干预或特殊处理
- 更好的监控:RabbitMQ提供了丰富的管理界面和监控指标
- 消息持久化:更完善的消息持久化机制,保证服务重启后消息不丢失
注意事项
- 幂等性:消费者需要保证幂等性,避免重复消费造成的问题
有时会因为网络波动导致消费者传给队列的ack消息丢失或者一直没到达,队列就认为这个消息没被消费导致再次传给消费者,这就会出现消息被重复消费的情况。可以使用全局唯一ID+消息幂等性存储。在生产者生产消息的时候为每个消息都生成一个全局唯一ID,然后在消费端每次消费这个消息前都判断一下这个消息是否被消费了,如果被消费了就不会再消费,如果没有消费则会继续消费。
- 消息顺序:如果需要保证消息顺序,需要使用单一队列和单一消费者
- 资源监控:注意监控队列长度、消费速率等指标,避免消息积压
- 错误处理:合理设计重试次数和死信处理逻辑,避免无限重试