手撸golang etcd raft协议之9,10
缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
Raft算法把问题分解成了四个子问题:
1. 领袖选举(leader election)、
2. 日志复制(log replication)、
3. 安全性(safety)
4. 成员关系变化(membership changes)
这几个子问题。
源码gitee地址:
https://gitee.com/ioly/learning.gooop
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 10)
- 添加put/get/del kv键值对的rpc接口
- 继续完善Leader状态的raft协议响应
设计
- rpc/IKVStoreRPC: kv操作的rpc接口
- store/IKVStore: kv操作的持久化接口
- stoer/ILogStore: 从IKVStore继承,以支持kv持久化
- lsm/IRaftState: 继承rpc.IKVStoreRPC接口,以支持kv操作
- lsm/tLeaderState: 初步实现Leader状态的raft协议处理,事件驱动的逻辑编排,读写分离的字段管理。
rpc/IKVStoreRPC.go
kv操作的rpc接口
package rpc
type IKVStoreRPC interface {
ExecuteKVCmd(cmd *KVCmd, ret *KVRet) error
}
type KVCmd struct {
OPCode KVOPCode
Key []byte
Content []byte
}
type KVOPCode int
const (
KVGet KVOPCode = iota
KVPut KVOPCode = iota
KVDel KVOPCode = iota
)
type KVRet struct {
Code KVRetCode
Key []byte
Content []byte
}
type KVRetCode int
const (
KVOk KVRetCode = iota
KVKeyNotFound KVRetCode = iota
KVInternalError KVRetCode = iota
)
store/IKVStore.go
kv操作的持久化接口
package store
type IKVStore interface {
Get(key []byte) (error, []byte)
Put(key []byte, content []byte) error
Del(key []byte) error
}
stoer/ILogStore.go
从IKVStore继承,以支持kv持久化
package store
import (
"learning/gooop/etcd/raft/model"
)
type ILogStore interface {
IKVStore
LastAppendedTerm() int64
LastAppendedIndex() int64
LastCommittedTerm() int64
LastCommittedIndex() int64
Append(entry *model.LogEntry) error
Commit(index int64) error
GetLog(index int64) (error, *model.LogEntry)
}
lsm/IRaftState.go
继承rpc.IKVStoreRPC接口,以支持kv操作
package lsm
import (
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
)
type IRaftState interface {
rpc.IRaftRPC
rpc.IKVStoreRPC
Role() roles.RaftRole
Start()
}
lsm/tLeaderState.go
初步实现Leader状态的raft协议处理,事件驱动的逻辑编排,读写分离的字段管理。
package lsm
import (
"errors"
"learning/gooop/etcd/raft/config"
"learning/gooop/etcd/raft/model"
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/store"
"learning/gooop/etcd/raft/timeout"
"sync"
"time"
)
// tLeaderState presents a leader node
type tLeaderState struct {
tEventDrivenModel
context iRaftStateContext
mInitOnce sync.Once
mStartOnce sync.Once
// update: leInit / leLeaderHeartbeat
mTerm int64
// update: leInit / leDisposing
mDisposedFlag bool
// update: leVoteToCandidate
mVotedTerm int64
mVotedCandidateID string
mVotedTimestamp int64
}
// trigger: init()
// args: empty
const leInit = "leader.init"
// trigger: Start()
// args: empty
const leStart = "leader.Start"
// trigger: whenNewLeaderAnnouncedThenSwitchToFollower
// args: empty
const leDiposing = "leader.Disposing"
// trigger : Heartbeat() / AppendLog()
// args: term int64
const leNewLeaderAnnounced = "leader.NewLeaderAnnounced"
// trigger: RequestVote()
// args: *rpc.RequestVoteCmd
const leBeforeRequestVote = "leader.BeforeRequestVote"
// trigger:
// args: *rpc.RequestVoteCmd
const leVoteToCandidate = "leader.VoteToCandidate"
// trigger: handleHeartbeat()
// args: term int64
const leHeartbeatRejected = "leader.HeartbeatRejected"
func newLeaderState(ctx iRaftStateContext, term int64) IRaftState {
it := new(tLeaderState)
it.init(ctx, term)
return it
}
func (me *tLeaderState) init(ctx iRaftStateContext, term int64) {
me.mInitOnce.Do(func() {
me.context = ctx
me.mTerm = term
me.initEventHandlers()
me.raise(leInit)
})
}
func (me *tLeaderState) initEventHandlers() {
// write only logic
me.hookEventsForDisposedFlag()
me.hookEventsForVotedTerm()
// read only logic
me.hook(leStart,
me.whenStartThenBeginHeartbeatToOthers)
me.hook(leNewLeaderAnnounced,
me.whenNewLeaderAnnouncedThenSwitchToFollower)
me.hook(leHeartbeatRejected,
me.whenHeartbeatRejectedThenSwitchToFollower)
}
func (me *tLeaderState) hookEventsForDisposedFlag() {
me.hook(leInit, func(e string, args ...interface{}) {
me.mDisposedFlag = false
})
me.hook(leDiposing, func(e string, args ...interface{}) {
me.mDisposedFlag = true
})
}
func (me *tLeaderState) hookEventsForVotedTerm() {
me.hook(leBeforeRequestVote, func(e string, args ...interface{}) {
// check last vote timeout
if me.mVotedTerm == 0 {
return
}
if time.Duration(time.Now().UnixNano() - me.mVotedTimestamp)*time.Nanosecond >= timeout.ElectionTimeout {
me.mVotedTerm = 0
me.mVotedTimestamp = 0
me.mVotedCandidateID = ""
}
})
me.hook(leVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
cmd := args[0].(*rpc.RequestVoteCmd)
me.mVotedTerm = cmd.Term
me.mVotedCandidateID = cmd.CandidateID
me.mVotedTimestamp = time.Now().UnixNano()
})
}
func (me *tLeaderState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
// check term
if cmd.Term <= me.mTerm {
ret.Code = rpc.HBTermMismatch
return nil
}
// new leader
me.raise(leNewLeaderAnnounced, cmd.Term)
// return ok
ret.Code = rpc.HBOk
return nil
}
func (me *tLeaderState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
// check term
if cmd.Term <= me.mTerm {
ret.Code = rpc.ALTermMismatch
return nil
}
// new leader
me.raise(leNewLeaderAnnounced, cmd.Term)
// return ok
ret.Code = rpc.ALInternalError
return nil
}
func (me *tLeaderState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
// just ignore
ret.Code = rpc.CLInternalError
return nil
}
func (me *tLeaderState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
me.raise(leBeforeRequestVote, cmd)
// check voted term
if cmd.Term < me.mVotedTerm {
ret.Code = rpc.RVTermMismatch
return nil
}
if cmd.Term == me.mVotedTerm {
if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID {
// already vote another
ret.Code = rpc.RVVotedAnother
return nil
} else {
// already voted
ret.Code = rpc.RVOk
return nil
}
}
if cmd.Term > me.mVotedTerm {
// new term, check log
if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() {
// good log
me.raise(leVoteToCandidate, cmd)
ret.Code = rpc.RVOk
} else {
// bad log
ret.Code = rpc.RVLogMismatch
}
return nil
}
// should not reach here
ret.Code = rpc.RVTermMismatch
return nil
}
func (me *tLeaderState) Role() roles.RaftRole {
return roles.Leader
}
func (me *tLeaderState) Start() {
me.mStartOnce.Do(func() {
me.raise(leStart)
})
}
func (me *tLeaderState) whenStartThenBeginHeartbeatToOthers(_ string, _ ...interface{}) {
go func() {
for !me.mDisposedFlag {
_ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error {
return me.handleHeartbeat(client)
})
time.Sleep(timeout.HeartbeatInterval)
}
}()
}
func (me *tLeaderState) boardcast(action func(config.IRaftNodeConfig, rpc.IRaftRPC) error) error {
for _,it := range me.context.Config().Nodes() {
if it.ID() == me.context.Config().ID() {
continue
}
e := me.context.RaftClientService().Using(it.ID(), func(client rpc.IRaftRPC) error {
return action(it, client)
})
if e != nil {
return e
}
}
return nil
}
func (me *tLeaderState) handleHeartbeat(client rpc.IRaftRPC) error {
cmd := new(rpc.HeartbeatCmd)
cmd.Term = me.mTerm
cmd.LeaderID = me.context.Config().ID()
ret := new(rpc.HeartbeatRet)
e := client.Heartbeat(cmd, ret)
if e != nil {
return e
}
switch ret.Code {
case rpc.HBTermMismatch:
me.raise(leHeartbeatRejected, ret.Term)
break
}
return nil
}
func (me *tLeaderState) whenNewLeaderAnnouncedThenSwitchToFollower(_ string, args ...interface{}) {
me.raise(leDiposing)
term := args[0].(int64)
me.context.HandleStateChanged(newFollowerState(me.context, term))
}
func (me *tLeaderState) whenHeartbeatRejectedThenSwitchToFollower(_ string, args ...interface{}) {
me.raise(leDiposing)
term := args[0].(int64)
me.context.HandleStateChanged(newFollowerState(me.context, term))
}
func (me *tLeaderState) ExecuteKVCmd(cmd *rpc.KVCmd, ret *rpc.KVRet) error {
switch cmd.OPCode {
case rpc.KVGet:
return me.handleKVGet(cmd, ret)
case rpc.KVPut:
return me.handleKVPut(cmd, ret)
case rpc.KVDel:
return me.handleKVDel(cmd, ret)
}
return nil
}
func (me *tLeaderState) handleKVGet(cmd *rpc.KVCmd, ret *rpc.KVRet) error {
e, v := me.context.Store().Get(cmd.Key)
if e != nil {
ret.Code = rpc.KVInternalError
return e
}
ret.Code = rpc.KVOk
ret.Content = v
return nil
}
func (me *tLeaderState) handleKVPut(cmd *rpc.KVCmd, ret *rpc.KVRet) error {
kvcmd := new(store.PutCmd)
kvcmd.Key = cmd.Key
kvcmd.Value = cmd.Content
// create/append/commit log
e := me.broadcastKVCmd(kvcmd, ret)
if e != nil {
return e
}
// apply cmd
return me.context.Store().Put(cmd.Key, cmd.Content)
}
func (me *tLeaderState) handleKVDel(cmd *rpc.KVCmd, ret *rpc.KVRet) error {
kvcmd := new(store.DelCmd)
kvcmd.Key = cmd.Key
// create/append/commit log
e := me.broadcastKVCmd(kvcmd, ret)
if e != nil {
return e
}
// apply cmd
return me.context.Store().Put(cmd.Key, cmd.Content)
}
func (me *tLeaderState) broadcastKVCmd(cmd store.IKVCmd, ret *rpc.KVRet) error {
// create log
st := me.context.Store()
log := new(model.LogEntry)
log.Term = me.mTerm
log.Index = st.LastCommittedIndex() + 1
log.PrevTerm = st.LastCommittedTerm()
log.PrevIndex = st.LastCommittedIndex()
log.Command = cmd.Marshal()
// append log
e := st.Append(log)
if e != nil {
ret.Code = rpc.KVInternalError
return e
}
// ask other nodes to append log
alcmd := new(rpc.AppendLogCmd)
alcmd.Term = me.mTerm
alcmd.LeaderID = me.context.Config().ID()
alcmd.Entry = log
sumOk := []int{ 0 }
_ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error {
alret := new(rpc.AppendLogRet)
e := client.AppendLog(alcmd, alret)
if e != nil {
return e
}
switch alret.Code {
case rpc.ALOk:
sumOk[0]++
break
case rpc.ALTermMismatch:
// todo: fixme
break
case rpc.ALIndexMismatch:
// todo: fixme
break
}
return nil
})
// wait for most nodes
if sumOk[0] >= len(me.context.Config().Nodes()) / 2 {
// commit log
clcmd := new(rpc.CommitLogCmd)
clcmd.LeaderID = me.context.Config().ID()
clcmd.Term = me.mTerm
clcmd.Index = log.Index
_ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error {
ret := new(rpc.CommitLogRet)
e := client.CommitLog(clcmd, ret)
if e != nil {
return e
}
switch ret.Code {
case rpc.CLInternalError:
// todo: fixme
break
case rpc.CLLogNotFound:
// todo: fixme
break
case rpc.CLOk:
return nil
}
return nil
})
// ok
return nil
} else {
return gErrorCannotReachAgreement
}
}
var gErrorCannotReachAgreement = errors.New("cannot reach agreement")
(未完待续)
有疑问加站长微信联系(非本文作者)