Kafka入门-消息丢失问题

发布于:2024-07-03 ⋅ 阅读:(20) ⋅ 点赞:(0)

Kafka到底在什么情况下才能保证消息不丢失

一句话概括,Kafka只对“已提交”的消息(committedmessage)做有限度的持久化保证

        已提交的消息:当Kafka的若干个Broker成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在Kafka看来 就正式变为“已提交”消息了。

        有限度的持久化保证:Kafka不丢消息是有前提条件的。假如消息保存在N个Kafka Broker上,那么这个前提条件就是这N个Broker中至少有1个存活。只要这个条件成立,Kafka就能保证你的这条消息永远不会丢失。

“消息丢失”案例

案例1:生产者程序丢失数据

Kafka Producer是异步发送消息的,也就是说调用的是producer.send(msg)这个API,那么它通常会立即返回,但此时不能认为消息发送已成功完成。解决此问题的方法非常简单:Producer永远要使用带有回调通知的发送API,也就是说不要使用producer.send(msg),而要使用 producer.send(msg,callback)

案例2:消费者程序丢失数据

Consumer端丢失数据主要体现在Consumer端要消费的消息不见了。Consumer程序有个“位移”的概念,表示的是这个Consumer当前消费到的Topic分区的位置。保证消费端丢失数据,维持先消费消息(阅读),再更新位移(书签)的顺序即可。这种处理方式可能带来的问题是消息的重复处理。

案例3:Consumer自动提交位移

Consumer程序从Kafka获取到消息后开启了多个线程异步处理消息,而Consumer程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移 已经被更新了,因此这条消息对于Consumer而言实际上是丢失了。

解决方案:如果是多线程异步处理消费消息,Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移

总结:

1. 不要使用producer.send(msg),而要使用producer.send(msg,callback)。记住,一定要使用带有回调通知的send方法。

2. 设置acks = all。acks是Producer的一个参数,代表了你对“已提交”消息的定义。如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。

3. 设置retries为一个较大的值。这里的retries同样是Producer的参数,对应前面提到的Producer自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0的Producer能够自动重试消 息发送,避免消息丢失。

4. 设置unclean.leader.election.enable = false。这是Broker端的参数,它控制的是哪些Broker有资格竞选分区的Leader。如果一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,必然会造成消息的 丢失。故一般都要将该参数设置成false,即不允许这种情况的发生。

5. 设置replication.factor >= 3。这也是Broker端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。

6. 设置min.insync.replicas > 1。这依然是Broker端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于1可以提升消息持久性。在实际环境中千万不要使用默认值1。

7. 确保replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完 成。推荐设置成replication.factor = min.insync.replicas + 1。

8. 确保消息消费完成再提交。Consumer端有个参数enable.auto.commit,最好把它设置成false,并采用手动提交位移的方式。就像前面说的,这对于单Consumer多线程处理的场景而言是至关重要的。