今天我们来一步步分析ETCD中applySnapshot函数
一、函数完整代码
函数的完整代码如下:
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
if raft.IsEmptySnap(apply.snapshot) {
return
}
applySnapshotInProgress.Inc()
lg := s.Logger()
lg.Info(
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
defer func() {
lg.Info(
"applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
applySnapshotInProgress.Dec()
}()
if apply.snapshot.Metadata.Index <= ep.appliedi {
lg.Panic(
"unexpected leader snapshot from outdated index",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
}
// wait for raftNode to persist snapshot onto the disk
<-apply.notifyc
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
if err != nil {
lg.Panic("failed to open snapshot backend", zap.Error(err))
}
// We need to set the backend to consistIndex before recovering the lessor,
// because lessor.Recover will commit the boltDB transaction, accordingly it
// will get the old consistent_index persisted into the db in OnPreCommitUnsafe.
// Eventually the new consistent_index value coming from snapshot is overwritten
// by the old value.
s.consistIndex.SetBackend(newbe)
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
if s.lessor != nil {
lg.Info("restoring lease store")
s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })
lg.Info("restored lease store")
}
lg.Info("restoring mvcc store")
if err := s.kv.Restore(newbe); err != nil {
lg.Panic("failed to restore mvcc store", zap.Error(err))
}
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
// Closing old backend might block until all the txns
// on the backend are finished.
// We do not want to wait on closing the old backend.
s.bemu.Lock()
oldbe := s.be
go func() {
lg.Info("closing old backend file")
defer func() {
lg.Info("closed old backend file")
}()
if err := oldbe.Close(); err != nil {
lg.Panic("failed to close old backend", zap.Error(err))
}
}()
s.be = newbe
s.bemu.Unlock()
lg.Info("restoring alarm store")
if err := s.restoreAlarms(); err != nil {
lg.Panic("failed to restore alarm store", zap.Error(err))
}
lg.Info("restored alarm store")
if s.authStore != nil {
lg.Info("restoring auth store")
s.authStore.Recover(newbe)
lg.Info("restored auth store")
}
lg.Info("restoring v2 store")
if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {
lg.Panic("failed to restore v2 store", zap.Error(err))
}
if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {
lg.Panic("illegal v2store content", zap.Error(err))
}
lg.Info("restored v2 store")
s.cluster.SetBackend(newbe)
lg.Info("restoring cluster configuration")
s.cluster.Recover(api.UpdateCapability)
lg.Info("restored cluster configuration")
lg.Info("removing old peers from network")
// recover raft transport
s.r.transport.RemoveAllPeers()
lg.Info("removed old peers from network")
lg.Info("adding peers from new cluster configuration")
for _, m := range s.cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
lg.Info("added peers from new cluster configuration")
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
}
二、函数功能概览
上述函数的核心功能是 应用一个来自 Raft 协议的快照,并在应用过程中恢复系统的各个关键组件,以确保系统的状态与最新的快照一致。具体来说,函数完成了以下核心任务:
检查快照有效性:判断快照是否为空或过时,如果无效则提前退出。
记录快照应用的开始和结束:通过日志记录快照应用的开始和结束,同时更新相关的监控指标。
等待并加载快照数据:等待 Raft 节点将快照持久化到磁盘,并打开快照后端。
恢复一致性索引:将新的快照后端设置为一致性索引的后端,确保一致性。
恢复存储组件:
- 租约存储(lease store):恢复与租约相关的数据。
- MVCC 存储:恢复多版本并发控制(MVCC)存储。
- 报警存储:恢复报警数据。
- 认证存储:恢复认证相关数据(如果存在)。
- V2 存储:恢复 V2 存储,并进行合法性检查。
恢复集群配置:更新集群配置,并确保集群的一致性。
更新 Raft 网络成员:移除旧的集群成员并添加新的集群成员到网络中。
更新应用进度:更新快照的任期、索引等应用进度信息。
三、函数详细分析
好的,接下来我将逐步解析这段代码,并用中文进行解释。
1. 检查快照是否为空
if raft.IsEmptySnap(apply.snapshot) {
return
}
- 这段代码判断传入的快照是否为空。如果快照为空(即没有数据需要恢复),则直接返回,结束该函数的执行。
2. 增加快照应用中的度量
applySnapshotInProgress.Inc()
- 这行代码会将
applySnapshotInProgress
计数器增加 1,表示当前有一个快照正在被应用。这个计数器通常用于监控系统中,帮助跟踪正在进行的操作。
3. 日志记录快照应用的开始
lg := s.Logger()
lg.Info(
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
- 获取日志记录器
lg
,并记录一条信息级别的日志,说明快照正在被应用。 - 日志中包括当前的快照索引、已应用的索引以及来自领导者的快照的索引和任期(
term
)。
4. 使用 defer
确保快照应用完成后记录日志
defer func() {
lg.Info(
"applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
applySnapshotInProgress.Dec()
}()
- 使用
defer
语句确保在函数结束时,记录一条日志表示快照应用已经完成。 - 同时,减少
applySnapshotInProgress
计数器,表示快照应用过程结束。
5. 检查快照的索引是否过时
if apply.snapshot.Metadata.Index <= ep.appliedi {
lg.Panic(
"unexpected leader snapshot from outdated index",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
}
- 这里会检查传入的快照索引是否小于等于已应用的索引
ep.appliedi
。如果是,说明接收到的快照来自一个过时的领导者,这会导致系统崩溃(通过lg.Panic
打印错误日志并触发 panic)。
6. 等待 Raft 节点将快照持久化到磁盘
<-apply.notifyc
- 等待一个信号,确保 Raft 节点已将快照持久化到磁盘。
apply.notifyc
是一个通道,程序会在此处阻塞,直到 Raft 节点完成快照的持久化操作。
7. 打开新的快照后端
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
if err != nil {
lg.Panic("failed to open snapshot backend", zap.Error(err))
}
- 这行代码通过
openSnapshotBackend
函数打开新的快照后端(即读取快照的存储),如果打开失败,则记录错误日志并触发 panic。
8. 设置新的后端为一致性索引
s.consistIndex.SetBackend(newbe)
- 将新的快照后端设置为一致性索引的后端。这是为了确保一致性索引能够正确地与新的快照数据同步。
9. 恢复租约存储(lease store)
if s.lessor != nil {
lg.Info("restoring lease store")
s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })
lg.Info("restored lease store")
}
- 如果系统中有
lessor
(负责管理租约的组件),则会从新快照后端恢复租约存储。 - 恢复过程中,会在事务回滚时写入 KV 存储。
10. 恢复 MVCC 存储
lg.Info("restoring mvcc store")
if err := s.kv.Restore(newbe); err != nil {
lg.Panic("failed to restore mvcc store", zap.Error(err))
}
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
- 接下来,恢复 MVCC(多版本并发控制)存储,如果恢复失败,则触发 panic。
- 恢复成功后,记录恢复的日志,包括一致性索引。
11. 关闭旧的后端
s.bemu.Lock()
oldbe := s.be
go func() {
lg.Info("closing old backend file")
defer func() {
lg.Info("closed old backend file")
}()
if err := oldbe.Close(); err != nil {
lg.Panic("failed to close old backend", zap.Error(err))
}
}()
s.be = newbe
s.bemu.Unlock()
- 使用锁 (
bemu.Lock
) 来确保线程安全地切换后端文件。 - 在一个新的 goroutine 中关闭旧的快照后端,防止在关闭过程中阻塞主线程。
- 更新
s.be
为新的快照后端,并解锁。
12. 恢复报警存储
lg.Info("restoring alarm store")
if err := s.restoreAlarms(); err != nil {
lg.Panic("failed to restore alarm store", zap.Error(err))
}
lg.Info("restored alarm store")
- 恢复报警存储,如果恢复失败,则触发 panic。
13. 恢复认证存储
if s.authStore != nil {
lg.Info("restoring auth store")
s.authStore.Recover(newbe)
lg.Info("restored auth store")
}
- 如果存在认证存储(
authStore
),则恢复认证存储。
14. 恢复 V2 存储
lg.Info("restoring v2 store")
if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {
lg.Panic("failed to restore v2 store", zap.Error(err))
}
if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {
lg.Panic("illegal v2store content", zap.Error(err))
}
lg.Info("restored v2 store")
- 恢复 V2 存储,并进行检查,确保没有非法的 V2 存储内容。
15. 恢复集群配置
s.cluster.SetBackend(newbe)
lg.Info("restoring cluster configuration")
s.cluster.Recover(api.UpdateCapability)
lg.Info("restored cluster configuration")
- 将集群配置恢复到新的快照后端,并恢复集群配置。
16. 移除旧的网络成员
lg.Info("removing old peers from network")
s.r.transport.RemoveAllPeers()
lg.Info("removed old peers from network")
- 从网络中移除旧的集群成员。
17. 添加新的集群成员
lg.Info("adding peers from new cluster configuration")
for _, m := range s.cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
lg.Info("added peers from new cluster configuration")
- 将新的集群成员添加到网络中。
18. 更新应用进度
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
- 更新应用进度(包括任期、索引等信息),确保与新的快照数据一致。
总结:这段代码的核心任务是应用一个来自 Raft 协议的快照。它通过多个步骤确保快照数据被正确地恢复到系统的各个存储组件中,同时进行了一系列的检查和恢复操作,确保系统的一致性和健康。