第一步 阅读文档
https://pdos.csail.mit.edu/6.824/labs/lab-shard.html
一直读到4B之前,理解他的每个段落的意思。我是读到第三遍 才全部读清楚。
第二步 整理这个DB的架构思路
lab4 的架构是典型的 M/S 架构(a configuration service and a set of replica groups),不过实现十分基础,很多功能没有实现:1) shards 之间的传递很慢并且不允许 concurrent client acess;2) 每个 raft group 中的 member 不会改变。
configuration service
- 由若干 shardmaster 利用 raft 协议保证一致性的集群;
- 管理 configurations 的顺序:每个 configuration 描述 replica group 以及每个 group 分别负责存储哪些 shards;
- 响应 Join/Leave/Move/Query 请求,并且对 configuration 做出相应的改变;
replica group
- 由若干 shardkv 利用 raft 协议保证一致性的集群;
- 负责具体数据的存储(一部分),组合所有 group 的数据即为整个 database 的数据;
- 响应对应 shards 的 Get/PutAppend 请求,并保证 linearized;
- 周期性向 shardmaster 进行 query 获取 configuration,并且进行 migration 和 update;
Sharemaster 主要负责根据 Client 提供的分区规则,将数据储存在不同的replica group 中
Sharemaster 有多台机器组成,他们之间使用 Raft 协议来保证一致性。
每一个 replica group由多台机器组成,他们之间也是通过 Raft 协议来保证一致性。
第三步 理解代码结构
CLIENT 就不说了,和3A的套路是一样的。
SERVER大多要自己实现。
这边主要要弄清楚的是COMMON
一个CONFIG 里面包含了这个CONFIG 的版本号。哪个分区归哪个REPLICA GROUP 管
这个REPLICA GROUP 里面包含了哪些SERVER
然后MASTER SERVER 会有一组CONFIG,序号递增。
configs []Config // indexed by config num
下面看下4个API。
JOIN 会给一组GID -> SERVER的映射。其实就是把这些GID 组,加到MASTER的管理范围里来。那么有新的GROUP来了。每台机器可以匀一些SHARD过去
LEAVE 是给一组GID,表示这组GID的SERVER机器们要走。那么他们管的SHARD又要匀给还没走的GROUP
MOVE 是指定某个SHARD 归这个GID管
QUERY就是根据CONFIG NUM来找到对应的CONFIG里的SHARD 规则是如何
其实整个MASTER端的架子和LAB 3A差不多。
与3A不同的核心点,就是REBALANCE
,也就是GROUP 加加减减 如何让SHARD 分布的尽可能均匀,同时移动量最小。
第四步 实现CLIENT代码
所有思路都复用3A的代码
也是呼应HINT
Start with a stripped-down copy of your kvraft server.
You should implement duplicate client request detection for RPCs to the shard master. The shardmaster tests don't test this, but the shardkv tests will later use your shardmaster on an unreliable network; you may have trouble passing the shardkv tests if your shardmaster doesn't filter out duplicate RPCs.
Client
package shardmaster
//
// Shardmaster clerk.
//
import (
"labrpc"
)
import "time"
import "crypto/rand"
import "math/big"
const RetryInterval = time.Duration(100 * time.Millisecond)
type Clerk struct {
servers []*labrpc.ClientEnd
id int64
seqNum int
lastLeader int
}
func Nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
ck.id = Nrand()
ck.seqNum = 0
ck.lastLeader = 0
return ck
}
func (ck *Clerk) Query(num int) Config {
args := QueryArgs{Num: num}
for {
var reply QueryReply
if ck.servers[ck.lastLeader].Call("ShardMaster.Query", &args, &reply) && !reply.WrongLeader {
return reply.Config
}
ck.lastLeader = (ck.lastLeader + 1) % len(ck.servers)
time.Sleep(RetryInterval)
}
}
func (ck *Clerk) Join(servers map[int][]string) {
args := JoinArgs{Servers: servers, Cid:ck.id, SeqNum:ck.seqNum}
ck.seqNum++
for {
var reply JoinReply
if ck.servers[ck.lastLeader].Call("ShardMaster.Join", &args, &reply) && !reply.WrongLeader {
return
}
ck.lastLeader = (ck.lastLeader + 1) % len(ck.servers)
time.Sleep(RetryInterval)
}
}
func (ck *Clerk) Leave(gids []int) {
args := LeaveArgs{GIDs: gids, Cid:ck.id, SeqNum:ck.seqNum}
ck.seqNum++
for {
var reply LeaveReply
if ck.servers[ck.lastLeader].Call("ShardMaster.Leave", &args, &reply) && !reply.WrongLeader {
return
}
ck.lastLeader = (ck.lastLeader + 1) % len(ck.servers)
time.Sleep(RetryInterval)
}
}
func (ck *Clerk) Move(shard int, gid int) {
args := MoveArgs{Shard: shard, GID: gid, Cid:ck.id, SeqNum:ck.seqNum}
ck.seqNum++
for {
var reply MoveReply
if ck.servers[ck.lastLeader].Call("ShardMaster.Move", &args, &reply) && !reply.WrongLeader {
return
}
ck.lastLeader = (ck.lastLeader + 1) % len(ck.servers)
time.Sleep(RetryInterval)
}
}
Common
第五步 搭建SERVER架子
基本就是把KV SERVER可以搬的代码都搬过来。
留一些特别的函数是要为SM SERVER写的。
下面有2处是不一样的,在KV SERVER里,只要简单的更新STRING STRING MAP 就可以。
但是在SM SERVER,不同操作要做的事是不一样的。所以我封装在updateConfig
里。这里面的JOIN 和 LEAVE需要用到REBALANCE
还有一处不一样的是QUERY,如果QUERY的返回成功,则需要去GET CONFIG然后传回来。
第6步 实现QUERY HANDLER
根据文档描述
The Query RPC's argument is a configuration number. The shardmaster replies with the configuration that has that number. If the number is -1 or bigger than the biggest known configuration number, the shardmaster should reply with the latest configuration. The result of Query(-1) should reflect every Join, Leave, or Move RPC that the shardmaster finished handling before it received the Query(-1) RPC.
如果QUERY的时候,不是-1,或者不是超过LEN(CONFIG),就取LEN -1的CONFIG的。和kv server的GET一样, 用RAFT来保证线性一致性。
第7步 实现updateConfig的框架
因为只有MOVE ,JOIN ,LEAVE需要去UPDATE CONFIG。
这个所谓的UPDATE,也是先基于最新的CONFIG,复制一份,随后在这个复制的CONFIG上面做更新。
其中MOVE最简单,只需要对CONFIG里Shards最更新。
随后JOIN 是要把一组新的GID 到SERVER的MAPING给加进新的CONFIG
LEAVE是把一组GID给从新的CONFIG里抽走。
基于上述思路有了如下代码。
第8步 实现rebalance
后来思考了下 要整体做REBALANCE
首先计算每个 replica group 分配多少个 shards, 比如10,4个GROUP,就是3,3,2,2
随后看当前的情况,把SHARD 从多的GROUP 分到少的group.
比如现在的情况是4,3,3; 新来一个GROUP,就把4先抽出去一个,3,3,3,1 再从任意一个3抽一个过去。变成3,3,2,2
如果本来是4,3,3 现在成了2个GROUP。 就需要遍历
Shards [NShards]int // shard -> gid, 看哪个SHARD的GROUP不在了,然后把这个SHARD 加到最少SHARD的那个GROUP里.
上述2个做法的逻辑没法统一,可能需要单独写。
按照上面的模式,要最大又要最小,在JAVA里用TREEMAP很好。可是GOLANG没有自带的。
基于这个考虑,我下面写的算法的时间复杂度,是moveElements * gidNumber
用了TREEMAP,可以达到moveElements * log(gidNumber)
为了代码简洁(因为这个LAB注重正确性,效率低一些),我实现复杂度高的
大概算法思路是
算出之前CONFIG,每个GID 有几个SHARD。然后JOIN的话找最大的,移到新加进来的那个的。直到达到平均值(向下取整)
如果是LEAVE的话找最小的,把LEAVE的给最小的。随后再找最小的,一直到LEAVE的GROUP的SHARD没有了
测试
BUG 1
BUG 2 OP is NIL
经过研究发现,需要向LABGOB 注册任何自定义的STRUCT,不然没法传输解析。
用脚本 测500次 PASS
SERVER 代码
package shardmaster
import (
"log"
"math"
"raft"
"time"
)
import "labrpc"
import "sync"
import "labgob"
type ShardMaster struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
// Your data here.
configs []Config // indexed by config num
chMap map[int]chan Op
cid2Seq map[int64]int
killCh chan bool
}
type Op struct {
OpType string "operation type(eg. join/leave/move/query)"
Args interface{} // could be JoinArgs, LeaveArgs, MoveArgs and QueryArgs, in reply it could be config
Cid int64
SeqNum int
}
func (sm *ShardMaster) Join(args *JoinArgs, reply *JoinReply) {
originOp := Op{"Join",*args,args.Cid,args.SeqNum}
reply.WrongLeader = sm.templateHandler(originOp)
}
func (sm *ShardMaster) Leave(args *LeaveArgs, reply *LeaveReply) {
originOp := Op{"Leave",*args,args.Cid,args.SeqNum}
reply.WrongLeader = sm.templateHandler(originOp)
}
func (sm *ShardMaster) Move(args *MoveArgs, reply *MoveReply) {
originOp := Op{"Move",*args,args.Cid,args.SeqNum}
reply.WrongLeader = sm.templateHandler(originOp)
}
func (sm *ShardMaster) Query(args *QueryArgs, reply *QueryReply) {
reply.WrongLeader = true;
originOp := Op{"Query",*args,Nrand(),-1}
reply.WrongLeader = sm.templateHandler(originOp)
if !reply.WrongLeader {
sm.mu.Lock()
defer sm.mu.Unlock()
if args.Num >= 0 && args.Num < len(sm.configs) {
reply.Config = sm.configs[args.Num]
} else {
reply.Config = sm.configs[len(sm.configs) - 1]
}
}
}
func (sm *ShardMaster) templateHandler(originOp Op) bool {
wrongLeader := true
index,_,isLeader := sm.rf.Start(originOp)
if !isLeader {return wrongLeader}
ch := sm.getCh(index,true)
op := sm.beNotified(ch,index)
if equalOp(op,originOp) {
wrongLeader = false
}
return wrongLeader
}
func (sm *ShardMaster) beNotified(ch chan Op, index int) Op {
select {
case notifyArg := <- ch :
close(ch)
sm.mu.Lock()
delete(sm.chMap,index)
sm.mu.Unlock()
return notifyArg
case <- time.After(time.Duration(600)*time.Millisecond):
return Op{}
}
}
func equalOp(a Op, b Op) bool{
return a.SeqNum == b.SeqNum && a.Cid == b.Cid && a.OpType == b.OpType
}
func (sm *ShardMaster) Kill() {
sm.rf.Kill()
sm.killCh <- true
}
// needed by shardkv tester
func (sm *ShardMaster) Raft() *raft.Raft {
return sm.rf
}
func (sm *ShardMaster) getCh(idx int, createIfNotExists bool) chan Op{
sm.mu.Lock()
defer sm.mu.Unlock()
if _, ok := sm.chMap[idx]; !ok {
if !createIfNotExists {return nil}
sm.chMap[idx] = make(chan Op,1)
}
return sm.chMap[idx]
}
func (sm *ShardMaster) updateConfig(op string, arg interface{}) {
cfg := sm.createNextConfig()
if op == "Move" {
moveArg := arg.(MoveArgs)
if _,exists := cfg.Groups[moveArg.GID]; exists {
cfg.Shards[moveArg.Shard] = moveArg.GID
} else {return}
}else if op == "Join" {
joinArg := arg.(JoinArgs)
for gid,servers := range joinArg.Servers {
newServers := make([]string, len(servers))
copy(newServers, servers)
cfg.Groups[gid] = newServers
sm.rebalance(&cfg,op,gid)
}
} else if op == "Leave"{
leaveArg := arg.(LeaveArgs)
for _,gid := range leaveArg.GIDs {
delete(cfg.Groups,gid)
sm.rebalance(&cfg,op,gid)
}
} else {
log.Fatal("invalid area",op)
}
sm.configs = append(sm.configs,cfg)
}
func (sm *ShardMaster) createNextConfig() Config {
lastCfg := sm.configs[len(sm.configs)-1]
nextCfg := Config{Num: lastCfg.Num + 1, Shards: lastCfg.Shards, Groups: make(map[int][]string)}
for gid, servers := range lastCfg.Groups {
nextCfg.Groups[gid] = append([]string{}, servers...)
}
return nextCfg
}
func (sm *ShardMaster) rebalance(cfg *Config, request string, gid int) {
shardsCount := sm.groupByGid(cfg) // gid -> shards
switch request {
case "Join":
avg := NShards / len(cfg.Groups)
for i := 0; i < avg; i++ {
maxGid := sm.getMaxShardGid(shardsCount)
cfg.Shards[shardsCount[maxGid][0]] = gid
shardsCount[maxGid] = shardsCount[maxGid][1:]
}
case "Leave":
shardsArray,exists := shardsCount[gid]
if !exists {return}
delete(shardsCount,gid)
if len(cfg.Groups) == 0 { // remove all gid
cfg.Shards = [NShards]int{}
return
}
for _,v := range shardsArray {
minGid := sm.getMinShardGid(shardsCount)
cfg.Shards[v] = minGid
shardsCount[minGid] = append(shardsCount[minGid], v)
}
}
}
func (sm *ShardMaster) groupByGid(cfg *Config) map[int][]int {
shardsCount := map[int][]int{}
for k,_ := range cfg.Groups {
shardsCount[k] = []int{}
}
for k, v := range cfg.Shards {
shardsCount[v] = append(shardsCount[v], k)
}
return shardsCount
}
func (sm *ShardMaster) getMaxShardGid(shardsCount map[int][]int) int {
max := -1
var gid int
for k, v := range shardsCount {
if max < len(v) {
max = len(v)
gid = k
}
}
return gid
}
func (sm *ShardMaster) getMinShardGid(shardsCount map[int][]int) int {
min := math.MaxInt32
var gid int
for k, v := range shardsCount {
if min > len(v) {
min = len(v)
gid = k
}
}
return gid
}
func send(notifyCh chan Op,op Op) {
notifyCh <- op
}
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardMaster {
sm := new(ShardMaster)
sm.me = me
sm.configs = make([]Config, 1)
sm.configs[0].Groups = map[int][]string{}
labgob.Register(Op{})
labgob.Register(JoinArgs{})
labgob.Register(LeaveArgs{})
labgob.Register(MoveArgs{})
labgob.Register(QueryArgs{})
sm.applyCh = make(chan raft.ApplyMsg)
sm.rf = raft.Make(servers, me, persister, sm.applyCh)
// Your code here.
sm.chMap = make(map[int]chan Op)
sm.cid2Seq = make(map[int64]int)
sm.killCh = make(chan bool,1)
go func() {
for {
select {
case <-sm.killCh:
return
case applyMsg := <-sm.applyCh:
if !applyMsg.CommandValid {continue}
op := applyMsg.Command.(Op)
sm.mu.Lock()
maxSeq,found := sm.cid2Seq[op.Cid]
if op.SeqNum >= 0 && (!found || op.SeqNum > maxSeq) {
sm.updateConfig(op.OpType,op.Args)
sm.cid2Seq[op.Cid] = op.SeqNum
}
sm.mu.Unlock()
if notifyCh := sm.getCh(applyMsg.CommandIndex,false); notifyCh != nil {
send(notifyCh,op)
}
}
}
}()
return sm
}
有疑问加站长微信联系(非本文作者)