Kafka 生产者优化与数据处理经验

发布于:2024-11-23 ⋅ 阅读:(13) ⋅ 点赞:(0)

Kafka:分布式消息系统的核心原理与安装部署-CSDN博客

自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客

Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客

Kafka 生产者优化与数据处理经验-CSDN博客

Kafka 工作流程解析:从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客

Kafka 消费者全面解析:原理、消费者 API 与Offset 位移-CSDN博客

Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客

Kafka 数据倾斜:原因、影响与解决方案-CSDN博客

Kafka 核心要点解析_kafka mirrok-CSDN博客

Kafka 核心问题深度解析:全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客

目录

一、提高生产者吞吐量

(一)相关参数设置

(二)代码示例与测试

二、数据可靠性

(一)ACK 机制分析

(二)代码示例与可靠性总结

三、数据去重

(一)数据传递语义

(二)幂等性原理与使用

(三)生产者事务

四、数据有序

五、数据乱序

六、总结


        在大数据处理领域,Kafka 作为一款高性能的分布式消息队列系统,被广泛应用于数据的传输、存储与处理。对于生产者而言,如何高效地将数据发送到 Kafka 集群,同时保证数据的可靠性、去重、有序性等,是至关重要的问题。本文将深入探讨 Kafka 生产者在提高吞吐量、保证数据可靠性、去重、有序性等方面的生产经验,并结合代码示例进行详细分析。

一、提高生产者吞吐量

(一)相关参数设置

  • batch.size:批次大小,默认 16k。适当增大批次大小可以减少网络请求次数,提高吞吐量。例如,将其设置为 18000,可以在一次网络请求中发送更多的数据。

properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 18000);

  • linger.ms:等待时间,修改为 5 - 100ms。该参数控制消息在缓冲区的等待时间,适当增加等待时间可以让更多的消息进入同一批次。比如设置为 1ms,会使消息更快地被发送,减少等待时间带来的延迟,但可能会导致批次较小。

properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

  • compression.type:压缩 snappy。启用压缩可以减少网络传输的数据量,提高传输效率。Snappy 是一种高效的压缩算法,能够在不显著增加 CPU 开销的情况下,大幅降低数据大小。

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

  • RecordAccumulator:缓冲区大小,修改为 64m。增大缓冲区大小可以容纳更多的消息,避免因缓冲区满而频繁触发发送操作,提高整体性能。

properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

(二)代码示例与测试

以下是一个简单的 Kafka 生产者代码示例,展示了如何设置上述参数:

package com.bigdata.producter;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 *   测试自定义分区器的使用
 */
public class CustomProducer07 {

    public static void main(String[] args) {

        // Properties 它是map的一种
        Properties properties = new Properties();
        // 设置连接kafka集群的ip和端口
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        /**
         *  此处是提高效率的代码
         */
        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 18000);
        // linger.ms:等待时间,默认 0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");



        // 创建了一个消息生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 调用这个里面的send方法
        // ctrl + p
        /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first","告诉你个秘密");
        kafkaProducer.send(producerRecord);*/
        for (int i = 0; i < 5; i++) {
            // 发送消息的时候,指定key值,但是没有分区号,会根据 hash(key) % 3 = [0,1,2]
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first","c","告诉你个找bigdata的好办法:"+i);
            // 回调-- 调用之前先商量好,回扣多少。
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    // 获取很多信息,exception == null 说明成功,不为null,说明失败
                    if(exception == null){
                        System.out.println("消息发送成功");
                        System.out.println(metadata.partition());// 三个分区,我什么每次都是2分区,粘性分区
                        System.out.println(metadata.offset());// 13 14 15 16 17
                        System.out.println(metadata.topic());
                    }else{
                        System.out.println("消息发送失败,失败原因:"+exception.getMessage());
                    }

                }
            });
        }

        kafkaProducer.close();
    }
}

测试时,可以在 hadoop102 上开启 Kafka 消费者:

bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic first

然后在 IDEA 中执行上述代码,观察 hadoop102 控制台中是否接收到消息。

二、数据可靠性

(一)ACK 机制分析

  • acks = 0:生产者发送过来的数据,不需要等数据落盘应答。这种方式效率最高,但可靠性最差。例如,发送了 Hello 和 World 两个信息,若 Leader 直接挂掉,数据就会丢失,因为生产者不会等待任何应答,数据一发送就认为成功,无法保证数据真正被 Kafka 集群接收和持久化。
  • acks = 1:生产者发送过来的数据,Leader 收到数据后应答。此时,如果 Leader 保存成功并应答,但在 Follower 未同步数据时 Leader 挂掉,且该 Follower 成为新的 Leader,那么之前的数据就会丢失,可靠性中等,效率中等。
  • acks = -1(all):生产者发送过来的数据,Leader 和 ISR 队列里面的所有节点收齐数据后应答。这提供了最高的可靠性,但效率相对较低。不过,如果分区副本设置为 1 个(只有一个 leader),或者 ISR 里应答的最小副本数量(min.insync.replicas 默认为 1)设置为 1,和 ack = 1 的效果是一样的,仍然有丢数的风险。数据完全可靠条件是 ACK 级别设置为 -1 + 分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2。

Leader收到数据,所有Follower都开始同步数据, 但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?

解决方案:

Leader维护了一个动态的in-sync replica set(ISR),意为和 Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。

如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。 这样就不用等长期联系不上或者已经故障的节点。

ISR: 可用的,存活的,Leader+Follower

数据可靠性分析:

如果分区副本设置为1个(只有一个leader),或者ISR里应答的最小副本数量 ( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

副本数是2,但是ISR中不一定有两个,因为会挂掉。

可靠性总结:

acks=0,生产者发送过来数据就不管了,可靠性差,效率高;

acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;

acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;

在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;

acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

开发环境:你的本地

测试环境:你们公司搭建的测试平台,跟生产环境基本相似

正式环境(生产环境):公司正在使用的(对外提供服务的)

数据重复分析:

acks: -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答

记录:acks = -1 有可能出现,数据重复问题

数据发送给了Leader,Follower 也同步成功了,此时准备应答为-1的时候,Leader挂了,Follower顶上,由于发送者不知道数据已经发送成功,会给新的Leader再发消息,此时数据重复。

(二)代码示例与可靠性总结

以下是设置 ACK 机制的代码示例:

package com.bigdata.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CustomProducerACKs {

    public static void main(String[] args) {
        // 这个里面放置配置相关信息
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        // 设置应答机制是哪个级别的,默认是all 等同于 -1
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        // 重试次数 retries,默认是 int 最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);


        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i=0;i<5;i++) {
            // 回调函数  本次发送并没有指定分区和Key值,仅仅发送的是value
            // 本地发送到底发给哪个分区呢?   1)随机 2)黏住它  使用的是粘性分区
            kafkaProducer.send(new ProducerRecord<>("first", "nihao: " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if(exception == null ){
                        // 通过回调函数,可以获取本次发送的内容和分区
                        int partition = metadata.partition();
                        String topic = metadata.topic();
                        System.out.println("本次发送的主题是:"+topic+",发给了哪个分区:"+ partition);
                    }else{
                        exception.printStackTrace();
                    }

                }
            });


        }

        // 此处如果不close() 发送不了数据
        kafkaProducer.close();
    }
}

在生产环境中,acks = 0 很少使用;acks = 1,一般用于传输普通日志,允许丢个别数据;acks = -1,一般用于传输和钱相关的数据等对可靠性要求比较高的场景。

三、数据去重

(一)数据传递语义

  • 至少一次(At Least Once):等于 ACK 级别设置为 -1 + 分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2,可以保障数据可靠,但不能保证数据不重复。
  • 最多一次(At Most Once):等于 ACK 级别设置为 0,能保证数据不重复,但不能保证数据不丢失。
  • 精确一次(Exactly Once):对于一些非常重要的信息,如和钱相关的数据,要求数据既不能重复也不丢失。Kafka 0.11 版本以后,引入了幂等性和事务来保障数据精确一次。

(二)幂等性原理与使用

  • 幂等性原理:幂等性就是指 Producer 不论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条,保证了不重复。其判断标准是具有 <PID, Partition, SeqNumber> 相同主键的消息提交时,Broker 只会持久化一条。其中 PID 是 Kafka 每次重启都会分配一个新的;Partition 表示分区号;Sequence Number 是单调自增的。所以幂等性只能保证在单分区单会话(重启会话就是下一次了)内不重复。如果 kafka 集群挂了,重启后以前的数据可能会再次发送,导致数据重复。
  • 如何使用幂等性:开启参数 enable.idempotence 默认为 true,false 关闭。

(三)生产者事务

Kafka 事务原理

        每一个 broker 都有一个事务协调器,有特定算法确定本次事务对应的事务协调器。

Kafka 的事务 API

        包括 initTransactions(初始化事务)、beginTransaction(开启事务)、sendOffsetsToTransaction(在事务内提交已经消费的偏移量,主要用于消费者)、commitTransaction(提交事务)、abortTransaction(放弃事务,类似于回滚事务的操作)。

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

单个 Producer 使用事务保证消息的仅一次发送代码示例

package com.bigdata.producter;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 *   使用事务+幂等性,保证数据唯一
 */
public class CustomProducer09 {

    public static void main(String[] args) {

        // Properties 它是map的一种
        Properties properties = new Properties();
        // 设置连接kafka集群的ip和端口
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        // 设置应答机制是哪个级别的,默认是all 等同于 -1
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        // 重试次数 retries,默认是 int 最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 默认幂等性是开启的,所以不用设置
        // 必须设置事务的ID
        // 设置事务 id(必须),事务 id 任意起名
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");



        // 创建了一个消息生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 初始化事务
        kafkaProducer.initTransactions();

        // 调用这个里面的send方法
        // ctrl + p
        /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first","告诉你个秘密");
        kafkaProducer.send(producerRecord);*/
        // 开启事务
        kafkaProducer.beginTransaction();
        try {
            for (int i = 0; i < 5; i++) {
                // 发送消息的时候,指定key值,但是没有分区号,会根据 hash(key) % 3 = [0,1,2]
                ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first", "c", "告诉你个找bigdata的好办法:" + i);
                // 回调-- 调用之前先商量好,回扣多少。
                kafkaProducer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {

                        // 获取很多信息,exception == null 说明成功,不为null,说明失败
                        if (exception == null) {
                            System.out.println("消息发送成功");
                            System.out.println(metadata.partition());// 三个分区,我什么每次都是2分区,粘性分区
                            System.out.println(metadata.offset());// 13 14 15 16 17
                            System.out.println(metadata.topic());
                        } else {
                            System.out.println("消息发送失败,失败原因:" + exception.getMessage());
                        }

                    }
                });
                if(i==3){
                    int a = 10 /0 ;
                }
            }
            // 提交事务
            kafkaProducer.commitTransaction();
        }catch (Exception e){
            // 如果出错了,回滚事务
            kafkaProducer.abortTransaction();
        }finally {
            kafkaProducer.close();
        }


    }
}

四、数据有序

        生产者发送的数据,单分区内可以做到有序,多分区则无法保证,除非把多个分区的数据拉到消费者端进行排序,但这样做效率很低,还不如直接设置一个分区。

五、数据乱序

  • kafka在1.x版本之前保证数据单分区有序,条件如下:

    max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。

  • kafka在1.x及以后版本保证数据单分区有序,条件如下:

    开启幂等性

    max.in.flight.requests.per.connection需要设置小于等于5

    未开启幂等性

    max.in.flight.requests.per.connection需要设置为1

    原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据, 故无论如何,都可以保证最近5个request的数据都是有序的。

 

六、总结

        在 Kafka 生产者的实际应用中,需要根据不同的业务场景和需求,合理设置各种参数,以平衡数据的吞吐量、可靠性、去重、有序性等方面的要求。例如,对于普通日志数据,可以适当牺牲一些可靠性,采用 acks = 1 的设置,提高吞吐量;而对于金融交易等对数据准确性要求极高的场景,则需要开启幂等性和事务,确保数据的精确一次处理。同时,要深入理解各个参数的含义和相互关系,以及不同版本 Kafka 的特性差异,才能更好地优化生产者的性能,构建高效稳定的大数据处理管道。