1.消息不丢失
如何保证消息的可靠性传输,或者说,如何保证消息不丢失?这对于任何 MQ 都是核心问题。
一条消息从生产到消费,可以划分三个阶段:
- 生产阶段:Producer 创建消息,并通过网络发送给 Broker。
- 存储阶段:Broker 收到消息并存储,如果是集群,还要同步副本给其他 Broker。
- 消费阶段:Consumer 向 Broker 请求消息,Broker 通过网络传输给 Consumer。
这三个阶段都可能丢失数据,所以要保证消息丢失,就需要任意一环都保证可靠。
存储阶段
存储阶段指的是 Kafka Server,也就是 Broker 如何保证消息不丢失。
一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
上面的话可以解读为:
- 已提交:只有当消息被写入分区的若干同步副本时,才被认为是已提交的。为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要 Leader 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。
- 持久化:Kafka 的数据存储在磁盘上,所以只要写入成功,天然就是持久化的。
- 只要还有一个副本是存活的,那么已提交的消息就不会丢失。
- 消费者只能读取已提交的消息。
副本机制
Kafka 的副本机制是 kafka 可靠性保证的核心。
Kafka 的主题被分为多个分区,分区是基本的数据块。每个分区可以有多个副本,有一个是 Leader(主副本),其他是 Follower(从副本)。所有数据都直接发送给 Leader,或者直接从 Leader 读取事件。Follower 只需要与 Leader 保持同步,并及时复制最新的数据。当 Leader 宕机时,从 Follower 中选举一个成为新的 Leader。
Broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。
#副本数
replication.factor
的作用是设置每个分区的副本数。replication.factor
是主题级别配置; default.replication.factor
是 broker 级别配置。
副本数越多,数据可靠性越高;但由于副本数增多,也会增加同步副本的开销,可能会降低集群的可用性。一般,建议设为 3,这也是 Kafka 的默认值。
#不完全的选主
unclean.leader.election.enable
用于控制是否支持不同步的副本参与选举 Leader。unclean.leader.election.enable
是 broker 级别(实际上是集群范围内)配置,默认值为 true。
- 如果设为 true,代表着允许不同步的副本成为主副本(即不完全的选举),那么将面临丢失消息的风险;
- 如果设为 false,就要等待原先的主副本重新上线,从而降低了可用性。
#最少同步副本
min.insync.replicas
控制的是消息至少要被写入到多少个副本才算是“已提交”。min.insync.replicas
是主题级别和 broker 级别配置。
尽管可以为一个主题配置 3 个副本,但还是可能会出现只有一个同步副本的情况。如果这个同步副本变为不可用,则必须在可用性和数据一致性之间做出选择。Kafka 中,消息只有被写入到所有的同步副本之后才被认为是已提交的。但如果只有一个同步副本,那么在这个副本不可用时,则数据就会丢失。
如果要确保已经提交的数据被已写入不止一个副本,就需要把最小同步副本的设置为大一点的值。
注意:要确保
replication.factor
>min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1
。
生产阶段
在生产消息阶段,消息队列一般通过请求确认机制,来保证消息的可靠传递,Kafka 也不例外。
Kafka 生产者 中提到了,Kafka 有三种发送方式:同步、异步、异步回调。
同步方式能保证消息不丢失,但性能太差;异步方式发送消息,通常会立即返回,但消息可能丢失。
解决生产者丢失消息的方案:
生产者使用异步回调方式 producer.send(msg, callback)
发送消息。callback(回调)能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
- 如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;
- 如果是消息不合格造成的,那么可以调整消息格式后再次发送。
然后,需要基于以下几点来保证 Kafka 生产者的可靠性:
#ACK
生产者可选的确认模式有三种:acks=0
、acks=1
、acks=all
。
acks=0
、acks=1
都有丢失数据的风险。acks=all
意味着会等待所有同步副本都收到消息。再结合min.insync.replicas
,就可以决定在得到确认响应前,至少有多少副本能够收到消息。
这是最保险的做法,但也会降低吞吐量。
#重试
如果 broker 返回的错误可以通过重试来解决,生产者会自动处理这些错误。
- 可重试错误,如:
LEADER_NOT_AVAILABLE
,主副本不可用,可能过一段时间,集群就会选举出新的主副本,重试可以解决问题。 - 不可重试错误,如:
INVALID_CONFIG
,即使重试,也无法改变配置选项,重试没有意义。
需要注意的是:有时可能因为网络问题导致没有收到确认,但实际上消息已经写入成功。生产者会认为出现临时故障,重试发送消息,这样就会出现重复记录。所以,尽可能在业务上保证幂等性。
设置 retries
为一个较大的值。这里的 retries
同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
#错误处理
开发者需要自行处理的错误:
- 不可重试的 broker 错误,如消息大小错误、认证错误等;
- 消息发送前发生的错误,如序列化错误;
- 生产者达到重试次数上限或消息占用的内存达到上限时发生的错误。
消费阶段
前文已经提到,消费者只能读取已提交的消息。这就保证了消费者接收到消息时已经具备了数据一致性。
消费者唯一要做的是确保哪些消息是已经读取过的,哪些是没有读取过的(通过提交偏移量给 Broker 来确认)。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。
消费者的可靠性配置
group.id
- 如果希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的group.id
。auto.offset.reset
- 有两个选项:earliest
- 消费者会从分区的开始位置读取数据latest
- 消费者会从分区末尾位置读取数据
enable.auto.commit
- 消费者自动提交偏移量。如果设为 true,处理流程更简单,但无法保证重复处理消息。auto.commit.interval.ms
- 自动提交的频率,默认为每 5 秒提交一次。
#显示提交偏移量
如果
enable.auto.commit
设为 true,即自动提交,就无需考虑提交偏移量的问题。
如果选择显示提交偏移量,需要考虑以下问题:
- 必须在处理完消息后再发送确认(提交偏移量),不要收到消息立即确认。
- 提交频率是性能和重复消息数之间的权衡
- 分区再均衡
- 消费可能需要重试机制
- 超时处理
- 消费者可能需要维护消费状态,如:处理完消息后,记录在数据库中。
- 幂等性设计
- 写数据库:根据主键判断记录是否存在
- 写 Redis:set 操作天然具有幂等性
- 复杂的逻辑处理,则可以在消息中加入全局 ID
重复消息
在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
- At most once:至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
- At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
- Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。
绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。
一般解决重复消息的办法是,在消费端,保证消费消息的操作具备幂等性。
常用的实现幂等操作的方法:
利用数据库的唯一约束实现幂等
关系型数据库可以使用 INSERT IF NOT EXIST
语句防止重复;Redis 可以使用 SETNX
命令来防止重复;其他数据库只要支持类似语义,也是一个道理。
#为更新的数据设置前置条件
如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
#记录并检查操作
还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
需要注意的是,“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。这一组操作可以通过分布式事务或分布式锁来保证其原子性。
消息的有序性
某些场景下,可能会要求按序发送消息。
#方案一、单 Partition
Kafka 每一个 Partition 只能隶属于消费者群组中的一个 Consumer,换句话说,每个 Partition 只能被一个 Consumer 消费。所以,如果 Topic 是单 Partition,自然是有序的。
方案分析
优点:简单粗暴。开发者什么也不用做。
缺点:Kafka 基于 Partition 实现其高并发能力,如果使用单 Partition,会严重限制 Kafka 的吞吐量。
结论:作为分布式消息引擎,限制并发能力,显然等同于自废武功,所以,这个方案几乎是不可接受的。
#方案二、同一个 key 的消息发送给指定 Partition
(1)生产者端显示指定 key 发往一个指定的 Partition,就可以保证同一个 key 在这个 Partition 中是有序的。
(2)接下来,消费者端为每个 key 设定一个缓存队列,然后让一个独立线程负责消费指定 key 的队列,这就保证了消费消息也是有序的。
消息积压
先修复消费者,然后停掉当前所有消费者。
新建 Topic,扩大分区,以提高并发处理能力。
创建临时消费者程序,并部署在多节点上,扩大消费处理能力。
最后处理完积压消息后,恢复原先部署架构。
1. 检查消息积压原因
消息积压的常见原因包括:
1.1 消费者性能问题
- 消费者应用的处理速度太慢。
- 消费者线程数量不足,无法并行处理分区。
1.2 分区和消费者数量不匹配
- 分区数少于消费者数量,部分消费者处于空闲状态。
- 分区数多于消费者,但消费者线程未充分利用,导致部分分区消费效率低。
1.3 消费者挂起或失联
- 消费者组失去连接或未正确重新加入。
- 消费者代码中存在错误,导致消费失败或重启。
1.4 Broker 性能问题
- Broker 磁盘 I/O 或网络带宽出现瓶颈。
- Broker 崩溃或重启,分区 Leader 切换导致消息堆积。
1.5 生产者流量突增
- 生产者发送的消息量超出正常负载范围。
2. 检测消息积压
使用 Kafka 提供的工具和指标确定消息积压的严重程度和位置:
2.1 使用 Kafka 自带工具
kafka-consumer-groups.sh --bootstrap-server <broker_address> \ --describe --group <consumer_group>
查看特定消费者组的 Lag
(消息滞后量)。
2.2 监控 Lag 指标
通过 Kafka 的监控工具(如 Prometheus + Grafana 或 Confluent Control Center)查看 consumer_lag
和分区的 Lag 分布情况。
2.3 检查 Broker 资源
- 磁盘利用率
- 网络带宽
- CPU 和内存使用率
3. 消息积压的解决措施
3.1 增加消费者处理能力
方法 1:增加消费者实例或线程
- 操作:通过水平扩展增加消费者实例,确保消费者数量与分区数一致或接近。
- 注意事项:
- 分区是并发消费的最小单位。如果消费者数大于分区数,多余的消费者会闲置。
方法 2:优化消费者代码
- 减少每条消息的处理时间。
- 使用异步处理提高吞吐量。
- 调整批量消费参数(如
max.poll.records
):max.poll.records=500
方法 3:调高消费者的 Poll 超时时间
- 避免因消息处理时间过长导致消费者失效:
max.poll.interval.ms=300000 # 默认 5 分钟
3.2 增加 Kafka 集群性能
方法 1:扩展分区数
- 操作:为积压严重的主题增加分区。
- 注意事项:
- 增加分区后,现有消息不会重新分配,新增的分区仅对未来消息有效。
- 生产者需重新配置分区策略。
方法 2:扩展 Broker 节点
- 操作:增加集群中的 Broker 数量。
- 注意事项:
- 新增 Broker 后,需要重新平衡分区分配。
方法 3:优化 Broker 配置
- 增加磁盘 I/O 性能,使用 SSD。
- 调整 Broker 的吞吐量参数:
num.replica.fetchers=4 # 副本同步线程数 num.network.threads=8 # 网络线程数
3.3 临时绕过积压
方法 1:跳过积压消息
- 消费者直接从最新消息开始消费:
consumer.seekToEnd(partition);
- 适用于非关键业务数据,丢弃历史积压。
方法 2:分批处理积压消息
- 使用专门的消费者组处理积压消息,避免影响当前消费者。
3.4 控制生产者流量
- 限制生产者发送消息速率,减少对 Broker 的压力。
- 调整批量发送参数:
linger.ms=10 batch.size=16384
3.5 数据分流
- 将流量分发到不同的主题。
- 通过增加 Kafka 集群或分片减少单集群压力。
4. 避免未来消息积压
4.1 调整 Kafka 配置
- 设置合理的最大保留时间:
properties
Copy code
log.retention.hours=72
- 设置分区最大大小:
log.segment.bytes=1073741824 # 1 GB
4.2 实时监控
- 设置 Lag 报警。
- 定期监控磁盘使用率、网络带宽等 Broker 指标。
4.3 优化主题设计
- 创建更多分区以支持并行消费。
- 使用压缩(如 Snappy 或 GZIP)减少消息大小。
5. 示例处理流程
场景:消费滞后严重
假设某主题有 10 个分区,消费者组处理速度慢,滞后量为 1,000,000。
处理步骤:
- 检查消费者的
Lag
:kafka-consumer-groups.sh --describe --group my-group --bootstrap-server <broker>
- 增加消费者实例,确保有 10 个活跃消费者。
- 优化消费者代码,批量拉取和异步处理消息。
- 如果积压仍未解决,创建新的消费者组分批消费旧数据。
- 增加分区数到 20,未来提高并行消费能力。
验证系统可靠性
验证 Kafka 系统的可靠性是确保它在高负载、故障情况下依然能够正常运行并满足业务需求的关键环节。可靠性主要包括以下几个方面:数据不丢失、消息顺序性、容错能力,以及在扩展或流量突增时保持稳定。
1. 确定可靠性验证目标
在开始测试前,需要明确可靠性验证的具体目标。常见目标包括:
- 数据可靠性:消息是否能够完全存储、消费且不丢失。
- 高可用性:Broker 节点或消费者出现故障时,系统能否正常运行。
- 顺序性:消息在分区内是否保持生产和消费的顺序。
- 延迟和吞吐量:在高负载下的性能表现。
- 扩展性:增加分区、Broker 后,是否能够平稳运行。
2. 测试环境搭建
2.1 准备 Kafka 集群
- 部署多节点 Kafka 集群,推荐至少 3 个 Broker 节点,使用 ZooKeeper 或 KRaft 模式。
- 配置多个副本(Replication Factor),确保分区的冗余。
2.2 设置监控
- 使用 Kafka 自带的 JMX 监控或集成 Prometheus 和 Grafana。
- 监控指标:
- 消息滞后(Lag)
- 磁盘使用率
- 网络带宽
- ISR 副本数量
- Broker CPU 和内存使用
3. 数据可靠性验证
3.1 消息丢失测试
- 方法:
- 设置生产者的
acks=all
和min.insync.replicas=2
,确保消息被写入所有在线副本。 - 人为停止 Leader 副本,验证数据是否能够从 Follower 恢复。
- 设置生产者的
- 验证点:
- 消息在副本切换过程中是否完整保留。
- Follower 是否能正确提升为 Leader。
3.2 消息重复性测试
- 方法:
- 通过设置生产者的
retries
和enable.idempotence=true
参数,启用幂等性。 - 人为引入网络延迟或模拟生产者失败重试。
- 通过设置生产者的
- 验证点:
- 消息是否被多次写入(确保幂等性)。
3.3 顺序性测试
- 方法:
- 在一个分区内连续生产带有序号的消息。
- 验证消费者接收的消息是否按序号递增。
- 验证点:
- 分区内消息顺序是否正确。
- 消费者重启后,顺序是否保持。
4. 故障容错验证
4.1 Broker 故障测试
- 方法:
- 关闭一个或多个 Broker 节点。
- 检查 Kafka 是否能够在剩余 Broker 上继续服务。
- 验证点:
- 分区的 Leader 是否正常切换。
- 消费者组是否继续消费。
4.2 ISR 副本测试
- 方法:
- 人为降低 ISR 副本数量(断开部分 Follower 网络)。
- 检查生产者在
acks=all
情况下是否报错。
- 验证点:
- 是否能够正确限制生产(防止数据丢失)。
- 网络恢复后,是否能自动将 Follower 恢复到 ISR。
4.3 ZooKeeper 故障测试(如使用 KRaft 可忽略)
- 方法:
- 暂时关闭 ZooKeeper。
- 测试生产和消费是否受影响。
- 验证点:
- Broker 能否正常运行。
- 在 ZooKeeper 恢复后,集群元数据是否一致。
5. 性能测试
5.1 高负载测试
- 方法:
- 使用 Kafka 自带工具或
kafka-producer-perf-test.sh
模拟大批量消息生产。 - 测试消费者组的处理能力。
- 使用 Kafka 自带工具或
- 验证点:
- 系统吞吐量(messages/second)。
- 消费者的滞后情况。
5.2 延迟测试
- 方法:
- 在消息中添加时间戳,生产后立刻消费。
- 测量从生产到消费完成的延迟。
- 验证点:
- 延迟是否符合业务 SLA(如 < 100ms)。
- 在高负载下是否有显著延迟增长。
5.3 磁盘压力测试
- 方法:
- 增加主题的分区数或副本数。
- 增大单条消息的大小(如 1MB)。
- 验证点:
- 磁盘写入是否出现瓶颈。
- 磁盘清理策略是否生效(如日志段过期删除)。
6. 扩展性验证
6.1 分区扩展测试
- 方法:
- 增加主题的分区数,并重新分配分区。
- 测试现有消费者组是否能自动感知并处理新分区。
- 验证点:
- 消费者是否能平衡负载。
- 分区的 Leader 分配是否均匀。
6.2 Broker 扩展测试
- 方法:
- 增加新的 Broker 节点。
- 重新分配分区。
- 验证点:
- 新 Broker 是否能够正常接收分区。
- 扩展过程是否对生产和消费有影响。
7. 异常场景测试
7.1 网络分区
- 模拟网络分区,将部分 Broker 从集群隔离。
- 验证点:
- Leader 副本是否能够继续提供服务。
- 网络恢复后,副本是否能自动同步。
7.2 消息积压
- 暂停消费者,产生大量消息积压。
- 验证点:
- 消费者恢复后是否能够正常消费。
- Broker 是否因积压导致磁盘或内存问题。
8. 自动化可靠性测试
工具:
- 使用开源工具如 Chaos Monkey for Kafka 模拟故障。
- 使用性能测试工具(如
kafka-producer-perf-test.sh
和kafka-consumer-perf-test.sh
)进行性能和压力测试。
集成测试:
- 将可靠性测试集成到 CI/CD 流水线,定期验证系统的可靠性。
建议从 3 个层面验证系统的可靠性:
- 配置验证
- 应用验证
- 客户端和服务器断开连接
- 选举
- 依次重启 broker
- 依次重启生产者
- 依次重启消费者
- 监控可靠性
- 对于生产者来说,最重要的两个指标是消息的
error-rate
和retry-rate
。如果这两个指标上升,说明系统出了问题。 - 对于消费者来说,最重要的指标是
consumer-lag
,该指标表明了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。
- 对于生产者来说,最重要的两个指标是消息的
最佳实践
生产者
- 不要使用
producer.send(msg)
,而要使用producer.send(msg, callback)
。记住,一定要使用带有回调通知的send
方法。 - 设置
acks = all
。acks
是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。 - 设置
retries
为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0
的 Producer 能够自动重试消息发送,避免消息丢失。
服务器(Kafka Broker)
- 设置
unclean.leader.election.enable = false。
这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。 - 设置
replication.factor
>= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 - 设置
min.insync.replicas
> 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 - 确保
replication.factor
>min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1
。
消费者
- 确保消息消费完成再提交。Consumer 端有个参数
enable.auto.commit
,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
1. Kafka 部署和配置
1.1 集群规划
- 分区数量:分区是并发处理的最小单位。规划分区时:
- 根据消费者并发数确定分区数(消费者 ≤ 分区数)。
- 考虑吞吐量需求,每个分区的最佳吞吐量通常在 10~100MB/s。
- 副本数量:
- 通常设置为 3(N+1 容错)。
- 确保
min.insync.replicas
≥ 2,避免数据丢失。
1.2 Broker 配置
- 磁盘 I/O:
- 使用 SSD 提高读写性能。
- 设置独立的日志存储磁盘,避免与操作系统或其他服务共享磁盘。
- 内存管理:
- Kafka 依赖文件系统缓存,适当限制 JVM 堆大小(如 6~8GB)。
num.io.threads
和num.network.threads
设置为 Broker 物理核心数的 2 倍。
1.3 网络优化
- 网络带宽:为 Kafka 集群分配高带宽网络,优先使用 10Gbps 网络。
- 压缩:
- 在生产者端启用压缩(如
snappy
或lz4
),减少网络带宽占用。 - 适用配置:
compression.type=snappy
- 在生产者端启用压缩(如
1.4 ZooKeeper/KRaft
- ZooKeeper:
- 使用奇数节点(3 或 5 个),保证高可用。
- 独立部署,不与 Kafka 共用硬件资源。
- KRaft 模式:
- Kafka 2.8 及以上版本支持 KRaft,减少对 ZooKeeper 的依赖。
2. 主题和消息设计
2.1 主题设计
- 分区规划:
- 避免过多小分区,导致元数据管理开销过大(建议单主题分区总数 < 10,000)。
- 避免过少分区,影响并发消费能力。
- 保留策略:
- 配置合适的日志保留策略:
log.retention.hours=168 # 7 天
log.segment.bytes=1GB # 每个日志段大小
- 使用基于时间 (
log.retention.ms
) 或空间 (log.retention.bytes
) 的策略,避免磁盘填满。
- 配置合适的日志保留策略:
2.2 消息大小
- 建议:
- 单条消息大小控制在 1KB ~ 1MB 之间。
- 如果消息过大,考虑启用生产者端压缩或分片发送。
- 调整 Broker 配置:
message.max.bytes=1000012 # 单条消息最大大小
2.3 消息键设计
- 为消息指定合适的 Key,以便控制消息在分区中的分布:
- 使用 Key 实现同类数据(如用户 ID)总是写入同一分区,确保分区内消息顺序。
3. 生产者优化
3.1 幂等性
- 启用幂等性:避免因网络抖动导致的重复消息。
enable.idempotence=true
3.2 批量发送
- 批量优化:合并小消息,减少网络请求开销。
batch.size=16384 # 单批次最大大小(16KB)
linger.ms=5 # 等待时间,允许消息积累
3.3 重试机制
- 配置生产者自动重试:
retries=3
retry.backoff.ms=100
3.4 acks 配置
- 根据可靠性需求设置
acks
:acks=1
:性能优先。acks=all
:数据可靠性优先。
4. 消费者优化
4.1 消费者并发
- 平衡消费者与分区数:消费者实例数应小于或等于分区数。
- 多线程消费:
- 一个消费者实例不能并行消费多个分区,需使用多线程模式或多个实例。
4.2 拉取配置
- 调整批量拉取和处理大小:
max.poll.records=500
fetch.min.bytes=1024
fetch.max.wait.ms=100
4.3 消费者组管理
- 消费者组负责分区分配,避免多个消费者组同时消费同一主题。
- 确保使用正确的偏移提交策略:
enable.auto.commit=false
4.4 偏移管理
- 手动提交偏移,确保处理完成后再提交:
consumer.commitSync();
5. 故障处理和容错
5.1 副本设置
- 设置合理的副本和 ISR:
replication.factor=3
min.insync.replicas=2
5.2 再均衡优化
- 使用
range
或sticky
分区分配策略,避免频繁再均衡。 - 配置消费者的最大空闲时间:
session.timeout.ms=10000
max.poll.interval.ms=300000
5.3 消息积压处理
- 增加消费者实例,提升消费速率。
- 如果积压严重,临时跳过旧消息:
consumer.seekToEnd(partition);
6. 性能监控和报警
6.1 监控工具
- 使用 Kafka 自带的 JMX 或 Prometheus 监控关键指标:
- 消费者延迟(Lag)
- 分区 ISR 数量
- Broker CPU、内存、磁盘使用率
- 网络带宽利用率
6.2 报警配置
- 配置 Lag 阈值报警,及时发现消费滞后。
- 设置磁盘使用率报警,避免日志清理过晚导致磁盘不足。
7. 扩展和升级
7.1 动态扩展
- 增加分区或 Broker 时,使用 Kafka 的
kafka-reassign-partitions.sh
工具重新分配分区。 - 避免一次性扩展过多,逐步扩展并监控性能。
7.2 升级策略
- Kafka 升级前备份数据,使用 Rolling Upgrade 逐步升级节点。
- 检查客户端与 Broker 的兼容性,确保 API 版本支持。
8. 数据保留和清理
8.1 数据过期策略
- 配置日志段和日志保留策略,避免占用大量磁盘空间:
log.retention.hours=168 # 数据保留 7 天
log.cleanup.policy=delete
8.2 压缩存储
- 配置
log.cleanup.policy=compact
实现日志压缩,保留最新值。