Kafka入门-消费者

发布于:2025-06-06 ⋅ 阅读:(23) ⋅ 点赞:(0)

消费者

Kafka消费方式:采用pull(拉)的方式,消费者从broker中主动拉去数据。使用pull的好处就是消费者可以根据自身需求,进行拉取数据,但是坏处就是如果Kafka没有数据,那么消费者可能会陷入循环中,一直返回空数据。

消费者与消费者之间是独立的,一个消费者可以消费多个分区数据。但是消费组不同,每个分区的数据只能由消费者组中的一个消费者消费,避免重复消费导致数据重复。

消费者组:

  • 消费者组由多个consumer组成,形成一个消费者组的条件,是所有消费者的groupid相同。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

消费者组初始化流程:

在这里插入图片描述

消费者组详细消费流程

在这里插入图片描述

Java创建消费者

注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不需要填写是因为id被自动填写为随机的消费者组id。

通过API消费一个主题的数据
//配置
        Properties properties = new Properties();
        //连接集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.27.101:9092,192.168.27.102:9092");

        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //1.创建一个消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //2.定义主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        //3.消费数据
        while (true){
            //拉取的间隔时间
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
消费者消费指定分区

需要指定分区只需要在定义主题时,使用定义主题以及分区方法

//2.定义主题以及分区
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("first",0));
        kafkaConsumer.assign(topicPartitions);
消费者组案例

创建三个消费者,进行消费不同分区

直接复制上面消费主题的代码,因为设置的groupid都是test,因此会自动成为一个消费者组。运行消费者组test内的三个消费者,然后运行生产者对每个分区进行发送消息,可以看到每个消费者都只消费了一个分区的消息。

注意:消费者组内的消费者在底层进行了编号,跟java类取名无关。

分区的分配以及再平衡

消费者组有多个消费者,而一个topic又有多个分区,那么应该由哪个消费者消费哪个分区呢?

Kafka有三种主流的分区分配策略,可以通过配置参数partiton.assignment.strategy修改分配策略,默认的策略是Range+CooperativeSticky。Kafka可以同时使用多个分区分配策略。

//设置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");

再平衡:相当于原有分区的消费者突然发送意外,不能再进行消费,重新分配该分区给其他消费者;或者消费者组中新增了消费组,需要重新分配分区。

在这里插入图片描述

  • Range

    在这里插入图片描述

    Range的再平衡,会将原消费者负责的分区一次性全部交给剩下的某一个消费者

  • RoundRobin

    在这里插入图片描述

    当触发再分配时,会将原消费者负责的分区按照RoundRobin一样进行重新分发

  • Sticky

    Sticky也是针对所有topic的策略,黏性分区是一种均匀随机的分配策略,会在执行一次新的分配之前,考虑上一次的分配结果,尽量少的调整分配的变动,可以节省开销。首先会尽量均衡的分配分区给消费者,在同一组内的消费者出现问题,也会尽量保持原有分配的分区不发送变化。但是在发生再平衡时,所有的消费者需要先放弃当前持有的分区资源,等待重新分配。

  • CooperativeSticky

    CooperativeSticky是2.4版本新增的策略,在原有Sticky策略上,将原本大规模的再平衡操作,拆分成了多次小规模的再平衡,直到平衡完成。

Offset位移

offset的默认维护位置

Offset,消息位移,它表示分区中每条消息的位置信息,是一个单调递增且不变的值。换句话说,offset可以用来唯一的标识分区中每一条记录。消费者消费完一条消息记录之后,需要提交offset来告诉Kafka Broker自己消费到哪里了。

在这里插入图片描述

_consumer_offsets主题采用key和value的方式存储数据,key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact,也就是每个key都保留最新数据。

默认情况下,是不允许查看消费系统主题数据的,如果需要查看该系统主题数据,要设置config/consumer.properties中添加配置exclude.internal.topics=false。默认是true,表示不能看系统相关信息。

自动提交offset

为了让用户更专注于自己的业务逻辑,Kafka提供了自动提交offset的功能,一段时间后自动提交offset。相关参数:

enable.auto.commit 是否开启自动提交offset功能,默认为true

auto.commmit.intervalms 自动提交offset分时间间隔,默认是5s。

//自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //提交时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
手动提交offset

自动提交是基于时间提交,开发人员很难把握提交的时机,因此Kafka还提供了手动提交offset的API。

//设置手动提交(关闭自动)
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

手动提交分为同步提交和异步提交。两者的共同点是都会将本次提交的一批数据最高的偏移量提交,不同的是同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试,而异步提交则没有失败重试机制,所有有可能提交失败。

同步提交:必须等待offset提交完毕,再去消费下一批数据。

//手动提交(同步)
            kafkaConsumer.commitSync();

异步提交:发送完offset请求后,就开始消费下一批数据了。

//手动提交(异步)
            kafkaConsumer.commitAsync();
指定Offset消费
//指定位置进行消费,先获取分区信息
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        //保证分区分配方案已经指定完毕,刚订阅主题时不能立即获取到分区信息
        while (assignment.size()==0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }
        for (TopicPartition topicPartition : assignment) {
            //                   分区           指定offset
            kafkaConsumer.seek(topicPartition,100);
        }
指定时间消费
//指定位置进行消费,先获取分区信息
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        //保证分区分配方案已经指定完毕,刚订阅主题时不能立即获取到分区信息
        while (assignment.size()==0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }
        HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();
        for (TopicPartition topicPartition : assignment) {
            //如果希望是一天前的当前时刻,那么就用当前时间减去一天间隔,单位为ms
            topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis()- 1 * 24 * 3600 * 1000);
        }
        //将时间转换为对应的offset
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
        for (TopicPartition topicPartition : assignment) {
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);

            kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
        }
消费者事务

在消费者消费的过程中会遇到重复消费和漏消费的情况发生。

漏消费:先提交offset后进行消费,有可能造成数据的漏消费

重复消费:已经消费数据,但是offset没有提交

在这里插入图片描述

如果想精准的进行一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。可以将Kafka的offset保存到支持事务的工具中。(比如MySQL)

数据积压

默认日志存储时间为7天,如果当消费速度低于消息的发送速度,那么就很可能造成数据积压。

如果Kafka消费能力不足,那么可以增加Topic的分区数,并且同时提升消费者组的消费者数量,消费者数=分区数。

如果下游数据处理不及时,那么提高每批次拉取的数据量。批次拉取数据过少,使得处理的数据小于生产的数据,也会造成数据积压。


网站公告

今日签到

点亮在社区的每一天
去签到