保证RocketMq和Kafka消息不丢失需考虑Producer发送、Broker存储、Consumer消费。需配置同步发送/刷盘、重试机制、幂等性生产,手动提交偏移量等策略。摘要由作者通过智能技术生成
RocketMq架构图
RocketMq消息不丢失
要想保证消息不丢失,需要从以下几个方面考虑:
Producer 发送消息
Broker 主从切换、保存消息
Consumer 消费消息
发送端考虑
同步发送
同步发送会返回 4 个状态码:
SEND_OK:消息发送成功。需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave 节点,默认这两个操作都是异步的,只有把这两个操作都改为同步,SEND_OK 这个状态才能真正表示发送成功 。
FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时。
FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时。
SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用。
根据返回的状态码,可以做消息重试,这里设置的重试次数是 3。(消息重试时,消费端需要做去重处理)
异步发送
异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。
Broker端考虑
刷盘策略
异步刷盘:默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。
同步刷盘:消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:flushDiskType=SYNC_FLUSH
Broker多副本和高可用
Broker 为了保证高可用,采用一主多从的方式部署。或多主多从。
消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。
这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:brokerRole=SYNC_MASTER
改为同步复制后,消息复制流程如下:
slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;
master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;
slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;
master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。
消费端考虑
消费成功
如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。
消费失败重试
Consumer 消费失败,这里有 3 种情况:
返回 RECONSUME_LATER
返回 null
抛出异常
Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。
注意:
Broker 默认最多重试 16 次,如果重试 16 次都失败,就把这条消息放入死信队列,Consumer 可以订阅死信队列进行消费。
重试只有在集群模式(MessageModel.CLUSTERING)下生效,在广播模式(MessageModel.BROADCASTING)下是不生效的。
Consumer 端一定要做好幂等处理。
其实重试 3 次都失败就可以说明代码有问题,这时 Consumer 可以把消息存入本地并告警进行人工处理,给 Broker 返回CONSUME_SUCCESS 来结束重试。
极端情况
如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。
Kafka
新版本的kafka, 已经不再依赖zookeeper.
对于kafka要保障消息的不丢失,同样需要考虑Producer、Broker、Consumer等情况。
生产者端
1. 配置正确的确认机制(acks 参数)
acks = 0:生产者发送消息后,不会等待 Broker 的确认,直接认为消息发送成功。这种方式性能最高,但可靠性最差,因为如果消息在发送过程中丢失,生产者无法得知。
acks = 1:生产者发送消息后,只要 Leader 分区成功写入消息,就会收到 Broker 的确认。这种方式在一定程度上保证了消息的可靠性,但如果 Leader 分区在确认消息后、将消息同步到 Follower 分区之前发生故障,消息仍然可能丢失。
acks = all 或 acks = -1:生产者发送消息后,需要等待 Leader 分区和所有 ISR(In - Sync Replicas,与 Leader 分区保持同步的副本集合)中的 Follower 分区都成功写入消息,才会收到 Broker 的确认。这种方式提供了最高的可靠性,但会降低性能,因为需要等待多个副本的写入操作完成。
2. 重试机制
生产者可以配置重试次数,当消息发送失败时,Kafka 会自动进行重试。例如,网络抖动、Broker 暂时不可用等情况导致的发送失败,通过重试机制可以提高消息发送的成功率。可以通过设置 retries
参数来指定重试次数,同时可以设置 retry.backoff.ms
参数来控制重试间隔。
3. 幂等性生产
Kafka 从 0.11.0.0 版本开始引入了幂等性生产者的概念。开启幂等性生产后,生产者会为每条消息生成一个唯一的 ID,Broker 会对消息进行去重处理,确保相同 ID 的消息只会被写入一次。可以通过设置 enable.idempotence
参数为 true
来开启幂等性生产。
Broker 端
1. 多副本机制
Kafka 采用多副本机制来提高消息的可靠性。每个分区都可以有多个副本,其中一个副本作为 Leader 分区,负责处理读写请求,其他副本作为 Follower 分区,从 Leader 分区同步消息。当 Leader 分区发生故障时,Kafka 会自动从 ISR 集合中选择一个 Follower 分区作为新的 Leader 分区,继续提供服务。可以通过设置 replication.factor
参数来指定分区的副本数,一般建议设置为 3 或以上。
2. 刷盘策略
Kafka 的消息是先写入内存中的缓冲区,然后定期刷盘。可以通过配置 flush.messages
和 flush.ms
参数来控制刷盘的频率。例如,设置 flush.messages = 1
表示每写入一条消息就刷盘一次,这样可以保证消息的持久化,但会降低性能;设置 flush.ms = 1000
表示每 1 秒刷盘一次,在性能和可靠性之间进行了平衡。
3. 合理配置 ISR
ISR 集合中的副本与 Leader 分区保持同步,如果某个 Follower 分区落后 Leader 分区太多,会被踢出 ISR 集合。可以通过设置 min.insync.replicas
参数来指定 ISR 集合中最少需要保持同步的副本数。当生产者设置 acks = all
时,如果 ISR 集合中的副本数小于 min.insync.replicas
,生产者会收到错误信息,从而避免消息丢失。
消费者端
1. 手动提交偏移量
Kafka 消费者在消费消息时,会记录消费的偏移量(offset),表示已经消费到的消息位置。默认情况下,消费者会自动提交偏移量,但这种方式可能会导致消息丢失。建议使用手动提交偏移量的方式,在消息处理完成后再提交偏移量,确保消息不会因为消费者在处理过程中崩溃而丢失。可以通过设置 enable.auto.commit
参数为 false
来关闭自动提交偏移量,然后使用 commitSync()
或 commitAsync()
方法手动提交偏移量。
2. 处理消费异常
消费者在消费消息时,可能会遇到各种异常情况,如网络异常、业务逻辑异常等。需要在代码中捕获这些异常,并进行相应的处理,确保消息不会因为异常而丢失。例如,可以将异常消息记录下来,进行重试或人工处理。
作者:许Web
链接:https://juejin.cn/post/7469051793236295680