kafka自定义了一套网络协议,遵守这个协议的格式,就可以像kafka推送消息或者拉取消息 kafka生产者主要实现的功能:
同步/异步发送消息
批量发送消息
超时重发
生产者代码步骤
1.明确发送时同步还是异步;2.明确kafka服务端的主机名和端口号;3.明确发送的topic;4.循环分析:要是异步发送,生产者发送消息给kafka服务端,然后忙自己的事,收到服务端发来的ack确认消息后,会调用回调函数;同步机制:生产者发送消息给kafka服务端,阻塞等待,收到服务端发来的ack确认消息后,会调用回调函数
具体过程:消息发送过程,涉及两个线程协同工作
主线程:将数据封装成ProducerRecord对象,调用send()将消息存在RecordAccumulator中
Sender线程:从 RecordAccumulator 中不断拉取数据并发送到 Kafka,并进行响应处理 从RecordAccoumulator中批量取出消息(都是同一topic+分区)构成ProducerBatch,取出多个ProducerBatch封装成ClientRequest,将ClientRequest交给NetworkClient,NetworkClient将请求通过kafkachannel执行I/O线程批量发送给broker,收到响应后,调用ClientRequest的回调函数
kafkaProducer
实现了四个方法
//1
send()将消息存入RecordAccumulator
//2
flush(),等待RecordAccumulator中的消息发送完全
//3
partitionsFor(),从Metadata(存储kafka集群的元信息)中获取制定Topic的分区信息
//4
close()关闭Producer对象,等待RecordAccumulator清空,关闭Sender线程
重要字段解释(cv)
PRODUCER_CLIENT_ID_SEQUENCE:clientId的生成器
clientId:此生产者的唯一标识
partitioner:分区选择器
maxRequestSize:消息的最大长度,包含消息头
totalMemorySize:发送单个消息的缓冲区大小
accumulator:RecordAccumulator
sender:发送消息的Sender任务
ioThread:执行Sender任务发送消息的线程
compressionType:压缩算法
keySerializer:key的序列化器
valueSerializer:value的序列化器
Metadata metadata:整个Kafka集群的元数据
maxBlockTimeMs:等待更新Kafka集群元数据的最大时长
requestTimeoutMs:从消息发送到收到ACK响应的最长时长
interceptors:ProducerInterceptor集合,ProducerInterceptor可以在消息发送之前对其进行拦截或修改;也可以先于用户的Callback,对ACK响应进行预处理producerConfig:配置对象,使用反射初始化KafkaProducer配置的相对对象
为什么要在kafkaProducer中维护kafka集群的元信息
在创建ProducerRecord时,只需指定topic的名称,那么他是怎样去特定的分区的,那么就是元信息中存储着这个topic对应几个分区,经过特定算法可以知道目标分区,那么最终时需要知道目标分区的leader所在服务器的地址,端口,这些信息也存在元信息里面
元信息具体数据(可以理解为通过结构体存储信息)
//Node表示一个集群的节点broker,Node记录这个节点host,ip,port
type Node struct {
ID int
Host string
Port int
...
}
//TopicPartition表示某个topic的分区,里面有topic名称,这个分区在这个topic的编号id
type TopicPartition struct {
Topic string
Partition int
...
}
//PartitionInfo:分区详细信息,里面有topic string,partition int[和上面一样],还有leader Node[leader所在节点的id],replicas Node[]-所有副本所在的节点信息,inSyncReplicas-isr集合中所有节点信息
type PartitionInfo struct {
Topic string
Partition int
Leader Node
Replicas []Node
InSyncReplicas []Node
...
}
这些信息都包含在cluster集群中,这个cluster核心字段如下:
1.nodes:kafka集群中节点列表([]Node)
2.nodesById: BrokerId<->Node (map(int,Node))
3.partitionByTopicPartition:TopicPartition<->PartitionInfo(map(TopicPartition,PartitionInfo))
4.partitionsByTopic:Topic名字<->PartitionInfo(map(string,[]PartitionInfo))
5.avaliablePartitionsByTopic:Topic名字<->PartitionInfo(map(string,[]PartitionInfo))
partitionsByTopic和avaliablePartitionsByTopic的区别是上面那个可以没有leader【在某些中间状态,leader宕机状态】,下面那个必须有leader
6.partitionsByNode:Node<->Partitioninfo(map(int,[]PartitionInfo))
这些怎样用呢?1.查找所有broker的信息;2.通过broker的id找到他的Node信息;3.通过TopicPartition找到更加具体的PartitionInfo;4.找到topic下面的所有分区信息(可以是中间状态);5.找到topic下面的所有分区信息(必须有leader);6.找到broker这个下面的所有分区信息
Metadata封装了cluster,核心字段如下
topics:所有topic
version:cluseter集群版本号,每更新一次集群就会使他+1
metadaExpireMs:更新集群元信息间隔
refreshBackoffMs:两次更新集群元信息最小间隔
lastRefreshMa:上次更新元数据时间(失败和成功都包含)
lastSuccessfulRefreshMs:上次元数据更新成功时间
cluster:kafka集群元数据记录
needUpdate:是否强制更新元数据-还不太懂
listeners:监听Metadata更新的监听器集合-还不太懂
needMetadaForTopics:是否更新全部的topic的元数据
Metadata核心方法
//将needUpdate更新为true,目的:Sender线程运行时更新Metadata记录的元数据,然会version的值 requestUpdate() //通过version版本号来判断元数据是否更新成功,要是没有就阻塞 awaitUpdate()
Metadata中的字段由主线程读取,Sender线程负责更新,因此必须要保证线程安全性,因此一般都要给他配置sync
waitOnMetadata()函数分析
看自己需要的topic是否在Metadata里面,要是没在,就加进去
获取topic中分区的详细信息,要是失败,调用requestUpdate(),唤醒Sender线程,使得Sender线程更新Metadata记录的元数据
主线程调用awaitUpdate(),等待Sender线程更新元数据成功
回到第二步直到获取partitioninfo分区信息成功