RocketMq\Kafka如何保障消息不丢失?

发布于:2025-02-22 ⋅ 阅读:(8) ⋅ 点赞:(0)

程序那点事

保证RocketMq和Kafka消息不丢失需考虑Producer发送、Broker存储、Consumer消费。需配置同步发送/刷盘、重试机制、幂等性生产,手动提交偏移量等策略。摘要由作者通过智能技术生成

RocketMq架构图

RocketMq消息不丢失

要想保证消息不丢失,需要从以下几个方面考虑:

  • Producer 发送消息

  • Broker 主从切换、保存消息

  • Consumer 消费消息

发送端考虑

同步发送

同步发送会返回 4 个状态码:

  • SEND_OK:消息发送成功。需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave 节点,默认这两个操作都是异步的,只有把这两个操作都改为同步,SEND_OK 这个状态才能真正表示发送成功 

  • FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时。

  • FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时。

  • SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用。

根据返回的状态码,可以做消息重试,这里设置的重试次数是 3。(消息重试时,消费端需要做去重处理)

异步发送

异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。

Broker端考虑

刷盘策略

  • 异步刷盘:默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。

  • 同步刷盘:消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:flushDiskType=SYNC_FLUSH

Broker多副本和高可用

Broker 为了保证高可用,采用一主多从的方式部署。或多主多从。

消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。

这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:brokerRole=SYNC_MASTER

改为同步复制后,消息复制流程如下:

  1. slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;

  2. master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;

  3. slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;

  4. master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。

消费端考虑

消费成功

如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。

消费失败重试

Consumer 消费失败,这里有 3 种情况:

  • 返回 RECONSUME_LATER

  • 返回 null

  • 抛出异常

Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。
注意:

  • Broker 默认最多重试 16 次,如果重试 16 次都失败,就把这条消息放入死信队列,Consumer 可以订阅死信队列进行消费。

  • 重试只有在集群模式(MessageModel.CLUSTERING)下生效,在广播模式(MessageModel.BROADCASTING)下是不生效的。

  • Consumer 端一定要做好幂等处理。
    其实重试 3 次都失败就可以说明代码有问题,这时 Consumer 可以把消息存入本地并告警进行人工处理,给 Broker 返回CONSUME_SUCCESS 来结束重试。

极端情况

如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。

Kafka

新版本的kafka, 已经不再依赖zookeeper.

对于kafka要保障消息的不丢失,同样需要考虑Producer、Broker、Consumer等情况。

生产者端

1. 配置正确的确认机制(acks 参数)

  • acks = 0:生产者发送消息后,不会等待 Broker 的确认,直接认为消息发送成功。这种方式性能最高,但可靠性最差,因为如果消息在发送过程中丢失,生产者无法得知。

  • acks = 1:生产者发送消息后,只要 Leader 分区成功写入消息,就会收到 Broker 的确认。这种方式在一定程度上保证了消息的可靠性,但如果 Leader 分区在确认消息后、将消息同步到 Follower 分区之前发生故障,消息仍然可能丢失。

  • acks = all 或 acks = -1:生产者发送消息后,需要等待 Leader 分区和所有 ISR(In - Sync Replicas,与 Leader 分区保持同步的副本集合)中的 Follower 分区都成功写入消息,才会收到 Broker 的确认。这种方式提供了最高的可靠性,但会降低性能,因为需要等待多个副本的写入操作完成。

2. 重试机制

生产者可以配置重试次数,当消息发送失败时,Kafka 会自动进行重试。例如,网络抖动、Broker 暂时不可用等情况导致的发送失败,通过重试机制可以提高消息发送的成功率。可以通过设置 retries 参数来指定重试次数,同时可以设置 retry.backoff.ms 参数来控制重试间隔。

3. 幂等性生产

Kafka 从 0.11.0.0 版本开始引入了幂等性生产者的概念。开启幂等性生产后,生产者会为每条消息生成一个唯一的 ID,Broker 会对消息进行去重处理,确保相同 ID 的消息只会被写入一次。可以通过设置 enable.idempotence 参数为 true 来开启幂等性生产。

Broker 端

1. 多副本机制

Kafka 采用多副本机制来提高消息的可靠性。每个分区都可以有多个副本,其中一个副本作为 Leader 分区,负责处理读写请求,其他副本作为 Follower 分区,从 Leader 分区同步消息。当 Leader 分区发生故障时,Kafka 会自动从 ISR 集合中选择一个 Follower 分区作为新的 Leader 分区,继续提供服务。可以通过设置 replication.factor 参数来指定分区的副本数,一般建议设置为 3 或以上。

2. 刷盘策略

Kafka 的消息是先写入内存中的缓冲区,然后定期刷盘。可以通过配置 flush.messages 和 flush.ms 参数来控制刷盘的频率。例如,设置 flush.messages = 1 表示每写入一条消息就刷盘一次,这样可以保证消息的持久化,但会降低性能;设置 flush.ms = 1000 表示每 1 秒刷盘一次,在性能和可靠性之间进行了平衡。

3. 合理配置 ISR

ISR 集合中的副本与 Leader 分区保持同步,如果某个 Follower 分区落后 Leader 分区太多,会被踢出 ISR 集合。可以通过设置 min.insync.replicas 参数来指定 ISR 集合中最少需要保持同步的副本数。当生产者设置 acks = all 时,如果 ISR 集合中的副本数小于 min.insync.replicas,生产者会收到错误信息,从而避免消息丢失。

消费者端

1. 手动提交偏移量

Kafka 消费者在消费消息时,会记录消费的偏移量(offset),表示已经消费到的消息位置。默认情况下,消费者会自动提交偏移量,但这种方式可能会导致消息丢失。建议使用手动提交偏移量的方式,在消息处理完成后再提交偏移量,确保消息不会因为消费者在处理过程中崩溃而丢失。可以通过设置 enable.auto.commit 参数为 false 来关闭自动提交偏移量,然后使用 commitSync() 或 commitAsync() 方法手动提交偏移量。

2. 处理消费异常

消费者在消费消息时,可能会遇到各种异常情况,如网络异常、业务逻辑异常等。需要在代码中捕获这些异常,并进行相应的处理,确保消息不会因为异常而丢失。例如,可以将异常消息记录下来,进行重试或人工处理。

作者:许Web

链接:https://juejin.cn/post/7469051793236295680


网站公告

今日签到

点亮在社区的每一天
去签到