Kafka实现延迟消息

发布于:2025-04-11 ⋅ 阅读:(39) ⋅ 点赞:(0)

Kafka 实现延迟消息

Kafka 本身不支持原生的延迟消息(不像 RocketMQ 内置了延迟队列),但可以通过多种方式来实现延迟消息。常见的方案如下:

1. 使用不同的 Topic 分区(最常见)

思路:

  • 创建多个延迟队列 Topic,比如 delay-5sdelay-10sdelay-30s,代表不同延迟时间的队列。
  • 生产者按延迟时间把消息发送到对应的 Topic。
  • 消费者监听并处理这些 Topic,延迟对应的秒数之后【线程睡眠 Thread.sleep(delay)】转发到真正的业务主题 real-topic

示例:

// 生产
String topic = "delay-10s"; // 发送到 10 秒延迟的队列
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message");
producer.send(record);

// 消费
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received delayed message: " + record.value());

                // 3. 模拟延迟(等待 10 秒)
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // 4. 发送到真正的业务 Topic
                ProducerRecord<String, String> forwardRecord = new ProducerRecord<>(REAL_TOPIC, record.key(), record.value());
                producer.send(forwardRecord);
                System.out.println("Message forwarded to real-topic: " + record.value());
            }
        }
  • 适用于:固定延迟时间的场景,比如 5s、10s、30s
  • 优点:简单易用,不需要额外组件。
  • 缺点:如果延迟时间种类很多,Topic 可能会很多,管理复杂。

2. 采用定时任务 + 数据库存储

思路:

  • 生产者把消息存到 数据库(如 MySQL)或者 Redis,并记录目标执行时间
  • 定时任务(如 Quartz、XXL-JOB)轮询数据库,判断是否到时间,然后再投递到 Kafka 进行消费。

示例:

  1. 生产者:

    // 先存到数据库,设置目标消费时间
    saveToDatabase("message1", System.currentTimeMillis() + 10000); // 10秒后发送
    
  2. 定时任务:

    List<Message> messages = queryReadyMessages(); // 查询到期的消息
    for (Message msg : messages) {
        producer.send(new ProducerRecord<>("real-topic", msg.getContent()));
        deleteFromDatabase(msg.getId()); // 发送后删除
    }
    
  • 适用于:延迟时间不固定、消息量不大的情况。
  • 优点:可以精准控制延迟时间,灵活性强。
  • 缺点:依赖数据库或 Redis,效率受限。

3. 结合 Redis 的 ZSet(有序集合)

思路:

  • 生产者将消息存入 Redis 的 ZSetscore 设为 目标消费时间戳
  • 消费者轮询 Redis,取出时间到期的消息,然后发送到 Kafka 进行消费。

示例:

  1. 生产者:

    // message:消息体,delayMillis:延迟时间
    public void addDelayTask(String message, long delayMillis) {
        long executeTime = System.currentTimeMillis() + delayMillis;
        redisTemplate.opsForZSet().add("delay-queue", message, executeTime);
        System.out.println("任务添加:" + message + ",执行时间:" + executeTime);
    }
    
    
  2. 消费者(定时轮询取出到期消息):

    @Scheduled(fixedRate = 1000) // 每秒执行一次
    public void consumeDelayTask() {
        long now = System.currentTimeMillis();
        // 返回一个 Set<String> 集合,包含所有 score 在 [0, now] 之间的元素。
        Set<String> messages = redisTemplate.opsForZSet().rangeByScore("delay-queue", 0, now);
    
        for (String msg : messages) {
            producer.send(new ProducerRecord<>("real-topic", msg)); // 可根据场景选择发送到目标主题 或者 直接处理消息
            redisTemplate.opsForZSet().remove("delay-queue", msg); // 处理完删除
        }
    }
    
  • 适用于:高并发场景,且对小量延迟消息有需求。
  • 优点:基于内存操作,性能高。
  • 缺点:需要额外维护 Redis,消息量过大可能导致性能问题。

4. 使用 Kafka Streams + State Store

思路:

  • Kafka Streams 允许我们使用时间窗口,可以让消息在流处理中等一段时间,然后再投递到目标 Topic。

示例:

KStream<String, String> stream = builder.stream("input-topic");
stream
    .transform(() -> new DelayTransformer(10 * 1000)) // 延迟 10 秒
    .to("output-topic");
  • 适用于:流式计算场景,如订单超时处理
  • 优点:适合与 Kafka 生态集成,流式处理友好。
  • 缺点:学习成本较高,对 Kafka Streams 有一定要求。

5. 结合 Flink 处理

如果你的架构里用到了 Flink,可以使用 Flink Timer 进行定时延迟处理:

  • 接收 Kafka 消息Flink 设置定时器到时间后发送回 Kafka
ctx.timerService().registerProcessingTimeTimer(timestamp + 10000); // 10s后触发
  • 适用于:大数据、实时计算场景。
  • 优点:可以结合流处理做复杂逻辑。
  • 缺点:需要 Flink 支持,架构要求高。

结论:如何选型?

方案 适用场景 优点 缺点
多个 Topic 固定延迟(5s、10s、30s) 简单易用 Topic 过多不好管理
数据库 + 定时任务 低吞吐、灵活时间 可靠性高 依赖数据库,效率受限
Redis ZSet 高并发、小量延迟消息 低延迟、性能高 Redis 容量受限
Kafka Streams 实时流处理 适配 Kafka 生态 学习成本较高
Flink Timer 大数据流处理 适配 Flink 需要 Flink

最佳实践

  • 如果是常见的业务(延迟 5s、10s)多 Topic 方案(简单易用)。
  • 如果消息量大且时间灵活Redis ZSet(高效)。
  • 如果业务可靠性要求高数据库 + 定时任务(可落地)。
  • 如果是流式计算Kafka Streams / Flink(强大但复杂)。