kafka学习笔记(三、消费者Consumer使用教程——消费性能多线程提升思考)

发布于:2025-06-01 ⋅ 阅读:(23) ⋅ 点赞:(0)

在这里插入图片描述


1.简介

KafkaConsumer是非线程安全的,它定义了一个acquire()方法来检测当前是否只有一个线程在操作,如不是则会抛出ConcurrentModifcationException异常。

acquire()可以看做是一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁操作和解锁操作。

KafkaConsumer虽然是单线程的执行方式,但是在某些情况下如:生产者发送消息的速度远大于消费者消费的速度,这样长时间可能会造成消息的丢失,此时我们就需要消费者采用多线程消费的方式增加消费速度。

2.多线程实现的方式

2.1.线程封闭多线程

即为每个线程实例化一个KafkaConsumer,如图所示,一个线程对应一个KafkaConsumer实例,所有的消费线程都属于同一个消费者组。

这种方式的并发度受限分区的实际个数

在这里插入图片描述
实现代码示例:

public class kafkaConsumer1 {

    public void ConsuermMultithread1() {
        Properties props = initConsifg(); // 此处初始化消费者配置参数省略
        int consumerThreadNum = 5;
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(props, topic).start();
        }
    }
    
    // 消费线程
    public static class KafkaConsumerThread extends Thread {
        private KafkaConsumer<String, String> kafkaConsumer;

        public KafkaConsumerThread(Properties prop, String topic) {
          this.kafkaConsumer = new KafkaConsumer<>(prop);
          this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }
        
        @Override
        public void run() {
          try {
              while (true) {
                  ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                  for (ConsumerRecord<String, String> record: records) {
                      // 处理消息
                  }
              }
          } catch (Exception e) {
              e.printStackTrace();
          } finally {
              kafkaConsumer.close();
          }
        }
    }
}

2.1.消息处理模块多线程

此方法是对上面方法的进一步优化,在消息处理模块增加多线程来处理消息,进一步提升消息消费的速度。
在这里插入图片描述

public class kafkaConsumer1 {

    public void ConsuermMultithread1() {
        Properties props = initConsifg(); // 此处初始化消费者配置参数省略
        int consumerThreadNum = 5;
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(props, topic).start();
        }
    }

    public static class KafkaConsumerThread extends Thread {
        private KafkaConsumer<String, String> kafkaConsumer;
        private ExecutorService executorService;
        private int threadNumber;

        public KafkaConsumerThread(Properties prop, String topic, int threadNumber) {
            this.kafkaConsumer = new KafkaConsumer<>(prop);
            this.kafkaConsumer.subscribe(Collections.singletonList(topic));
            this.threadNumber = threadNumber;

            executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                    if (!records.isEmpty()) {
                        executorService.submit(new RecordsHandler(records));
                    }
                    
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                kafkaConsumer.close();
            }
        }
    }

    public static class RecordsHandler extends Thread {
        public final ConsumerRecords<String, String> records;

        public RecordsHandler(ConsumerRecords<String, String> records) {
            this.records = records;
        }

        @Override
        public void run() {
            /// 处理records  
        }
    }
}

此方法需要引入一个共享的offsets来参与提交。


网站公告

今日签到

点亮在社区的每一天
去签到