etcd-v3.5release-(1)-put

发布于:2024-12-06 ⋅ 阅读:(21) ⋅ 点赞:(0)

笔记:v3.5和v3.6还是有许多不同的,但大致流程还是一样的

!!!本文的a.b.c表示a文件里的b类的方法c,注意a不一定是包名,因为文件名不一定等于包名

go可以成千上万的goroute并发,所以每个请求都是同步请求即propose,然后select,直到超时或者propose成功

etcdServer.put->s.raftNode.node.stepwait->n.proc(这是chan)->node.run->(case <-proc)->node.rawNode.raft.Step->raft.setp.case_default->raft.step(前面是Step,这里是内部的step)->case_propose->raft.appendEntry+raft.bcast

etcdserver收到请求,请求需要先propose,然后etcdserver则把请求转发给node要求node完成propose任务,node是一个状态机,负责决策,根据请求的当前状态来决定下一步干什么,node只负责角色,具体操作由raft完成,也就是说raft提供了各种能力即负责执行,但是raft什么时候执行哪个操作则由node根据请求的当前状态来决定,换个角度理解,etcdserver看到的raft 节点是node,而raft协议层面看到的节点则是raft。

存疑:unstable、memstore、snapshot三者lastindex的关系

梳理一下batchtx、writebuf、readbuf、bbolt之间的关系

batchtx内部保存了一个bbolt对象,因为etcd是读操作可以从readbuf中读取不用每次都从磁盘查(etcd是所有数据的索引都会放在内存,但是数据不一定可能在磁盘),从而加快读取,put时会先把数据写入bbolt,但是此时是没有提交的,然后写入bbolt之后再写入writebuf,因为batchtx是批量提交,也就是攒够1w条后会进行提交(肯定有异步goroute会定时提交),提交的时候则会调用内部bbolt.tx.commit来提交bbolt事务,事务提交以后在会调用txn.End,在batchTxn的End函数里会把writebuf里面的内容写到readbuf,然后清空writebuf,也就是说etcd是先从readbuf取数据,取不到再去读磁盘,batchtx把数据写到bbolt之后但是没有提交之前,对外不可见,所以这些数据就不会写到readbuf中,当攒够一批事务或者定时提交时间到了,batchtx调用bbolt.commit来提交事务,提交事务后会把提交的数据丢到readbuf,因为此时会写readbuf,所以需要对readbuf加锁,因为batchtx是批量提交,每次提交的事务会很多,所以不应该提交一个事务就修改一次readbuf,因为这要求对readbuf大量加解锁,效率就低了,所以就用了一个writebuf,先把所有提交的数据写道writebuf,等事务结束的时候一次性写到readbuf中,然后再释放readbuf


a.b.c表示a文件里的b类的方法c,注意a不一定是包名,因为文件名不一定等于包名

put源码流程:

------->1: etcdserver put
etcdserver.EtcdServer.Put                        #客户端调用Put grpc会走到这里,
                                                 #客户端都是通过调grpc来和服务器通信,不同的请求对应不同的rpc函数
                                                 #因为put会修改集群数据,所以写入数据库之前必须先走一遍raft流程来同步日志
  etcdserver.EtcdServer.raftRequest(pb.InternalRaftRequest{Put: r})
                                                 #把普通请求封装到raft请求中
                                                 #raftRequest的意思表示etcdserver要使用raft方式完成此请求
                                                 #put操作要求propose成功并且被apply之后才能返回ok给client
                                                 #此处raftRequest只是第一步,put会直到数据异步apply成功后才会返回
                                                 #这个Put:r表示把当前请求放到raft请求的Put字段,
                                                 #这个put字段属于raft请求的数据字段(data)
                                                 #当然,还有Delete/Range/Txn/Compact字段,
                                                 #但是同一个请求有且只有一个字段不为空即一个请求只做一件事
                                                 #raft同步日志过程中,
                                                 #只关心日志的元信息比如日志索引、任期,不关心不使用这个数据字段
                                                 #这个data字段有两个作用:
                                                 #1:持久化wal日志时作为日志的一部分被写入磁盘,从而做好了数据备份,
                                                 #当崩溃时数据不会丢失,重启时重放一遍日志就行
                                                 #2:apply的时候被解析,
                                                 #data字段是一个完整的请求,包含了数据和请求类型等元信息,
                                                 #apply阶段从data字段解析出请求r后就根据Put字段不为空从而判断出这是一个put请求,
                                                 #从而可以正确分发请求(调用不同函数)
                                                 #所以不管是什么请求,这个raft同步日志的流程都是通用的,
                                                 #因为raft流程完全不涉及data字段

    etcdserver.EtcdServer.raftRequestOnce        #以raft方式完成此请求
      etcdserver.EtcdServer.processInternalRaftRequestOnce 
                                                 #干三件事:1:检测是否接受该请求。如果达到阈值就停止接受新请求
                                                 #2:准备好raft所需的日志数据(把客户端请求(put/delete/txn等)
                                                 #序列化以后封装到raft日志的data字段)
                                                 #3:进行propose然后等到其他goroute成功apply数据
        etcdserver.EtcdServer.getAppliedIndex    #获取已经应用的日志的索引(即appliedIndex),应用成功表示数据已经写入数据库
        etcdserver.EtcdServer.getCommittedIndex  #获取已经提交日志的索引(即commitedIndex),提交成功表示该数据日志已经同步到其他节点了
                                                 #把一条数据写入etcdf服务器需要两步:
                                                 #1:同步日志到其他节点,一条日志包含两部分(日志元信息,原始请求);
                                                 #2:写数据到本机bbolt数据库,etcd底层用的是bbolt数据库存储数据
                                                 #(原始请求中包含数据)
                                                 #同步日志:即commit操作。
                                                 #commit操作即是把日志同步到其他etcd节点,一旦过半节点同步成功则该日志就变成commited了
                                                 #写数据:即apply操作即成功把数据写入底层boltdb数据库
                                                 #!!!apply操作是各搞各的,leader与follower之间不会为此进行通信和同步,
                                                 #不用担心崩溃恢复的问题,原因如下:
                                                 #apply一条日志之前该日志必定是commited,即成功写入了本地磁盘并且同步到了过半节点上,
                                                 #所以崩溃后日志还是在的,数据还是在的
                                                 #崩溃重启时,只要从0开始重新apply一遍所有日志(相当于mysql redo),  
                                                 #就能使数据库恢复到一个确定的状态
                                                 #???待补充:因为etcd是readIndex,所以不用担心从follower节点读时落后的问题

        if appliedIndex+limits<commitedIndex     #如果apply一条数据花费的时间长,那么就可能导致appliedIndex远远滞后于commitedIndex
                                                 #一旦超过一个阈值,就拒绝本次put即此时不再接收新请求
                                                 #每条日志都有一个索引表示这是第几条日志,
                                                 #etcd中如果index=x的索引还没处理完就不会处理index=x+1的日志
                                                 #整个日志索引空间分两大块:持久化了的和unstable,
                                                 #unstable表示这部分日志还在内存,断电会丢失
                                                 #日志索引分3大段,applied(已经可以访问即写到了数据库),
                                                 #commited(已经提交了日志但还没有写到数据库),uncommited(还未提交日志)
                                                 #applied:[0,x]  commited:[0,z-1]  uncommited: [z,+∞),
                                                 #x必定小于等于z-1即必须先提交后应用
                                                 #还可以分为stable(持久化到了磁盘),unstable:还在内存,断电丢失
                                                 #stable:[0,k-1],unstable:[k,+∞),
                                                 #如果allowUseUnstable(可配置,即允许commite和apply未持久化的日志)
                                                 #那么x/z都可以大于等于k,反之都必须小于k
                                                 #有个InProgress表示正在进行持久化的数据即inprogress之前的已经持久化了,
                                                 #之后的分两部分:正在持久化和还未开始持久化
                                                 #还有个applying表示正在应用中的日志即applying:[x+1,applying],
                                                 #applying要小于等于commited
                                                 #!!applying和commiting时日志可以还没有持久化(可配置allowUseUnstable),
                                                 #因为持久化和commited/apply操作都是异步的
                                                 #不过一般来说commitedIndex要小于unstable的第一条日志的索引(默认配置是这样)
                                                 #关于日志还有另一种分法,即snapshot,memtable,unstable
                                                 #snapshot[0,a]:这部分日志已经持久化了,并创建了对应的snapshot文件,
                                                 #该文件记录了当前的appliedIndex值即a,重启后会从a开始重放
                                                 #memtable[a+1,b]:这部分日志已经持久化了,但不属于快照,会在内存创建一个对应的数组储存
                                                 #ustable[b+1,+∞):这部分还没持久化
            return                               #applyIndex+limits<commitedIndex,表示落后程度达到了阈值,所以直接拒绝新请求

        idutil.Generator.Next                    #为该请求生成一个唯一id,会把这个id和一个chan对应起来,
                                                 #当异步完成propose流程后会通过这个chan通知此goroute
        etcdserverpb.InternalRaftRequest.Marshal #序列化raft请求的数据部分,后续会被放到一个msg对象中,然后准备发往其他etcd节点
        wait.list.Register(id)                   #为前面生成的id生成一个chan,假设叫做ch,会这个ch加到的内部的map中即id->chan
                                                 #这样我们就把(req,id,chan)三者联系起来了
                                                 #当前goroute会阻塞在这个ch chan上以等待put结果即等待数据apply成功
                                                 #当日志同步完成后其他goroute会写这个chan来通知当前线程put完成
                                                 #收到结果后就返回给用户了

        raft.node.Propose                        #调用node接口来通知底层的node进行propose
                                                 #etcdserver看到的节点是node,对于etcdserver来说,node就是一个raft层面的节点
                                                 #所以etcdsever把propose这个任务交给node去完成,etcdserver只要等待结果就好
                                                 #node.propose方法就是写node.propc这个chan
                                                 #来通知node的另一个goroute来进行propose
                                                 #propc是propose chan的缩写,c表示chan
                                                 #node的这个goroute就是node.run这个函数,这就是一个状态机
                                                 #这个状态机有个switch ,处理不同的chan上的消息
                                                 #raft同步日志流程,就是一个2pc节点,即包括两个阶段:propose和commited
                                                 #propose就是leader通知各个follower来了一条新日志,你们给我保存一下
                                                 #commitd就是leader检测到一些日志可以提交了(即检测到该日志已有过半follower同步成功)
                                                 #那么leader就先本地提交然后发消息通知各个follower提交这些日志
                                                 #笔记:因为follower可能失败,所以才需要先收集是否有过半节点可以成功
                                       
          raft.node.stepWait(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) 
                                                 #创建raft消息并把消息发送给node的状态机goroute
                                                 #消息类型为MsgProp即代表这一步是propose
                                                 #不管什么请求(put/delete/txn/compact等),原始请求都会作为raft请求的data字段
            raft.node.stepWithWaitOption         #这个函数只处理Propose消息,对于其他类型的消息则直接发到recvc chan,
                                                 #由对应线程来处理,然后直接返回
              if m.Type != pb.MsgProp:
                case n.recvc <- m                #非Propose消息直接转发给node的recvc
                return
              raft.node.propc <- msg(pb.MsgProp) #把包含了日志的Propse消息发往指定chan,即raft.node.run这个线程去异步propose
              case err := <-pm.result            #阻塞,直到propose完成并把结果填充到resultchan中


  
      ------->2: 请求发到node.propc chan          #------>另一个goroute,node.run,即raft状态机
           流程来到另一个goroute即node.run
          raft.node.run                          #node.run函数相当于raft状态机,里面会有一个死循环不断select,select中有很多chan
                                                 #每个chan对应一个状态,每个chan都有一个配套的动作
                                                 #上一步我们是发送给了node.propc这个chan,那么配套的动作就是propose即发起提议
                                                 #propose处理流程分2步:
                                                 #1:leader先在本机上把该日志持久化(实际是异步操作,只不过会默认持久化成功并更新相关变量)
                                                 #2:发送MsgApp消息通知follower节点保存该日志
                                                 #之所以本机上也做是因为leader本身也算作一个节点,最终投票时leader本身也要投一票,
                                                 #所以leader本身也要做follower所做的事
                                                 #向follower节点发送的MsgApp日志同步消息会放到发送缓冲区由其他线程发送,
                                                 #并乐观的更新leader看到的follower节点视图(即pr.next)
                                                 #即消息还没发给follower,就默认follower会成功。就算失败也没关系,
                                                 #因为检测的是pr.Match这个字段而不是pr.next, 
                                                 #如果指定时间没有过半节点的pr.match字段达到日志对应的索引,
                                                 #那么leader就知道这个失败了,就会进行相应处理
                                  

            for{
              ...略,后面会细讲...                 #略去其他代码,此处我们直接来看propc对应的propose动作
              select:
                case <-raft.node.propc           #当收到etcdserver发来的日志同步请求时执行这里
                                                 #做两件事:1:调用StepLeader发起propose;2:等待,直到收到propose的结果
                  raft.raft.Step                 #node调用raft.step来执行下一步
                                                 #node与raft关系:node是大脑,raft是执行器,
                                                 #也就是说raft提供了各种能力比如写wal持久化、网络通信等能力
                                                 #然后node就是根据当前请求的状态来判断该执行什么操作
                                                 #node只负责决策,不负责执行,实际执行时node就通过调用raft的各个函数来实现
                                                 #节点有三种角色(leader/candidate/follower),
                                                 #对应stepLeader/stepCandidate/stepFollower函数,根据角色调用对应的函数
                                                 #这里以leader为例,因为etcd只允许leader发起propose,
                                                 #所以当前节点必定是leader,如果是follow,那么会把propose请求转发给leader
                    raft.stepLeader:             #leader propose主要执行两个操作:
                                                 #1:把日志写入unstabel;
                                                 #2:发送MsgApp消息给follower,日志会放在MsgApp消息中
                      case MsgProp:              #这里处理propose消息,leader的日志追加操作对应的是etcdserver层发来的MsgProp消息,
                                                 #而非leader的日志追加操作则对应的是leader发来的MsgApp消息,
                                                 #再次强调:只有leader才可以propose
                        raft.raft.appendEntry    #本地添加日志,只是添加到内存,并不会立即持久化,是由另一个goroute异步进行持久化
                                                 #!!!不用担心还没持久化就崩溃导致数据丢失
                                                 #在关闭allowUseUnstable的情况下etcd只会commite已经持久化的日志,
                                                 #所有还在unstable中的日志必定是还没有持久化的,
                                                 #还没有持久化的日志必定不会commite和apply,
                                                 #所以必定不会返回成功的结果给客户端,所以客户端肯定也知道本次请求还没有成功
                                                 #所以客户端可以选择稍后重试直到etcd集群恢复正常,
                                                 #所以即使断电也没关系,unstable中的日志丢失就丢失了,完全不用担心
                                                 #!!!补充一下:发起propose、commit日志、apply日志是三个goroute
                                                 #!!!他们都是互相异步的,也就是说commit日志(即写入wal日志)/apply日志时
                                                 #!!!不是说a goroute通知commit/apply goroute去同步某条日志
                                                 #!!!而是说a goroute搞完以后更新一下proposedIndex和commitedIndex然后就返回了
                                                 #!!!这样wal goroute会异步不断的写日志到磁盘,直到达到最大的值
                                                 #!!!apply也是一样的,也就是说wal goroute会落后于proposed
                                                 #!!!apply也会落后于wal即commite
                                                 #!!!这里的落后是指:索引x处的日志已经proposed完成完了,
                                                 #!!!但是wal刷盘线程可能还没写到那个位置,也就是落后于proposed
                                                 #!!!索引y处的日志已经wal写到磁盘了,但是还没有apply,即apply落后于commite
                          log.raftLog.lastIndex  #获取最后一条日志的索引,
                                                 #然后本次新增的n条日志的索引就依次为lastIndex+1、lastIndex+2,...lastIndex+n
                                                 #可以把整个日志空间看成一个逻辑数组,
                                                 #第0个元素的日志索引就是0,第x条日志的索引就是x,每次追加日志都是追加到数组末尾
                                                 #然后unstable就是这个逻辑数组中位置x到+∞的这部分索引对应的日志
                                                 #即unstable就是整个数组的后半段
                            log_unstable.unstable.maybeLastIndex #尝试返回unstanble的最后一条日志的index
                              if l := len(u.entries); l != 0:    #如果u.entries不为空则返回u.offset+len(u)-
                                return u.offset + uint64(l) - 1  #u.offset表示u的其实offset,因为整个索引空间是0~+∞
                                                                 #u只保存了u.offset~+∞,即最后这一段
                              if u.snapshot != nil :             #如果len(u)==0,则返回snapshot的index
                              return u.snapshot.Metadata.Index   
                            storage.MemoryStorage.LastIndex      #如果snapshot也是空,则返回memstore的lastindex
                                                                 #memstorage中保存的都是持久化了的日志
                                                                 #所以说mem.lastIndex<=snapshot.lastIndex<u.lastIndex
                          raft.raft.increaseUncommittedSize      #计算未提交数据总大小,
                                                                 #如果算上本次准备提交的日志数据字节数
                                                                 #超过了系统允许的未提交数据的字节数
                                                                 #那么就拒绝本次提交,以免未提交数据太多,一旦断电,影响太大
                                                                 #比如由1G的数据还没提交,然后断电,丢失,
                                                                 #client又要花时间花资源重做这1G数据对应的请求,代价太大
                          log.raftlog.append                     #这里把日志添加到unstable数组的末尾,由其他线程异步持久化到磁盘
                            if after := ents[0].Index - 1; after < l.committed : 
                                                                 #如果新增日志的索引号小于已提交最后一条日志的索引号,
                                                                 #所以发生了不可修复错误,直接打印panic日志
                              log(panicLevel,xxx)
                            raft.unstable.truncateAndAppend      #这里把日志添加到unstable数组的末尾,并且会truncate
                          r.prs.Progress[r.id].MaybeUpdate(li)   #r代表的就是当前的节点即当前的leader
                                                                 #这里就是更新leader的match,即表示leader已经持久化到了这里
                                                                 #quorum过半匹配时就是判断的这个match字段
                                                                 #!!!因为这是leader,所以才立即更新,这是leader更新自己
                                                                 #而非leader则需要持久化以后才会更新   
                            if pr.Match < n:
                              pr.Match = n                       #更新pr.Match字段
                              updated = true
                              pr.ProbeAcked()
	                        pr.Next = max(pr.Next, n+1)          #更新pr.Next  
                          raft.raft.maybeCommit                  #注意:假设本次请求对应的日志索引是x,
                                                                 #此处commite并不是指提交到x
                                                                 #而是会实时监测一下此时应该提交到哪里,
                                                                 #也就是说maybeCommit仅仅是一个激活操作
                                                                 #具体提交到哪个位置,则由当前的状态决定
                                                                 #这里简单记录一下,后面会详细介绍
                            log.raftlog.maybeCommit
                              l.commitTo                         #提交leader的commitedIndex代表日志已经提交到这里了
              

                        raft.raft.bcastAppend           #bcastAppend的意思是broadcastAppend,
                                                        #即广播需要追加的数据即日志到所有peer节点,广播的是MsgApp消息
                                                        #bcastAppend发过去的消息要干两件事:
                                                        #第一件事是告知peer节点要追加哪些日志,
                                                        #第二件事是通知peer节点更新状态
                                                        #MsgApp消息包含两个部分,第一个部分是需要append的日志,
                                                        #第二个是peer节点的新状态(比如commitedIndex)
                                                        #leader拥有所有节点的视图(即progress对象),
                                                        #然后leader会更新这些progress对象
                                                        #更新完后通知peer节点把他们自己的状态更新到leader所看到的这个状态,
                                                        #比如leader的commited到达x了,peer节点也需要把commited索引更新到x。
                          tracker.ProgressTracker.Visit #遍历所有节点,etcd用id来代表集群,visit接受一个lambda函数,
                                                        #该函数中如果目的节点是自己,那么就会忽略该节点
                            raft.raft.sendAppend        #发送MsgApp发到指定节点

                              raft.raft.maybeSendAppend  #!!!在发送给follower节点的MsgApp消息内
                                                         #!!!还会带上leader视角下follower此刻的状态信息(任期、Match、Next)
                                                         #!!!即leader认为follwer此刻应该提交到这里了,
                                                         #!!!follower收到后就会按照leader的命令用这些信息更新本机的状态
                                                         #!!!假设当前收到的请求对应的日志为x,
                                                         #!!!这个maybesend并不是就发送这条日志x,
                                                         #!!!相反的是maybesend函数并不关心当前收到的日志
                                                         #!!!而是会检测状态,凡是满足要求的日志都会被发送,
                                                         #!!!所以发来一条请求,收到一条日志,仅仅是激活send操作
                                                         #!!!send操作发送哪些日志并不是由收到的日志决定,而是由当前状态决定
                                pr := tracker.ProgressTracker.Progress[to]  
                                                         #to表目的节点id,etcd用id代表结点,
                                                         #这里先根据目的节点id获取目的节点的相关信息,
                                                         #pr表示leader视角下follower节点应该的的状态
                                                         #pr即progress,即进度的意思,即leader视角下follower节点的日志复制进度
                                                         #pr.Match:表示follower节点leader节点之间日志的匹配索引
                                                         #即leader认为follower已经成功复制了leader上[0,pr.Match]内区间内的日志
                                                         #pr.Next: 表示leader期望赋值给follower的下一条日志的索引
                                                         #pr即struct tracker.Progress,
                                                         #前面说的发送哪些日志,就是由这个progres状态变量来决定

                                log.raftLog.entries(pr.Next)     
                                                        #获取leader上日志索引号大于等于pr.Next的所有日志,
                                                        #[pr.match+1,pr.next-1]这个索引区间
                                                        #表示这些日志则是已经发送给follower了但是follower还未返回确认
                                                        #pr表示该某个follower节点上日志复制进度
                                                        #如果leader上存在日志索引号大于pr.Next的日志,
                                                        #说明leader有些日志该follower没有,即follower落后了
                                                        #所以这里就是一次性把follower缺少的日志都发送给follower,
                                                        #而不是仅仅发送请求中的那条日志x
                                                        #所以这个mabesend叫做maybe,
                                                        #如果不存在就说明没有日志要发送,如果存在说明有日志要发送
                                                        #所以说日志x只是激活这个maybesend操作,
                                                        #而具体发送哪些日志由实际状态来决定
                                                        #当然,如果日志太多了,一次发送不了,那么就只会发送一部分,
                                                        #剩下的等到下一次maybesend被激活再来发送
                                                        #!!!小结:etcd中这种操作多的是,即a事件只是触发b操作,
                                                        #!!!b的具体操作则由当时的实际状态决定,而不是由a传递给b的参数决定
                                                        #!!!如wal日志刷盘操作、apply操作、maybeCommit、maybeSend等
                                m.Type = pb.MsgApp      #发送的是MsgApp消息
                                m.Index = pr.Next - 1   #即to上这条msgAPP消息对应的index
                                m.LogTerm = term        #当前任期
                                m.Entries = ents        #数据
                                m.Commit = r.raftLog.committed #leader的committedIndex,即leader告知follower自己的状态
                                switch pr.State:
                                  case tracker.StateReplicate:
				                    tracker.Progress.OptimisticUpdate #乐观更新to节点对应的pr.Next=最后一条日志的Index+1
                                                                      #OptimisticUpdate是乐观更新的意思
                                                                      #正常情况下to的pr.Next都等于leader lastIndex+1,
                                                                      #如果follower落后于leader,
                                                                      #即to节点的pr.next<leader的lastIndex+1
                                                                      #就是说这里leader先乐观更新follower状态
                                                                      #即使follower不是这个状态,
                                                                      #follower落后于leader也没关系
                                                                      #因为在follower报告给leader的heartbeat消息里,
                                                                      #follower会发送自己的实际状态
                                                                      #leader会据此修正pr.Next
                                      pr.Next+=len(entries)
                                    tracker.Inflights.Add           #表示传输中的消息窗口,用于限制消息的数量和带宽,
                                                                    #如果满了会检测到,从而导致本次发送取消。
                                raft.raft.send(pb.Message{          #把leader视角下follower的状态都填充到这个MsgApp消息里
                                    To:      to,                    #消息接收节点对应的id
                                    Type:    pb.MsgApp,             #消息类型
                                    Index:   lastIndex,             #此刻follower节点上最后一条日志的索引号应该为lastIndex
                                                                    #如果对不上,follower就会忽略这条MsgApp消息
                                    LogTerm: lastTerm,              #此刻follower节点上最后一条日志对应的leader的任期
                                                                    #如果对不上,follower就会忽略这条MsgApp消息
                                    Entries: ents,                  #本次leader发送给follower的日志
                                    Commit:  r.raftLog.committed,   #告诉follower,leader上的日志已经提交到这里了
                                })                                  #把leader视角下follower的状态都填充到这个MsgApp消息里
                                                                    #给peer节点发送一条msgApp消息,
                                                                    #节点收到一条msgapp消息
                                                                    #表示收到leader节点发过来的一条日志同步消息
                                  append(raft.msgs)        #msgApp消息会放到leader的msgs数组中
                                                           #会有另一个专门的发送线程不断轮询这个数组,如果发现msgs不为空就发送
                                                           #即node.run这个goroute
                                                           #会在每次循环开始都会检测msgAfterAppend和msgs,有数据则表示ready

                                                           #走完这里表明propose流程已经结束,
                                                           #propose只需要把对应日志加到本机的unstable数组
                                                           #以及把消息成功发出去就算成功,
                                                           #至于有没有过半节点成功则是下一步apply的事情
                                                           #注意,此时只是propose完成,put还没有完成
                                                           #apply一批数据后会通知正在wait的chan put请求已完成
                                                           #然后put才会结束等待,才会返回ok给应用
                                                           #此时经过一次propose,那么就会产生数据需要处理
                                                           #比如持久化wal日志、更新commited等
                                                           #所以下一步就是通知node内部的raftNode来执行持久化wal日志等操作
  


         ------->3:还是node.run        #仍然是node.run函数,和上面的是同一个函数,但是这是下一次循环了
                                       #这些代码原本是在select case <-raft.node.propc之前的
                                       #这里就是检测是否有数据redy了,如果有就把这些数据放到一个结构里,
                                       #然后填充指定chan来通知数据到了,不同的chan配套有不同的动作
                                       #此处就是通知底层raftNode有数据来了,你该开始干活了
                                       #raftNode负责持久化wal日志等操作

          raft.node.run
            for{
              ready=raft.RawNode.HasReady #此函数是node.run用来检测是否有数据需要处理,如果有就会激活本次循环,
                                          #如果无则本次循环会阻塞在某个chan上,数据有多种,来源也有多种
                if !r.softState().equal(rn.prevSoftSt): 
                                          #1:判断软状态是否和之前的状态相同,如果不相同则说明需要处理
                                          #软状态:{leader,role},即leader是谁以及当前节点的角色是什么
                                          #软状态不会持久化,所以才叫软状态
                  return true
                if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt):
                                          #2:判断硬状态是否和之前的状态相同,如果不相同则说明需要处理
                                          #硬状态:{vote,term,commit},当前候选人、当前任期、当前已提交的索引
                                          #硬状态会在写wal日志的时候一同写入磁盘
                  return true
                if r.raftLog.hasPendingSnapshot():
                                          #如果有挂起的快照操作,则说明需要处理,
                                          #快照的用途是当follower严重落后时leader直接发一个snapshot过来
                                          #然后follower收到snapshot后会把他放到usntable的snapshot变量里,
                                          #这个变量不为空表明这个snapshot还没有持久化到磁盘,写到磁盘后会置空这个变量
                  return true
                if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts():
                                          #3:msgs不为空表示有数据要发送,
                                          #unstableEntries不为空则表示有日志需要持久化
                                          #hasNextEnts为真表示applyIndex<commitIndex,表示有数据可以apply
                  return true
                if len(r.readStates) != 0:#4:ReadStats数组是否为空?
                                          #不为空则说明有ReadIndex请求已经有过半节点确认当前leader是有效地,
                                          #可以唤醒并执行该读请求  
                  return true
                return false              #如果上面的条件都没满足则表示本次循环无需处理

              if ready:                   #这个hasReady操作在node.run中是在每次循环开始的时候判断,
                                          #因为上一次循环中处理MsgProp消息时会把发送给其他节点的写入msgs  
                                          #所以本轮循环,hasReady会返回true,这些数据会在本次循环中处理,
                raft.RawNode.readyWithoutAccept #准备好本次要处理的数据,会把所有数据封装到一个raft.Ready结构中
                                                #因为etcd所有操作基本都是异步的,所以同一时刻三种数据都会存在:
                                                #unstable:需要持久化的数据、
                                                #commited:已经提交的数据,需要apply
                                                #msgs:发往其他节点的消息(如heartbeat/MsgApp/MsgAppResp等)
                                                #正因为如此,所以每次迭代时会同时进行三种操作
                                                #即持久化unstable、apply已经commited的数据、发送msgs到peers节点

                  raft.raftlog.nextUnstableEnts      #2:获取所有未持久化的日志,这个写操作就是wal(预写日志)
                    raft.unstable.nextEntries
                      return unstable[inprogress+1:] #inprogress之前的数据表示之前已经在持久化过程中或者已经持久化了
                                                     #只不过还没完成
                  raft.raftlog.nextCommittedEnts     #3:获取一批已经commited,需要apply的日志
                    raft.raftlog.maxAppliableIndex   #如果allowUseUnstable,那么可以允许提交未持久化的日志,反之不允许
                      hi := l.committed
                      if !allowUnstable {            #如果配置不允许使用未提交的数据,则只允许提交已持久化的日志,
                                                     #offset-1表示最后一条持久化的日志的index
                                                     #如果不允许使用unstable的日志,
                                                     #那么这样就可以确保用户一旦收到写入完成的响应,
                                                     #那么数据对应的wal日志必定已经写入了磁盘
                                                     #此后即使服务器崩溃,数据不会丢失
                                                     #如果开启,就说明还没有持久化的日志可以被apply,这样断电以后,有可能数据丢失
                        hi = min(hi, l.unstable.offset-1)
                      }
                    raft.raftlog.slice(applying+1,hi+1,maxSize) # [applied,applying]区间内的数据已经在应用中了
                                                                # 本次commite[applying+1,hi+1)之间的日志,
                                                                #至多处理到hi,并且至多处理maxSize字节

              select                        #这个select和上面的select case <-raft.node.propc是同一个select
                case readyc <- rd:          #数据已经丢给raftNode,即另一个goroute ratNode.start在等待这个radyc
                  raft.RawNode.acceptReady  #此case主要就是一个发送的目的,一旦发送完成就重置当前的rd结构
                    raft.RawNode.raft.prevSoftSt = rd.SoftState
                    raft.RawNode.raft.readStates = nil
                    raft.RawNode.raft.msgs = nil
                  advancec = n.advancec     #等raftNode处理完后就会发送消息到advance这个chan
                                            #捋一下:etcdserver内嵌一个raftNdoe,raftNode内嵌一个raftNodeConfig,
                                            #raftNodeConfig又内嵌一个node,node又内嵌一个RawNode
                                            #一个rawNode又内嵌一个raft节点以及当前节点的状态
                                            #node负责根据请求的当前状态作出决策,然后node调用这个raft对象来实际执行各种操作

         ------->4:raftNode.start          #在步骤3的select中我们通过case readyc<-rd 把准备好的数据
                                            #发送给了readyc,所以下一步就是处理readyc中的数据了

        raft.raftNode.start                #这里会启动一个goroute,然后启动一个死循环,通过select从chan接受消息
          go func(): 
            for{
              select
                case rd := <-r.Ready()     #上一步node.run的case readyc <- rd这个操作会激活这个case
                                           #raftNode的操作最终都是委托给内部的raft对象的
                                           #因为传过来的rd中同时包含了
                                           #unstable(准备持久化的数据)/commited(准备apply的数据/msg(准备发送的消息)
                                           #所以这个case也会同时干这三件事sync(unstable)、apply(commited)、send(msg)
                                           #!!!注意,raftNode的apply只是通知etcdserver去进行apply,
                                           #!!!raftNode本身不执行apply
                                           #这个case一共做五件事:
                                           #1:检查软状态是否有改变(比如集群是不是有新leader了,集群是否有leader),
                                           #如果有则调用相关函数
                                           #2:如果readStateC数组不为空,
                                           #则说明此刻appliedIndex大于等于这个readState所等待的commitedIndex 
                                           #需要通知etcdserver进行apply
                                           #3~5:sync(unstable)、apply(commited)、send(msg)
                  if rd.SoftState != nil:             #1:检查集群leader
                    etcdserver.EtcdServer.setLeader   #设置集群leader
                    updateLeadership=func(){          #表示这里调用的是一个lambda函数
                        if leader=节点自己:
                          v3compactor.Periodic.Resume #如果leader是节点自己,那么重新开启compact
                                                      #因为compact会改变集群数据状态,所以etcd只有leader才可以发起compact,
                                                      #进行compact之前要先走raft同步一条compact日志
                                                      #compact日志中包含本次要要压缩哪些版本的数据
                        if leader!=节点自己:
                          v3compactor.Periodic.Pause  #如果leader节点不是自己,那么就暂停compact
                        if newLeader:
        				  s.leaderChangedMu.Lock()
        				  lc := s.leaderChanged
        				  s.leaderChanged = make(chan struct{})
        				  close(lc)
        				  s.leaderChangedMu.Unlock()
                                                      #???待补充:close(lc)时其他goroute会感知到leader变了
                                                      #我们在并发进行线性读的时候因为要确认是否有过半节点有效,
                                                      #所以函数requestCurrentIndex会阻塞
                                                      #直到节点确认过半节点承认节点是当前集群的leader之后
                                                      #才会去读取当前的appliedInex
                                                      #如果requestCurrentIndex阻塞的时候收到集群leader改变的消息
                                                      #那么他就会立刻放弃所有此前还在等待的ReadIndex请求,然后返回错误
                    }()
                  if len(rd.ReadStates) != 0 {
                    r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]  
                                                          #2:如果readStattes不为空,说明此刻有ReadIndex准备好了,
                                                          #所以需要唤醒getCurrentIndex函数

                  etcdserver.updateCommittedIndex         #设置server上的commitedIndex。
                                                          #etcdserver的commitedIndex变量
                                                          #取决于底层的raft日志的commitedIndex
                                                          #可以这样理解:server有一层状态,
                                                          #然后server底层的raft日志也有一层状态
                                                          #raft日志的commited表示日志已经同步到这里了,
                                                          #对于server来说,这些日志都是可以commited的
                                                          #即etcdserver的commitedIndex小于等于raft的commitedIndex
                    idx:=commitedEntryLastIndex           #本次待apply的最后一条日志的index
                    etcdserver.EtcdServer.setCommittedIndex(idx) #设置server上的commitedIndex
                  waitWALSync := shouldWaitWALSync(rd)
				  if waitWALSync:                         #只要有新unstable的日志,那么就为true
                                                          #即在apply之前必定会持久化日志
					wal.Wal.Save                          #持久化wal日志
                  raftNode.applyc <- toApply{commitedEntry,notifyc chan} 
                                                          #3:通知etcdserver apply(commited)
                                                          #把准备apply的数据填充到apply chan中,
                                                          #etcdserver.run会监听这个applyc chan
                                                          #commitedEntry表示本次要apply的日志,
                                                          #notify用来同步etcdserver.run里的apply操作
                                                          #因为etcdserver.run里apply函数
                                                          #在成功写入bbolt数据库后会调用trrigerSnapshot做一个快照
                                                          #而本函数也会处理快照,所以为了安全,需要一个notify chan来同步
                  if isLeader:                            #4:把待发送的消息发送给follower节点(put流程中此处是MsgApp消息)
                    etcdserver.raftNode.processMessages   #根据实际情况重新设置消息的某些字段,主要就是丢弃自己发给自己的消息
                                                          #r.msgs中的消息就是thread 1 中填充的准备发往peer节点的pb.MsgApp消息
                                                          #和发给自己的raftpb.MsgAppResp消息
                                                          #这里将MsgAppResp的目的地是自己的消息的目的地设置为0表示丢弃

                    rafthttp.Transport.Send               #把r.msgs中的消息发送给peer节点
                      rafthttp.peer.Send                  #把msg写入一个chan,然后由监听这个chan的其他线程去异步发送请求给peer节点
                        writerch=rafthttp.peer.pick       #获取对应的输出chan,支持    streamAppV2、streamMsg、pipeline
                        writerch<-msgs                    #把msgs填充到chan中,这个chan会有对应的线程x在监听,
                                                          #一旦收到chan就会把消息发往指定节点
                                                          #也就是说发送线程从raftnode.ready chan(实际是node.readyc)接受数据
                                                          #然后把收到的响应填充到raftNode.recv chan(实际是node.recvc)
                  if !raft.IsEmptySnap(rd.Snapshot)
                    storage.storage.SaveSnap(rd.Snapshot)  #5.1:持久化快照点。如果rd的snap字段不为空说明leader发来了snap,
                                                           #需要对应的snap数据持久化到磁盘上,以供后续回放
                                                           #说明这个节点当前的角色为follower
                                                           #注意:这里说的snap就是本文后面apply完成之后
                                                           #triggersnap函数创建的snap日志文件和snap日志
                                                           #这个snap点相当于mysql里的checkpoint点,
                                                           #snapshot操作也是必须得leader发起
                      snap.Snapshotter.SaveSnap            #保存snap的数据部分到一个单独的文件中,
                                                           #数据部分指的是集群状态、term、index,
                                                           #并不是bbolt数据库中的数据,别搞混了
                      wal.Wal.SaveSnapshot                 #保存一条snap日志到wal文件中,
                                                           #snap日志和其他raft日志都是一样的只是日志的类型不同而已
                  if !waitWALSync:
                    storage.storage.Save #5.2:刷盘wal日志,即持久化unstable部分对应的所有日志
                                         #是所有usntable日志以及当前的硬状态(term/vote/commit)
                                         #unstable之前的日志肯定也已经写入磁盘,
                                         #即那些准备返回结果给用户的数据对应的日志肯定已经写入了磁盘
                                         #因为etcd会限制未提交的数据大小,所以一旦请求太多,
                                         #导致unstable的日志超过阈值,那么集群就会拒绝后续所有新增put请求
                                         #当完成这一步的时候可以确保目前leader上所有unstable的日志都已经持久化到了磁盘,
                                         #此后崩溃也不怕了
                                         #leader发送MsgApp消息给follower节点和leader刷盘wal日志是一个并发操作
                                         #刷盘的时候是按日志的index来刷盘的,即index=a的日志如果刷盘失败,
                                         #那么index>a的日志肯定不会在存储中
                                         #下面的为个人随笔,并非etcd的实现,:
                                         #可以简单理解为一条日志刷入磁盘的日志状态有四种取值:
                                         #prepare、commited、applied、abort
                                         #prepare状态:这是初始状态,已经给follower节点发出MsgApp消息了,
                                         #但是还没有收集到足够多的选票
                                         #abort状态: 指定时间内没有收到足够多的选票、被其他节点拒绝了,
                                         #那么就把prepare状态的日志标记为abort
                                         #commited状态:收集到了足够多的选票,则从prepare转变为commited
                                         #applied状态:commited状态的日志发布成功则变为applied状态
                                         #突然断电后重启后通过重放日志来恢复:
                                         #prepare:可以简单的选择直接丢弃或者重试,全在个人取舍
                                         #abort状态:直接丢弃
                                         #commited:再次apply就行
                                         #applied:直接使用
                                         #OLAP型数据库starrocks1.9版本的事务处理流程就是与上面的流程几乎一模一样
                                         #但是etcd实际不是这么处理的,etcd判断一条日志是不是有效,
                                         #是通过集群协商来获取的,而不是直接记录日志的状态
                                         #崩溃重启后,etcd节点会获取到集群leader,然后以leader为准,
                                         #对于index=a的日志,如果leader上有,且index、term都对的上,
                                         #那么这条日志对于该etcd节点来说就是有效地,
                                         #相当于上面所说的处于commited状态的日志,
                                         #如果这条日志leader上没有或者冲突了,
                                         #那么这条日志对于该节点来说就是prepare的,直接丢弃,  
                                         #etcd这里没有abort,因为所有日志都是以leader为准,比较霸道
                                         #如果leader发现一条日志一个follower节点返回reject,
                                         #那么该leader就会强制用自己的日志去覆盖该follower的日志
                                         #如果leader发现一条日志一直收不到过半节点返回响应,如果是节点宕机或者网络出错,
                                         #那么leader在heatbeat部分就会检测到该节点失效
                                         #并且会标记该节点并返回失败
                      wal.Wal.Save                    #是storage.storage.Save函数里调用wal.Wal.Save
                        wal.Wal.saveEntry             #保存日志,此处还没有强制刷盘
                        wal.Wal.saveState             #保存硬状态,包括:
                                                    #Term:表示当前的任期
                                                    #Vote:表示当前节点最后投票给的候选人的ID
                                                    #Commit:已经被大多数节点确认过的最高索引号。
                        sync=raft.MustSync            #判断是否需要强制刷盘,如果写入的entry不为空或者硬状态不等于旧状态则必须强制刷盘
                        if curOff < SegmentSizeBytes: #如果一个段没有写满。默认64MB,不一定是64MB,
                                                    #因为最后一条日志大小不一定刚好凑满64MB,可能超出一点点
                          if sync:
                            wal.Wal.sync              #强制刷盘
                        wal.Wal.Cut                   #超过了指定的文件大小,
                                                    #所以先把文件切割成两部分(不会把一条日志保存到两个文件),然后分别刷盘
                  if !raft.IsEmptySnap(rd.Snapshot)
                    wal.Wal.Sync                    #强制wal刷新   
                    raft.MemoryStorage.ApplySnapshot#应用快照,就是按照发来的snap来设置memtable的相关字段
                    storage.storage.Release         #释放掉旧的snap文件的锁,以便purgeFile线程可以异步删除过时的文件。
                                                    #之前加锁是为了防止文件被其他人其他线程意外删除

                  raft.MemoryStorage.Append         #把日志条目追加到memoryTable中,此时已经写好wal了,所以断电也不怕丢失
                                                    #wal日志会被最新的快照点分为两部分,快照点之前的日志都是保存在磁盘上,
                                                    #快照点之后的所有日志,etcd还会把他加载到内存中
                  notifyc <- struct{}{}             #这个notifyc是上面toApply chan内部的一个chan,
                                                    #etcdserver.run在处理applyc chan过程中会等待这个notifyc
                                                    #是snap相关
                  raft.node.Advance                 #发消息通知道node.run可以进行下一步了,处理完本次ready后通知node往下走,
                                                    #主要是处理stepsOnApp中leader自己发给自己的的MsgAppResp消息
                    raft.node.advancec <- struct{}{} #node.run会监听这个advancec chan



         ------->5:etcdserver apply                #raftNode在步骤4中数据刷盘以后,就表示日志必定已经持久化了
                                                    #日志同步完了,所以就可以执行put操作了
                                                    #所以把新检测到的可以apply的数据丢给etcdserver
                                                    #etcdserver apply就是把任务丢到另一个goroute的fifo队列让他去慢慢apply
                                                    #一旦apply日志x,就唤醒日志x对应的请求所在goroute返回ok给client
                                          
          etcdserver.EtcdServer.run
            for
              select:
                case ap := <-s.r.apply():
                  f := func(context.Context) { s.applyAll(&ep, &ap) }
                                                    #把applyAll封装到一个job里面,然后丢到异步fifo队列去执行,
                                                    #注意,fifo是先进先出的意思,fifo队列会等到事务x-1完成才会开始事务x
                                                    #这样就保证了一定是按日志顺序apply的
                                                    #一个事务对应一个appliedIndex,所以代码里也是用revision来作为事务id。
                                                    #另一个方面,之所以开一个异步队列是因为applyc chan的容量只有1,
                                                    #如果顺序执行如果一直本次apply非常慢,那么就会导致无法接受新的apply请求,
                                                    #从而导致发送方也阻塞,然后整个系统都阻塞了

                  schedule.Schedule.schedule        #把这个job丢到一个fifo队列里,然后一个work线程不断从fifo队列取元素,
                                                    #这里略,这里直接就转到applyAll函数
                    etcdserver.EtcdServer.applyAll  #干两件事:1:把数据写入bbolt; 
                                                    #2:写reqid对应的chan从而通知etcdserver本次客户端请求已经完成了,可以返回
                                                    #!!!一路都是串行的,即直到把数据写入bbolt数据库才会返回,
                                                    #!!!即上一个事物还没结束,下一个事务必定不会开始

                      etcdserver.EtcdServer.applySnapshot   #应用snapshot。就是把相关变量重置为snapshot文件中指定的相关信息
                        if apply.snapshot.Metadata.Index <= ep.appliedi:   #如果leader发来的snapshot中的applyindex
                                                                           #比当前本机的applyIndex还要小,那么panic
                          panice  
        	            <-apply.notifyc                     #等待步骤4中raftNode持久化snapshot到磁盘后才会apply snapshot
                        storage.OpenSnapshotBackend         #打开指定目录下的xx.snap.db文件,这个是真的备份文件,
                                                            #etcdctl --endpoints xx snapshot save命令
                                                            #会从--endpoints指定的server上拉取数据并保存到本地
                                                            #当使用etcdctl snapshot restore的时候
                                                            #就会去读取指定的文件然后从该文件恢复数据库
                                                            #更多详情见snapshot那一章
                        mvcc.watchableStore.Restore         #读取指定文件(xx.snap.db),然后恢复数据,略

                      etcdserver.EtcdServer.applyEntries    #根据日志,把结果写入数据库
                        if firsti > ep.appliedi+1           #必须按日志顺序apply,
                                                            #如果x-1这条日志还没有applied,那么x就不能apply
                                                            #ep.applied表示leader上applied的最后一条日志的索引
                                                            #firsti表示本次准备apply的第一条日志的索引
                          panic
                        curRev=etcdserver.Etcdserver.apply  #应用数据,也就是把数据成功写入etcd,
                                                            #当写入一条数据后会更新当前集群的revision字段,
                                                            #apply这些操作是每个节点自己进行的,
                                                            #无需同步,因为日志已经同步好了,
                                                            #按顺序apply就可以保证不会出差错。
                                                            #revision字段是该节点上所有操作都共用的一个字段
                                                            #即下一个事务的revison就等于curRev+1
                          cindex.consistentIndex.SetConsistentApplyingIndex   
                                                            #设置底层一致性存储的applyingIndex,这一块还不懂,
                                                            #之所以还搞一个这个,看代码注释说是为了安全

                          case raftpb.EntryNormal:          #put数据对应的entry是EntryNormal,
                                                            #put完之后还要更新applied索引
                            etcdserver.EtcdServer.applyEntryNormal: 
                                                            #干两件事:apply一条日志;
                                                            #通过ch<-x通知其他goroute本次put操作已结束
                              apply.authApplierV3.Apply     #apply之前判断一下是否允许apply
                                applierV3backend.Apply
                                                            #根据req类型来调用对应的方法
                                                            #!!!raft层的作用只是同步一下日志,
                                                            #会把具体的请求内容封装在日志的data字段
                                                            #因为raft层只同步日志,完全不关心data字段
                                    case r.Put!=nil:        #put请求则调用对应的put方法,执行链 auth->quota->backend
                                                            #raft请求里面有多个指针比如put/delete/txn等,
                                                            #这些指针有且只有一个不为空
                                      apply.authApplierV3.Put(txn=nil)           #判断是否允许put
                                        store.authStore.IsPutPermitted           #判断该用户是否有权限put到磁盘
                                        apply_auth.authApplierV3.checkLeasePuts  #判断lease是否正确
                                        store.authStore.IsRangePermitted         #判断是否有权限RangePut

                                        apply.quotaApplierV3.Put
                                          storage.BackendQuota.Available   #检查磁盘是否还有足够空间写入数据,
                                                                           #backend可以简单理解为磁盘
                                          apply.applierV3backend.Put
                                              if txn=nil:                  #如果是非事务put,则创建事务put
                                                                           #也就是说不管事务还是非事务,底层都统一使用的事务put
                                                leases.lessor.Lookup       #如果key提前设置了租约,
                                                                           #则租约必须存在,如果没有,则忽略
                                                mvcc.watchableStore.Write  #创建事务put对象
                                                  return &watchableStoreTxnWrite{s.store.Write(trace), s}
                                                    kvstore_txt.store.Write
                                                      s.mu.RLock()         #???待补充
                	                                  tx := s.b.BatchTx()
                	                                  tx.LockInsideApply() #锁住txnbuffer,因为txnbuffere只能顺序写,不允许并发写
                                                                           #一堆又一堆的接口,太恶心了
                                                                           #总之记住txnbuffer里面有一个batchtxn,txnbuffer是对外的
                                                                           #txnbuffer的一切操作之中都会转到batchtxn
                                                                           #笔记中有些地方可能把batchTxnbuffer错记成了batchtx
                                                                           #也有可能把batchtx错记成了batchtxnbuffere
                	                                  tw := &storeTxnWrite{
                	                                    storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
                	                                    tx:           tx,
                	                                    beginRev:     s.currentRev,
                	                                    changes:      make([]mvccpb.KeyValue, 0, 4),
                	                                  }
        	                                          return newMetricsTxnWrite(tw)
                                              watchable_store.metricsTxnWrite.Put #mvcc 事务put,这是通用流程 
                                                                                  #!!!对应的包名是MVCC
                                                                                  #!!!就是说etcd使用的是mvcc
                                                kvstore_txn.storeTxnWrite.Put
                                                  kvstore_txn.storeTxnWrite.put
                                                    rev := tw.beginRev + 1     #本次put的主版本号就是beginRev+1
                                                	c := rev                   #c表示create_revision   
                                                    _, created, ver, err=index.treeIndex.Get  
                                                                               #查找key是否存在,如果存在则获取他的旧信息
                                                                               #即create_revision、version
                                                                               #不存在则跳过,即默认为NoLease
                                                    if err == nil:             #err=nil表示key已存在
                                                      c = created.main         #从中取出key的create_revision
                                                      oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
                                                                               #取出旧的lease信息
                                                    idxRev := revision{main: rev, sub: int64(len(tw.changes))}
                                                                               #主版本号+副版本号
                                                                               #每次put只对应一个主版本号,
                                                                               #但是一次put中每put一条一条数据,副版本号就会+1
                                                                               #因为副版本号=len(tw.changes)
                                                                               #他是put一条,就append一条到tw.changes这个数组
                                                                               #所以副版本号从0开始,每次递增
                                                                               #tw.changes包括其他信息即create_revision、
                                                                               #mode_revision、version、key、value、lease
                                                    ver = ver + 1              #当前key的版本号,即每个key都会有自己独立的版本号
                                                                               #ver默认为0,所以每个key的第一个版本号就是0+1=1
                                                    kv := mvccpb.KeyValue{     #要保存的值:
                                                      Key:            key,     #本条日志对应的key
                                                      Value:          value,   #value
                                                      CreateRevision: c,       #createRevision=旧的createRevision,即不变
                                                      ModRevision:    rev,     #ModRevision=本次put请求对应的主版本号
                                                      Version:        ver,     #version=key自己独立的版本号+1
                                                      Lease:          int64(leaseID),
                                                    }
                                                                           #总结一下版本号的概念
                                                                           #rev即revision
                                                                           #etcd有个版本概念的:revision(修订版本号)
                                                                           #revision是相对于整个集群的,
                                                                           #即每次修改操作都对应一个版本号,为上一次操作的版本号+1,
                                                                           #revision有两个字段,main和sub,main表示事务操作的主版本号
                                                                           #同一事务中发生变更的key共享同一main版本号,
                                                                           #sub表示同一事务中发生变更的次数,从0开始
                                                                           #这个变更指的是变更存储
                                                                           #etcd还有个version的概念,这是针对每个key的,互相独立的
                                                                           #举个例子: 
                                                                           #假设本次事务为put,且只有一次put,且一次put3条数据
                                                                           #([a,1],[b,1],[c,1]),且磁盘已经存储了a这个key即[a,-1]
                                                                           #且a这个key的version为kv
                                                                           #假设上一次操作后系统的revision版本号为rv
                                                                           #那么put a 1 时(main=rv+1,sub=0,kv=kv+1)
                                                                           #那么put b 2 时(main=rv+1,sub=1,kv=1)
                                                                           #那么put a 3 时(main=rv+1,sub=2,kv=1)
                                                                           #版本号虽然单调递增但是是64位,目前是不可能用完的
                                                                           #注意:别把日志索引(index)和数据版本(revision)混起来了

                                                    batchTxn.batchTxBuffered.UnsafeSeqPut   
                                                                           #前面准备好了数据就需要put了
                                                                           #需要put数据以及修改index
                                                                           #这里先put数据
                                                                           key=(revision),value=(key-value,someRevisionInfo)


                                                      batchTxn.batchTx.UnsafeSeqPut #执行这个的时候必须已经获取了锁
                                                        batchTxn.batchTx.unsafePut  
                                                                                    #把数据存入bbolt,如果是写事务,他可能要put多个key
                                                          bbolt.Tx.Bucket           #获取对应的桶
                                                          bbolt.Bucket.Put          #把数据存放到桶里
                                                          t.pending++               #bbolt是批量事务提交
                                                        batchTxn.txWriteBuffer.putSeq   
                                                                           #还没看懂,不过不是在这里写入,貌似和查询有关 20240430 19:20
                                                                           #20240505 15:31 嗨,各位,我看懂了,听我西索:
                                                                           #!!!!batchTxbuffered/writebuffer、readbuffer!!!!
                                                                           #backend.batchTx.unsafePut向数据库提交了一个写操作,
                                                                           #但是这个写操作并不是立即提交,而是先写到一个buff,会由其他线程提交
                                                                           #他还会把结果写到readbuffer,这样读的时候就可以直接从readbuff读
                                                                           #写readbuff肯定是要加锁的,所以为了避免put一个key就写一次readbuff
                                                                           #就弄了一个writebuff,这样就先放到writebuff里,最后一次性写入readbuff
                                                                           #他是在调用batchtx.Unlock释放锁的时候一次性把writebuff写入readbuff
                                                                           #写入以后,读事务就可以直接从readbuff读取了,就不用每次都区读数据库了

                                                    index.treeIndex.Put    #把(key,revision)写入内存里的treeIndex,
                                                                           #key=key,value=[]generation{revision},
                                                                           #保存了所有历史版本
                                                                           #这个treeindex是启动时重建的,etcd会把所有key都放到内存,
                                                                           #这限制了etcd支持的数据大小
                                                                           #generation是代的意思,一个key从创建到删除是一个generation
                                                                           #generation记录key从创建到删除中间所有变更的版本信息。
                                                                           #当key被删除后再次被操作,会创建新的generation来记录版本信息。
                                                    if oldLease != lease.NoLease: #处理lease: detach旧lease
                                                      tw.s.le.Detach 
                                                    if oldLease != lease.NoLease: #处理lease:attach新lease
                                                      tw.s.le.Attach 
                                                    watchable_store_txn.watchableStore.End  
                                                                            #当put完成后需要释放txnbuffer的锁
                                                                            #!!!此处没有commited,
                                                                            #!!!只是把操作丢到txnbuffer就默认成功,就直接返回了
                                                                            #会有另外一个线程定时commite
                                                   	  tw.s.mu.Lock()  
                                                      watchable_store_txn.watchableStore.notify #通知watcher,暂不清楚
                                                      watchable_store.metricsTxnWrite.End       #就是更新一些metrics
                                                        kvstore_txn.storeTxnWrite.End           #主要就是加锁和释放锁
                                                          if len(tw.changes) != 0 {
	                                                        // hold revMu lock to prevent new read txns from opening until writeback.
                                                            tw.s.revMu.Lock()       #changes不为null表示有更改
                                                                                    #加写锁保护currentRev
                                                                                    #主要是compaction相关
                                                            tw.s.currentRev++  
                                                          }
                                                          tw.tx.Unlock()            #释放batchTxn事务锁
                                                            batch_tx.batchTxBuffered.Unlock 
                                                                            #还没看懂,不过不是在这里写入,貌似和查询有关 20240430 19:20
                                                                            #20240505 15:31 嗨,各位,我看懂了,听我西索:
                                                                            #!!!!batchTxbuffered/writebuffer、readbuffer!!!!
                                                                            #backend.batchTx.unsafePut向数据库提交了一个写操作,
                                                                            #但是这个写操作并不是立即提交,而是先写到一个buff,会由其他线程提交
                                                                            #他还会把结果写到readbuffer,这样读的时候就可以直接从readbuff读
                                                                            #写readbuff肯定是要加锁的,所以为了避免put一个key就写一次readbuff
                                                                            #就弄了一个writebuff,这样就先放到writebuff里,最后一次性写入readbuff
                                                                            #他是在调用batchtx.Unlock释放锁的时候一次性把writebuff写入readbuff
                                                                            #写入以后,读事务就可以直接从readbuff读取了,就不用每次都区读数据库了
                                                              if t.pending != 0:
                                                                t.backend.readTx.Lock() #所有读请求都会对这个readTx加读锁,
                                                                                        #并且直到请求完成才会释放锁
                                                                                        #因为要修改readBuffer,
                                                                                        #所以会等到所有读请求完成并释放锁后才可以加写锁  
                                                                txWriteBuffer.writeback #把writebuf中的内容写到readbuff
                                                                t.backend.readTx.Unlock() #释放锁
                                                                if t.pending >= t.backend.batchLimit || #batchTxn就是批量事务
                                                                                                        #如果pending的事务数超过了阈值
                                                                                                        #那么就提交,默认是1w
                                                                  t.pendingDeleteOperations > 0         #或者挂起的delete操作数大于0
                                                                {
                                                                  t.commit(false)                #提交事务
                                                                    t.backend.readTx.Lock()      #要提交事务必须先阻止所有读
                                                                    batch_tx.batchTxBuffered.unsafeCommit #提交事务
                                                                      t.backend.hooks.OnPreCommitUnsafe(t) #执行hook
                                                                      if t.backend.readTx.tx != nil: #tx是bbolt.tx   
                                                                                                     #如果bbolt.readTx不为null
                                                                                                     #则需要等待这些读事务完成才能commit                       
                                                                        go func(tx *bolt.Tx, wg *sync.WaitGroup):
                                                                          wg.Wait()
                                                                          bbolt.tx.Rollback()
                                                                          t.backend.readTx.reset()
                                                                      batch_tx.batchTx.commit
                                                                		bbolt.tx.Commit()        #bbolt的事务提交sdk
                                                                        t.pending = 0
	                                                                  t.pendingDeleteOperations = 0
                                                                    t.backend.readTx.Unlock()    #提交完毕,释放锁
                                                                }
                                                                t.batchTx.Unlock()      #释放txnbuffer,由此可以观之,同一时刻,
                                                                                        #只会有一个事务修改batchTxnBuffered
                                                                                        #因为etcd是按日志顺序for循环逐条串行apply的,
                                                                                        #所以对于普通的put/delete,一般是串行的
                                                                                        #加锁我猜主要是为了和compact等竞争
                                                                                        #(snapshot和defrag不清楚)
                                                          if len(tw.changes) != 0 {
                                                            tw.s.revMu.Unlock()     #释放tw.s.revMu.Lock锁
                                                          }
                                                          tw.s.mu.RUnlock()

	                          id := raftReq.ID
                              wait.list.Trigger         #数据已经写入数据库了即apply成功,
                                                        #所以这里把applyInternalRaftRequest的结果写入reqid对应的chan,
                                                        #从而通知processInternalRaftRequestOnce已完成put操作
                                                        #processInternalRaftRequestOnce收到结果后就返回给用户了
                                idx=id%defalut_length
                                ch := w.e[idx].m[id]
                                ch <- x
                                close(ch)               #这里是关闭chan
                            etcdserver.EtcdServer.setAppliedIndex     #更新apply索引
                            etcdserver.EtcdServer.setTerm             #更新任期

                      wait_time.timeList.Trigger(appliedIndex)   #唤醒所有阻塞在appliedIndex之前的读请求。
                                                                 #etcd ReadIndex(线性读):
                                                                 #当读请求到来时会记录下此刻的commitedIndex值,
                                                                 #并保存到confirmIndex变量中,
                                                                 #只有当appliedIndex>=confirmIndex的时候读请求才会解除阻塞,
                                                                 #才会去数据库读数据,
                                                                 #这里已经apply到appliedIndex了,所以在此之前的读请求都可以被唤醒了

                      <- apply.notifyc                           #在上一步所有wal日志刷盘以后才会通知etcdserver进行快照

                      etcdserver.EtcdServer.triggerSnapshot  
                        if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount:
		                  return                                #如果当前最新的appliedIndex减去当前快照的最后一条日志的Index
                                                                #超过了阈值就触发快照,否则跳过,默认是1w条
                        etcdserver.EtcdServer.snapshot          #创建快照
                          clone := s.v2store.Clone()            #克隆当前store
                                                                #etcd是所有数据都存放在内存,所以直接内存中复制
                                                                #!!!v3.6即main分支会使用v3,不会执行clone  
                                                                #!!!而v3.5则还是使用v2存储,snapshot的时候会执行克隆
	                      s.KV().Commit()                       #强制提交以便kv持久化metadata
                          go func(){
                             clone.SaveNoCopy()
                               return json.Marshal              #返回json格式编码的store
                             raft.MemoryStorage.CreateSnapshot  #更新memtable里的snapmetadata
                             storage.storage.SaveSnap           #把集群状态相关信息(成员、term、index)写入一个新创建的xx.snap文件,
                                                                #同时在wal日志中写一条snapshot的日志
                                                                #之后由异步的purge线程在下一次启动的时候
                                                                #根据这个xx.snap文件中的配置信息去执行删除操作
                                                                #(比如把所有index<x的wal日志文件都删掉)
                                                                #同时如果节点重启,那么节点会把applideIndex设置为最新的快照中的index,
                                                                #然后从此处开始重新apply日志
                             storage.storage.Release            #清理member/snap目录下过时的以.snap.db结尾的文件
                               wal.Wal.ReleaseLockTo            #释放过时的wal文件的锁,否则这些文件会被etcd锁住,
                                                                #purgeFile线程无法打开
                               snap.Snapshotter.ReleaseSnapDBs  #删除掉member/snap目录下以.snap.db结尾的文件
                                                                #因为我们已经在applyEntries之前applySnapShot过了,所以可以删除
                             s.r.raftStorage.Compact(compacti)
 

         ------->6:node.run处理recvc                         #处理follower节点发过来的消息,
                                                             #put流程则对应MsgAppResp即leader发MsgApp给follwer,
                                                             #follower然后返回MsgAppResp
                                                             #由发送线程异步发送
                                                             #当发送线程收到响应的时候就会把消息填充到n.recvc chan
                                                             #步骤4中处理完后会写advance这个chan来通知node.run前进
          raft.node.run
            for{
              select
                case m := <-n.recvc:  
			      if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type): #这个if会过滤掉来源未知的响应消息
                    raft.raft.Step
                      raft.stepLeader
                        select
                          case pb.MsgAppResp:               #处理日志append成功的消息,主要操作是修改leader的commitedIndex
                            if tracker.Progress.MaybeUpdate #判断的同时更新消息来源节点对应的progress对象的Match和Next字段
                                                            #m.index表示该条MsgAppResp消息中日志的索引
                                                            #MsgAppResp表示成功追加了一条日志,而raft不允许日志空洞,
                                                            #因此收到一条日志索引字段为10的MsgAppResp消息就表明10之前的所有日志,
                                                            #该节点上都有,所以直接把令pr.Match=m.index,pr.Next=m.index+1
                              raft.raft.maybeCommit #判断是否可以进行一次提交操作,即是否有过半的节点已经成功追加日志
                                                    #节点对应的progress对象的Match字段用来表示该节点上日志已经复制到哪个位置了
                                                    #(关闭allowUnstable时match<unstable.offset)
                                                    #收到一个MsgAppResp消息的时候
                                                    #我们在MaybeUpdate里会更新消息来源节点的progress对象的Match字段
                                                    #就是说leader有一个progerss数组,
                                                    #然后每收到一个MsgAppResp消息就通过MaybeUpdate更新一个pregress对象的Match
                                                    #更新完之后就调用maybeCommite判断一次,
                                                    #判断是否有过半节点的progress对象的Match字段已经大于等于leader的commitedIndex
                                                    #就是先对所有match字段升序排序,
                                                    #然后取n - (n/2 + 1)这个位置的match,
                                                    #如果大于leader的commitedIndex说明已经有过半节点成功追加日志
                                                    #即[commitedIndex,Matchs[n-(n/2+1)]这个区间内的日志是新增的
                                                    #并且是可以commited的日志。etcd他是按顺序把日志写入wal文件,
                                                    #wal文件中日志的状态是不知道的,所以etcd是通过一些索引变量来判断该日志是什么状态的
                                                    #这些索引变量就是applied/commited/unstable,
                                                    #如果一条日志的索引即index>commited就说明该日志还没有commited
                                                    #因为wal日志已经持久化了,就算崩溃了也没事,
                                                    #因为wal日志中每条记录都包含了(term/index/type/data四个字段)
                                                    #集群启动的时候,applied/unstable这些变量的值都是可以自动生成
                                                    #commited会作为硬状态的一部分写入磁盘

                                tracker.ProgressTracker.Committed   #获取排序后n-(n/2+1)这个位置的progrees对象的match字段
                                raft.raftlog.maybeCommit            #比较一下中间那个match是否大于当前leader的commitedIndex,
                                                                    #如果大于就说明有日志可以提交
                                  raft.raftlog.commitTo             #把leader节点的commiteIndex更新到n-(n/2+1)这个位置对应的match值
                              raft.releasePendingReadIndexMessages  #释放之前挂起的一致性读请求
                              raft.raft.bcastAppend #这次会发一条不带任何日志的MsgApp消息给所有其他节点通知他们已经提交到这里了
                                                    #bcastAppend发过去的MsgApp消息两部分:待append的日志和peer节点新状态
                                                    #因为代码是通过MsgAppResp消息来激活检测,
                                                    #而通过maybeCommite中检测时是直接检测所有集群的日志提交情况
                                                    #并不是检测日志是否更新到了MsgAppResp中的日志所对应的index,
                                                    #即收到MsgAppResp只是告知程序需要检测一下集群提交状态
                                                    #如果有新日志可以提交,就通知所有节点更新commitedIndex
                                                    #比如三个节点的集群(leader,follwer1,follwer2),
                                                    #commitedindex分别为(x,x,x),然后经过一段时间运行
                                                    #三个节点的日志分别已经追加到了(x+3,x+1,x+2),
                                                    #此时收到了一个迟来的MsgAppResp,这个MsgAppResp中的消息对应的日志索引为x-1,
                                                    #然后这个消息就会激活检测,
                                                    #leader检测到此时x+2及x+2以前的日志都是可以提交的
                                                    #(n-(n/2+1)=1,排序后match[1]=x+2
                                                    #leader就会通知他们把commitedIndex更新到x+2,
                                                    #更新完以后commitedIndex分别是是(x+2,x+1,x+2)
                                                    #因为过半就行,所以leader和follower2的commitedIndex都可以更新到x+2,
                                                    #而follower1的日志最多只能到x+1,所以只更新到x+1
                                                    #这种情况就是follower1滞后了,没关系,后面赶上就行了,
                                                    #目前集群还是有过半节点正常的,可以正常运行
                 


         ------->7:node.run处理advancec            #步骤4中会写advancec,通知node前进
          raft.node.run
            rawNode.RawNode.Advance
              raft.raft.advance
              log.raftlog.appliedTo                #更新applyindex
              raftLog.stableTo                     #shrink unstable数组,也就是已经apply的数据可以丢弃从而减少unstable数组大小
                                                   #步骤5中会

网站公告

今日签到

点亮在社区的每一天
去签到