Kafka拦截器

发布于:2025-03-25 ⋅ 阅读:(26) ⋅ 点赞:(0)


1.定义

拦截器主要用于实现clients端的定制化需求,包括消息在生产者发送到 Kafka 或者在消费者接收消息之前进行一些定制化的操作。用于在消息发送和接收的关键步骤中进行拦截和处理。可以修改消息,日志记录,统计等。由生产者拦截器和消费者拦截器组成。生产者的拦截器是在发送前和确认后调用,而消费者的则是在接收后和提交前。

2.生产者拦截器

对于 producer 而言,interceptor使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor 按序作用于同一条消息从而形成一个拦截链(imterceptorchain)。intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法如下。

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send 方法中,即它运行在用户主线程中。producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
  • onAcknowledgement(RecordMetadata,Exception):该方法会在消息成功提交被应答之前或消息发送失败时调用,并且通常都是在 producer 回调 callback 逻辑触发之前。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。onAcknowledgement 运行在producer的I/O线程中,因此不要在该方法中放入很“重”的逻辑,否则会拖慢producer的消息发送效率。当exception为null时可以获取到消息的确认信息,包括分区、偏移量等。当exception不为null时可以执行一些异常处理逻辑。
  • close:关闭interceptor,主要用于执行一些资源清理工作。

interceptor可能运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外,若指定了多个interceptor,则producer 将按照指定顺序调用它们,同时把每个interceptor 中捕获的异常记录到错误日志中而不是向上传递。

2.1 示例

下面实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
首先创建TimeStampPrependerInterceptor,代码如下:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeStampPrependerInterceptor implements ProducerInterceptor<String,String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord<>(record.topic(),record.partition(), record.timestamp(), record.key(),System.currentTimeMillis()+","+record.value().toString());
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

下面定义第二个interceptor,代码如下:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterInterceptor implements ProducerInterceptor<String,String> {
    private int errorCounter =0;
    private int successCounter =0;
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(exception==null){
            successCounter++;
        }else{
            errorCounter++;
        }
    }
    @Override
    public void close() {
        System.out.println("Successful sent:"+successCounter);
        System.out.println("Failed sent:"+errorCounter);
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

生产者代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//必须指定
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定
        props.put("acks", "-1");
        props.put("retries", 3);
        props.put("batch.size", 323840);
        props.put("linger.ms", 10);
        props.put("buffer.memory", 33554432);
        props.put("max.block.ms", 3000);
        props.put("partitioner.class","com.exm.collectcodenew.kafka.producer.customPartitioner.AuditPartitioner");
        //构建拦截链
        List<String> interceptors =new ArrayList<>();
        interceptors.add("com.exm.collectcodenew.kafka.producer.customInterceptor.TimeStampPrependerInterceptor");
        interceptors.add("com.exm.collectcodenew.kafka.producer.customInterceptor.CounterInterceptor");
        props.put("interceptor.classes",interceptors);
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            ProducerRecord record = new ProducerRecord("topic-test1","test_"+i);
            producer.send(record).get();
        }
        producer.close();
    }
}

3.消费者拦截器

消费者拦截器(Consumer Interceptor)是一种允许你在消息到达消费者客户端之前或之后对其进行拦截并执行特定操作的功能。不过,如果你的目的是在消费消息的确认阶段做一些操作,可以考虑使用消费者回调或者在消费者代码中显式处理确认逻辑。
要使用消费者拦截器,你需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口。其定义的方法如下。

  • onConsume(ConsumerRecords<K, V> records): 在消费者拉取poll到消息之后,但还没被返回给用户之前被调用。可以根据需要修改消息集。
  • onCommit(Map<TopicPartition, OffsetAndMetadata> offsets): 在消费者成功提交偏移量之后调用。可以用来记录偏移量提交的详细信息。如果提交过程中出现异常,可能不会触发这个方法。
  • close(): 拦截器关闭时调用,可以用来释放资源。

3.1 示例

定义interceptor,代码如下:

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public class LoggingOffsetInterceptor implements ConsumerInterceptor<String,String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        System.out.println("Before consuming records: " + records);
        // 在这里可以进行一些预处理操作,例如修改消息内容等
        // 例如:修改每条消息的内容
        records.forEach(record -> System.out.println("Original message: " + new String(record.value())));
        // 你可以在这里返回一个新的ConsumerRecords对象来改变原始的记录集
        return records;
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        System.out.println("Committing offsets: " + offsets);
        // 在这里可以执行一些在偏移量提交之后的操作,例如记录偏移量信息等
    }
    @Override
    public void close() {
        // 清理资源等操作
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

消费者代码如下:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//必须指定
        props.put("group.id","test-group");//必须指定
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//必须指定
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//必须指定
        props.put("enable.auto.commit","true");
        props.put("auto.commit.interval.ms","1000");
        props.put("auto.offset.reset","earliest");//从最早的消息开始读取
        // 添加拦截器配置
        props.put("interceptor.classes", "com.exm.collectcodenew.kafka.producer.customInterceptor.LoggingOffsetInterceptor");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //创建consumer实例
        consumer.subscribe(Arrays.asList("topic-test1"));
        while(true){
            ConsumerRecords<String,String> records=consumer.poll(1000);
            for (ConsumerRecord<String, String> record: records){
                System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());
            }
        }
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到