目录
5、Kafka消息数据积压,Kafka消费能力不足怎么处理?
1.Kafka架构
生产者、Broker、消费者、Zookeeper;
注意:Zookeeper中保存Broker id和消费者offsets等信息,但是没有生产者信息。
1.1发送流程
1.2 Broker总体工作流程
1.3消费者初始化流程
=======================================================================
================================================================
2、 Kafka的ISR副本同步队列
ISR:副本同步队列 =》 主要解决,leader挂了,谁当新老大的问题
老版本:延迟时间、延迟条数
新版本:延迟时间 relica.lag.time.max.ms ,默认30s-
ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。
OSR,表示Follower与Leader副本同步时,延迟过多的副本。
====================================================================
3、消费者分区分配策略
Ranger : 均分,除不尽给id小的
RoundRobin: hash % , 轮询
粘性: 尽量复用之前的分配的关系,主要是作用于 rebalance的时候
rebalance:
消费者组里面 新增 、挂了 消费者(消费者数量发生变化)
topic的分区数发生变化
====================================================================
4、Kafka数据重复
4.1 数据重复产生的原因:
因重试发送 造成的重复
4.2 解决的办法
- kafak自己解决或者在下一级主件的处理。
(1)自身:
- 幂等性、事务、ack=-1(数据太多时处理不过来,产生积压等一系列问题,在企业里面用的比较少,更追求效率)
- 1)幂等性
- 生产者开启幂等性: 单分区单会话有效
enable.idempotence ==> true
<pid,partition,SeqNumber>
会话: 重启Kafka,pid就变了,一个新的会话
SeqNumber: 生产者 打上的一个标记, 自增序列: 1、2、3、4、5、6...... -
- 2)事务
- 事务API: 初始化事务、启动事务、提交事务、停止事务、提交offset事务
// 1初始化事务 void initTransactions(); // 2开启事务 void beginTransaction() throws ProducerFencedException; // 3在事务内提交已经消费的偏移量(主要用于消费者) void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; // 4提交事务 void commitTransaction() throws ProducerFencedException; // 5放弃事务(类似于回滚事务的操作) void abortTransaction() throws ProducerFencedException;
单个Producer,使用事务保证消息的仅一次发送
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class CustomProducerTransactions { public static void main(String[] args) throws InterruptedException { // 1. 创建kafka生产者的配置对象 Properties properties = new Properties(); // 2. 给kafka配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置事务id(必须),事务id任意起名 properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0"); // 3. 创建kafka生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 初始化事务 kafkaProducer.initTransactions(); // 开启事务 kafkaProducer.beginTransaction(); try { // 4. 调用send方法,发送消息 for (int i = 0; i < 5; i++) { // 发送消息 kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i)); } // int i = 1 / 0; // 提交事务 kafkaProducer.commitTransaction(); } catch (Exception e) { // 终止事务 kafkaProducer.abortTransaction(); } finally { // 5. 关闭资源 kafkaProducer.close(); } } }
(2)找兄弟:
- kafka数据重复,可以在下一级处理:SparkStreaming、Redis、Flink或者hive的dwd层。()
- 去重的手段是:分组、按照id开窗取第一条记录。
5、Kafka消息数据积压,Kafka消费能力不足怎么处理?
(1)消费能力不足
可以考虑增加Topic主题的分区数,同时增加下一级消费者的CPU核数,消费者数=分区数(两者缺一不可)
(2)单个消费者能力不足
提高每批次拉取的数量
fetch.max.bytes: 一次拉取的最大字节数,默认50M。
max.poll.records: 一次拉取的最大条数,默认500条,改成一次拉取1000,2000条。
(3)消费者宕机了,一个小时后才重新正常启动消费者
积压了1小时,之前是正常的
=》积压是暂时的,过一会就赶上进度了,这种情况不处理
6、Kafka挂了怎么办
1)处理:尝试重启
2)评估影响: 丢数、重复、乱序
3)定位问题:看日志
4)解决问题
短时间 会存储在flume channel里面
长时间 日志服务器保存30数据
7、Kafka数据丢了
1)生产者角度
ack
0:相当于异步发送,消息发送完毕即offset增加,继续生产。
1:leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。
在生产环境:
0肯定不选
1 一般传输的是普通日志,对可靠性要求不是特别高。
-1 传输的数据和钱相关,或者金融企业,对可靠性要求高。
设置重试:
如果是1,设个3~5次
如果是-1,Int最大值(无限重试)
ack设为-1,一定不丢数吗? ==》还要考虑 broker端的配置
2)broker角度
副本数 >= 2
min.insync.replicas >= 2 ISR里的最小同步副本数
3)消费者角度(跟Kafka无关)
对于 端到端一致性 来说:
消费者消费数据,是 拷贝 还是 剪切?
Spark Streaming:手动维护offset(自己实现)
Flink: offset维护在 source算子的状态里,开启ck就行了
Flume: 事务
8、Kafka高效读写?
1)Kafka本身是分布式集群,可以设置分区,并发度高。
2)采用顺序读写:600m/s 随机读写100m/s
Kafka的producer生产数据,要写入到log文件中, 写的过程是一直
追加到文件末端,为顺序写。
3)采用了零拷贝技术
9、Kafka优化
1)资源优化
硬盘: 2个副本、保存3天
内存: 默认内存1个G,每个节点10~15G (扣一点,6G)
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"
CPU: 三个线程参数
2)生产者吞吐:
buffer.memory 默认32m ==》 64m
batch.size 默认16k ==》 32k
lingerms 默认0ms ==》 5-100ms
compression.type 默认none ==》 lz4
3)消费者吞吐:
fetch.max.bytes: 一次拉取的最大字节数,默认50M
max.poll.records: 一次拉取的最大条数,默认500条
4)一致性问题:
生产者:
幂等性
ack -1
事务
broerk端:
副本>=2
min.insync.replicas >=2
消费者:
手动维护offset + (事务 或 幂等)
=====================================
10、Kafka杂七杂八
(1)Kafka的机器数量
Kafka机器数量 = 2 *(峰值生产速度 * 副本数 / 100)+ 1
(2)Kafka的副本数设定
- 一般我们设置成2个或3个,很多企业设置为2个。
- 副本的优势:提高可靠性;
- 副本劣势:增加了网络IO传输
- 副本的优势:提高可靠性;
(3)Kafka日志保存时间
- 默认保存7天;生产环境建议3天
(4)Kafka的硬盘大小
每天的数据量100g * 2个副本 * 3天 / 70%
一天约100G数据量
100G * 2副本 * 保存3天 / 0.7 = 约800多G ===》 给1T
实时数据也在 Kafka ===》 5T(5)Kakfa分区数
1)创建一个只有1个分区的topic
2)测试这个topic的producer吞吐量和consumer吞吐量。
3)假设他们的值分别是Tp和Tc,单位可以是MB/s。
4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)
例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;
分区数 = 100 / 20 = 5分区
如何根据数据量确定Kafka分区个数、Kafka的分区是不是越多越好、Kafak生产者分发策略,消费者负载均衡 09_啊策策的博客-CSDN博客
分区数一般设置为:3-10个
(6)多少个Topic
通常情况:多少个日志类型就多少个Topic(一类数据对应一个Topic)。也有对日志类型进行合并的。
(7)Kafka速率100万日活,每人每天产生100条日志,每条日志平均1k
一天数据条数= 100万 * 100条 = 1亿条
一天数据量 = 1亿条 * 1k = 约100G
平均条数 = 1亿条 / (24 * 60 * 60) = 约 1150条/s (说大概值,每秒1000条左右)
平均速率 = 1150 * 1k = 约 1M/s
高峰时段: 中午小高峰、晚上下班 7-12点
高峰条数 = 平均 * 20倍 = 1150 * 20 = 约23000条/s (说大概值,每秒2万条左右)
高峰速率 = 约20M/s
低谷时段: 凌晨半夜
低谷条数 = 约50条/s (说大概值,每秒几十条)
低谷速率 = 几十K/s(8)过期数据:
删除: 默认,真正把数据删了
压缩: 保留最新key