总结
MQ隔离了消息生产和消息消费,解耦了消息生产和消息消费
kafka知识点
- broker
- topic partition replication leader follower
- log segment message-interval
kafka版本特点:
- kk0.9- offset存储在zk
- kk0.9+ offset存储在kk(kk创建的topic中,topicName=_consumer_offsets)
- kk2.8.0- 必须要zookeeper
kafka配置
broker数量
配置 | 说明 | 默认值 |
---|---|---|
num.partitions |
分区数量 | 1 |
offsets.topic.replication.factor |
partition备份数量 | 1 |
log.dirs |
日志目录,xxx.log文件存储位置 | /tmp/kafka-logs |
log.segment.bytes |
xxx.log文件大小 | 1073741824bytes=1GB |
log.index.interval.bytes |
往xxx.log中新写入4KB+的数据xxx.index才会增加1个条目 减小这个值会增加索引条目 |
4096bytes=4KB |
replica.lag.time.max.ms |
ISR中leader和follower之间的通信时间 | 10000, 10s |
producer配置
配置 | 说明 | 默认值 |
---|---|---|
acks |
通过request.required.acks 参数来设置数据可靠性的级别,分别是0, 1,all |
1, leader消息落盘之后返回ack给producer |
retries |
请求失败producer会自动重试 | 0,如果>0可能造成生产消息重复 |
batch.size |
一批消息大小 | 16KB,超过16KB发送1次 |
linger.ms |
消息逗留时间 | 0,batch.size 和linger.ms 满足其中1个条件发送1次消息 |
buffer.memory |
producer可使用的缓存大小 | 32MB,不能设置太小导致生产到缓存的速度大于发送的速度造成OOM,buffer.memory 必>batch.size |
key.serializer , value.serializer |
消息键值的序列化器 | StringSerializer.class, 序列化器类必须实现org.apache.kafka.common.serialization.Serializer接口 |
transactionIdPrefix |
事务ID前缀,<TID,PID>, <PID,Partition,SeqNumber> |
默认为空,不用事务 |
compressionType |
压缩类型 | 无默认值 |
customer配置
配置 | 说明 | 默认值 |
---|---|---|
group-id |
消费者组ID | 1个partition中的1个消息只能被1消费者组中的1个消费者消费 |
enable-auto-commit |
自动提交offset | 无默认 |
auto-commit-interval |
自动提交offset时间间隔 | 无默认 |
auto-offset-reset |
latest(partition有offset从offset开始消费,partition无offset消费新产生的消息(latest消息)) earliest(partition有offset从offset开始消费,partition无offset从头开始消费(earliest消息)) none(partition有offset从offset开始消费,有1个partition无offset抛出异常) |
latest |
key-deserializer value-deserializer |
消息键值的序列化器 | StringSerializer.class, 序列化器类必须实现org.apache.kafka.common.serialization.Serializer接口 |
isolationLevel |
事务隔离级别,READ_UNCOMMITTED/READ_COMMITTED | READ_UNCOMMITTED |
bootstrapServers |
kafka地址 | 默认使用kafka.bootstrapServers, kafka.customer.bootstrapServers覆盖kafka.bootstrapServers |
clientId |
客户端ID,用于服务端日志 | 无默认值 |
heartbeatInterval |
心跳时间间隔 | 无默认值 |
fetchMinSize |
一次fetch请求从kafka获取的最小数据量 | 无默认值 |
fetchMaxWait |
当消息量<fetchMinSize,最多等待时间 | 无默认值;当消息量>=fetchMinSize,不等待直接返回 |
maxPollRecords |
调用一次poll()最多返回记录数 | 无默认值 |
customer分区器(java):分区分配策略
- 默认
RangeAssignor
:逐个topic,一段范围内的消息分配给某个消费者
MQ初识
MQ隔离了消息生产和消息消费,解耦了消息生产和消息消费
MQ作用:
- 解耦
- 异步
- 削峰,生产大于消费时,暂时存储消息
- 可扩展
MQ模式
- publish/subscribe模式(发布/订阅):1个消息可被n个消费者消费
- p2p模式:1个消息只被1个消费者消费
p/s模式分类:
- MQ主动推送,类似微信公众号
- customer主动拉取,类似kakfa
customer主动拉取优缺点:
- 优点:MQ主动推送,不同的消费者消费能力不同
- 缺点:customer需要轮询MQ中是否有新的消息
kafka初识
kafka:基于发布订阅模式的消息中间件,主要应用于大数据实时处理领域
侧重点:数据的存储和读取,针对实时性比较高的流式数据处理场景
特点:
- 高吞吐:2ms,万亿级别消息/天
- 高可用:集群内部节点动态扩展,集群之间动态扩展
- 持久化:可持久化数据
zookeeper初识
kafka与zookeeper
- 多个kk和1个zk可以构成1个集群
- zk用来维护kk集群的元数据
- kk0.9- offset存储在zk
- kk0.9+ offset存储在kk(kk创建的topic中,topicName=_consumer_offsets)
- offset(偏移量)记录了消费者消费的partition中消息位置(消费到第几个消息了)
kafka/整体结构
publish/subscribe模式
- kk只支持“发布/订阅”模式
producer(生产者):
- 消息的生产者
customer(消费者):
- 消息的消费者
- 消费者实时记录offset,以便出错恢复时,从上次的位置继续消费
- customer位于customer group中
customer group(消费者组):
- 把消费者分配到1个组里
- 1个消费者也能构成1个组
broker(中间人):
- 消息管理的中间人,1个kk服务,1个kk进程
- 多个broker组成kk集群
topic(话题):
- 1个话题可以有很多消息
- producer:topic=m:n,customer:topic=m:n
- topic中含有partition数目,replication数目
- topic:partition=1:n
partition(分区,消息队列):
- 存放消息
- partition数量可以>broker数量
- partition以轮询的方式分配到broker中
- n个partition分布在不同的broker中(负载均衡,高并发)
- topic:partition=1:n
- partition:replication=1:n
- 1个partition中的1个消息只能被1消费者组中的1个消费者消费
- partition:log文件=1:1
event(消息)
- event由key、value、timestamp组成
- 相同key的event会传输到相同的partition
- events被消费之后不会被删除,可以配置events保存时间(保存events不会影响kafka的性能)
replication(消息队列副本):
- 备份partition的消息
- replication数量不能>broker数量
- 只有1个replication的partition只有leader
- leader和follower同1个partition的n个replication角色
- leader-replication和follower-replication一定分布在不同的broker中
- 每一个replication都维护了1个offset(消息偏移量,消费到那个消息了)
leader(消息队列领导者):
- 多个replication应该要有领导者
- leader负责读写消息,producer的写入和customer的读取
follower(消息队列跟随者):
- 多个replication应该要有跟随者
- follower只负责备份leader消息
kafka/整体架构
producer customer customer-group topic-partition-replication
- customer位于customer group中
- 1个customer group可以只有1个customer
- topicA的partition=3,1个partition有replication=2个副本,
- partition一定位于不同broker中,partition数量不能>broker数量
- replication-leader和replication-follower一定位于不同broker中,replication数量可以>broker数量
- replication-leader负责消息读写,replication-follower负责备份replication-leader
- topic和partition是逻辑概念,replication是物理概念负责消息的读写备份
- kafka只支持publish/subscribe模式
kafka/配置
配置 | 说明 | 默认值 |
---|---|---|
log.index.interval.bytes |
往xxx.log中新写入4KB+的数据xxx.index才会增加1个条目 减小这个值会增加索引条目 |
4096bytes=4KB |
log.segment.bytes |
xxx.log文件大小 | 1073741824bytes=1GB |
num.partitions |
分区数量 | 1 |
offsets.topic.replication.factor |
partition备份数量 | 1 |
log.dirs |
日志目录,xxx.log文件存储位置 | /tmp/kafka-logs |
kafka/稀疏索引 确定消息位置
说明:
- 主要由xxx.index, xxx.log, xxx.timeindex文件
- 文件名相同则属于同1个segment
- 文件名以文件内容第1个offset命名
log.dirs
确定log文件存储目录,log.segment.bytes
确定segment的大小,log.index.interval.bytes
认为是
offset(偏移量)的单位- 当xxx.log文件>
log.segment.bytes
将生成新的segment(xxx.index, xxx.log, xxx.timeindex),文件名以文件内容第1个offset命名 - 确定消息位置:offset》xxx.index文件名》xxx.index内容(二分查找一段消息)》xxx.log内容(具体查找某个消息)
00000000000000000000.index
offset: 5077 position: 1067163526
offset: 5078 position: 1068068399
offset: 5079 position: 1068973272
00000000000000000000.timeindex
00000000000000000000.timeindex
timestamp: 1660291675000 offset: 5077
timestamp: 1660291675003 offset: 5078
timestamp: 1660291675008 offset: 5079
00000000000000000000.log
position: 1067163526
message
message
position: 1068068399
message
message
message
position: 1068973272
message
topic partition log segment .log .index .timeindex:
- topic:partition=1:n
- partition:log=1:1
- log:segment=1:n
- log和segment是逻辑概念,.log, .index, .timeindex是物理文件
- 1个segment有1个.log,1个.index文件和1个.timeindex文件
- xxx.log存储消息,xxx.index存储
offset|position
,xxx.timeindex存储timestamp|offset
- 同1个segment的xxx.log, xxx.index, xxx.timeindex的文件名相同,以文件内容第1个offset命名
xxx.index:
offset|下一个log.index.interval.bytes的起始物理位置
,一段消息的offset(偏移量)- 因为offset记录的是1段消息的偏移量,不是具体某个消息的偏移量,所以称作稀疏索引
- 因为offset是递增有序的,所以可以使用二分查找,定位一段消息
xxx.log: position|message
xxx.timeindex:添加offset的毫秒timestamp|offset
,什么时间加的offset
kafka/offset
offset是什么
- offset是某个consumerGroup消费某个topic中的某个partition中的消息产生的
- [consumerGroup|topic|partition] 唯一确定offset
offset存储位置
- kk0.9-版本保存在zk
- kk0.9+版本 kafka
kk0.9-版本offset保存在zk
kafka默认1s之后向zk提交数据
[controller brokers consumers]
- controller 的竞选策略是竞争,那个broker先在zk注册,谁就是controller
- brokers 所有的broker信息
- consumers 是consumerGroup
brokers/
[ids,topics,seqid]
- ids 中是所有broker的id
- topics 中是所有topic
- seqid 中是消息的序号 Exactly once <pid,partition,seqNum>
consumers/
[console-consumer-88500、console-consumer-70850]
- console-consumer-88502 是消费者组,没有指定consumerGroup的名字时console-consumer-随机5个数字
console-consumer-88500/
[ids,owners,offsets]
offsets/
[topicA]
- topicA是topic name
topicA/
[0,1]
- 0,1都是partition name
kk0.9+版本offset保存在kk(保存为topic:_consumer_offsets)
保存为系统内置的topic:_consumer_offsets
想消费系统内置topic,修改配置文件consuemr.properties,`exclude.internal.topics=false`
对于topic,_consumer_offsets,普通consumer是producer
创建系统consumer消费topic,_consumer_offsets,才能看到_consumer_offsets存储的offset的结构
普通customer消费topic和partition都不变,则生成的offset消息的key不变,提交到系统topic(_consumer_offsets)的partition不变
kafka/高效消息队列
高效消息队列:
- 多partition并行
- 顺序写磁盘
- 零拷贝
顺序写磁盘:数据原来是按簇离散存储到磁盘中的,kk是顺序写到磁盘中,减少了磁盘寻址时间(磁盘访问时间=寻道时间+寻找扇区时间+读字节时间)
传统IO:在内存的内核空间和用户空间来回拷贝
零拷贝:内核空间和用户空间都没有拷贝出现。实际上还是会拷贝一些元信息到socket cache
kafka/zookeeper
kk2.8.0- 必须要zookeeper
kafka进程broker和zookeeper结合构成集群
ZK作用:
- 选举某个broker为controller
- 抢占式选举,那个broker先向zk注册这个broker就被当做leader
controller(某个broker)作用
- 管理集群中broker的上下线
- 管理集群中topic的partition和replication分配
- 管理集群中replication的leader选举
ISR中leader选举:
- 先获取ISR,再更新leader和ISR
- ISR中leader挂了会通知到zk,isr_change_notification
kafka/消息同步策略ISR(In-Sync Replicas)
ISR(In-Sync Replicas):保持同步的副本,包括leader+n个follower,ISR中不包含所有follower
ISR作用:
- 新leader选举
- follower的选择
- ack的发送时机
follower的选择-kk老版本:
- leader和follower消息的差距数量,配置
replica.lag.max.messages
- leader和follower之间的通信时间,配置
replica.lag.time.max.ms
follower的选择-kk新版本:
- leader和follower之间的通信时间,配置
replica.lag.time.max.ms
kafka/消息同步策略ISR(In-Sync Replicas)/问题
问题:为什么kk新版本不要“消息差距标准了”
首先,两个条件都要将造成follower反复进出
- 如果
replica.lag.max.messages=10
- 生产者是批量发送12条消息》leader
- 此时ISR中所有follower消息条数与leader都差距12条,ISR中所有follower被踢出
- follower和leader之间的通信时间<
replica.lag.time.max.ms
规定的时间 - ISR又把这些踢出去的follower全部加入
- 上面两个条件放到造成反复选入和踢出follower,类似操作系统不好的页面置换算法造成页面抖动
- 另外ISR的元数据需要同步给zk
此外,leader和follower的通信时间标准变相保证了leader和follower消息的差距数量标准,leader和follower的通信时间延迟越短,消息同步速度自然越快,leader和follower的消息数量差距也就减少了。
kafka/消息同步策略/HW和LEO customer
LEO(Lag End Offset):ISR中每个replication(leader,follower)最后1个offset
HW(High Watermark):ISR中最小的LEO(最大offset中的最小值)
HW和LEO保证customer消费消息的一致性
- HW之前的消息,customer才能够消费
- ISR中选取的新leader中消息数量一定>=HW
- ISR中不论那个follower成为leader后,消费者都能正常消费
kafka/消息数量/Exactly once(有且只有1个消息)
At least Once:这个消息在kafka中至少有1个,1~n,对应acks=-1,消息不会丢失,消息可能会重复
At Most Once:这个消息在kafka中至多有1个,0~1,对应acks=0,消息可能丢失,消息不会重复
Exactly Once:这个消息在kafka中有且只有1个,1,消息幂等性+At Least Once=Exactly Once
kafka幂等性原理:唯一索引
kafka Exactly once过程:
1.将producer参数中enable.idempotence=true
:producer会被分配1个PID,producer发送同1个partition的消息会附带SeqNumber
2.broker缓存<PID,Partition,SeqNumber>
,那个producer往那个partition发送了第几个event
kafka/消息数量/Exactly once/问题
跨partition:
- kafka的幂等性无法保证跨分区跨会话的Exactly Once。因为,
<PID,Partition,SeqNumber>
,同1个消息发往不同的partition也无法去重,造成不满足Exactly once
跨会话
- kafka重启:kafka重启PID会发生变化
- producer重启:producer重启PID会发生变化
producer/kafka/事务
<TID,PID>
, <PID,Partition,SeqNumber>
已知Exactly once
- 可以保证不跨会话(kk重启或producer重启)&&不跨partition情况下的“消息有且只有1个”
- 实现方式
<PID,Partition,SeqNumber>
,PID由broker提供给producer
producer事务:跨会话&&跨partition的Exactly once,两跨之下依然能够保证生产消息的唯一性
- producer提供一个全局唯一的Transaction ID
- 1对1绑定TID和PID,<tid,pid>
- 当跨会话(kk重启或producer重启)之后可以通过TID获取PID
Transaction Coordinator组件:负责producer事务的实现
- producer使用TID可以从Transaction Coordinator获取事务状态(事务topic保存事务状态)
customer事务:保证1个消息只被消费1刺
- 事务能力弱于producer
- consumer可以通过offset访问kk中任意位置的的消息
- kk中segment(.log, .index)默认保存7天,segment过期删除时仍有消息没有消费,被删除的消息不能被消费
producer/可靠性机制ACK
producer没有收到ack的消息才重传
request.required.acks
可取值为0/1/-1
request.required.acks=0
:不会给producer响应ack
request.required.acks=1
(默认):leader消息落盘之后返回ack给producer
request.required.acks=-1
:ISR中follower(不是所有follower)从leader同步消息落盘后发送ack给leader
request.required.acks=-1
发送ack流程:
- producer批量发送消息到kk
- 批量消息到达leader并落盘
- ISR中follower同步数据并落盘完成
- 发送ack给leader
producer/可靠性机制ACK/问题/消息丢失
request.required.acks=0
:不会给producer响应ack
request.required.acks=1
:producer发送消息》leader接收消息并落盘后》leader所在broker宕机(数据丢失)》kafka重新选举leader》kafka消息丢失
request.required.acks=-1
:当ISR中只有1个leader没有follower时,退回到ack=1的情况
offsets.topic.replication.factor=1
副本数被设置为1,当ISR中只有1个leader没有follower,退回到ack=1的情况- 在ISR重新选入follower期间,ISR中只有1个leader提供服务,退回到ack=1的情况
- 保证ISR中至少有1个follower就可以避免消息丢失
producer/可靠性机制acks/问题/消息重复
request.required.acks=-1
:producer发送消息》leader接收消息并落盘后》ISR中follower从leader后同步消息并落盘后》leader宕机(发送ack失败)》producer没有收到ack》重新发送已经落盘的消息
producer/整体结构
producer:生产者使用send(ProducerRecord对象)方法发送构造的ProducerRecord对象
interceptors:拦截器
serializer:序列化器
partitioner:分区器
RecordAccumulator:记录累计器,是main线程和sender线程共享的对象
RecordBatch:批量记录的封装
sender:发送器,一次发送一个RecordBatch
producer/interceptor 自定义拦截器
ProducerInterceptor接口方法:
- configure(configs):操作配置信息
- onSend(ProducerRecord):操作ProducerRecord。在Serializer、Partitioner之前调用
- onAcknowledgement(RecordMetadata, Exception):ACK,broker发送确认给producer。producer回调之前调用
- close():producer.close()时才会调用interceptor.close()
- interceptor运行在producer所处线程中
producer/分区分配策略
将producer发送的数据封装成 ProducerRecord对象
- 指定partition,使用指定partition的值
- 没有指定partition但指定key,hash(key)%topic可用的partition数量=partition,key不变消息放置的partition不变
- 没有指定partition且没有指定key,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与%topic可用的partition数量=partition
- 三种封装成ProducerRecod对象的方法的优先级别,从上到下优先级降低
customer/消费方式
publish/subscribe模式分类:
- MQ主动推送,类似微信公众号
- customer主动拉取,类似kakfa
customer主动拉取优缺点:
- 优点:MQ主动推送,不同的消费者消费能力不同
- 缺点:customer需要轮询MQ中是否有新的消息
kafka customer:
- 消费时传入timeout(等待时间)参数,customer消费时没有数据,等待timeout时间再去查看。
customer/分区分配策略
注意:
- 组为单位:以customer-group为单位订阅topic,同1个customer-group的customer不会重复消息
- 同组互斥:1个topic下的1个partition中的1个event只能被customer-group中的1个customer消费
customer具有以下分区分配策略:
- Range(范围): kk默认分区分配策略,一段范围内的消息分配给某个消费者
- Round-Robin(轮询):消息只分配到customer-group
- Sticky(粘性):
Range(范围): 逐个topic
- 排序partition:排序同1个topic中的partition
- 排序customer:排序同1个customer-group的customer
- customer被分配的消息数量:range=partition数量(同1topic)/customer数量(同1customer-group),余数交给同1customer-group的第1个customer
- 缺点-首customer负载不公:topic数量很多,大量topic的partition数量/customer数量=有余数时,这些余数将被交给同1customer-group的第1个customer(range+余数+余数+…+余数 数量的消息)
Round-Robin(轮询):面向订阅的所有topic
- 轮询订阅的所有topic的所有partition,将订阅的所有topic的所有partition结合起来轮询
- 缺点,消息只分配到customer-group,不能指定由组内那个customer消费
Sticky(粘性):
- kk 0.11+之后才出现的新策略
customer/重置offset
重置offset:
- customer更换customer-group之后将重置offset
- segment过期删除(offset也被删除了)
auto.offset.reset=latest/earliest
, latest是默认方式, earliest等价于--from-begining
, earliest不一定从0开始- 批量消费提交offset,customer只获取offset一次,把这个offset放入内存中,后面的消费直接去修改内存中的offset,批量消费后提交offset
customer/提交offset
提交offset:
- 自动定时提交
- 手动(同步/异步)提交
手动(同步/异步)提交:
- 可以在消费者成功处理完数据后再提交,避免消费者侧消息丢失
- 手动同步提交:使用1个线程poll消息和提交offset,提交offset失败会自动重试
- 手动异步提交:异步手动提交,主线程仍然去拉取数据,另开1个线程去提交offset,提交offset失败不会重试
- 手动同步提交缺点:线程提交offset时会阻塞直到提交offset完成
- 手动异步提交缺点:提交offset失败不会重试
customer/提交offset/自动定时提交/问题
消息漏消费:
- customer处理失败,但offset提交成功
- 举例,1s钟提交1次offset最大值,当consumer处理最后1条消息时处理失败,此时刚好1s钟仍然会提交最大的offset,造成消息失败消费
消息重复消费:
- customer处理成功,但offset提交失败
- 举例,10s提交1次offset最大值,当consumer5s内处理完poll下来的一批数据并入数据库后,刚好第6sconsumer故障 offset没有提交,consumer重新启动获取offset值不是最新的
customer/存储offset
offset存储位置
- kk0.9-版本保存在zk
- kk0.9+版本 kafka
customer/存储offset/问题
消息重复消费:
- zk或kk的offset存储方式,当offset提交失败,造成customer重复消费(重新从旧offset开始消费)
- 重复消费:consumer先处理消息,再提交offset,可能造成重复消费
- 消息漏消费:先提交offset,再consumer处理消息,可能消费失败,进而造成消息漏消费
- 因此采用自定义存储offset,把customer处理消息和提交offset弄成1个原子性操作
customer/存储offset/自定义存储offset
customer分配的partition经过rebalance:如果由于customer数量发生变化,造成其他customer分配的partition发生变化,即经过rebalance,此时其他customr需要存储更多的offset
监听rebalance:因此,需要1个ConsumerRebalanceListener监听consumer是否经过的rebalance,没有rebalance只存储原来分配的partition的offset,经过rebalance需要定位到新分配partition从故障customer最近提交的offset继续消费
下面2个方法需要自己实现
- 获取某partition最新的offset(比如,从mysql获取)
- 提交该customer被分配的partition最新的offset(比如,提交到mysql)
- mysql表设计同于kk
consumerGroup|topic|partition|offset
问题
消息默认持久化时间7天,kk中segment(.log, .index)默认保存7天
消息消费顺序,生产者顺序就是消费的顺序