kafka知识点

发布于:2023-01-19 ⋅ 阅读:(543) ⋅ 点赞:(0)

总结

MQ隔离了消息生产和消息消费,解耦了消息生产和消息消费

kafka知识点

  • broker
  • topic partition replication leader follower
  • log segment message-interval

kafka版本特点:

  • kk0.9- offset存储在zk
  • kk0.9+ offset存储在kk(kk创建的topic中,topicName=_consumer_offsets)
  • kk2.8.0- 必须要zookeeper

kafka配置
broker数量

配置 说明 默认值
num.partitions 分区数量 1
offsets.topic.replication.factor partition备份数量 1
log.dirs 日志目录,xxx.log文件存储位置 /tmp/kafka-logs
log.segment.bytes xxx.log文件大小 1073741824bytes=1GB
log.index.interval.bytes 往xxx.log中新写入4KB+的数据xxx.index才会增加1个条目
减小这个值会增加索引条目
4096bytes=4KB
replica.lag.time.max.ms ISR中leader和follower之间的通信时间 10000, 10s

producer配置

配置 说明 默认值
acks 通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all 1, leader消息落盘之后返回ack给producer
retries 请求失败producer会自动重试 0,如果>0可能造成生产消息重复
batch.size 一批消息大小 16KB,超过16KB发送1次
linger.ms 消息逗留时间 0,batch.sizelinger.ms满足其中1个条件发送1次消息
buffer.memory producer可使用的缓存大小 32MB,不能设置太小导致生产到缓存的速度大于发送的速度造成OOM,buffer.memory必>batch.size
key.serializer, value.serializer 消息键值的序列化器 StringSerializer.class, 序列化器类必须实现org.apache.kafka.common.serialization.Serializer接口
transactionIdPrefix 事务ID前缀,<TID,PID>, <PID,Partition,SeqNumber> 默认为空,不用事务
compressionType 压缩类型 无默认值

customer配置

配置 说明 默认值
group-id 消费者组ID 1个partition中的1个消息只能被1消费者组中的1个消费者消费
enable-auto-commit 自动提交offset 无默认
auto-commit-interval 自动提交offset时间间隔 无默认
auto-offset-reset latest(partition有offset从offset开始消费,partition无offset消费新产生的消息(latest消息))
earliest(partition有offset从offset开始消费,partition无offset从头开始消费(earliest消息))
none(partition有offset从offset开始消费,有1个partition无offset抛出异常)
latest
key-deserializer value-deserializer 消息键值的序列化器 StringSerializer.class, 序列化器类必须实现org.apache.kafka.common.serialization.Serializer接口
isolationLevel 事务隔离级别,READ_UNCOMMITTED/READ_COMMITTED READ_UNCOMMITTED
bootstrapServers kafka地址 默认使用kafka.bootstrapServers, kafka.customer.bootstrapServers覆盖kafka.bootstrapServers
clientId 客户端ID,用于服务端日志 无默认值
heartbeatInterval 心跳时间间隔 无默认值
fetchMinSize 一次fetch请求从kafka获取的最小数据量 无默认值
fetchMaxWait 当消息量<fetchMinSize,最多等待时间 无默认值;当消息量>=fetchMinSize,不等待直接返回
maxPollRecords 调用一次poll()最多返回记录数 无默认值

customer分区器(java):分区分配策略

  • 默认RangeAssignor:逐个topic,一段范围内的消息分配给某个消费者
    在这里插入图片描述

MQ初识

MQ隔离了消息生产和消息消费,解耦了消息生产和消息消费
MQ作用:

  • 解耦
  • 异步
  • 削峰,生产大于消费时,暂时存储消息
  • 可扩展

MQ模式

  • publish/subscribe模式(发布/订阅):1个消息可被n个消费者消费
  • p2p模式:1个消息只被1个消费者消费

p/s模式分类:

  • MQ主动推送,类似微信公众号
  • customer主动拉取,类似kakfa

customer主动拉取优缺点:

  • 优点:MQ主动推送,不同的消费者消费能力不同
  • 缺点:customer需要轮询MQ中是否有新的消息

kafka初识

官网

kafka:基于发布订阅模式的消息中间件,主要应用于大数据实时处理领域
侧重点:数据的存储和读取,针对实时性比较高的流式数据处理场景

特点:

  • 高吞吐:2ms,万亿级别消息/天
  • 高可用:集群内部节点动态扩展,集群之间动态扩展
  • 持久化:可持久化数据

zookeeper初识

kafka与zookeeper

  • 多个kk和1个zk可以构成1个集群
  • zk用来维护kk集群的元数据
  • kk0.9- offset存储在zk
  • kk0.9+ offset存储在kk(kk创建的topic中,topicName=_consumer_offsets)
  • offset(偏移量)记录了消费者消费的partition中消息位置(消费到第几个消息了)

kafka/整体结构

在这里插入图片描述
publish/subscribe模式

  • kk只支持“发布/订阅”模式

producer(生产者):

  • 消息的生产者

customer(消费者):

  • 消息的消费者
  • 消费者实时记录offset,以便出错恢复时,从上次的位置继续消费
  • customer位于customer group中

customer group(消费者组):

  • 把消费者分配到1个组里
  • 1个消费者也能构成1个组

broker(中间人):

  • 消息管理的中间人,1个kk服务,1个kk进程
  • 多个broker组成kk集群

topic(话题):

  • 1个话题可以有很多消息
  • producer:topic=m:n,customer:topic=m:n
  • topic中含有partition数目,replication数目
  • topic:partition=1:n

partition(分区,消息队列):

  • 存放消息
  • partition数量可以>broker数量
  • partition以轮询的方式分配到broker中
  • n个partition分布在不同的broker中(负载均衡,高并发)
  • topic:partition=1:n
  • partition:replication=1:n
  • 1个partition中的1个消息只能被1消费者组中的1个消费者消费
  • partition:log文件=1:1

event(消息)

  • event由key、value、timestamp组成
  • 相同key的event会传输到相同的partition
  • events被消费之后不会被删除,可以配置events保存时间(保存events不会影响kafka的性能)

replication(消息队列副本):

  • 备份partition的消息
  • replication数量不能>broker数量
  • 只有1个replication的partition只有leader
  • leader和follower同1个partition的n个replication角色
  • leader-replication和follower-replication一定分布在不同的broker中
  • 每一个replication都维护了1个offset(消息偏移量,消费到那个消息了)

leader(消息队列领导者):

  • 多个replication应该要有领导者
  • leader负责读写消息,producer的写入和customer的读取

follower(消息队列跟随者):

  • 多个replication应该要有跟随者
  • follower只负责备份leader消息

kafka/整体架构

在这里插入图片描述
producer customer customer-group topic-partition-replication

  • customer位于customer group中
  • 1个customer group可以只有1个customer
  • topicA的partition=3,1个partition有replication=2个副本,
  • partition一定位于不同broker中,partition数量不能>broker数量
  • replication-leader和replication-follower一定位于不同broker中,replication数量可以>broker数量
  • replication-leader负责消息读写,replication-follower负责备份replication-leader
  • topic和partition是逻辑概念,replication是物理概念负责消息的读写备份
  • kafka只支持publish/subscribe模式

kafka/配置

配置 说明 默认值
log.index.interval.bytes 往xxx.log中新写入4KB+的数据xxx.index才会增加1个条目
减小这个值会增加索引条目
4096bytes=4KB
log.segment.bytes xxx.log文件大小 1073741824bytes=1GB
num.partitions 分区数量 1
offsets.topic.replication.factor partition备份数量 1
log.dirs 日志目录,xxx.log文件存储位置 /tmp/kafka-logs

kafka/稀疏索引 确定消息位置

在这里插入图片描述
说明:

  • 主要由xxx.index, xxx.log, xxx.timeindex文件
  • 文件名相同则属于同1个segment
  • 文件名以文件内容第1个offset命名
  • log.dirs确定log文件存储目录,log.segment.bytes确定segment的大小,log.index.interval.bytes认为是
    offset(偏移量)的单位
  • 当xxx.log文件>log.segment.bytes 将生成新的segment(xxx.index, xxx.log, xxx.timeindex),文件名以文件内容第1个offset命名
  • 确定消息位置:offset》xxx.index文件名》xxx.index内容(二分查找一段消息)》xxx.log内容(具体查找某个消息)

00000000000000000000.index

offset: 5077 position: 1067163526
offset: 5078 position: 1068068399
offset: 5079 position: 1068973272

00000000000000000000.timeindex

00000000000000000000.timeindex
timestamp: 1660291675000 offset: 5077
timestamp: 1660291675003 offset: 5078
timestamp: 1660291675008 offset: 5079

00000000000000000000.log

position: 1067163526 
	message
	message
position: 1068068399 
	message
	message
	message
position: 1068973272 
	message

在这里插入图片描述
topic partition log segment .log .index .timeindex:

  • topic:partition=1:n
  • partition:log=1:1
  • log:segment=1:n
  • log和segment是逻辑概念,.log, .index, .timeindex是物理文件
  • 1个segment有1个.log,1个.index文件和1个.timeindex文件
  • xxx.log存储消息,xxx.index存储offset|position,xxx.timeindex存储timestamp|offset
  • 同1个segment的xxx.log, xxx.index, xxx.timeindex的文件名相同,以文件内容第1个offset命名

xxx.index:

  • offset|下一个log.index.interval.bytes的起始物理位置,一段消息的offset(偏移量)
  • 因为offset记录的是1段消息的偏移量,不是具体某个消息的偏移量,所以称作稀疏索引
  • 因为offset是递增有序的,所以可以使用二分查找,定位一段消息

xxx.log: position|message

xxx.timeindex:添加offset的毫秒timestamp|offset,什么时间加的offset

kafka/offset

offset是什么

  • offset是某个consumerGroup消费某个topic中的某个partition中的消息产生的
  • [consumerGroup|topic|partition] 唯一确定offset

offset存储位置

  • kk0.9-版本保存在zk
  • kk0.9+版本 kafka

kk0.9-版本offset保存在zk

kafka默认1s之后向zk提交数据
[controller brokers consumers]
- controller 的竞选策略是竞争,那个broker先在zk注册,谁就是controller
- brokers 所有的broker信息
- consumers 是consumerGroup

brokers/
[ids,topics,seqid]
- ids 中是所有broker的id
- topics 中是所有topic
- seqid 中是消息的序号 Exactly once <pid,partition,seqNum>

consumers/
[console-consumer-88500、console-consumer-70850]
- console-consumer-88502 是消费者组,没有指定consumerGroup的名字时console-consumer-随机5个数字

console-consumer-88500/
[ids,owners,offsets]

offsets/
[topicA]	
- topicA是topic name

topicA/
[0,1]
- 0,1都是partition name

kk0.9+版本offset保存在kk(保存为topic:_consumer_offsets)

保存为系统内置的topic:_consumer_offsets
想消费系统内置topic,修改配置文件consuemr.properties,`exclude.internal.topics=false`
对于topic,_consumer_offsets,普通consumer是producer
创建系统consumer消费topic,_consumer_offsets,才能看到_consumer_offsets存储的offset的结构
普通customer消费topic和partition都不变,则生成的offset消息的key不变,提交到系统topic(_consumer_offsets)的partition不变

kafka/高效消息队列

高效消息队列:

  • 多partition并行
  • 顺序写磁盘
  • 零拷贝

顺序写磁盘:数据原来是按簇离散存储到磁盘中的,kk是顺序写到磁盘中,减少了磁盘寻址时间(磁盘访问时间=寻道时间+寻找扇区时间+读字节时间)

传统IO:在内存的内核空间和用户空间来回拷贝
在这里插入图片描述

零拷贝:内核空间和用户空间都没有拷贝出现。实际上还是会拷贝一些元信息到socket cache
在这里插入图片描述

kafka/zookeeper

kk2.8.0- 必须要zookeeper
kafka进程broker和zookeeper结合构成集群

ZK作用:

  • 选举某个broker为controller
  • 抢占式选举,那个broker先向zk注册这个broker就被当做leader

controller(某个broker)作用

  • 管理集群中broker的上下线
  • 管理集群中topic的partition和replication分配
  • 管理集群中replication的leader选举

ISR中leader选举:

  • 先获取ISR,再更新leader和ISR
  • ISR中leader挂了会通知到zk,isr_change_notification

kafka/消息同步策略ISR(In-Sync Replicas)

ISR(In-Sync Replicas):保持同步的副本,包括leader+n个follower,ISR中不包含所有follower
ISR作用:

  • 新leader选举
  • follower的选择
  • ack的发送时机

follower的选择-kk老版本:

  • leader和follower消息的差距数量,配置replica.lag.max.messages
  • leader和follower之间的通信时间,配置replica.lag.time.max.ms

follower的选择-kk新版本:

  • leader和follower之间的通信时间,配置replica.lag.time.max.ms

kafka/消息同步策略ISR(In-Sync Replicas)/问题

问题:为什么kk新版本不要“消息差距标准了”
首先,两个条件都要将造成follower反复进出

  • 如果replica.lag.max.messages=10
  • 生产者是批量发送12条消息》leader
  • 此时ISR中所有follower消息条数与leader都差距12条,ISR中所有follower被踢出
  • follower和leader之间的通信时间<replica.lag.time.max.ms规定的时间
  • ISR又把这些踢出去的follower全部加入
  • 上面两个条件放到造成反复选入和踢出follower,类似操作系统不好的页面置换算法造成页面抖动
  • 另外ISR的元数据需要同步给zk

此外,leader和follower的通信时间标准变相保证了leader和follower消息的差距数量标准,leader和follower的通信时间延迟越短,消息同步速度自然越快,leader和follower的消息数量差距也就减少了。

kafka/消息同步策略/HW和LEO customer

LEO(Lag End Offset):ISR中每个replication(leader,follower)最后1个offset
HW(High Watermark):ISR中最小的LEO(最大offset中的最小值)

HW和LEO保证customer消费消息的一致性

  • HW之前的消息,customer才能够消费
  • ISR中选取的新leader中消息数量一定>=HW
  • ISR中不论那个follower成为leader后,消费者都能正常消费

kafka/消息数量/Exactly once(有且只有1个消息)

At least Once:这个消息在kafka中至少有1个,1~n,对应acks=-1,消息不会丢失,消息可能会重复
At Most Once:这个消息在kafka中至多有1个,0~1,对应acks=0,消息可能丢失,消息不会重复
Exactly Once:这个消息在kafka中有且只有1个,1,消息幂等性+At Least Once=Exactly Once

kafka幂等性原理:唯一索引

kafka Exactly once过程:
1.将producer参数中enable.idempotence=true:producer会被分配1个PID,producer发送同1个partition的消息会附带SeqNumber
2.broker缓存<PID,Partition,SeqNumber>,那个producer往那个partition发送了第几个event

kafka/消息数量/Exactly once/问题

跨partition:

  • kafka的幂等性无法保证跨分区跨会话的Exactly Once。因为,<PID,Partition,SeqNumber>,同1个消息发往不同的partition也无法去重,造成不满足Exactly once

跨会话

  • kafka重启:kafka重启PID会发生变化
  • producer重启:producer重启PID会发生变化

producer/kafka/事务

<TID,PID>, <PID,Partition,SeqNumber>

已知Exactly once

  • 可以保证不跨会话(kk重启或producer重启)&&不跨partition情况下的“消息有且只有1个”
  • 实现方式<PID,Partition,SeqNumber>,PID由broker提供给producer

producer事务:跨会话&&跨partition的Exactly once,两跨之下依然能够保证生产消息的唯一性

  • producer提供一个全局唯一的Transaction ID
  • 1对1绑定TID和PID,<tid,pid>
  • 当跨会话(kk重启或producer重启)之后可以通过TID获取PID

Transaction Coordinator组件:负责producer事务的实现

  • producer使用TID可以从Transaction Coordinator获取事务状态(事务topic保存事务状态)

customer事务:保证1个消息只被消费1刺

  • 事务能力弱于producer
  • consumer可以通过offset访问kk中任意位置的的消息
  • kk中segment(.log, .index)默认保存7天,segment过期删除时仍有消息没有消费,被删除的消息不能被消费

producer/可靠性机制ACK

producer没有收到ack的消息才重传
request.required.acks可取值为0/1/-1
request.required.acks=0:不会给producer响应ack
request.required.acks=1(默认):leader消息落盘之后返回ack给producer
request.required.acks=-1:ISR中follower(不是所有follower)从leader同步消息落盘后发送ack给leader

request.required.acks=-1发送ack流程:

  1. producer批量发送消息到kk
  2. 批量消息到达leader并落盘
  3. ISR中follower同步数据并落盘完成
  4. 发送ack给leader

producer/可靠性机制ACK/问题/消息丢失

request.required.acks=0:不会给producer响应ack
request.required.acks=1:producer发送消息》leader接收消息并落盘后》leader所在broker宕机(数据丢失)》kafka重新选举leader》kafka消息丢失
request.required.acks=-1:当ISR中只有1个leader没有follower时,退回到ack=1的情况

  • offsets.topic.replication.factor=1副本数被设置为1,当ISR中只有1个leader没有follower,退回到ack=1的情况
  • 在ISR重新选入follower期间,ISR中只有1个leader提供服务,退回到ack=1的情况
  • 保证ISR中至少有1个follower就可以避免消息丢失

producer/可靠性机制acks/问题/消息重复

request.required.acks=-1:producer发送消息》leader接收消息并落盘后》ISR中follower从leader后同步消息并落盘后》leader宕机(发送ack失败)》producer没有收到ack》重新发送已经落盘的消息

producer/整体结构

在这里插入图片描述
producer:生产者使用send(ProducerRecord对象)方法发送构造的ProducerRecord对象
interceptors:拦截器
serializer:序列化器
partitioner:分区器

RecordAccumulator:记录累计器,是main线程和sender线程共享的对象
RecordBatch:批量记录的封装

sender:发送器,一次发送一个RecordBatch

producer/interceptor 自定义拦截器

ProducerInterceptor接口方法:

  • configure(configs):操作配置信息
  • onSend(ProducerRecord):操作ProducerRecord。在Serializer、Partitioner之前调用
  • onAcknowledgement(RecordMetadata, Exception):ACK,broker发送确认给producer。producer回调之前调用
  • close():producer.close()时才会调用interceptor.close()
  • interceptor运行在producer所处线程中

producer/分区分配策略

在这里插入图片描述
将producer发送的数据封装成 ProducerRecord对象

  • 指定partition,使用指定partition的值
  • 没有指定partition但指定key,hash(key)%topic可用的partition数量=partition,key不变消息放置的partition不变
  • 没有指定partition且没有指定key,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与%topic可用的partition数量=partition
  • 三种封装成ProducerRecod对象的方法的优先级别,从上到下优先级降低

customer/消费方式

publish/subscribe模式分类:

  • MQ主动推送,类似微信公众号
  • customer主动拉取,类似kakfa

customer主动拉取优缺点:

  • 优点:MQ主动推送,不同的消费者消费能力不同
  • 缺点:customer需要轮询MQ中是否有新的消息

kafka customer:

  • 消费时传入timeout(等待时间)参数,customer消费时没有数据,等待timeout时间再去查看。

customer/分区分配策略

注意:

  • 组为单位:以customer-group为单位订阅topic,同1个customer-group的customer不会重复消息
  • 同组互斥:1个topic下的1个partition中的1个event只能被customer-group中的1个customer消费

customer具有以下分区分配策略:

  • Range(范围): kk默认分区分配策略,一段范围内的消息分配给某个消费者
  • Round-Robin(轮询):消息只分配到customer-group
  • Sticky(粘性):

Range(范围): 逐个topic

  • 排序partition:排序同1个topic中的partition
  • 排序customer:排序同1个customer-group的customer
  • customer被分配的消息数量:range=partition数量(同1topic)/customer数量(同1customer-group),余数交给同1customer-group的第1个customer
  • 缺点-首customer负载不公:topic数量很多,大量topic的partition数量/customer数量=有余数时,这些余数将被交给同1customer-group的第1个customer(range+余数+余数+…+余数 数量的消息)

Round-Robin(轮询):面向订阅的所有topic

  • 轮询订阅的所有topic的所有partition,将订阅的所有topic的所有partition结合起来轮询
  • 缺点,消息只分配到customer-group,不能指定由组内那个customer消费

Sticky(粘性):

  • kk 0.11+之后才出现的新策略

customer/重置offset

重置offset:

  • customer更换customer-group之后将重置offset
  • segment过期删除(offset也被删除了)
  • auto.offset.reset=latest/earliest, latest是默认方式, earliest等价于--from-begining, earliest不一定从0开始
  • 批量消费提交offset,customer只获取offset一次,把这个offset放入内存中,后面的消费直接去修改内存中的offset,批量消费后提交offset

customer/提交offset

提交offset:

  • 自动定时提交
  • 手动(同步/异步)提交

手动(同步/异步)提交:

  • 可以在消费者成功处理完数据后再提交,避免消费者侧消息丢失
  • 手动同步提交:使用1个线程poll消息和提交offset,提交offset失败会自动重试
  • 手动异步提交:异步手动提交,主线程仍然去拉取数据,另开1个线程去提交offset,提交offset失败不会重试
  • 手动同步提交缺点:线程提交offset时会阻塞直到提交offset完成
  • 手动异步提交缺点:提交offset失败不会重试

customer/提交offset/自动定时提交/问题

消息漏消费:

  • customer处理失败,但offset提交成功
  • 举例,1s钟提交1次offset最大值,当consumer处理最后1条消息时处理失败,此时刚好1s钟仍然会提交最大的offset,造成消息失败消费

消息重复消费:

  • customer处理成功,但offset提交失败
  • 举例,10s提交1次offset最大值,当consumer5s内处理完poll下来的一批数据并入数据库后,刚好第6sconsumer故障 offset没有提交,consumer重新启动获取offset值不是最新的

customer/存储offset

offset存储位置

  • kk0.9-版本保存在zk
  • kk0.9+版本 kafka

customer/存储offset/问题

消息重复消费:

  • zk或kk的offset存储方式,当offset提交失败,造成customer重复消费(重新从旧offset开始消费)
  • 重复消费:consumer先处理消息,再提交offset,可能造成重复消费
  • 消息漏消费:先提交offset,再consumer处理消息,可能消费失败,进而造成消息漏消费
  • 因此采用自定义存储offset,把customer处理消息和提交offset弄成1个原子性操作

customer/存储offset/自定义存储offset

customer分配的partition经过rebalance:如果由于customer数量发生变化,造成其他customer分配的partition发生变化,即经过rebalance,此时其他customr需要存储更多的offset
监听rebalance:因此,需要1个ConsumerRebalanceListener监听consumer是否经过的rebalance,没有rebalance只存储原来分配的partition的offset,经过rebalance需要定位到新分配partition从故障customer最近提交的offset继续消费

下面2个方法需要自己实现

  • 获取某partition最新的offset(比如,从mysql获取)
  • 提交该customer被分配的partition最新的offset(比如,提交到mysql)
  • mysql表设计同于kkconsumerGroup|topic|partition|offset

问题

消息默认持久化时间7天,kk中segment(.log, .index)默认保存7天
消息消费顺序,生产者顺序就是消费的顺序


网站公告

今日签到

点亮在社区的每一天
去签到

热门文章