applyConfChange 方法是处理 ConfChange
配置变更的核心逻辑。它被用来将通过 Raft 已经提交的配置变更应用到 EtcdServer
。配置变更通常包括添加、删除、更新节点等操作。这个方法涉及到多个关键操作,包括配置验证、成员管理、以及 Raft 集群的更新等。
目录
一、方法完整源码
二、方法详细解析
1. 初始化日志记录
lg := s.Logger()
- 功能:初始化日志记录器,用于后续记录操作日志。
- 作用:帮助开发者追踪配置变更的处理过程。
2. 配置变更验证
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
lg.Error("Validation on configuration change failed", zap.Bool("shouldApplyV3", bool(shouldApplyV3)), zap.Error(err))
cc.NodeID = raft.None
s.r.ApplyConfChange(cc)
if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
}
return false, err
}
- 功能:对配置变更进行验证,确保配置合法。
- 作用:
- 如果验证失败,记录错误日志,并将配置变更标记为无效(
cc.NodeID = raft.None
)。 - 调用
s.r.ApplyConfChange(cc)
将配置变更应用到 Raft 存储中。 - 如果需要应用 V3 存储,还会更新一致性索引。
- 返回
false
和错误,表示配置变更未成功应用。
- 如果验证失败,记录错误日志,并将配置变更标记为无效(
3. 更新配置状态
*confState = *s.r.ApplyConfChange(cc)
s.beHooks.SetConfState(confState)
- 功能:将配置变更应用到 Raft 存储,并更新集群配置状态。
- 作用:
- 调用
ApplyConfChange
方法将配置变更应用到 Raft 存储中,返回新的配置状态。 - 更新
confState
,并通过s.beHooks.SetConfState
设置当前配置状态。
- 调用
4. 处理不同类型的配置变更
根据配置变更的类型(ConfChangeAddNode
、ConfChangeRemoveNode
、ConfChangeUpdateNode
),分别进行不同的处理。
4.1 添加节点 (ConfChangeAddNode
和 ConfChangeAddLearnerNode
)
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
confChangeContext := new(membership.ConfigChangeContext)
if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
lg.Panic("failed to unmarshal member", zap.Error(err))
}
if cc.NodeID != uint64(confChangeContext.Member.ID) {
lg.Panic(
"got different member ID",
zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
zap.String("member-id-from-message", confChangeContext.Member.ID.String()),
)
}
if confChangeContext.IsPromote {
s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3)
} else {
s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3)
if confChangeContext.Member.ID != s.id {
s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
}
}
if confChangeContext.Member.ID == s.id {
if cc.Type == raftpb.ConfChangeAddLearnerNode {
isLearner.Set(1)
} else {
isLearner.Set(0)
}
}
- 功能:
- 处理
ConfChangeAddNode
和ConfChangeAddLearnerNode
类型的配置变更。 - 首先,反序列化配置变更的上下文
confChangeContext
。 - 检查配置变更中的节点 ID 是否与消息中的节点 ID 匹配。
- 如果是提升节点为领导者,调用
PromoteMember
;否则,调用AddMember
添加成员。 - 如果新增的成员不是当前节点,则通过
s.r.transport.AddPeer
将其加入 Raft 集群。 - 更新当前节点是否为学习者(
Learner
)节点。
- 处理
4.2 删除节点 (ConfChangeRemoveNode
)
case raftpb.ConfChangeRemoveNode:
id := types.ID(cc.NodeID)
s.cluster.RemoveMember(id, shouldApplyV3)
if id == s.id {
return true, nil
}
s.r.transport.RemovePeer(id)
- 功能:
- 处理
ConfChangeRemoveNode
类型的配置变更,删除指定节点。 - 如果删除的节点是当前节点(即自己),返回
true
表示停止。 - 否则,通过
s.r.transport.RemovePeer
将节点从 Raft 集群中移除。
- 处理
4.3 更新节点 (ConfChangeUpdateNode
)
case raftpb.ConfChangeUpdateNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
lg.Panic("failed to unmarshal member", zap.Error(err))
}
if cc.NodeID != uint64(m.ID) {
lg.Panic(
"got different member ID",
zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
zap.String("member-id-from-message", m.ID.String()),
)
}
s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
if m.ID != s.id {
s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
}
- 功能:
- 处理
ConfChangeUpdateNode
类型的配置变更,更新节点信息。 - 反序列化配置变更中的节点信息,检查节点 ID 是否匹配。
- 更新节点的 Raft 属性。
- 如果更新的节点不是当前节点,则更新节点的 peer 信息。
- 处理
5. 返回值
return false, nil
- 功能:如果配置变更已成功应用,返回
false
和nil
,表示无需停止服务器。 - 作用:告知调用者配置变更已成功应用。
三、核心代码:
配置变更验证和处理
if err := s.cluster.ValidateConfigurationChange(cc); err != nil { // handle error return false, err }
配置变更应用
*confState = *s.r.ApplyConfChange(cc) s.beHooks.SetConfState(confState)
节点添加/删除/更新处理
// Add node if confChangeContext.IsPromote { ... } // Remove node s.cluster.RemoveMember(id, shouldApplyV3) // Update node s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
四、总结
applyConfChange
方法通过验证配置变更,应用到集群中,并根据配置变更的类型(添加、删除、更新节点)分别执行相应的操作。它确保了节点的正确添加、删除和更新,同时保证了 Raft 集群的一致性和稳定性。如果配置变更处理失败,还会通过日志记录详细的错误信息。