Kafka面试常问知识点

发布于:2022-11-14 ⋅ 阅读:(1360) ⋅ 点赞:(1)

目录

1.Kafka架构

1.1发送流程

 1.2 Broker总体工作流程

1.3消费者初始化流程

2、 Kafka的ISR副本同步队列

3、消费者分区分配策略

4、Kafka数据重复

4.1 数据重复产生的原因:

4.2 解决的办法

(1)自身:

(2)找兄弟:

5、Kafka消息数据积压,Kafka消费能力不足怎么处理?

6、Kafka挂了怎么办

7、Kafka数据丢了

8、Kafka高效读写?

9、Kafka优化

10、Kafka杂七杂八

(1)Kafka的机器数量

(2)Kafka的副本数设定

(3)Kafka日志保存时间

(4)Kafka的硬盘大小

(5)Kakfa分区数

(6)多少个Topic

    (7)Kafka速率

(8)过期数据:


1.Kafka架构

生产者、Broker、消费者、Zookeeper;

注意:Zookeeper中保存Broker id消费者offsets等信息,但是没有生产者信息

1.1发送流程

 1.2 Broker总体工作流程

1.3消费者初始化流程

=======================================================================


================================================================

  • 2、 Kafka的ISR副本同步队列

    •           ISR:副本同步队列 =》 主要解决,leader挂了,谁当新老大的问题
              老版本:延迟时间、延迟条数
              新版本:延迟时间  relica.lag.time.max.ms ,默认30s

    • ISR,表示和Leader保持同步Follower集合。如果Follower长时间未向Leader同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。

      OSR表示Follower与Leader副本同步时,延迟过多的副本。

    • ====================================================================

    • 3、消费者分区分配策略

    •     Ranger : 均分,除不尽给id小的
          RoundRobin: hash %  , 轮询
          粘性: 尽量复用之前的分配的关系,主要是作用于 rebalance的时候
          
          rebalance: 
              消费者组里面 新增 、挂了 消费者(消费者数量发生变化)
              topic的分区数发生变化

  • ==================================================================== 

  • 4、Kafka数据重复

  • 4.1 数据重复产生的原因:

  •    因重试发送 造成的重复

  • 4.2 解决的办法

  • kafak自己解决或者在下一级主件的处理。
  • (1)自身:

  • 幂等性、事务、ack=-1(数据太多时处理不过来,产生积压等一系列问题,在企业里面用的比较少,更追求效率)
  •          1)幂等性
  •     生产者开启幂等性: 单分区单会话有效
        enable.idempotence ==> true
        <pid,partition,SeqNumber>
            会话:  重启Kafka,pid就变了,一个新的会话
            SeqNumber: 生产者 打上的一个标记,  自增序列: 1、2、3、4、5、6......
  •      
    •       2)事务
  •    事务API: 初始化事务、启动事务、提交事务、停止事务、提交offset事务
  • // 1初始化事务
    void initTransactions();
    
    // 2开启事务
    void beginTransaction() throws ProducerFencedException;
    
    // 3在事务内提交已经消费的偏移量(主要用于消费者)
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                  String consumerGroupId) throws ProducerFencedException;
    
    // 4提交事务
    void commitTransaction() throws ProducerFencedException;
    
    // 5放弃事务(类似于回滚事务的操作)
    void abortTransaction() throws ProducerFencedException;

    单个Producer,使用事务保证消息的仅一次发送

  • import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class CustomProducerTransactions {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 1. 创建kafka生产者的配置对象
            Properties properties = new Properties();
    
            // 2. 给kafka配置对象添加配置信息
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    
            // key,value序列化
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 设置事务id(必须),事务id任意起名
            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
    
            // 3. 创建kafka生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    
            // 初始化事务
            kafkaProducer.initTransactions();
            // 开启事务
            kafkaProducer.beginTransaction();
            try {
                // 4. 调用send方法,发送消息
                for (int i = 0; i < 5; i++) {
                    // 发送消息
                    kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
                }
    
    //            int i = 1 / 0;
    
                // 提交事务
                kafkaProducer.commitTransaction();
    
            } catch (Exception e) {
                // 终止事务
                kafkaProducer.abortTransaction();
            } finally {
                // 5. 关闭资源
                kafkaProducer.close();
            }
        }
    }

  • (2)找兄弟:

  • kafka数据重复,可以在下一级处理:SparkStreaming、Redis、Flink或者hive的dwd层。()
  • 去重的手段是:分组、按照id开窗取第一条记录。

5、Kafka消息数据积压,Kafka消费能力不足怎么处理?

(1)消费能力不足

        可以考虑增加Topic主题的分区数,同时增加下一级消费者的CPU核数,消费者数=分区数(两者缺一不可)

(2)单个消费者能力不足

        提高每批次拉取的数量
            fetch.max.bytes: 一次拉取的最大字节数,默认50M。
            max.poll.records: 一次拉取的最大条数,默认500条,改成一次拉取1000,2000条。

(3)消费者宕机了,一个小时后才重新正常启动消费者
    积压了1小时,之前是正常的
        =》积压是暂时的,过一会就赶上进度了,这种情况不处理

6、Kafka挂了怎么办


1)处理:尝试重启
2)评估影响: 丢数、重复、乱序
3)定位问题:看日志
4)解决问题

短时间  会存储在flume channel里面

长时间  日志服务器保存30数据

7、Kafka数据丢了

1)生产者角度

ack

0:相当于异步发送,消息发送完毕即offset增加,继续生产。

1:leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。

-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

在生产环境:

        0肯定不选

        1 一般传输的是普通日志,对可靠性要求不是特别高。

        -1 传输的数据和钱相关,或者金融企业,对可靠性要求高。

    设置重试:
        如果是1,设个3~5次
        如果是-1,Int最大值(无限重试)
            
    ack设为-1,一定不丢数吗? ==》还要考虑 broker端的配置


2)broker角度 
    副本数 >= 2 
    min.insync.replicas >= 2     ISR里的最小同步副本数


3)消费者角度(跟Kafka无关)
    对于 端到端一致性 来说:
        消费者消费数据,是 拷贝 还是 剪切?
        
    Spark Streaming:手动维护offset(自己实现)
    Flink: offset维护在 source算子的状态里,开启ck就行了
    Flume: 事务

8、Kafka高效读写?

1)Kafka本身是分布式集群,可以设置分区,并发度高。

2)采用顺序读写:600m/s          随机读写100m/s

         Kafka的producer生产数据,要写入到log文件中, 写的过程是一直

        追加到文件末端,为顺序写。

3)采用了零拷贝技术

9、Kafka优化

1)资源优化
    硬盘: 2个副本、保存3天
    内存:  默认内存1个G,每个节点10~15G  (扣一点,6G)

export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"

    CPU: 三个线程参数
2)生产者吞吐:
    buffer.memory   默认32m  ==》 64m 
    batch.size      默认16k  ==》 32k
    lingerms        默认0ms  ==》 5-100ms
    compression.type  默认none ==》 lz4
3)消费者吞吐:
    fetch.max.bytes: 一次拉取的最大字节数,默认50M
    max.poll.records: 一次拉取的最大条数,默认500条
4)一致性问题:
    生产者:
        幂等性
        ack  -1 
        事务
    broerk端:
        副本>=2 
        min.insync.replicas >=2 
    消费者:
        手动维护offset + (事务 或 幂等)
        

=====================================

10、Kafka杂七杂八

(1)Kafka的机器数量

        Kafka机器数量 = 2 *(峰值生产速度 * 副本数 / 100)+ 1

(2)Kafka的副本数设定

  •         一般我们设置成2个或3个,很多企业设置为2个
    •         副本的优势:提高可靠性;
      •         副本劣势:增加了网络IO传输

(3)Kafka日志保存时间

  •                 默认保存7天;生产环境建议3天
  • (4)Kafka的硬盘大小

    •          每天的数据量100g * 2个副本 * 3天 / 70%

      •              一天约100G数据量
            
                    100G * 2副本 * 保存3天 / 0.7 = 约800多G ===》 给1T
            
                    实时数据也在 Kafka ===》 5T 

  • (6)多少个Topic

    •         通常情况:多少个日志类型就多少个Topic(一类数据对应一个Topic)也有日志类型进行合并的。

  •     
    (7)Kafka速率

    •             100万日活,每人每天产生100条日志,每条日志平均1k
          
                  一天数据条数= 100万 * 100条 = 1亿条
                  一天数据量 =  1亿条 * 1k = 约100G
          
                  平均条数 = 1亿条 / (24 * 60 * 60) = 约 1150条/s (说大概值,每秒1000条左右)
                  平均速率 = 1150 * 1k = 约 1M/s 
          
                  高峰时段: 中午小高峰、晚上下班 7-12点
                  高峰条数 = 平均 * 20倍 = 1150 * 20 = 约23000条/s (说大概值,每秒2万条左右)
                  高峰速率 = 约20M/s
          
                  低谷时段: 凌晨半夜
                  低谷条数 = 约50条/s (说大概值,每秒几十条)
                  低谷速率 = 几十K/s

       

      • (8)过期数据:

        •             删除: 默认,真正把数据删了
                      压缩: 保留最新key
              


      •     
         

本文含有隐藏内容,请 开通VIP 后查看