主线程将数据放入到本地累加器中record accumulator中进行存储,sender线程会异步的拉取数据到kafka集群中,这个数据拉取并且复制到kafka集群中以后,kafka需要返回给sender线程一个确认应答ack,这个确认应答用于在sender线程中进行判定sender线程是否复制拉取数据成功,如果我们在producer中设定了retries开关,那么失败以后sender线程还会多次重新复制尝试拉取数据
其中失败尝试和producer端没有任何关系,producer端只是将数据放入到本地累加器中而已,失败尝试是由sender线程重新尝试的
ack的级别:
ack = 0 ;sender线程认为拉取过去的数据kafka一定会收到
ack = 1 ; sender线程拉取过去的数据leader节点接收到,并且存储到自己的本地,然后在返回ack
ack = -1 ; sender线程拉取数据,leader节点收到存储到本地,所有follower节点全部都接收到并且存储到本地这个时候leader返回ack
综上所述ack = -1的级别是数据稳定性最高的,因为能够保证数据全部都同步完毕再返回给sender线程。
后面我们可以得知:
ack=0或1的时候,都可能会导致数据的丢失问题。
而ack=-1时,确保所有数据同步完毕才返回,但有可能导致数据的重复发送的问题。
带有确认应答的代码:
其中回调函数中的metadata对象可以知道发送数据到哪里了,exception用于区分是不是本条数据发送成功
但是这个回调函数不能做出任何的反馈操作,只能起到通知的作用
代码:
public class producerWithCallBack {
public static void main(String[] args) {
Properties pro = new Properties();
pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.put(ProducerConfig.ACKS_CONFIG, "all");
//设定ack,在代码中ack的级别存在三种 0 1 all
pro.put(ProducerConfig.RETRIES_CONFIG,3 );
//设定重试次数
pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "what can i say ? man");
for(int i=0;i<5;i++){
producer.send(record, new Callback() {
//发送方法中增加回调代码
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//metadata中包含所有的发送数据的元数据信息
//哪个topic的那个分区的第几个数据
String topic = metadata.topic();
int partition = metadata.partition();
long offset = metadata.offset();
if(exception == null ){
System.out.println("success"+" "+topic+" "+partition+" "+offset);
}else{
System.out.println("fail"+" "+topic+" "+partition+" "+offset);
}
}
});
}
producer.close();
}
}
打印的元数据信息。