mit6.824 2024spring Lab3A Raft

发布于:2025-09-01 ⋅ 阅读:(20) ⋅ 点赞:(0)

基本状态

const (
	Follower = iota
	Candidate
	Leader
)

节点的字段

type Raft struct {
    mu        sync.Mutex          // 保护共享状态的锁
    peers     []*labrpc.ClientEnd // 所有节点的RPC端点
    persister *Persister          // 持久化存储
    me        int                 // 本节点索引
    
    // Raft状态
    state       int        // Follower/Candidate/Leader
    currentTerm int        // 当前任期
    votedFor    int        // 当前任期投给谁
    log         []Entry    // 日志条目
    
    // 选举相关
    voteCount int        // 当前获得的票数
    timeStamp time.Time  // 上次收到消息的时间
    
    // Leader专用
    nextIndex  []int // 每个follower的下一个日志索引
    matchIndex []int // 每个follower已复制的最高日志索引
    
    // 提交和应用
    commitIndex int // 已提交的最高日志索引
    lastApplied int // 已应用的最高日志索引
}

状态转换:

  • Follower → Candidate:选举超时
  • Candidate → Leader:获得多数票
  • Candidate → Follower:发现更高任期
  • Leader → Follower:发现更高任期

几个核心方法

Elect 选举过程

func (rf *Raft) Elect() {
    rf.mu.Lock()
    // 自增term,成为候选人,给自己投票
    rf.currentTerm += 1
    rf.state = Candidate
    rf.votedFor = rf.me
    rf.voteCount = 1
    rf.timeStamp = time.Now() // 重置超时计时器
    
    args := &RequestVoteArgs{
        Term:         rf.currentTerm,
        CandidateId:  rf.me,
        LastLogIndex: len(rf.log) - 1,
        LastLogTerm:  rf.log[len(rf.log)-1].Term,
    }
    rf.mu.Unlock()
    
    // 向所有其他节点请求投票
    for i := 0; i < len(rf.peers); i++ {
        if i == rf.me { continue }
        go rf.collectVote(i, args) // 并发收集投票
    }
}

收集选票

collectVote

func (rf *Raft) collectVote(serverTo int, args *RequestVoteArgs) {
    voteAnswer := rf.GetVoteAnswer(serverTo, args)
    if !voteAnswer { return }
    
    rf.muVote.Lock()
    defer rf.muVote.Unlock()
    
    // 检查是否已获得多数票
    if rf.voteCount > len(rf.peers)/2 {
        return // 已经是Leader或已被否决
    }
    
    rf.voteCount += 1
    if rf.voteCount > len(rf.peers)/2 {
        rf.mu.Lock()
        if rf.state == Follower {
            // 已被其他协程转为Follower
            rf.mu.Unlock()
            return
        }
        rf.state = Leader // 成为Leader
        rf.mu.Unlock()
        go rf.SendHeartBeats() // 开始发送心跳
    }
}

GetVoteAnswer

func (rf *Raft) GetVoteAnswer(server int, args *RequestVoteArgs) bool {
	sendArgs := *args
	reply := RequestVoteReply{}
	ok := rf.sendRequestVote(server, &sendArgs, &reply) // RPC调用

	if !ok {
		return false
	}

	rf.mu.Lock()
	defer rf.mu.Unlock()

	// 检查发送请求时的任期是否与当前任期一致。如果不一致,说明在此期间节点已更新到更高任期,此响应已过时。
	if sendArgs.Term != rf.currentTerm {
		return false
	}

	// 处理更高任期的响应
	if reply.Term > rf.currentTerm {
		rf.currentTerm = reply.Term // 更新到更高任期
		rf.votedFor = -1            // 重置投票记录
		rf.state = Follower         // 退化为跟随者
	}

	return reply.VoteGranted
}

SendHeartBeats 心跳

func (rf *Raft) SendHeartBeats() {
    for !rf.killed() {
        rf.mu.Lock()
        if rf.state != Leader {
            rf.mu.Unlock()
            return // 不再是Leader,停止发送
        }
        
        args := &AppendEntriesArgs{
            Term:         rf.currentTerm,
            LeaderId:     rf.me,
            PrevLogIndex: 0,    // 简化实现
            PrevLogTerm:  0,    // 简化实现
            Entries:      nil,  // 空 entries 表示心跳
            LeaderCommit: rf.commitIndex,
        }
        rf.mu.Unlock()
        
        // 向所有follower发送心跳
        for i := 0; i < len(rf.peers); i++ {
            if i == rf.me { continue }
            go rf.handleHeartBeat(i, args)
        }
        
        time.Sleep(time.Duration(HeartBeatTimeOut) * time.Millisecond)
    }
}

RequestVote RPC 处理投票请求

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    
    // 拒绝旧任期的请求
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        reply.VoteGranted = false
        return
    }
    
    // 发现更高任期,转为Follower
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.votedFor = -1
        rf.state = Follower
    }
    
    // 检查日志是否至少一样新
    lastLogIndex := len(rf.log) - 1
    lastLogTerm := rf.log[lastLogIndex].Term
    logOk := (args.LastLogTerm > lastLogTerm) ||
             (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)
    
    // 检查是否可以投票
    voteOk := (rf.votedFor == -1 || rf.votedFor == args.CandidateId)
    
    if voteOk && logOk {
        rf.votedFor = args.CandidateId
        rf.timeStamp = time.Now() // 重置超时计时器
        reply.VoteGranted = true
    } else {
        reply.VoteGranted = false
    }
    
    reply.Term = rf.currentTerm
}

AppendEntries RPC 处理心跳

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    
    // 拒绝旧任期的请求
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        reply.Success = false
        return
    }
    
    // 发现更高任期,转为Follower
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.votedFor = -1
        rf.state = Follower
    }
    
    // 重置超时计时器
    rf.timeStamp = time.Now()
    
    // 检查日志一致性(简化实现)
    if args.Entries != nil && // 如果是日志条目而非心跳
        (args.PrevLogIndex >= len(rf.log) || 
         rf.log[args.PrevLogIndex].Term != args.PrevLogTerm) {
        reply.Success = false
        return
    }
    
    // 更新提交索引
    if args.LeaderCommit > rf.commitIndex {
        rf.commitIndex = min(args.LeaderCommit, len(rf.log)-1)
    }
    
    reply.Success = true
    reply.Term = rf.currentTerm
}

tricker 超时检测

func (rf *Raft) ticker() {
    rd := rand.New(rand.NewSource(int64(rf.me)))
    
    for !rf.killed() {
        // 随机选举超时时间(防止多个节点同时发起选举)
        rdTimeOut := ElectTimeOutBase + rd.Intn(300)
        
        rf.mu.Lock()
        // 检查是否超时且不是Leader
        if rf.state != Leader && 
           time.Since(rf.timeStamp) >= time.Duration(rdTimeOut)*time.Millisecond {
            go rf.Elect() // 发起选举
        }
        rf.mu.Unlock()
        
        time.Sleep(ElectTimeOutCheckInterval)
    }
}