笔记:v3.5和v3.6还是有许多不同的,但大致流程还是一样的
!!!本文的a.b.c表示a文件里的b类的方法c,注意a不一定是包名,因为文件名不一定等于包名
go可以成千上万的goroute并发,所以每个请求都是同步请求即propose,然后select,直到超时或者propose成功
etcdServer.put->s.raftNode.node.stepwait->n.proc(这是chan)->node.run->(case <-proc)->node.rawNode.raft.Step->raft.setp.case_default->raft.step(前面是Step,这里是内部的step)->case_propose->raft.appendEntry+raft.bcast
etcdserver收到请求,请求需要先propose,然后etcdserver则把请求转发给node要求node完成propose任务,node是一个状态机,负责决策,根据请求的当前状态来决定下一步干什么,node只负责角色,具体操作由raft完成,也就是说raft提供了各种能力即负责执行,但是raft什么时候执行哪个操作则由node根据请求的当前状态来决定,换个角度理解,etcdserver看到的raft 节点是node,而raft协议层面看到的节点则是raft。
存疑:unstable、memstore、snapshot三者lastindex的关系
梳理一下batchtx、writebuf、readbuf、bbolt之间的关系
batchtx内部保存了一个bbolt对象,因为etcd是读操作可以从readbuf中读取不用每次都从磁盘查(etcd是所有数据的索引都会放在内存,但是数据不一定可能在磁盘),从而加快读取,put时会先把数据写入bbolt,但是此时是没有提交的,然后写入bbolt之后再写入writebuf,因为batchtx是批量提交,也就是攒够1w条后会进行提交(肯定有异步goroute会定时提交),提交的时候则会调用内部bbolt.tx.commit来提交bbolt事务,事务提交以后在会调用txn.End,在batchTxn的End函数里会把writebuf里面的内容写到readbuf,然后清空writebuf,也就是说etcd是先从readbuf取数据,取不到再去读磁盘,batchtx把数据写到bbolt之后但是没有提交之前,对外不可见,所以这些数据就不会写到readbuf中,当攒够一批事务或者定时提交时间到了,batchtx调用bbolt.commit来提交事务,提交事务后会把提交的数据丢到readbuf,因为此时会写readbuf,所以需要对readbuf加锁,因为batchtx是批量提交,每次提交的事务会很多,所以不应该提交一个事务就修改一次readbuf,因为这要求对readbuf大量加解锁,效率就低了,所以就用了一个writebuf,先把所有提交的数据写道writebuf,等事务结束的时候一次性写到readbuf中,然后再释放readbuf
a.b.c表示a文件里的b类的方法c,注意a不一定是包名,因为文件名不一定等于包名
put源码流程:
------->1: etcdserver put
etcdserver.EtcdServer.Put #客户端调用Put grpc会走到这里,
#客户端都是通过调grpc来和服务器通信,不同的请求对应不同的rpc函数
#因为put会修改集群数据,所以写入数据库之前必须先走一遍raft流程来同步日志
etcdserver.EtcdServer.raftRequest(pb.InternalRaftRequest{Put: r})
#把普通请求封装到raft请求中
#raftRequest的意思表示etcdserver要使用raft方式完成此请求
#put操作要求propose成功并且被apply之后才能返回ok给client
#此处raftRequest只是第一步,put会直到数据异步apply成功后才会返回
#这个Put:r表示把当前请求放到raft请求的Put字段,
#这个put字段属于raft请求的数据字段(data)
#当然,还有Delete/Range/Txn/Compact字段,
#但是同一个请求有且只有一个字段不为空即一个请求只做一件事
#raft同步日志过程中,
#只关心日志的元信息比如日志索引、任期,不关心不使用这个数据字段
#这个data字段有两个作用:
#1:持久化wal日志时作为日志的一部分被写入磁盘,从而做好了数据备份,
#当崩溃时数据不会丢失,重启时重放一遍日志就行
#2:apply的时候被解析,
#data字段是一个完整的请求,包含了数据和请求类型等元信息,
#apply阶段从data字段解析出请求r后就根据Put字段不为空从而判断出这是一个put请求,
#从而可以正确分发请求(调用不同函数)
#所以不管是什么请求,这个raft同步日志的流程都是通用的,
#因为raft流程完全不涉及data字段
etcdserver.EtcdServer.raftRequestOnce #以raft方式完成此请求
etcdserver.EtcdServer.processInternalRaftRequestOnce
#干三件事:1:检测是否接受该请求。如果达到阈值就停止接受新请求
#2:准备好raft所需的日志数据(把客户端请求(put/delete/txn等)
#序列化以后封装到raft日志的data字段)
#3:进行propose然后等到其他goroute成功apply数据
etcdserver.EtcdServer.getAppliedIndex #获取已经应用的日志的索引(即appliedIndex),应用成功表示数据已经写入数据库
etcdserver.EtcdServer.getCommittedIndex #获取已经提交日志的索引(即commitedIndex),提交成功表示该数据日志已经同步到其他节点了
#把一条数据写入etcdf服务器需要两步:
#1:同步日志到其他节点,一条日志包含两部分(日志元信息,原始请求);
#2:写数据到本机bbolt数据库,etcd底层用的是bbolt数据库存储数据
#(原始请求中包含数据)
#同步日志:即commit操作。
#commit操作即是把日志同步到其他etcd节点,一旦过半节点同步成功则该日志就变成commited了
#写数据:即apply操作即成功把数据写入底层boltdb数据库
#!!!apply操作是各搞各的,leader与follower之间不会为此进行通信和同步,
#不用担心崩溃恢复的问题,原因如下:
#apply一条日志之前该日志必定是commited,即成功写入了本地磁盘并且同步到了过半节点上,
#所以崩溃后日志还是在的,数据还是在的
#崩溃重启时,只要从0开始重新apply一遍所有日志(相当于mysql redo),
#就能使数据库恢复到一个确定的状态
#???待补充:因为etcd是readIndex,所以不用担心从follower节点读时落后的问题
if appliedIndex+limits<commitedIndex #如果apply一条数据花费的时间长,那么就可能导致appliedIndex远远滞后于commitedIndex
#一旦超过一个阈值,就拒绝本次put即此时不再接收新请求
#每条日志都有一个索引表示这是第几条日志,
#etcd中如果index=x的索引还没处理完就不会处理index=x+1的日志
#整个日志索引空间分两大块:持久化了的和unstable,
#unstable表示这部分日志还在内存,断电会丢失
#日志索引分3大段,applied(已经可以访问即写到了数据库),
#commited(已经提交了日志但还没有写到数据库),uncommited(还未提交日志)
#applied:[0,x] commited:[0,z-1] uncommited: [z,+∞),
#x必定小于等于z-1即必须先提交后应用
#还可以分为stable(持久化到了磁盘),unstable:还在内存,断电丢失
#stable:[0,k-1],unstable:[k,+∞),
#如果allowUseUnstable(可配置,即允许commite和apply未持久化的日志)
#那么x/z都可以大于等于k,反之都必须小于k
#有个InProgress表示正在进行持久化的数据即inprogress之前的已经持久化了,
#之后的分两部分:正在持久化和还未开始持久化
#还有个applying表示正在应用中的日志即applying:[x+1,applying],
#applying要小于等于commited
#!!applying和commiting时日志可以还没有持久化(可配置allowUseUnstable),
#因为持久化和commited/apply操作都是异步的
#不过一般来说commitedIndex要小于unstable的第一条日志的索引(默认配置是这样)
#关于日志还有另一种分法,即snapshot,memtable,unstable
#snapshot[0,a]:这部分日志已经持久化了,并创建了对应的snapshot文件,
#该文件记录了当前的appliedIndex值即a,重启后会从a开始重放
#memtable[a+1,b]:这部分日志已经持久化了,但不属于快照,会在内存创建一个对应的数组储存
#ustable[b+1,+∞):这部分还没持久化
return #applyIndex+limits<commitedIndex,表示落后程度达到了阈值,所以直接拒绝新请求
idutil.Generator.Next #为该请求生成一个唯一id,会把这个id和一个chan对应起来,
#当异步完成propose流程后会通过这个chan通知此goroute
etcdserverpb.InternalRaftRequest.Marshal #序列化raft请求的数据部分,后续会被放到一个msg对象中,然后准备发往其他etcd节点
wait.list.Register(id) #为前面生成的id生成一个chan,假设叫做ch,会这个ch加到的内部的map中即id->chan
#这样我们就把(req,id,chan)三者联系起来了
#当前goroute会阻塞在这个ch chan上以等待put结果即等待数据apply成功
#当日志同步完成后其他goroute会写这个chan来通知当前线程put完成
#收到结果后就返回给用户了
raft.node.Propose #调用node接口来通知底层的node进行propose
#etcdserver看到的节点是node,对于etcdserver来说,node就是一个raft层面的节点
#所以etcdsever把propose这个任务交给node去完成,etcdserver只要等待结果就好
#node.propose方法就是写node.propc这个chan
#来通知node的另一个goroute来进行propose
#propc是propose chan的缩写,c表示chan
#node的这个goroute就是node.run这个函数,这就是一个状态机
#这个状态机有个switch ,处理不同的chan上的消息
#raft同步日志流程,就是一个2pc节点,即包括两个阶段:propose和commited
#propose就是leader通知各个follower来了一条新日志,你们给我保存一下
#commitd就是leader检测到一些日志可以提交了(即检测到该日志已有过半follower同步成功)
#那么leader就先本地提交然后发消息通知各个follower提交这些日志
#笔记:因为follower可能失败,所以才需要先收集是否有过半节点可以成功
raft.node.stepWait(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
#创建raft消息并把消息发送给node的状态机goroute
#消息类型为MsgProp即代表这一步是propose
#不管什么请求(put/delete/txn/compact等),原始请求都会作为raft请求的data字段
raft.node.stepWithWaitOption #这个函数只处理Propose消息,对于其他类型的消息则直接发到recvc chan,
#由对应线程来处理,然后直接返回
if m.Type != pb.MsgProp:
case n.recvc <- m #非Propose消息直接转发给node的recvc
return
raft.node.propc <- msg(pb.MsgProp) #把包含了日志的Propse消息发往指定chan,即raft.node.run这个线程去异步propose
case err := <-pm.result #阻塞,直到propose完成并把结果填充到resultchan中
------->2: 请求发到node.propc chan #------>另一个goroute,node.run,即raft状态机
流程来到另一个goroute即node.run
raft.node.run #node.run函数相当于raft状态机,里面会有一个死循环不断select,select中有很多chan
#每个chan对应一个状态,每个chan都有一个配套的动作
#上一步我们是发送给了node.propc这个chan,那么配套的动作就是propose即发起提议
#propose处理流程分2步:
#1:leader先在本机上把该日志持久化(实际是异步操作,只不过会默认持久化成功并更新相关变量)
#2:发送MsgApp消息通知follower节点保存该日志
#之所以本机上也做是因为leader本身也算作一个节点,最终投票时leader本身也要投一票,
#所以leader本身也要做follower所做的事
#向follower节点发送的MsgApp日志同步消息会放到发送缓冲区由其他线程发送,
#并乐观的更新leader看到的follower节点视图(即pr.next)
#即消息还没发给follower,就默认follower会成功。就算失败也没关系,
#因为检测的是pr.Match这个字段而不是pr.next,
#如果指定时间没有过半节点的pr.match字段达到日志对应的索引,
#那么leader就知道这个失败了,就会进行相应处理
for{
...略,后面会细讲... #略去其他代码,此处我们直接来看propc对应的propose动作
select:
case <-raft.node.propc #当收到etcdserver发来的日志同步请求时执行这里
#做两件事:1:调用StepLeader发起propose;2:等待,直到收到propose的结果
raft.raft.Step #node调用raft.step来执行下一步
#node与raft关系:node是大脑,raft是执行器,
#也就是说raft提供了各种能力比如写wal持久化、网络通信等能力
#然后node就是根据当前请求的状态来判断该执行什么操作
#node只负责决策,不负责执行,实际执行时node就通过调用raft的各个函数来实现
#节点有三种角色(leader/candidate/follower),
#对应stepLeader/stepCandidate/stepFollower函数,根据角色调用对应的函数
#这里以leader为例,因为etcd只允许leader发起propose,
#所以当前节点必定是leader,如果是follow,那么会把propose请求转发给leader
raft.stepLeader: #leader propose主要执行两个操作:
#1:把日志写入unstabel;
#2:发送MsgApp消息给follower,日志会放在MsgApp消息中
case MsgProp: #这里处理propose消息,leader的日志追加操作对应的是etcdserver层发来的MsgProp消息,
#而非leader的日志追加操作则对应的是leader发来的MsgApp消息,
#再次强调:只有leader才可以propose
raft.raft.appendEntry #本地添加日志,只是添加到内存,并不会立即持久化,是由另一个goroute异步进行持久化
#!!!不用担心还没持久化就崩溃导致数据丢失
#在关闭allowUseUnstable的情况下etcd只会commite已经持久化的日志,
#所有还在unstable中的日志必定是还没有持久化的,
#还没有持久化的日志必定不会commite和apply,
#所以必定不会返回成功的结果给客户端,所以客户端肯定也知道本次请求还没有成功
#所以客户端可以选择稍后重试直到etcd集群恢复正常,
#所以即使断电也没关系,unstable中的日志丢失就丢失了,完全不用担心
#!!!补充一下:发起propose、commit日志、apply日志是三个goroute
#!!!他们都是互相异步的,也就是说commit日志(即写入wal日志)/apply日志时
#!!!不是说a goroute通知commit/apply goroute去同步某条日志
#!!!而是说a goroute搞完以后更新一下proposedIndex和commitedIndex然后就返回了
#!!!这样wal goroute会异步不断的写日志到磁盘,直到达到最大的值
#!!!apply也是一样的,也就是说wal goroute会落后于proposed
#!!!apply也会落后于wal即commite
#!!!这里的落后是指:索引x处的日志已经proposed完成完了,
#!!!但是wal刷盘线程可能还没写到那个位置,也就是落后于proposed
#!!!索引y处的日志已经wal写到磁盘了,但是还没有apply,即apply落后于commite
log.raftLog.lastIndex #获取最后一条日志的索引,
#然后本次新增的n条日志的索引就依次为lastIndex+1、lastIndex+2,...lastIndex+n
#可以把整个日志空间看成一个逻辑数组,
#第0个元素的日志索引就是0,第x条日志的索引就是x,每次追加日志都是追加到数组末尾
#然后unstable就是这个逻辑数组中位置x到+∞的这部分索引对应的日志
#即unstable就是整个数组的后半段
log_unstable.unstable.maybeLastIndex #尝试返回unstanble的最后一条日志的index
if l := len(u.entries); l != 0: #如果u.entries不为空则返回u.offset+len(u)-
return u.offset + uint64(l) - 1 #u.offset表示u的其实offset,因为整个索引空间是0~+∞
#u只保存了u.offset~+∞,即最后这一段
if u.snapshot != nil : #如果len(u)==0,则返回snapshot的index
return u.snapshot.Metadata.Index
storage.MemoryStorage.LastIndex #如果snapshot也是空,则返回memstore的lastindex
#memstorage中保存的都是持久化了的日志
#所以说mem.lastIndex<=snapshot.lastIndex<u.lastIndex
raft.raft.increaseUncommittedSize #计算未提交数据总大小,
#如果算上本次准备提交的日志数据字节数
#超过了系统允许的未提交数据的字节数
#那么就拒绝本次提交,以免未提交数据太多,一旦断电,影响太大
#比如由1G的数据还没提交,然后断电,丢失,
#client又要花时间花资源重做这1G数据对应的请求,代价太大
log.raftlog.append #这里把日志添加到unstable数组的末尾,由其他线程异步持久化到磁盘
if after := ents[0].Index - 1; after < l.committed :
#如果新增日志的索引号小于已提交最后一条日志的索引号,
#所以发生了不可修复错误,直接打印panic日志
log(panicLevel,xxx)
raft.unstable.truncateAndAppend #这里把日志添加到unstable数组的末尾,并且会truncate
r.prs.Progress[r.id].MaybeUpdate(li) #r代表的就是当前的节点即当前的leader
#这里就是更新leader的match,即表示leader已经持久化到了这里
#quorum过半匹配时就是判断的这个match字段
#!!!因为这是leader,所以才立即更新,这是leader更新自己
#而非leader则需要持久化以后才会更新
if pr.Match < n:
pr.Match = n #更新pr.Match字段
updated = true
pr.ProbeAcked()
pr.Next = max(pr.Next, n+1) #更新pr.Next
raft.raft.maybeCommit #注意:假设本次请求对应的日志索引是x,
#此处commite并不是指提交到x
#而是会实时监测一下此时应该提交到哪里,
#也就是说maybeCommit仅仅是一个激活操作
#具体提交到哪个位置,则由当前的状态决定
#这里简单记录一下,后面会详细介绍
log.raftlog.maybeCommit
l.commitTo #提交leader的commitedIndex代表日志已经提交到这里了
raft.raft.bcastAppend #bcastAppend的意思是broadcastAppend,
#即广播需要追加的数据即日志到所有peer节点,广播的是MsgApp消息
#bcastAppend发过去的消息要干两件事:
#第一件事是告知peer节点要追加哪些日志,
#第二件事是通知peer节点更新状态
#MsgApp消息包含两个部分,第一个部分是需要append的日志,
#第二个是peer节点的新状态(比如commitedIndex)
#leader拥有所有节点的视图(即progress对象),
#然后leader会更新这些progress对象
#更新完后通知peer节点把他们自己的状态更新到leader所看到的这个状态,
#比如leader的commited到达x了,peer节点也需要把commited索引更新到x。
tracker.ProgressTracker.Visit #遍历所有节点,etcd用id来代表集群,visit接受一个lambda函数,
#该函数中如果目的节点是自己,那么就会忽略该节点
raft.raft.sendAppend #发送MsgApp发到指定节点
raft.raft.maybeSendAppend #!!!在发送给follower节点的MsgApp消息内
#!!!还会带上leader视角下follower此刻的状态信息(任期、Match、Next)
#!!!即leader认为follwer此刻应该提交到这里了,
#!!!follower收到后就会按照leader的命令用这些信息更新本机的状态
#!!!假设当前收到的请求对应的日志为x,
#!!!这个maybesend并不是就发送这条日志x,
#!!!相反的是maybesend函数并不关心当前收到的日志
#!!!而是会检测状态,凡是满足要求的日志都会被发送,
#!!!所以发来一条请求,收到一条日志,仅仅是激活send操作
#!!!send操作发送哪些日志并不是由收到的日志决定,而是由当前状态决定
pr := tracker.ProgressTracker.Progress[to]
#to表目的节点id,etcd用id代表结点,
#这里先根据目的节点id获取目的节点的相关信息,
#pr表示leader视角下follower节点应该的的状态
#pr即progress,即进度的意思,即leader视角下follower节点的日志复制进度
#pr.Match:表示follower节点leader节点之间日志的匹配索引
#即leader认为follower已经成功复制了leader上[0,pr.Match]内区间内的日志
#pr.Next: 表示leader期望赋值给follower的下一条日志的索引
#pr即struct tracker.Progress,
#前面说的发送哪些日志,就是由这个progres状态变量来决定
log.raftLog.entries(pr.Next)
#获取leader上日志索引号大于等于pr.Next的所有日志,
#[pr.match+1,pr.next-1]这个索引区间
#表示这些日志则是已经发送给follower了但是follower还未返回确认
#pr表示该某个follower节点上日志复制进度
#如果leader上存在日志索引号大于pr.Next的日志,
#说明leader有些日志该follower没有,即follower落后了
#所以这里就是一次性把follower缺少的日志都发送给follower,
#而不是仅仅发送请求中的那条日志x
#所以这个mabesend叫做maybe,
#如果不存在就说明没有日志要发送,如果存在说明有日志要发送
#所以说日志x只是激活这个maybesend操作,
#而具体发送哪些日志由实际状态来决定
#当然,如果日志太多了,一次发送不了,那么就只会发送一部分,
#剩下的等到下一次maybesend被激活再来发送
#!!!小结:etcd中这种操作多的是,即a事件只是触发b操作,
#!!!b的具体操作则由当时的实际状态决定,而不是由a传递给b的参数决定
#!!!如wal日志刷盘操作、apply操作、maybeCommit、maybeSend等
m.Type = pb.MsgApp #发送的是MsgApp消息
m.Index = pr.Next - 1 #即to上这条msgAPP消息对应的index
m.LogTerm = term #当前任期
m.Entries = ents #数据
m.Commit = r.raftLog.committed #leader的committedIndex,即leader告知follower自己的状态
switch pr.State:
case tracker.StateReplicate:
tracker.Progress.OptimisticUpdate #乐观更新to节点对应的pr.Next=最后一条日志的Index+1
#OptimisticUpdate是乐观更新的意思
#正常情况下to的pr.Next都等于leader lastIndex+1,
#如果follower落后于leader,
#即to节点的pr.next<leader的lastIndex+1
#就是说这里leader先乐观更新follower状态
#即使follower不是这个状态,
#follower落后于leader也没关系
#因为在follower报告给leader的heartbeat消息里,
#follower会发送自己的实际状态
#leader会据此修正pr.Next
pr.Next+=len(entries)
tracker.Inflights.Add #表示传输中的消息窗口,用于限制消息的数量和带宽,
#如果满了会检测到,从而导致本次发送取消。
raft.raft.send(pb.Message{ #把leader视角下follower的状态都填充到这个MsgApp消息里
To: to, #消息接收节点对应的id
Type: pb.MsgApp, #消息类型
Index: lastIndex, #此刻follower节点上最后一条日志的索引号应该为lastIndex
#如果对不上,follower就会忽略这条MsgApp消息
LogTerm: lastTerm, #此刻follower节点上最后一条日志对应的leader的任期
#如果对不上,follower就会忽略这条MsgApp消息
Entries: ents, #本次leader发送给follower的日志
Commit: r.raftLog.committed, #告诉follower,leader上的日志已经提交到这里了
}) #把leader视角下follower的状态都填充到这个MsgApp消息里
#给peer节点发送一条msgApp消息,
#节点收到一条msgapp消息
#表示收到leader节点发过来的一条日志同步消息
append(raft.msgs) #msgApp消息会放到leader的msgs数组中
#会有另一个专门的发送线程不断轮询这个数组,如果发现msgs不为空就发送
#即node.run这个goroute
#会在每次循环开始都会检测msgAfterAppend和msgs,有数据则表示ready
#走完这里表明propose流程已经结束,
#propose只需要把对应日志加到本机的unstable数组
#以及把消息成功发出去就算成功,
#至于有没有过半节点成功则是下一步apply的事情
#注意,此时只是propose完成,put还没有完成
#apply一批数据后会通知正在wait的chan put请求已完成
#然后put才会结束等待,才会返回ok给应用
#此时经过一次propose,那么就会产生数据需要处理
#比如持久化wal日志、更新commited等
#所以下一步就是通知node内部的raftNode来执行持久化wal日志等操作
------->3:还是node.run #仍然是node.run函数,和上面的是同一个函数,但是这是下一次循环了
#这些代码原本是在select case <-raft.node.propc之前的
#这里就是检测是否有数据redy了,如果有就把这些数据放到一个结构里,
#然后填充指定chan来通知数据到了,不同的chan配套有不同的动作
#此处就是通知底层raftNode有数据来了,你该开始干活了
#raftNode负责持久化wal日志等操作
raft.node.run
for{
ready=raft.RawNode.HasReady #此函数是node.run用来检测是否有数据需要处理,如果有就会激活本次循环,
#如果无则本次循环会阻塞在某个chan上,数据有多种,来源也有多种
if !r.softState().equal(rn.prevSoftSt):
#1:判断软状态是否和之前的状态相同,如果不相同则说明需要处理
#软状态:{leader,role},即leader是谁以及当前节点的角色是什么
#软状态不会持久化,所以才叫软状态
return true
if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt):
#2:判断硬状态是否和之前的状态相同,如果不相同则说明需要处理
#硬状态:{vote,term,commit},当前候选人、当前任期、当前已提交的索引
#硬状态会在写wal日志的时候一同写入磁盘
return true
if r.raftLog.hasPendingSnapshot():
#如果有挂起的快照操作,则说明需要处理,
#快照的用途是当follower严重落后时leader直接发一个snapshot过来
#然后follower收到snapshot后会把他放到usntable的snapshot变量里,
#这个变量不为空表明这个snapshot还没有持久化到磁盘,写到磁盘后会置空这个变量
return true
if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts():
#3:msgs不为空表示有数据要发送,
#unstableEntries不为空则表示有日志需要持久化
#hasNextEnts为真表示applyIndex<commitIndex,表示有数据可以apply
return true
if len(r.readStates) != 0:#4:ReadStats数组是否为空?
#不为空则说明有ReadIndex请求已经有过半节点确认当前leader是有效地,
#可以唤醒并执行该读请求
return true
return false #如果上面的条件都没满足则表示本次循环无需处理
if ready: #这个hasReady操作在node.run中是在每次循环开始的时候判断,
#因为上一次循环中处理MsgProp消息时会把发送给其他节点的写入msgs
#所以本轮循环,hasReady会返回true,这些数据会在本次循环中处理,
raft.RawNode.readyWithoutAccept #准备好本次要处理的数据,会把所有数据封装到一个raft.Ready结构中
#因为etcd所有操作基本都是异步的,所以同一时刻三种数据都会存在:
#unstable:需要持久化的数据、
#commited:已经提交的数据,需要apply
#msgs:发往其他节点的消息(如heartbeat/MsgApp/MsgAppResp等)
#正因为如此,所以每次迭代时会同时进行三种操作
#即持久化unstable、apply已经commited的数据、发送msgs到peers节点
raft.raftlog.nextUnstableEnts #2:获取所有未持久化的日志,这个写操作就是wal(预写日志)
raft.unstable.nextEntries
return unstable[inprogress+1:] #inprogress之前的数据表示之前已经在持久化过程中或者已经持久化了
#只不过还没完成
raft.raftlog.nextCommittedEnts #3:获取一批已经commited,需要apply的日志
raft.raftlog.maxAppliableIndex #如果allowUseUnstable,那么可以允许提交未持久化的日志,反之不允许
hi := l.committed
if !allowUnstable { #如果配置不允许使用未提交的数据,则只允许提交已持久化的日志,
#offset-1表示最后一条持久化的日志的index
#如果不允许使用unstable的日志,
#那么这样就可以确保用户一旦收到写入完成的响应,
#那么数据对应的wal日志必定已经写入了磁盘
#此后即使服务器崩溃,数据不会丢失
#如果开启,就说明还没有持久化的日志可以被apply,这样断电以后,有可能数据丢失
hi = min(hi, l.unstable.offset-1)
}
raft.raftlog.slice(applying+1,hi+1,maxSize) # [applied,applying]区间内的数据已经在应用中了
# 本次commite[applying+1,hi+1)之间的日志,
#至多处理到hi,并且至多处理maxSize字节
select #这个select和上面的select case <-raft.node.propc是同一个select
case readyc <- rd: #数据已经丢给raftNode,即另一个goroute ratNode.start在等待这个radyc
raft.RawNode.acceptReady #此case主要就是一个发送的目的,一旦发送完成就重置当前的rd结构
raft.RawNode.raft.prevSoftSt = rd.SoftState
raft.RawNode.raft.readStates = nil
raft.RawNode.raft.msgs = nil
advancec = n.advancec #等raftNode处理完后就会发送消息到advance这个chan
#捋一下:etcdserver内嵌一个raftNdoe,raftNode内嵌一个raftNodeConfig,
#raftNodeConfig又内嵌一个node,node又内嵌一个RawNode
#一个rawNode又内嵌一个raft节点以及当前节点的状态
#node负责根据请求的当前状态作出决策,然后node调用这个raft对象来实际执行各种操作
------->4:raftNode.start #在步骤3的select中我们通过case readyc<-rd 把准备好的数据
#发送给了readyc,所以下一步就是处理readyc中的数据了
raft.raftNode.start #这里会启动一个goroute,然后启动一个死循环,通过select从chan接受消息
go func():
for{
select
case rd := <-r.Ready() #上一步node.run的case readyc <- rd这个操作会激活这个case
#raftNode的操作最终都是委托给内部的raft对象的
#因为传过来的rd中同时包含了
#unstable(准备持久化的数据)/commited(准备apply的数据/msg(准备发送的消息)
#所以这个case也会同时干这三件事sync(unstable)、apply(commited)、send(msg)
#!!!注意,raftNode的apply只是通知etcdserver去进行apply,
#!!!raftNode本身不执行apply
#这个case一共做五件事:
#1:检查软状态是否有改变(比如集群是不是有新leader了,集群是否有leader),
#如果有则调用相关函数
#2:如果readStateC数组不为空,
#则说明此刻appliedIndex大于等于这个readState所等待的commitedIndex
#需要通知etcdserver进行apply
#3~5:sync(unstable)、apply(commited)、send(msg)
if rd.SoftState != nil: #1:检查集群leader
etcdserver.EtcdServer.setLeader #设置集群leader
updateLeadership=func(){ #表示这里调用的是一个lambda函数
if leader=节点自己:
v3compactor.Periodic.Resume #如果leader是节点自己,那么重新开启compact
#因为compact会改变集群数据状态,所以etcd只有leader才可以发起compact,
#进行compact之前要先走raft同步一条compact日志
#compact日志中包含本次要要压缩哪些版本的数据
if leader!=节点自己:
v3compactor.Periodic.Pause #如果leader节点不是自己,那么就暂停compact
if newLeader:
s.leaderChangedMu.Lock()
lc := s.leaderChanged
s.leaderChanged = make(chan struct{})
close(lc)
s.leaderChangedMu.Unlock()
#???待补充:close(lc)时其他goroute会感知到leader变了
#我们在并发进行线性读的时候因为要确认是否有过半节点有效,
#所以函数requestCurrentIndex会阻塞
#直到节点确认过半节点承认节点是当前集群的leader之后
#才会去读取当前的appliedInex
#如果requestCurrentIndex阻塞的时候收到集群leader改变的消息
#那么他就会立刻放弃所有此前还在等待的ReadIndex请求,然后返回错误
}()
if len(rd.ReadStates) != 0 {
r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]
#2:如果readStattes不为空,说明此刻有ReadIndex准备好了,
#所以需要唤醒getCurrentIndex函数
etcdserver.updateCommittedIndex #设置server上的commitedIndex。
#etcdserver的commitedIndex变量
#取决于底层的raft日志的commitedIndex
#可以这样理解:server有一层状态,
#然后server底层的raft日志也有一层状态
#raft日志的commited表示日志已经同步到这里了,
#对于server来说,这些日志都是可以commited的
#即etcdserver的commitedIndex小于等于raft的commitedIndex
idx:=commitedEntryLastIndex #本次待apply的最后一条日志的index
etcdserver.EtcdServer.setCommittedIndex(idx) #设置server上的commitedIndex
waitWALSync := shouldWaitWALSync(rd)
if waitWALSync: #只要有新unstable的日志,那么就为true
#即在apply之前必定会持久化日志
wal.Wal.Save #持久化wal日志
raftNode.applyc <- toApply{commitedEntry,notifyc chan}
#3:通知etcdserver apply(commited)
#把准备apply的数据填充到apply chan中,
#etcdserver.run会监听这个applyc chan
#commitedEntry表示本次要apply的日志,
#notify用来同步etcdserver.run里的apply操作
#因为etcdserver.run里apply函数
#在成功写入bbolt数据库后会调用trrigerSnapshot做一个快照
#而本函数也会处理快照,所以为了安全,需要一个notify chan来同步
if isLeader: #4:把待发送的消息发送给follower节点(put流程中此处是MsgApp消息)
etcdserver.raftNode.processMessages #根据实际情况重新设置消息的某些字段,主要就是丢弃自己发给自己的消息
#r.msgs中的消息就是thread 1 中填充的准备发往peer节点的pb.MsgApp消息
#和发给自己的raftpb.MsgAppResp消息
#这里将MsgAppResp的目的地是自己的消息的目的地设置为0表示丢弃
rafthttp.Transport.Send #把r.msgs中的消息发送给peer节点
rafthttp.peer.Send #把msg写入一个chan,然后由监听这个chan的其他线程去异步发送请求给peer节点
writerch=rafthttp.peer.pick #获取对应的输出chan,支持 streamAppV2、streamMsg、pipeline
writerch<-msgs #把msgs填充到chan中,这个chan会有对应的线程x在监听,
#一旦收到chan就会把消息发往指定节点
#也就是说发送线程从raftnode.ready chan(实际是node.readyc)接受数据
#然后把收到的响应填充到raftNode.recv chan(实际是node.recvc)
if !raft.IsEmptySnap(rd.Snapshot)
storage.storage.SaveSnap(rd.Snapshot) #5.1:持久化快照点。如果rd的snap字段不为空说明leader发来了snap,
#需要对应的snap数据持久化到磁盘上,以供后续回放
#说明这个节点当前的角色为follower
#注意:这里说的snap就是本文后面apply完成之后
#triggersnap函数创建的snap日志文件和snap日志
#这个snap点相当于mysql里的checkpoint点,
#snapshot操作也是必须得leader发起
snap.Snapshotter.SaveSnap #保存snap的数据部分到一个单独的文件中,
#数据部分指的是集群状态、term、index,
#并不是bbolt数据库中的数据,别搞混了
wal.Wal.SaveSnapshot #保存一条snap日志到wal文件中,
#snap日志和其他raft日志都是一样的只是日志的类型不同而已
if !waitWALSync:
storage.storage.Save #5.2:刷盘wal日志,即持久化unstable部分对应的所有日志
#是所有usntable日志以及当前的硬状态(term/vote/commit)
#unstable之前的日志肯定也已经写入磁盘,
#即那些准备返回结果给用户的数据对应的日志肯定已经写入了磁盘
#因为etcd会限制未提交的数据大小,所以一旦请求太多,
#导致unstable的日志超过阈值,那么集群就会拒绝后续所有新增put请求
#当完成这一步的时候可以确保目前leader上所有unstable的日志都已经持久化到了磁盘,
#此后崩溃也不怕了
#leader发送MsgApp消息给follower节点和leader刷盘wal日志是一个并发操作
#刷盘的时候是按日志的index来刷盘的,即index=a的日志如果刷盘失败,
#那么index>a的日志肯定不会在存储中
#下面的为个人随笔,并非etcd的实现,:
#可以简单理解为一条日志刷入磁盘的日志状态有四种取值:
#prepare、commited、applied、abort
#prepare状态:这是初始状态,已经给follower节点发出MsgApp消息了,
#但是还没有收集到足够多的选票
#abort状态: 指定时间内没有收到足够多的选票、被其他节点拒绝了,
#那么就把prepare状态的日志标记为abort
#commited状态:收集到了足够多的选票,则从prepare转变为commited
#applied状态:commited状态的日志发布成功则变为applied状态
#突然断电后重启后通过重放日志来恢复:
#prepare:可以简单的选择直接丢弃或者重试,全在个人取舍
#abort状态:直接丢弃
#commited:再次apply就行
#applied:直接使用
#OLAP型数据库starrocks1.9版本的事务处理流程就是与上面的流程几乎一模一样
#但是etcd实际不是这么处理的,etcd判断一条日志是不是有效,
#是通过集群协商来获取的,而不是直接记录日志的状态
#崩溃重启后,etcd节点会获取到集群leader,然后以leader为准,
#对于index=a的日志,如果leader上有,且index、term都对的上,
#那么这条日志对于该etcd节点来说就是有效地,
#相当于上面所说的处于commited状态的日志,
#如果这条日志leader上没有或者冲突了,
#那么这条日志对于该节点来说就是prepare的,直接丢弃,
#etcd这里没有abort,因为所有日志都是以leader为准,比较霸道
#如果leader发现一条日志一个follower节点返回reject,
#那么该leader就会强制用自己的日志去覆盖该follower的日志
#如果leader发现一条日志一直收不到过半节点返回响应,如果是节点宕机或者网络出错,
#那么leader在heatbeat部分就会检测到该节点失效
#并且会标记该节点并返回失败
wal.Wal.Save #是storage.storage.Save函数里调用wal.Wal.Save
wal.Wal.saveEntry #保存日志,此处还没有强制刷盘
wal.Wal.saveState #保存硬状态,包括:
#Term:表示当前的任期
#Vote:表示当前节点最后投票给的候选人的ID
#Commit:已经被大多数节点确认过的最高索引号。
sync=raft.MustSync #判断是否需要强制刷盘,如果写入的entry不为空或者硬状态不等于旧状态则必须强制刷盘
if curOff < SegmentSizeBytes: #如果一个段没有写满。默认64MB,不一定是64MB,
#因为最后一条日志大小不一定刚好凑满64MB,可能超出一点点
if sync:
wal.Wal.sync #强制刷盘
wal.Wal.Cut #超过了指定的文件大小,
#所以先把文件切割成两部分(不会把一条日志保存到两个文件),然后分别刷盘
if !raft.IsEmptySnap(rd.Snapshot)
wal.Wal.Sync #强制wal刷新
raft.MemoryStorage.ApplySnapshot#应用快照,就是按照发来的snap来设置memtable的相关字段
storage.storage.Release #释放掉旧的snap文件的锁,以便purgeFile线程可以异步删除过时的文件。
#之前加锁是为了防止文件被其他人其他线程意外删除
raft.MemoryStorage.Append #把日志条目追加到memoryTable中,此时已经写好wal了,所以断电也不怕丢失
#wal日志会被最新的快照点分为两部分,快照点之前的日志都是保存在磁盘上,
#快照点之后的所有日志,etcd还会把他加载到内存中
notifyc <- struct{}{} #这个notifyc是上面toApply chan内部的一个chan,
#etcdserver.run在处理applyc chan过程中会等待这个notifyc
#是snap相关
raft.node.Advance #发消息通知道node.run可以进行下一步了,处理完本次ready后通知node往下走,
#主要是处理stepsOnApp中leader自己发给自己的的MsgAppResp消息
raft.node.advancec <- struct{}{} #node.run会监听这个advancec chan
------->5:etcdserver apply #raftNode在步骤4中数据刷盘以后,就表示日志必定已经持久化了
#日志同步完了,所以就可以执行put操作了
#所以把新检测到的可以apply的数据丢给etcdserver
#etcdserver apply就是把任务丢到另一个goroute的fifo队列让他去慢慢apply
#一旦apply日志x,就唤醒日志x对应的请求所在goroute返回ok给client
etcdserver.EtcdServer.run
for
select:
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
#把applyAll封装到一个job里面,然后丢到异步fifo队列去执行,
#注意,fifo是先进先出的意思,fifo队列会等到事务x-1完成才会开始事务x
#这样就保证了一定是按日志顺序apply的
#一个事务对应一个appliedIndex,所以代码里也是用revision来作为事务id。
#另一个方面,之所以开一个异步队列是因为applyc chan的容量只有1,
#如果顺序执行如果一直本次apply非常慢,那么就会导致无法接受新的apply请求,
#从而导致发送方也阻塞,然后整个系统都阻塞了
schedule.Schedule.schedule #把这个job丢到一个fifo队列里,然后一个work线程不断从fifo队列取元素,
#这里略,这里直接就转到applyAll函数
etcdserver.EtcdServer.applyAll #干两件事:1:把数据写入bbolt;
#2:写reqid对应的chan从而通知etcdserver本次客户端请求已经完成了,可以返回
#!!!一路都是串行的,即直到把数据写入bbolt数据库才会返回,
#!!!即上一个事物还没结束,下一个事务必定不会开始
etcdserver.EtcdServer.applySnapshot #应用snapshot。就是把相关变量重置为snapshot文件中指定的相关信息
if apply.snapshot.Metadata.Index <= ep.appliedi: #如果leader发来的snapshot中的applyindex
#比当前本机的applyIndex还要小,那么panic
panice
<-apply.notifyc #等待步骤4中raftNode持久化snapshot到磁盘后才会apply snapshot
storage.OpenSnapshotBackend #打开指定目录下的xx.snap.db文件,这个是真的备份文件,
#etcdctl --endpoints xx snapshot save命令
#会从--endpoints指定的server上拉取数据并保存到本地
#当使用etcdctl snapshot restore的时候
#就会去读取指定的文件然后从该文件恢复数据库
#更多详情见snapshot那一章
mvcc.watchableStore.Restore #读取指定文件(xx.snap.db),然后恢复数据,略
etcdserver.EtcdServer.applyEntries #根据日志,把结果写入数据库
if firsti > ep.appliedi+1 #必须按日志顺序apply,
#如果x-1这条日志还没有applied,那么x就不能apply
#ep.applied表示leader上applied的最后一条日志的索引
#firsti表示本次准备apply的第一条日志的索引
panic
curRev=etcdserver.Etcdserver.apply #应用数据,也就是把数据成功写入etcd,
#当写入一条数据后会更新当前集群的revision字段,
#apply这些操作是每个节点自己进行的,
#无需同步,因为日志已经同步好了,
#按顺序apply就可以保证不会出差错。
#revision字段是该节点上所有操作都共用的一个字段
#即下一个事务的revison就等于curRev+1
cindex.consistentIndex.SetConsistentApplyingIndex
#设置底层一致性存储的applyingIndex,这一块还不懂,
#之所以还搞一个这个,看代码注释说是为了安全
case raftpb.EntryNormal: #put数据对应的entry是EntryNormal,
#put完之后还要更新applied索引
etcdserver.EtcdServer.applyEntryNormal:
#干两件事:apply一条日志;
#通过ch<-x通知其他goroute本次put操作已结束
apply.authApplierV3.Apply #apply之前判断一下是否允许apply
applierV3backend.Apply
#根据req类型来调用对应的方法
#!!!raft层的作用只是同步一下日志,
#会把具体的请求内容封装在日志的data字段
#因为raft层只同步日志,完全不关心data字段
case r.Put!=nil: #put请求则调用对应的put方法,执行链 auth->quota->backend
#raft请求里面有多个指针比如put/delete/txn等,
#这些指针有且只有一个不为空
apply.authApplierV3.Put(txn=nil) #判断是否允许put
store.authStore.IsPutPermitted #判断该用户是否有权限put到磁盘
apply_auth.authApplierV3.checkLeasePuts #判断lease是否正确
store.authStore.IsRangePermitted #判断是否有权限RangePut
apply.quotaApplierV3.Put
storage.BackendQuota.Available #检查磁盘是否还有足够空间写入数据,
#backend可以简单理解为磁盘
apply.applierV3backend.Put
if txn=nil: #如果是非事务put,则创建事务put
#也就是说不管事务还是非事务,底层都统一使用的事务put
leases.lessor.Lookup #如果key提前设置了租约,
#则租约必须存在,如果没有,则忽略
mvcc.watchableStore.Write #创建事务put对象
return &watchableStoreTxnWrite{s.store.Write(trace), s}
kvstore_txt.store.Write
s.mu.RLock() #???待补充
tx := s.b.BatchTx()
tx.LockInsideApply() #锁住txnbuffer,因为txnbuffere只能顺序写,不允许并发写
#一堆又一堆的接口,太恶心了
#总之记住txnbuffer里面有一个batchtxn,txnbuffer是对外的
#txnbuffer的一切操作之中都会转到batchtxn
#笔记中有些地方可能把batchTxnbuffer错记成了batchtx
#也有可能把batchtx错记成了batchtxnbuffere
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
watchable_store.metricsTxnWrite.Put #mvcc 事务put,这是通用流程
#!!!对应的包名是MVCC
#!!!就是说etcd使用的是mvcc
kvstore_txn.storeTxnWrite.Put
kvstore_txn.storeTxnWrite.put
rev := tw.beginRev + 1 #本次put的主版本号就是beginRev+1
c := rev #c表示create_revision
_, created, ver, err=index.treeIndex.Get
#查找key是否存在,如果存在则获取他的旧信息
#即create_revision、version
#不存在则跳过,即默认为NoLease
if err == nil: #err=nil表示key已存在
c = created.main #从中取出key的create_revision
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
#取出旧的lease信息
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
#主版本号+副版本号
#每次put只对应一个主版本号,
#但是一次put中每put一条一条数据,副版本号就会+1
#因为副版本号=len(tw.changes)
#他是put一条,就append一条到tw.changes这个数组
#所以副版本号从0开始,每次递增
#tw.changes包括其他信息即create_revision、
#mode_revision、version、key、value、lease
ver = ver + 1 #当前key的版本号,即每个key都会有自己独立的版本号
#ver默认为0,所以每个key的第一个版本号就是0+1=1
kv := mvccpb.KeyValue{ #要保存的值:
Key: key, #本条日志对应的key
Value: value, #value
CreateRevision: c, #createRevision=旧的createRevision,即不变
ModRevision: rev, #ModRevision=本次put请求对应的主版本号
Version: ver, #version=key自己独立的版本号+1
Lease: int64(leaseID),
}
#总结一下版本号的概念
#rev即revision
#etcd有个版本概念的:revision(修订版本号)
#revision是相对于整个集群的,
#即每次修改操作都对应一个版本号,为上一次操作的版本号+1,
#revision有两个字段,main和sub,main表示事务操作的主版本号
#同一事务中发生变更的key共享同一main版本号,
#sub表示同一事务中发生变更的次数,从0开始
#这个变更指的是变更存储
#etcd还有个version的概念,这是针对每个key的,互相独立的
#举个例子:
#假设本次事务为put,且只有一次put,且一次put3条数据
#([a,1],[b,1],[c,1]),且磁盘已经存储了a这个key即[a,-1]
#且a这个key的version为kv
#假设上一次操作后系统的revision版本号为rv
#那么put a 1 时(main=rv+1,sub=0,kv=kv+1)
#那么put b 2 时(main=rv+1,sub=1,kv=1)
#那么put a 3 时(main=rv+1,sub=2,kv=1)
#版本号虽然单调递增但是是64位,目前是不可能用完的
#注意:别把日志索引(index)和数据版本(revision)混起来了
batchTxn.batchTxBuffered.UnsafeSeqPut
#前面准备好了数据就需要put了
#需要put数据以及修改index
#这里先put数据
key=(revision),value=(key-value,someRevisionInfo)
batchTxn.batchTx.UnsafeSeqPut #执行这个的时候必须已经获取了锁
batchTxn.batchTx.unsafePut
#把数据存入bbolt,如果是写事务,他可能要put多个key
bbolt.Tx.Bucket #获取对应的桶
bbolt.Bucket.Put #把数据存放到桶里
t.pending++ #bbolt是批量事务提交
batchTxn.txWriteBuffer.putSeq
#还没看懂,不过不是在这里写入,貌似和查询有关 20240430 19:20
#20240505 15:31 嗨,各位,我看懂了,听我西索:
#!!!!batchTxbuffered/writebuffer、readbuffer!!!!
#backend.batchTx.unsafePut向数据库提交了一个写操作,
#但是这个写操作并不是立即提交,而是先写到一个buff,会由其他线程提交
#他还会把结果写到readbuffer,这样读的时候就可以直接从readbuff读
#写readbuff肯定是要加锁的,所以为了避免put一个key就写一次readbuff
#就弄了一个writebuff,这样就先放到writebuff里,最后一次性写入readbuff
#他是在调用batchtx.Unlock释放锁的时候一次性把writebuff写入readbuff
#写入以后,读事务就可以直接从readbuff读取了,就不用每次都区读数据库了
index.treeIndex.Put #把(key,revision)写入内存里的treeIndex,
#key=key,value=[]generation{revision},
#保存了所有历史版本
#这个treeindex是启动时重建的,etcd会把所有key都放到内存,
#这限制了etcd支持的数据大小
#generation是代的意思,一个key从创建到删除是一个generation
#generation记录key从创建到删除中间所有变更的版本信息。
#当key被删除后再次被操作,会创建新的generation来记录版本信息。
if oldLease != lease.NoLease: #处理lease: detach旧lease
tw.s.le.Detach
if oldLease != lease.NoLease: #处理lease:attach新lease
tw.s.le.Attach
watchable_store_txn.watchableStore.End
#当put完成后需要释放txnbuffer的锁
#!!!此处没有commited,
#!!!只是把操作丢到txnbuffer就默认成功,就直接返回了
#会有另外一个线程定时commite
tw.s.mu.Lock()
watchable_store_txn.watchableStore.notify #通知watcher,暂不清楚
watchable_store.metricsTxnWrite.End #就是更新一些metrics
kvstore_txn.storeTxnWrite.End #主要就是加锁和释放锁
if len(tw.changes) != 0 {
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock() #changes不为null表示有更改
#加写锁保护currentRev
#主要是compaction相关
tw.s.currentRev++
}
tw.tx.Unlock() #释放batchTxn事务锁
batch_tx.batchTxBuffered.Unlock
#还没看懂,不过不是在这里写入,貌似和查询有关 20240430 19:20
#20240505 15:31 嗨,各位,我看懂了,听我西索:
#!!!!batchTxbuffered/writebuffer、readbuffer!!!!
#backend.batchTx.unsafePut向数据库提交了一个写操作,
#但是这个写操作并不是立即提交,而是先写到一个buff,会由其他线程提交
#他还会把结果写到readbuffer,这样读的时候就可以直接从readbuff读
#写readbuff肯定是要加锁的,所以为了避免put一个key就写一次readbuff
#就弄了一个writebuff,这样就先放到writebuff里,最后一次性写入readbuff
#他是在调用batchtx.Unlock释放锁的时候一次性把writebuff写入readbuff
#写入以后,读事务就可以直接从readbuff读取了,就不用每次都区读数据库了
if t.pending != 0:
t.backend.readTx.Lock() #所有读请求都会对这个readTx加读锁,
#并且直到请求完成才会释放锁
#因为要修改readBuffer,
#所以会等到所有读请求完成并释放锁后才可以加写锁
txWriteBuffer.writeback #把writebuf中的内容写到readbuff
t.backend.readTx.Unlock() #释放锁
if t.pending >= t.backend.batchLimit || #batchTxn就是批量事务
#如果pending的事务数超过了阈值
#那么就提交,默认是1w
t.pendingDeleteOperations > 0 #或者挂起的delete操作数大于0
{
t.commit(false) #提交事务
t.backend.readTx.Lock() #要提交事务必须先阻止所有读
batch_tx.batchTxBuffered.unsafeCommit #提交事务
t.backend.hooks.OnPreCommitUnsafe(t) #执行hook
if t.backend.readTx.tx != nil: #tx是bbolt.tx
#如果bbolt.readTx不为null
#则需要等待这些读事务完成才能commit
go func(tx *bolt.Tx, wg *sync.WaitGroup):
wg.Wait()
bbolt.tx.Rollback()
t.backend.readTx.reset()
batch_tx.batchTx.commit
bbolt.tx.Commit() #bbolt的事务提交sdk
t.pending = 0
t.pendingDeleteOperations = 0
t.backend.readTx.Unlock() #提交完毕,释放锁
}
t.batchTx.Unlock() #释放txnbuffer,由此可以观之,同一时刻,
#只会有一个事务修改batchTxnBuffered
#因为etcd是按日志顺序for循环逐条串行apply的,
#所以对于普通的put/delete,一般是串行的
#加锁我猜主要是为了和compact等竞争
#(snapshot和defrag不清楚)
if len(tw.changes) != 0 {
tw.s.revMu.Unlock() #释放tw.s.revMu.Lock锁
}
tw.s.mu.RUnlock()
id := raftReq.ID
wait.list.Trigger #数据已经写入数据库了即apply成功,
#所以这里把applyInternalRaftRequest的结果写入reqid对应的chan,
#从而通知processInternalRaftRequestOnce已完成put操作
#processInternalRaftRequestOnce收到结果后就返回给用户了
idx=id%defalut_length
ch := w.e[idx].m[id]
ch <- x
close(ch) #这里是关闭chan
etcdserver.EtcdServer.setAppliedIndex #更新apply索引
etcdserver.EtcdServer.setTerm #更新任期
wait_time.timeList.Trigger(appliedIndex) #唤醒所有阻塞在appliedIndex之前的读请求。
#etcd ReadIndex(线性读):
#当读请求到来时会记录下此刻的commitedIndex值,
#并保存到confirmIndex变量中,
#只有当appliedIndex>=confirmIndex的时候读请求才会解除阻塞,
#才会去数据库读数据,
#这里已经apply到appliedIndex了,所以在此之前的读请求都可以被唤醒了
<- apply.notifyc #在上一步所有wal日志刷盘以后才会通知etcdserver进行快照
etcdserver.EtcdServer.triggerSnapshot
if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount:
return #如果当前最新的appliedIndex减去当前快照的最后一条日志的Index
#超过了阈值就触发快照,否则跳过,默认是1w条
etcdserver.EtcdServer.snapshot #创建快照
clone := s.v2store.Clone() #克隆当前store
#etcd是所有数据都存放在内存,所以直接内存中复制
#!!!v3.6即main分支会使用v3,不会执行clone
#!!!而v3.5则还是使用v2存储,snapshot的时候会执行克隆
s.KV().Commit() #强制提交以便kv持久化metadata
go func(){
clone.SaveNoCopy()
return json.Marshal #返回json格式编码的store
raft.MemoryStorage.CreateSnapshot #更新memtable里的snapmetadata
storage.storage.SaveSnap #把集群状态相关信息(成员、term、index)写入一个新创建的xx.snap文件,
#同时在wal日志中写一条snapshot的日志
#之后由异步的purge线程在下一次启动的时候
#根据这个xx.snap文件中的配置信息去执行删除操作
#(比如把所有index<x的wal日志文件都删掉)
#同时如果节点重启,那么节点会把applideIndex设置为最新的快照中的index,
#然后从此处开始重新apply日志
storage.storage.Release #清理member/snap目录下过时的以.snap.db结尾的文件
wal.Wal.ReleaseLockTo #释放过时的wal文件的锁,否则这些文件会被etcd锁住,
#purgeFile线程无法打开
snap.Snapshotter.ReleaseSnapDBs #删除掉member/snap目录下以.snap.db结尾的文件
#因为我们已经在applyEntries之前applySnapShot过了,所以可以删除
s.r.raftStorage.Compact(compacti)
------->6:node.run处理recvc #处理follower节点发过来的消息,
#put流程则对应MsgAppResp即leader发MsgApp给follwer,
#follower然后返回MsgAppResp
#由发送线程异步发送
#当发送线程收到响应的时候就会把消息填充到n.recvc chan
#步骤4中处理完后会写advance这个chan来通知node.run前进
raft.node.run
for{
select
case m := <-n.recvc:
if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type): #这个if会过滤掉来源未知的响应消息
raft.raft.Step
raft.stepLeader
select
case pb.MsgAppResp: #处理日志append成功的消息,主要操作是修改leader的commitedIndex
if tracker.Progress.MaybeUpdate #判断的同时更新消息来源节点对应的progress对象的Match和Next字段
#m.index表示该条MsgAppResp消息中日志的索引
#MsgAppResp表示成功追加了一条日志,而raft不允许日志空洞,
#因此收到一条日志索引字段为10的MsgAppResp消息就表明10之前的所有日志,
#该节点上都有,所以直接把令pr.Match=m.index,pr.Next=m.index+1
raft.raft.maybeCommit #判断是否可以进行一次提交操作,即是否有过半的节点已经成功追加日志
#节点对应的progress对象的Match字段用来表示该节点上日志已经复制到哪个位置了
#(关闭allowUnstable时match<unstable.offset)
#收到一个MsgAppResp消息的时候
#我们在MaybeUpdate里会更新消息来源节点的progress对象的Match字段
#就是说leader有一个progerss数组,
#然后每收到一个MsgAppResp消息就通过MaybeUpdate更新一个pregress对象的Match
#更新完之后就调用maybeCommite判断一次,
#判断是否有过半节点的progress对象的Match字段已经大于等于leader的commitedIndex
#就是先对所有match字段升序排序,
#然后取n - (n/2 + 1)这个位置的match,
#如果大于leader的commitedIndex说明已经有过半节点成功追加日志
#即[commitedIndex,Matchs[n-(n/2+1)]这个区间内的日志是新增的
#并且是可以commited的日志。etcd他是按顺序把日志写入wal文件,
#wal文件中日志的状态是不知道的,所以etcd是通过一些索引变量来判断该日志是什么状态的
#这些索引变量就是applied/commited/unstable,
#如果一条日志的索引即index>commited就说明该日志还没有commited
#因为wal日志已经持久化了,就算崩溃了也没事,
#因为wal日志中每条记录都包含了(term/index/type/data四个字段)
#集群启动的时候,applied/unstable这些变量的值都是可以自动生成
#commited会作为硬状态的一部分写入磁盘
tracker.ProgressTracker.Committed #获取排序后n-(n/2+1)这个位置的progrees对象的match字段
raft.raftlog.maybeCommit #比较一下中间那个match是否大于当前leader的commitedIndex,
#如果大于就说明有日志可以提交
raft.raftlog.commitTo #把leader节点的commiteIndex更新到n-(n/2+1)这个位置对应的match值
raft.releasePendingReadIndexMessages #释放之前挂起的一致性读请求
raft.raft.bcastAppend #这次会发一条不带任何日志的MsgApp消息给所有其他节点通知他们已经提交到这里了
#bcastAppend发过去的MsgApp消息两部分:待append的日志和peer节点新状态
#因为代码是通过MsgAppResp消息来激活检测,
#而通过maybeCommite中检测时是直接检测所有集群的日志提交情况
#并不是检测日志是否更新到了MsgAppResp中的日志所对应的index,
#即收到MsgAppResp只是告知程序需要检测一下集群提交状态
#如果有新日志可以提交,就通知所有节点更新commitedIndex
#比如三个节点的集群(leader,follwer1,follwer2),
#commitedindex分别为(x,x,x),然后经过一段时间运行
#三个节点的日志分别已经追加到了(x+3,x+1,x+2),
#此时收到了一个迟来的MsgAppResp,这个MsgAppResp中的消息对应的日志索引为x-1,
#然后这个消息就会激活检测,
#leader检测到此时x+2及x+2以前的日志都是可以提交的
#(n-(n/2+1)=1,排序后match[1]=x+2
#leader就会通知他们把commitedIndex更新到x+2,
#更新完以后commitedIndex分别是是(x+2,x+1,x+2)
#因为过半就行,所以leader和follower2的commitedIndex都可以更新到x+2,
#而follower1的日志最多只能到x+1,所以只更新到x+1
#这种情况就是follower1滞后了,没关系,后面赶上就行了,
#目前集群还是有过半节点正常的,可以正常运行
------->7:node.run处理advancec #步骤4中会写advancec,通知node前进
raft.node.run
rawNode.RawNode.Advance
raft.raft.advance
log.raftlog.appliedTo #更新applyindex
raftLog.stableTo #shrink unstable数组,也就是已经apply的数据可以丢弃从而减少unstable数组大小
#步骤5中会