Redis 实现消息队列

发布于:2025-07-03 ⋅ 阅读:(18) ⋅ 点赞:(0)

1. 使用 List (列表) 数据结构 + BRPOPLPUSH (经典实现)

  • 原理:

    • Redis 的 List 可以看作是一个双向链表。你可以使用 LPUSH 在列表左侧插入元素(消息),使用 RPOP 从右侧取出元素。
    • 为了实现阻塞等待(即没有消息时等待,有消息时再取出),Redis 提供了 BRPOP 命令。
    • 更适合实现循环队列(FIFO)的是 BRPOPLPUSH 命令。这个命令会从源列表 source 的右侧弹出一个元素,并将其推入目标列表 destination 的左侧,如果源列表为空,则阻塞等待。
    • 通过巧妙地让 source 和 destination 指向同一个列表,就可以实现一个生产者-消费者模型,并且天然支持消息的“已处理”和“待重试”逻辑(通过将处理失败的消息从 destination 列表弹回 source 列表)。
  • 流程:

    • 生产者: 使用 LPUSH list_name "message" 将消息推送到列表。
    • 消费者: 使用 BRPOPLPUSH list_name list_name "timeout" (timeout 单位秒)。
      • 如果列表有消息,它会弹出并推回(因为是同一个列表),消费者拿到消息。
      • 如果列表为空,它会阻塞,直到有消息被 LPUSH 进来,或者超时。
    • 处理失败重试: 如果消费者处理消息失败,它可以再次使用 LPUSH list_name "message" 将消息放回队列头部,或者使用更复杂的重试逻辑(比如单独维护一个死信队列)。
  • 优点:

    • 实现简单,Redis 本身是单线程的,天然避免了并发问题。
    • BRPOPLPUSH 结合同一个列表可以实现可靠的消息处理和重试机制。
  • 缺点:

    • 不支持消费者组(Consumer Groups),即不能像 Kafka 那样让多个消费者并行消费同一个分区的不同消息。
    • 消息顺序性:虽然 List 本身是顺序的,但如果多个消费者同时 BRPOPLPUSH,可能会打乱顺序(但通常一个 List 配对一个消费者)。
    • 性能瓶颈:受限于单线程模型,在高并发下性能可能不如专业的 MQ(如 Kafka, RabbitMQ)。
    • 消息持久化依赖 Redis 的 RDB/AOF 配置,如果 Redis 宕机且数据未持久化,消息会丢失。

2. 使用 Streams 数据结构 (Redis 5.0+ 推荐方式)

  • 原理:

    • Streams 是 Redis 官方推出的专门用于消息队列的数据结构。它更像传统 MQ(如 Kafka)的模型。
    • 每条消息都有一个唯一的 ID(由时间戳和序列号组成)。
    • 支持 XADD (添加消息), XREAD / XREADGROUP (读取消息,后者支持消费者组), XACK (确认消息已处理), XDEL (删除消息), XINFO (查看流信息) 等命令。
    • 消费者组 (Consumer Groups): 这是 Streams 的核心特性。允许你将一个 Streams 分配给多个消费者组,每个组内部可以有不同的消费者。组内的每条消息只会分发给组内的一个消费者。支持 XPENDING 查看未确认的消息,XCLAIM 强制获取其他消费者未确认的消息等。
  • 流程:

    • 生产者: 使用 XADD stream_name * field1 value1 field2 value2 添加消息。
    • 消费者 (非组): 使用 XREAD BLOCK timeout WATCH stream_name MINID min_id 阻塞读取新消息。
    • 消费者组:
      • 创建组: XGROUP CREATE stream_name group_name $MKSTREAM ( $ 表示从最新消息开始)。
      • 读取消息: XREADGROUP GROUP group_name consumer_name COUNT count BLOCK timeout CONSUMERS consumer_name ID id stream_name。未指定 ID 时通常用 $ 或 >.
      • 确认消息: 处理完成后用 XACK stream_name group_name message_id 确认。只有被确认的消息才会被自动从该组的待处理列表中移除。
      • 处理失败: 不确认 (XACK),消息会一直留在该组的待处理列表(Pending Entries List),可以通过 XPENDING 查看,或者用 XCLAIM 转移给其他消费者重试。
  • 优点:

    • 功能最接近专业 MQ,支持消费者组,实现并行消费。
    • 消息可以持久化(依赖 Redis 持久化配置)。
    • 提供了更完善的消息确认、重试、死信等机制。
    • 可以保留历史消息(通过设置 MAXLEN 或 + 无限制)。
  • 缺点:

    • 是 Redis 5.0 以后才有的特性,旧版本不支持。
    • 相比 List,学习曲线稍陡,命令更复杂。
    • 性能瓶颈依然受限于 Redis 单线程模型。

3. 使用 Pub/Sub (发布/订阅)

  • 原理:

    • Redis 的 Pub/Sub 机制允许客户端订阅一个或多个频道(channel)。
    • 当有消息发布到某个频道时,所有订阅该频道的客户端都会收到消息。
    • 命令包括 PUBLISH channel messageSUBSCRIBE channel1 channel2PSUBSCRIBE pattern (订阅模式)。
  • 流程:

    • 生产者: 使用 PUBLISH channel_name "message" 发布消息到指定频道。
    • 消费者: 使用 SUBSCRIBE channel_name 订阅频道,然后会收到所有发布到该频道的消息。
  • 优点:

    • 实现非常简单。
    • 实时性高,消息一旦发布就能立刻被消费者收到。
    • 适用于广播模式,一个消息发给多个消费者。
  • 缺点:

    • 消息不持久化: 如果消费者在消息发布时没有连接上,或者 Redis 宕机,消息就会丢失。这是 Pub/Sub 最核心的缺点,不适合要求可靠性的场景。
    • 没有消息确认机制。
    • 没有消费者组概念。
    • 消息可能重复投递(如果消费者处理慢,新消费者连接上来会收到旧消息)。

总结与选择建议:

  • 如果只是简单的、对可靠性要求不高的即时通知(如日志广播、系统事件通知),Pub/Sub 是一个简单快速的选择。
  • 如果需要可靠的消息传递、简单的重试机制,并且对消费者组没有要求,List + BRPOPLPUSH 是一个不错的选择,尤其是在 Redis 5.0 以下版本。
  • 如果需要更接近专业 MQ 的功能,如消费者组、消息确认、更好的重试机制,并且使用 Redis 5.0+,强烈推荐使用 Streams。
  • 对于高吞吐量、强一致性的核心业务消息队列,专业的 MQ(如 Kafka, RabbitMQ, Pulsar)通常是更好的选择,它们在性能、可靠性和功能上通常更优。

网站公告

今日签到

点亮在社区的每一天
去签到