MIT6.824-lab4A-The Shard controller(基于Raft的Shard KV数据库-分片控制器)

发布于:2023-01-11 ⋅ 阅读:(479) ⋅ 点赞:(0)

所有资料:👉 https://github.com/1345414527/MIT6.824-2022

4A(The Shard controller,分片控制器)

前言

在本实验中,我们将构建一个带分片的KV存储系统,即一组副本组上的键。每一个分片都是KV对的子集,例如,所有以“a”开头的键可能是一个分片,所有以“b”开头的键可能是另一个分片…。

分片的原因是性能。每个replica group只处理几个分片的 put 和 get,并且这些组并行操作;因此,系统总吞吐量(每单位时间的投入和获取)与组数成比例增加。

我们要实现的分片KV存储将有两个主要组件:

  • replica groups(复制组)。每个replica group负责分片的一个子集。副本由少数使用 Raft 复制组的分片的服务器组成;
  • shard controller(分片控制器)。分片控制器决定哪个副本组应该为每个分片服务,此信息称为配置。配置随时间而变化。客户端通过请求分片控制器找到某一个key的replica group,并且replica group请求控制器以找出要服务的分片。整个系统有一个单一的controller,使用 Raft 作为容错服务实现。

分片存储系统必须能够在replica group之间移动分片,因为某些组可能比其他组负载更多,因此需要移动分片以平衡负载;而且replica group可能会加入和离开系统,可能会添加新的副本组以增加容量,或者可能会使现有的副本组脱机以进行修复或报废。

本实验的主要挑战是处理重新配置——移动分片所属。在单个副本组中,所有组成员必须就何时发生与客户端 Put/Append/Get 请求相关的重新配置达成一致。例如,Put 可能与重新配置大约同时到达,导致副本组停止对该Put包含的key的分片负责。组中的所有副本必须就 Put 发生在重新配置之前还是之后达成一致。如果之前,Put 应该生效,分片的新所有者将看到它的效果;如果之后,Put 将不会生效,客户端必须在新所有者处重新尝试。推荐的方法是让每个副本组使用 Raft 不仅记录 Puts、Appends 和 Gets 的顺序,还记录重新配置的顺序。您需要确保在任何时候最多有一个副本组为每个分片提供请求。

重新配置还需要副本组之间的交互。例如,在配置 10 中,组 G1 可能负责分片 S1。在配置 11 中,组 G2 可能负责分片 S1。在从 10 到 11 的重新配置过程中,G1 和 G2 必须使用 RPC 将分片 S1(键/值对)的内容从 G1 移动到 G2。

不论是KV数据库,还是replica groups以及分片的实现,都可以看看redis的源码,redis的处理十分巧妙和优美。又或者是参考BigTable, Spanner, FAWN, Apache HBase, Rosebud, Spinnaker等技术的架构。

lab4相比于lab3,也要实现exactly once语义,但是其更加接近于工业界的KV数据库的实现,因此lab4 是一个相对贴近生产场景的 lab。

整体的架构可以参考:

任务

4A主要就是实现shardctrler,其实它就是一个高可用的集群配置管理服务。它主要记录了当前整个系统的配置信息Config,即每组中各个节点的 servername 以及当前每个 shard 被分配到了哪个组。

对于前者,shardctrler 可以通过用户手动或者内置策略自动的方式来增删 raft 组,从而更有效地利用集群的资源。对于后者,客户端的每一次请求都可以通过询问 shardctrler 来路由到对应正确的数据节点,其实有点类似于 HDFS Master 的角色,当然客户端也可以缓存配置来减少 shardctrler 的压力。

在工业界,shardctrler 的角色就类似于 TiDB 的 PD 或者 Kafka 的 ZK,只不过工业界的集群配置管理服务往往更复杂些,一般还要兼顾负载均衡,事务授时等功能。

具体来说任务就是:

  • 完善ShardCtrler数据结构、初始化代码,适当修改Command.go文件;
  • 针对客户端的Query、Join、Leave、Move四种rpc分别设置处理函数;
  • 编写applyCh的处理函数,处理Query、Join、Leave、Move四种命令,后三种命令处理中要进行一次配置调整;
  • 自己指定策略,根据当前的调整命令对配置进行调整,即shard的配置调整。(最难)

任务须知

  • 在lab4中,client并不会实现get、put、append这些命令的调用,而是四种函数:Query、Join、Leave、Move,分别对应四种RPC,主要是方便管理员控制 shardctrler:

    • Query RPC。查询配置,参数是一个配置号, shardctrler 回复具有该编号的配置。如果该数字为 -1 或大于已知的最大配置数字,则 shardctrler 应回复最新配置。 Query(-1) 的结果应该反映 shardctrler 在收到 Query(-1) RPC 之前完成处理的每个 Join、Leave 或 Move RPC
    • Join RPC 。添加新的replica group,它的参数是一组从唯一的非零副本组标识符 (GID) 到服务器名称列表的映射。 shardctrler 应该通过创建一个包含新副本组的新配置来做出反应。新配置应在所有组中尽可能均匀地分配分片,并应移动尽可能少的分片以实现该目标。如果 GID 不是当前配置的一部分,则 shardctrler 应该允许重新使用它(即,应该允许 GID 加入,然后离开,然后再次加入);
    • Leave RPC。删除指定replica group, 参数是以前加入的组的 GID 列表。 shardctrler 应该创建一个不包括这些组的新配置,并将这些组的分片分配给剩余的组。新配置应在组之间尽可能均匀地划分分片,并应移动尽可能少的分片以实现该目标;
    • Move RPC。移动分片,的参数是一个分片号和一个 GID。 shardctrler 应该创建一个新配置,其中将分片分配给组。 Move 的目的是让我们能够测试您的软件。移动后的加入或离开可能会取消移动,因为加入和离开会重新平衡。
  • shardctrler 管理一系列编号的configuration。每个configuration都描述了一组副本组和分片到副本组的分配。每当此分配需要更改时,分片控制器都会使用新分配创建一个新配置。键/值客户端和服务器在想知道当前(或过去)配置时联系 shardctrler;

  • 第一个configuration应该编号为零。它不应包含任何组,并且所有分片都应分配给 GID 零(无效的 GID)。下一个configuration(为响应加入 RPC 而创建)应该编号为 1。分片通常比组多得多(即每个组将服务多个分片),以便可以以相当精细的粒度转移负载;

  • Query RPC和lab3的Get RPC有一点不一样,Query RPC的参数如果不是-1,则查询的是某一个不会再改变的数据,因此不管是哪一个节点都能够返回;但是如果是-1则表明查询的最新的,因此必须要在leader中当做一个命令来进行处理,因为只有按照顺序来执行才能准确的获取当前的配置信息,但可以不考虑幂等性;

  • 分配调整策略,如果没有好想法的话,可以参考:①尽量不改变当前的分配结果进行调整。每一个group都有一个分配shard的平均值,这也是每个hroup要分配到的最小值,多出来的shard可以轮询所有group进行分配,但是如果一个group已经分配的shard数 <= 平均值+多出来shard,就可以考虑多出来shard全部分配给它;②如果有group分配到的shard数小于平均值,则将空的shard分配给它达到平均值。但是注意因为group的遍历先后顺序问题,可能当前没有足够的shard进行分配,因此当前全部遍历完,需要再尝试一次;③最终分配完,可能所有的group都达到了平均值,但是多出来的shard可能没有进行分配完,因此可以进行轮询分配。

    例子:

    当前:4个group,23个shard,1-group分配了5个,2-group分配了8个,3-group分配了5个,4-group分配了5个

    此时加入了0-group,因此就是5个group,23个shard,平均值为4,多出来shard为3。

    第一次大循环:

    0-group没有分配到,因为0是第一个进行shard调整,又当前没有空shard,因此就没有分配到;

    1-group分配到5个,此时多出来shard为2;

    2-group分配到6个,此时多来shard为0;

    3-group分配到4个;4-group分配到4个。

    第二次大循环:

    0-gropu分配到4个。

代码

shardctrler的代码整体逻辑和kvraft相同,client部分几乎不用我们实现了,server部分一个RPC请求的处理也是rpc接收->start->applyCh处理->具体处理函数(除了Query有点不同)。

client

type Clerk struct {
    servers []*labrpc.ClientEnd
    // Your data here.
    clientId int64
}

以join为例,代码就是:

func (ck *Clerk) Join(servers map[int][]string) {
    args := &JoinArgs{}
    // Your code here.
    args.Servers = servers
    args.ClientId = ck.clientId
    args.CommandId = nrand()

    for {
        // try each known server.
        for _, srv := range ck.servers {
            var reply JoinReply
            ok := srv.Call("ShardCtrler.Join", args, &reply)
            if ok && reply.WrongLeader == false {
                return
            }
        }
        time.Sleep(100 * time.Millisecond)
    }
}

其他的都是自带的代码,就不说了。写完kvraft这部分应该没问题。

server

数据结构
type ShardCtrler struct {
    mu      sync.Mutex
    me      int
    rf      *raft.Raft
    applyCh chan raft.ApplyMsg

    // Your data here.
    stopCh          chan struct{}
    commandNotifyCh map[int64]chan CommandResult
    lastApplies     map[int64]int64 //k-v:ClientId-CommandId

    configs []Config // indexed by config num

    //用于互斥锁
    lockStartTime time.Time
    lockEndTime   time.Time
    lockMsg       string
}

type CommandResult struct {
    Err    Err
    Config Config
}

type Op struct {
    // Your definitions here.
    // Field names must start with capital letters,
    // otherwise RPC will break.
    ReqId     int64 //用来标识commandNotify
    CommandId int64
    ClientId  int64
    Args      interface{}
    Method    string
}
初始化代码
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardCtrler {
    labgob.Register(Op{})

    sc := new(ShardCtrler)
    sc.me = me

    sc.configs = make([]Config, 1)
    sc.configs[0].Groups = map[int][]string{}

    sc.applyCh = make(chan raft.ApplyMsg)
    sc.rf = raft.Make(servers, me, persister, sc.applyCh)

    // Your code here.
    sc.stopCh = make(chan struct{})
    sc.commandNotifyCh = make(map[int64]chan CommandResult)
    sc.lastApplies = make(map[int64]int64)

    go sc.handleApplyCh()

    return sc
}
RPC接收处理代码
func (sc *ShardCtrler) Join(args *JoinArgs, reply *JoinReply) {
    // Your code here.
    res := sc.waitCommand(args.ClientId, args.CommandId, "Join", *args)
    if res.Err == ErrWrongLeader {
        reply.WrongLeader = true
    }
    reply.Err = res.Err
}

func (sc *ShardCtrler) Leave(args *LeaveArgs, reply *LeaveReply) {
    res := sc.waitCommand(args.ClientId, args.CommandId, "Leave", *args)
    if res.Err == ErrWrongLeader {
        reply.WrongLeader = true
    }
    reply.Err = res.Err
}

func (sc *ShardCtrler) Move(args *MoveArgs, reply *MoveReply) {
    res := sc.waitCommand(args.ClientId, args.CommandId, "Move", *args)
    if res.Err == ErrWrongLeader {
        reply.WrongLeader = true
    }
    reply.Err = res.Err
}

func (sc *ShardCtrler) Query(args *QueryArgs, reply *QueryReply) {
    // Your code here.
    DPrintf("server %v query:args %+v", sc.me, args)

    //如果是查询已经存在的配置可以直接返回,因为存在的配置是不会改变的;
    //如果是-1,则必须在handleApplyCh中进行处理,按照命令顺序执行,不然不准确。
    sc.lock("query")
    if args.Num >= 0 && args.Num < len(sc.configs) {
        reply.Err = OK
        reply.WrongLeader = false
        reply.Config = sc.getConfigByIndex(args.Num)
        sc.unlock("query")
        return
    }
    sc.unlock("query")
    res := sc.waitCommand(args.ClientId, args.CommandId, "Query", *args)
    if res.Err == ErrWrongLeader {
        reply.WrongLeader = true
    }
    reply.Err = res.Err
    reply.Config = res.Config
}

以上分别是四种RPC的处理代码,都是调用waitCommand函数进行处理,query的处理除外,在调用waitCommand函数之前,可以进行一次判断,如果满足条件就直接返回,这里可以直接返回的原因是:以往的配置是不可变的,只要获取的不是最新的配置,就可以直接获取,而最近的配置信息因为命令的执行先后顺序不同,产生的配置也会不同,因此,必须按照顺序来执行命令,才能获取当前query命令执行时准确的最新配置。

query调用的getConfigByIndex函数是根据configNum获取对应的config,这里要注意的一点就是:获取的config一定要Copy以下,就是进行一次深拷贝,简单处理就是创建一个新的相同对象。

func (sc *ShardCtrler) getConfigByIndex(idx int) Config {
	if idx < 0 || idx >= len(sc.configs) {
		//因为会在config的基础上进行修改形成新的config,又涉及到map需要深拷贝
		return sc.configs[len(sc.configs)-1].Copy()
	}
	return sc.configs[idx].Copy()
}

其实可以不进行深拷贝的,仅仅是简化了后面的处理代码,比如因为join、move代码我们要创建一个新的config,而新的config其实和上一个config很多内容相同,因此可以直接在这个config上进行修改。如果不是深拷贝,旧的config就改变了。简单看下后面的代码:

func (sc *ShardCtrler) handleJoinCommand(args JoinArgs) {
    conf := sc.getConfigByIndex(-1)
    conf.Num += 1

    //加入组
    for k, v := range args.Servers {
        conf.Groups[k] = v
    }

    sc.adjustConfig(&conf)
    sc.configs = append(sc.configs, conf)
}

这么处理仅仅是因为方便,不这么写也没问题。

再来看下rpc的核心处理代码waitCommand:

func (sc *ShardCtrler) waitCommand(clientId int64, commandId int64, method string, args interface{}) (res CommandResult) {
    DPrintf("server %v wait cmd start,clientId:%v,commandId: %v,method: %s,args: %+v", sc.me, clientId, commandId, method, args)
    op := Op{
        ReqId:     nrand(),
        ClientId:  clientId,
        CommandId: commandId,
        Method:    method,
        Args:      args,
    }
    index, term, isLeader := sc.rf.Start(op)
    if !isLeader {
        res.Err = ErrWrongLeader
        DPrintf("server %v wait cmd NOT LEADER.", sc.me)
        return
    }
    sc.lock("waitCommand")
    ch := make(chan CommandResult, 1)
    sc.commandNotifyCh[op.ReqId] = ch
    sc.unlock("waitCommand")
    DPrintf("server %v wait cmd notify,index: %v,term: %v,op: %+v", sc.me, index, term, op)

    t := time.NewTimer(WaitCmdTimeOut)
    defer t.Stop()

    select {
        case <-t.C:
        res.Err = ErrTimeout
        case res = <-ch:
    case <-sc.stopCh:
        res.Err = ErrServer
    }

    sc.removeCh(op.ReqId)
    DPrintf("server %v wait cmd end,Op: %+v.", sc.me, op)
    return

}

主要的处理步骤:

  • 根据命令信息封装一个Op命令;
  • 调用Start提交该命令;
  • 创建一个用于处理该命令的唤醒ch;
  • 阻塞等待ch的返回,不管是哪个ch返回,都要删除前一步创建的ch,防止内存泄漏。
命令应用代码

基于raft的协议,每当一个日志达到多数派,那么对应的命令就如进入applyCh,而applyCh的处理就需要我们自己来进行定义:

//处理applych
func (sc *ShardCtrler) handleApplyCh() {
	for {
		select {
		case <-sc.stopCh:
			DPrintf("get from stopCh,server-%v stop!", sc.me)
			return
		case cmd := <-sc.applyCh:
			//处理快照命令,读取快照的内容
			if cmd.SnapshotValid {
				continue
			}
			//处理普通命令
			if !cmd.CommandValid {
				continue
			}
			cmdIdx := cmd.CommandIndex
			DPrintf("server %v start apply command %v:%+v", sc.me, cmdIdx, cmd.Command)
			op := cmd.Command.(Op)
			sc.lock("handleApplyCh")

			if op.Method == "Query" {
				//处理读
				conf := sc.getConfigByIndex(op.Args.(QueryArgs).Num)
				sc.notifyWaitCommand(op.ReqId, OK, conf)
			} else {
				//处理其他命令
				//判断命令是否重复
				isRepeated := false
				if v, ok := sc.lastApplies[op.ClientId]; ok {
					if v == op.CommandId {
						isRepeated = true
					}
				}
				if !isRepeated {
					switch op.Method {
					case "Join":
						sc.handleJoinCommand(op.Args.(JoinArgs))
					case "Leave":
						sc.handleLeaveCommand(op.Args.(LeaveArgs))
					case "Move":
						sc.handleMoveCommand(op.Args.(MoveArgs))
					default:
						panic("unknown method")
					}
				}
				sc.lastApplies[op.ClientId] = op.CommandId
				sc.notifyWaitCommand(op.ReqId, OK, Config{})
			}

			DPrintf("apply op: cmdId:%d, op: %+v", cmdIdx, op)
			sc.unlock("handleApplyCh")
		}
	}
}

处理逻辑是在一个for循环中,从applyCh中会获取两种命令:快照命令和普通命令,当然lab4A不需要完成快照命令的处理。具体处理如下:

  • 获取applyCh中的数据后,先进行一个转换,转成我们的Op结构;
  • 如果是Query操作,简单的根据configNum获取config,并唤醒等待的协程;
  • 如果是Move、Join、Move操作,需要先判断是否满足exactly once语义,即命令是否和上一个命令重复;然后分别进行处理;处理完成后唤醒等待的协程。

其中,Move、Join、Move操作的处理函数如下:

func (sc *ShardCtrler) handleJoinCommand(args JoinArgs) {
    conf := sc.getConfigByIndex(-1)
    conf.Num += 1

    //加入组
    for k, v := range args.Servers {
        conf.Groups[k] = v
    }

    sc.adjustConfig(&conf)
    sc.configs = append(sc.configs, conf)
}

func (sc *ShardCtrler) handleLeaveCommand(args LeaveArgs) {
    conf := sc.getConfigByIndex(-1)
    conf.Num += 1

    //删掉server,并重置分配的shard
    for _, gid := range args.GIDs {
        delete(conf.Groups, gid)
        for i, v := range conf.Shards {
            if v == gid {
                conf.Shards[i] = 0
            }
        }
    }

    sc.adjustConfig(&conf)
    sc.configs = append(sc.configs, conf)
}

func (sc *ShardCtrler) handleMoveCommand(args MoveArgs) {
    conf := sc.getConfigByIndex(-1)
    conf.Num += 1
    conf.Shards[args.Shard] = args.GID
    sc.configs = append(sc.configs, conf)
}

  • Join:将参数的server加入config的group,然后调用adjustConfig进行shard的分配调整;
  • Leave:将参数的server移出config的group,并删除对应的shard标识,然后调用adjustConfig进行shard的分配调整;
  • handleMoveCommand:只要修改参数中指定的shard的归属形成一个cnonfig。

notifyWaitCommand唤醒命令应用完成的等待协程:

func (sc *ShardCtrler) notifyWaitCommand(reqId int64, err Err, conf Config) {
    if ch, ok := sc.commandNotifyCh[reqId]; ok {
        ch <- CommandResult{
            Err:    err,
            Config: conf,
        }
    }
}
配置调整代码
//我们的策略是尽量不改变当前的配置
func (sc *ShardCtrler) adjustConfig(conf *Config) {
    //针对三种情况分别进行调整
    if len(conf.Groups) == 0 {
        conf.Shards = [NShards]int{}
    } else if len(conf.Groups) == 1 {
        for gid, _ := range conf.Groups {
            for i, _ := range conf.Shards {
                conf.Shards[i] = gid
            }
        }
    } else if len(conf.Groups) <= NShards {
        //group数小于shard数,因此某些group可能会分配多一个或多个shard
        avgShardsCount := NShards / len(conf.Groups)
        otherShardsCount := NShards - avgShardsCount*len(conf.Groups)
        isTryAgain := true

        for isTryAgain {
            isTryAgain = false
            DPrintf("adjust config,%+v", conf)
            //获取所有的gid
            var gids []int
            for gid, _ := range conf.Groups {
                gids = append(gids, gid)
            }
            sort.Ints(gids)
            //遍历每一个server
            for _, gid := range gids {
                count := 0
                for _, val := range conf.Shards {
                    if val == gid {
                        count++
                    }
                }

                //判断是否要改变配置
                if count == avgShardsCount {
                    //不需要改变配置
                    continue
                } else if count > avgShardsCount && otherShardsCount == 0 {
                    //多出来的设置为0
                    temp := 0
                    for k, v := range conf.Shards {
                        if gid == v {
                            if temp < avgShardsCount {
                                temp += 1
                            } else {
                                conf.Shards[k] = 0
                            }
                        }
                    }
                } else if count > avgShardsCount && otherShardsCount > 0 {
                    //此时看看多出的shard能否全部分配给该server
                    //如果没有全部分配完,下一次循环再看
                    //如果全部分配完还不够,则需要将多出的部分设置为0
                    temp := 0
                    for k, v := range conf.Shards {
                        if gid == v {
                            if temp < avgShardsCount {
                                temp += 1
                            } else if temp == avgShardsCount && otherShardsCount != 0 {
                                otherShardsCount -= 1
                            } else {
                                conf.Shards[k] = 0
                            }
                        }
                    }

                } else {
                    //count < arg
                    for k, v := range conf.Shards {
                        if v == 0 && count < avgShardsCount {
                            conf.Shards[k] = gid
                            count += 1
                        }
                        if count == avgShardsCount {
                            break
                        }
                    }
                    //因为调整的顺序问题,可能前面调整的server没有足够的shard进行分配,需要在进行一次调整
                    if count < avgShardsCount {
                        DPrintf("adjust config try again.")
                        isTryAgain = true
                        continue
                    }
                }
            }

            //调整完成后,可能会有所有group都打到平均的shard数,但是多出来的shard没有进行分配
            //此时可以采用轮询的方法
            cur := 0
            for k, v := range conf.Shards {
                //需要进行分配的
                if v == 0 {
                    conf.Shards[k] = gids[cur]
                    cur += 1
                    cur %= len(conf.Groups)
                }
            }

        }
    } else {
        //group数大于shard数,每一个group最多一个shard,会有group没有shard

        gidsFlag := make(map[int]int)
        emptyShards := make([]int, 0, NShards)
        for k, gid := range conf.Shards {
            if gid == 0 {
                emptyShards = append(emptyShards, k)
                continue
            }
            if _, ok := gidsFlag[gid]; ok {
                conf.Shards[k] = 0
                emptyShards = append(emptyShards, k)
            } else {
                gidsFlag[gid] = 1
            }
        }
        if len(emptyShards) > 0 {
            var gids []int
            for k, _ := range conf.Groups {
                gids = append(gids, k)
            }
            sort.Ints(gids)
            temp := 0
            for _, gid := range gids {
                if _, ok := gidsFlag[gid]; !ok {
                    conf.Shards[emptyShards[temp]] = gid
                    temp += 1
                }
                if temp >= len(emptyShards) {
                    break
                }
            }

        }
    }
}

首先进行一个分支判断:

  • len(conf.Groups) == 0:表示当前没有groups,不需要调整;

  • len(conf.Groups) == 1:所有shard全部分配给它;

  • len(conf.Groups) <= NShards:这一步最重要,也是大部分情况下所处的状态,具体处理如下:

    • 计算平均每一个group分配到的shard数,以及多余的shard数;

    • 进行一个for循环,一般情况下执行1、2次。首先获取所有的group id,并进行升序排序;然后遍历每一个group id:

      1)如果group分配的shard = 平均shard数,则当前group不用进行处理;

      2)如果group分配的shard > 平均shard数 且多于的shard数为0,则将多出来的shard标记为0;

      3)如果group分配的shard > 平均shard数 且多于的shard数大于0,则当前group每多分配一个,多余的shard数-1,如果多余的shard数减为0了,那么就和分支3相同了,即剩下的分配给当前group的shard就要标记为0;

      4)如果group分配的shard < 平均shard数,则将标记为0的shard分配给当前group,只要分配的shard数达到平均值,就继续处理下一个group,如果当前的shard不够分配,则需要进行下一次循环(因为group遍历的先后问题)。

    • 最后一步,就是针对上一步的调整,因为可能会有这样的情况:所有group都打到平均的shard数,但是多出来的shard没有进行分配,此时可以采用轮询的方法进行分配。

  • len(conf.Groups) > NShards:每一个group最多一个shard,会有group没有shard。gidsFlag用来标识每一个group是否已经被分配shard;emptyShards用来保存某些group分配的多余shard,用于接下来进行分配。

common代码

这一部分是common.go中的代码,简单介绍一下:

  • init()函数主要是向labgob注册接口的可能类型。在Op的数据结构中我们有一个结构是:Args interface{},在raft调用Call将AppendEntries RPC将日志发送给其它节点时,就会使用labgob进行编解码,如果没有注册就会报错;(并不是都需要注册,为了方便,我就全列出来了)
  • Config就是存储配置信息,要为config创建一个copy函数,用于进行深拷贝,具体的原因可以看RPC接受处理代码部分;
  • Query、Join、Move、Leave分别有自己的args和reply。
type Err string

// The number of shards.
const NShards = 10

//状态码
const (
    OK             = "OK"
    ErrWrongLeader = "wrongLeader"
    ErrTimeout     = "timeout"
    ErrServer      = "ErrServer"
)

//必须注册才能进行解码和编码
func init() {
    labgob.Register(Config{})
    labgob.Register(QueryArgs{})
    labgob.Register(QueryReply{})
    labgob.Register(JoinArgs{})
    labgob.Register(JoinReply{})
    labgob.Register(LeaveArgs{})
    labgob.Register(MoveArgs{})
    labgob.Register(LeaveReply{})
    labgob.Register(MoveReply{})
}

// A configuration -- an assignment of shards to groups.
// Please don't change this.
//保存配置信息
type Config struct {
    Num    int              // config number,当前配置的编号
    Shards [NShards]int     // shard -> gid,每一个分片到replica group id的映射
    Groups map[int][]string // gid -> servers[],每一个replica group包含哪些server
}

type ClientCommandId struct {
    ClientId  int64
    CommandId int64
}

type JoinArgs struct {
    Servers map[int][]string // new GID -> servers mappings
    ClientCommandId
}

type JoinReply struct {
    WrongLeader bool
    Err         Err
}

type LeaveArgs struct {
    GIDs []int
    ClientCommandId
}

type LeaveReply struct {
    WrongLeader bool
    Err         Err
}

type MoveArgs struct {
    Shard int
    GID   int
    ClientCommandId
}

type MoveReply struct {
    WrongLeader bool
    Err         Err
}

type QueryArgs struct {
    Num int // desired config number
    ClientCommandId
}

type QueryReply struct {
    WrongLeader bool
    Err         Err
    Config      Config
}

func (c *Config) Copy() Config {
    config := Config{
        Num:    c.Num,
        Shards: c.Shards,
        Groups: make(map[int][]string),
    }
    for gid, s := range c.Groups {
        config.Groups[gid] = append([]string{}, s...)
    }
    return config
}

测试结果

本文含有隐藏内容,请 开通VIP 后查看