基本状态
const (
Follower = iota
Candidate
Leader
)
节点的字段
type Raft struct {
mu sync.Mutex // 保护共享状态的锁
peers []*labrpc.ClientEnd // 所有节点的RPC端点
persister *Persister // 持久化存储
me int // 本节点索引
// Raft状态
state int // Follower/Candidate/Leader
currentTerm int // 当前任期
votedFor int // 当前任期投给谁
log []Entry // 日志条目
// 选举相关
voteCount int // 当前获得的票数
timeStamp time.Time // 上次收到消息的时间
// Leader专用
nextIndex []int // 每个follower的下一个日志索引
matchIndex []int // 每个follower已复制的最高日志索引
// 提交和应用
commitIndex int // 已提交的最高日志索引
lastApplied int // 已应用的最高日志索引
}
状态转换:
- Follower → Candidate:选举超时
- Candidate → Leader:获得多数票
- Candidate → Follower:发现更高任期
- Leader → Follower:发现更高任期
几个核心方法
Elect 选举过程
func (rf *Raft) Elect() {
rf.mu.Lock()
// 自增term,成为候选人,给自己投票
rf.currentTerm += 1
rf.state = Candidate
rf.votedFor = rf.me
rf.voteCount = 1
rf.timeStamp = time.Now() // 重置超时计时器
args := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: len(rf.log) - 1,
LastLogTerm: rf.log[len(rf.log)-1].Term,
}
rf.mu.Unlock()
// 向所有其他节点请求投票
for i := 0; i < len(rf.peers); i++ {
if i == rf.me { continue }
go rf.collectVote(i, args) // 并发收集投票
}
}
收集选票
collectVote
func (rf *Raft) collectVote(serverTo int, args *RequestVoteArgs) {
voteAnswer := rf.GetVoteAnswer(serverTo, args)
if !voteAnswer { return }
rf.muVote.Lock()
defer rf.muVote.Unlock()
// 检查是否已获得多数票
if rf.voteCount > len(rf.peers)/2 {
return // 已经是Leader或已被否决
}
rf.voteCount += 1
if rf.voteCount > len(rf.peers)/2 {
rf.mu.Lock()
if rf.state == Follower {
// 已被其他协程转为Follower
rf.mu.Unlock()
return
}
rf.state = Leader // 成为Leader
rf.mu.Unlock()
go rf.SendHeartBeats() // 开始发送心跳
}
}
GetVoteAnswer
func (rf *Raft) GetVoteAnswer(server int, args *RequestVoteArgs) bool {
sendArgs := *args
reply := RequestVoteReply{}
ok := rf.sendRequestVote(server, &sendArgs, &reply) // RPC调用
if !ok {
return false
}
rf.mu.Lock()
defer rf.mu.Unlock()
// 检查发送请求时的任期是否与当前任期一致。如果不一致,说明在此期间节点已更新到更高任期,此响应已过时。
if sendArgs.Term != rf.currentTerm {
return false
}
// 处理更高任期的响应
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term // 更新到更高任期
rf.votedFor = -1 // 重置投票记录
rf.state = Follower // 退化为跟随者
}
return reply.VoteGranted
}
SendHeartBeats 心跳
func (rf *Raft) SendHeartBeats() {
for !rf.killed() {
rf.mu.Lock()
if rf.state != Leader {
rf.mu.Unlock()
return // 不再是Leader,停止发送
}
args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: 0, // 简化实现
PrevLogTerm: 0, // 简化实现
Entries: nil, // 空 entries 表示心跳
LeaderCommit: rf.commitIndex,
}
rf.mu.Unlock()
// 向所有follower发送心跳
for i := 0; i < len(rf.peers); i++ {
if i == rf.me { continue }
go rf.handleHeartBeat(i, args)
}
time.Sleep(time.Duration(HeartBeatTimeOut) * time.Millisecond)
}
}
RequestVote RPC 处理投票请求
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
// 拒绝旧任期的请求
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}
// 发现更高任期,转为Follower
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.state = Follower
}
// 检查日志是否至少一样新
lastLogIndex := len(rf.log) - 1
lastLogTerm := rf.log[lastLogIndex].Term
logOk := (args.LastLogTerm > lastLogTerm) ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)
// 检查是否可以投票
voteOk := (rf.votedFor == -1 || rf.votedFor == args.CandidateId)
if voteOk && logOk {
rf.votedFor = args.CandidateId
rf.timeStamp = time.Now() // 重置超时计时器
reply.VoteGranted = true
} else {
reply.VoteGranted = false
}
reply.Term = rf.currentTerm
}
AppendEntries RPC 处理心跳
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
// 拒绝旧任期的请求
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}
// 发现更高任期,转为Follower
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.state = Follower
}
// 重置超时计时器
rf.timeStamp = time.Now()
// 检查日志一致性(简化实现)
if args.Entries != nil && // 如果是日志条目而非心跳
(args.PrevLogIndex >= len(rf.log) ||
rf.log[args.PrevLogIndex].Term != args.PrevLogTerm) {
reply.Success = false
return
}
// 更新提交索引
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit, len(rf.log)-1)
}
reply.Success = true
reply.Term = rf.currentTerm
}
tricker 超时检测
func (rf *Raft) ticker() {
rd := rand.New(rand.NewSource(int64(rf.me)))
for !rf.killed() {
// 随机选举超时时间(防止多个节点同时发起选举)
rdTimeOut := ElectTimeOutBase + rd.Intn(300)
rf.mu.Lock()
// 检查是否超时且不是Leader
if rf.state != Leader &&
time.Since(rf.timeStamp) >= time.Duration(rdTimeOut)*time.Millisecond {
go rf.Elect() // 发起选举
}
rf.mu.Unlock()
time.Sleep(ElectTimeOutCheckInterval)
}
}