【ETCD】【源码阅读】深入解析 EtcdServer.applyConfChange 方法

发布于:2024-12-19 ⋅ 阅读:(15) ⋅ 点赞:(0)

applyConfChange 方法是处理 ConfChange 配置变更的核心逻辑。它被用来将通过 Raft 已经提交的配置变更应用到 EtcdServer。配置变更通常包括添加、删除、更新节点等操作。这个方法涉及到多个关键操作,包括配置验证、成员管理、以及 Raft 集群的更新等。

一、方法完整源码

二、方法详细解析

1. 初始化日志记录
lg := s.Logger()
  • 功能:初始化日志记录器,用于后续记录操作日志。
  • 作用:帮助开发者追踪配置变更的处理过程。
2. 配置变更验证
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
    lg.Error("Validation on configuration change failed", zap.Bool("shouldApplyV3", bool(shouldApplyV3)), zap.Error(err))
    cc.NodeID = raft.None
    s.r.ApplyConfChange(cc)

    if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
        applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
        s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
    }
    return false, err
}
  • 功能:对配置变更进行验证,确保配置合法。
  • 作用
    • 如果验证失败,记录错误日志,并将配置变更标记为无效(cc.NodeID = raft.None)。
    • 调用 s.r.ApplyConfChange(cc) 将配置变更应用到 Raft 存储中。
    • 如果需要应用 V3 存储,还会更新一致性索引。
    • 返回 false 和错误,表示配置变更未成功应用。
3. 更新配置状态
*confState = *s.r.ApplyConfChange(cc)
s.beHooks.SetConfState(confState)
  • 功能:将配置变更应用到 Raft 存储,并更新集群配置状态。
  • 作用
    • 调用 ApplyConfChange 方法将配置变更应用到 Raft 存储中,返回新的配置状态。
    • 更新 confState,并通过 s.beHooks.SetConfState 设置当前配置状态。
4. 处理不同类型的配置变更

根据配置变更的类型(ConfChangeAddNodeConfChangeRemoveNodeConfChangeUpdateNode),分别进行不同的处理。

4.1 添加节点 (ConfChangeAddNodeConfChangeAddLearnerNode)
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
    confChangeContext := new(membership.ConfigChangeContext)
    if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
        lg.Panic("failed to unmarshal member", zap.Error(err))
    }
    if cc.NodeID != uint64(confChangeContext.Member.ID) {
        lg.Panic(
            "got different member ID",
            zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
            zap.String("member-id-from-message", confChangeContext.Member.ID.String()),
        )
    }
    if confChangeContext.IsPromote {
        s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3)
    } else {
        s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3)

        if confChangeContext.Member.ID != s.id {
            s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
        }
    }

    if confChangeContext.Member.ID == s.id {
        if cc.Type == raftpb.ConfChangeAddLearnerNode {
            isLearner.Set(1)
        } else {
            isLearner.Set(0)
        }
    }
  • 功能
    • 处理 ConfChangeAddNodeConfChangeAddLearnerNode 类型的配置变更。
    • 首先,反序列化配置变更的上下文 confChangeContext
    • 检查配置变更中的节点 ID 是否与消息中的节点 ID 匹配。
    • 如果是提升节点为领导者,调用 PromoteMember;否则,调用 AddMember 添加成员。
    • 如果新增的成员不是当前节点,则通过 s.r.transport.AddPeer 将其加入 Raft 集群。
    • 更新当前节点是否为学习者(Learner)节点。
4.2 删除节点 (ConfChangeRemoveNode)
case raftpb.ConfChangeRemoveNode:
    id := types.ID(cc.NodeID)
    s.cluster.RemoveMember(id, shouldApplyV3)
    if id == s.id {
        return true, nil
    }
    s.r.transport.RemovePeer(id)
  • 功能
    • 处理 ConfChangeRemoveNode 类型的配置变更,删除指定节点。
    • 如果删除的节点是当前节点(即自己),返回 true 表示停止。
    • 否则,通过 s.r.transport.RemovePeer 将节点从 Raft 集群中移除。
4.3 更新节点 (ConfChangeUpdateNode)
case raftpb.ConfChangeUpdateNode:
    m := new(membership.Member)
    if err := json.Unmarshal(cc.Context, m); err != nil {
        lg.Panic("failed to unmarshal member", zap.Error(err))
    }
    if cc.NodeID != uint64(m.ID) {
        lg.Panic(
            "got different member ID",
            zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
            zap.String("member-id-from-message", m.ID.String()),
        )
    }
    s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
    if m.ID != s.id {
        s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
    }
  • 功能
    • 处理 ConfChangeUpdateNode 类型的配置变更,更新节点信息。
    • 反序列化配置变更中的节点信息,检查节点 ID 是否匹配。
    • 更新节点的 Raft 属性。
    • 如果更新的节点不是当前节点,则更新节点的 peer 信息。
5. 返回值
return false, nil
  • 功能:如果配置变更已成功应用,返回 falsenil,表示无需停止服务器。
  • 作用:告知调用者配置变更已成功应用。

三、核心代码:

  1. 配置变更验证和处理

    if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
        // handle error
        return false, err
    }
    
  2. 配置变更应用

    *confState = *s.r.ApplyConfChange(cc)
    s.beHooks.SetConfState(confState)
    
  3. 节点添加/删除/更新处理

    // Add node
    if confChangeContext.IsPromote { ... }
    // Remove node
    s.cluster.RemoveMember(id, shouldApplyV3)
    // Update node
    s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
    

四、总结

applyConfChange 方法通过验证配置变更,应用到集群中,并根据配置变更的类型(添加、删除、更新节点)分别执行相应的操作。它确保了节点的正确添加、删除和更新,同时保证了 Raft 集群的一致性和稳定性。如果配置变更处理失败,还会通过日志记录详细的错误信息。