【大数据学习 | kafka】producer端的回调和ack

发布于:2024-11-02 ⋅ 阅读:(66) ⋅ 点赞:(0)

主线程将数据放入到本地累加器中record accumulator中进行存储sender线程异步拉取数据到kafka集群中,这个数据拉取并且复制到kafka集群中以后,kafka需要返回给sender线程一个确认应答ack,这个确认应答用于在sender线程中进行判定sender线程是否复制拉取数据成功,如果我们在producer中设定了retries开关,那么失败以后sender线程还会多次重新复制尝试拉取数据

其中失败尝试和producer端没有任何关系,producer端只是将数据放入到本地累加器中而已,失败尝试是由sender线程重新尝试的

ack的级别:

ack = 0 ;sender线程认为拉取过去的数据kafka一定会收到

ack = 1 ; sender线程拉取过去的数据leader节点接收到,并且存储到自己的本地,然后在返回ack

ack = -1 ; sender线程拉取数据,leader节点收到存储到本地,所有follower节点全部都接收到并且存储到本地这个时候leader返回ack

综上所述ack = -1的级别是数据稳定性最高的,因为能够保证数据全部都同步完毕再返回给sender线程。

后面我们可以得知:

ack=0或1的时候,都可能会导致数据的丢失问题。

而ack=-1时,确保所有数据同步完毕才返回,但有可能导致数据的重复发送的问题。

带有确认应答的代码:

其中回调函数中的metadata对象可以知道发送数据到哪里了,exception用于区分是不是本条数据发送成功

但是这个回调函数不能做出任何的反馈操作,只能起到通知的作用

代码:

public class producerWithCallBack {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.ACKS_CONFIG, "all");
        //设定ack,在代码中ack的级别存在三种 0 1 all
        pro.put(ProducerConfig.RETRIES_CONFIG,3 );
        //设定重试次数
        pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
        pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "what can i say ? man");
        for(int i=0;i<5;i++){
            producer.send(record, new Callback() {
                //发送方法中增加回调代码
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    //metadata中包含所有的发送数据的元数据信息
                    //哪个topic的那个分区的第几个数据
                    String topic = metadata.topic();
                    int partition = metadata.partition();
                    long offset = metadata.offset();
                    if(exception == null ){
                        System.out.println("success"+" "+topic+" "+partition+" "+offset);
                    }else{
                        System.out.println("fail"+" "+topic+" "+partition+" "+offset);
                    }
                }
            });
        }
        producer.close();
    }
}

打印的元数据信息。