手撸golang etcd raft协议之8
缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
Raft算法把问题分解成了四个子问题:
1. 领袖选举(leader election)、
2. 日志复制(log replication)、
3. 安全性(safety)
4. 成员关系变化(membership changes)
这几个子问题。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 8)
- 简化rpc连接管理器tRaftClientService的实现
- 剥离IRaftLSM的内部支持接口到iRaftStateContext接口
- 完成Candidate状态的处理逻辑
设计
- IRaftLSM:raft有限状态机接口
- iRaftStateContext:提供状态模式下的上下文支持
- tCandidateState:Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写分离的字段管理。
- tRaftClient:管理到指定raft节点的rpc连接
- tRaftClientService:管理当前节点到其他raft节点的rpc连接
IRaftLSM.go
raft有限状态机接口
package lsm
import (
"learning/gooop/etcd/raft/rpc"
)
// IRaftLSM raft有限状态自动机
type IRaftLSM interface {
rpc.IRaftRPC
iRaftStateContext
State() IRaftState
}
iRaftStateContext.go
提供状态模式下的上下文支持
package lsm
import (
"learning/gooop/etcd/raft/config"
"learning/gooop/etcd/raft/rpc/client"
"learning/gooop/etcd/raft/store"
)
type iRaftStateContext interface {
Config() config.IRaftConfig
Store() store.ILogStore
HandleStateChanged(state IRaftState)
RaftClientService() client.IRaftClientService
}
tCandidateState.go
Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写分离的字段管理。
package lsm
import (
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/timeout"
"sync"
"time"
)
// tCandidateState presents a candidate node
type tCandidateState struct {
tEventDrivenModel
context iRaftStateContext
mInitOnce sync.Once
mStartOnce sync.Once
// update: init / ceAskingForVote
mTerm int64
// update: ceInit / ceAskingForVote / ceVoteToCandidate
mVotedTerm int64
// update: ceInit / ceAskingForVote / ceVoteToCandidate
mVotedCandidateID string
// update: ceInit / ceAskingForVote / ceVoteToCandidate
mVotedTimestamp int64
// update: ceInit / ceAskingForVote / ceReceiveTicket / ceDisposing
mTicketCount map[string]bool
mTicketMutex *sync.Mutex
// update: ceInit / ceDisposing
mDisposedFlag bool
}
// trigger: init()
// args: empty
const ceInit = "candidate.init"
// trigger: Start()
// args: empty
const ceStart = "candidate.Start"
// trigger: whenAskingForVoteThenWatchElectionTimeout()
// args: empty
const ceElectionTimeout = "candidate.ElectionTimeout"
// trigger: Heartbeat() / AppendLog() / CommitLog()
// args: empty
const ceLeaderAnnounced = "candidate.LeaderAnnounced"
// trigger: RequestVote()
// args: *rpc.RequestVoteCmd
const ceVoteToCandidate = "candidate.VoteToCandidate"
// trigger: whenLeaderAnnouncedThenSwitchToFollower()
// args: empty
const ceDisposing = "candidate.Disposing"
// trigger: beginAskForVote()
// args: empty
const ceAskingForVote = "candidate.AskingForVote"
// trigger: handleRequestVoteOK()
// args: empty
const ceReceiveTicket = "candidate.ReceiveTicket"
// trigger: whenReceiveTicketThenCheckTicketCount
// args: empty
const ceWinningTheVote = "candidate.ceWinningTheVote"
func newCandidateState(ctx iRaftStateContext, term int64) IRaftState {
it := new(tCandidateState)
it.init(ctx, term)
return it
}
func (me *tCandidateState) init(ctx iRaftStateContext, term int64) {
me.mInitOnce.Do(func() {
me.context = ctx
me.mTerm = term
me.initEventHandlers()
me.raise(ceInit)
})
}
func (me *tCandidateState) initEventHandlers() {
// write only logic
me.hookEventsForTerm()
me.hookEventsForVotedTerm()
me.hookEventsForVotedCandidateID()
me.hookEventsForVotedTimestamp()
me.hookEventsForTicketCount()
me.hookEventsForDisposedFlag()
// read only logic
me.hook(ceStart,
me.whenStartThenAskForVote)
me.hook(ceAskingForVote,
me.whenAskingForVoteThenWatchElectionTimeout)
me.hook(ceReceiveTicket,
me.whenReceiveTicketThenCheckTicketCount)
me.hook(ceElectionTimeout,
me.whenElectionTimeoutThenAskForVoteAgain)
me.hook(ceWinningTheVote,
me.whenWinningTheVoteThenSwitchToLeader)
me.hook(ceLeaderAnnounced,
me.whenLeaderAnnouncedThenSwitchToFollower)
}
// hookEventsForTerm maintains field: mTerm
// update: ceElectionTimeout
func (me *tCandidateState) hookEventsForTerm() {
me.hook(ceAskingForVote, func(e string, args ...interface{}) {
me.mTerm++
})
}
// hookEventsForVotedTerm maintains field: mVotedTerm
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
func (me *tCandidateState) hookEventsForVotedTerm() {
me.hook(ceInit, func(e string, args ...interface{}) {
// initially, vote to itself
me.mVotedTerm = me.mTerm
})
me.hook(ceAskingForVote, func(e string, args ...interface{}) {
// when timeout, reset to itself
me.mVotedTerm = me.mTerm
})
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
cmd := args[0].(*rpc.RequestVoteCmd)
me.mVotedTerm = cmd.Term
})
}
// hookEventsForVotedCandidateID maintains field: mVotedCandidateID
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
func (me *tCandidateState) hookEventsForVotedCandidateID() {
me.hook(ceInit, func(e string, args ...interface{}) {
// initially, vote to itself
me.mVotedCandidateID = me.context.Config().ID()
})
me.hook(ceAskingForVote, func(e string, args ...interface{}) {
me.mVotedCandidateID = me.context.Config().ID()
})
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
cmd := args[0].(*rpc.RequestVoteCmd)
me.mVotedCandidateID = cmd.CandidateID
})
}
func (me *tCandidateState) hookEventsForVotedTimestamp() {
me.hook(ceInit, func(e string, args ...interface{}) {
// initially, vote to itself
me.mVotedTimestamp = time.Now().UnixNano()
})
me.hook(ceAskingForVote, func(e string, args ...interface{}) {
me.mVotedTimestamp = time.Now().UnixNano()
})
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
me.mVotedTimestamp = time.Now().UnixNano()
})
}
func (me *tCandidateState) hookEventsForTicketCount() {
me.hook(ceInit, func(e string, args ...interface{}) {
me.mTicketMutex = new(sync.Mutex)
me.mTicketCount = make(map[string]bool, 0)
me.mTicketCount[me.context.Config().ID()] = true
})
me.hook(ceAskingForVote, func(e string, args ...interface{}) {
me.mTicketMutex.Lock()
defer me.mTicketMutex.Unlock()
me.mTicketCount = make(map[string]bool, 0)
me.mTicketCount[me.context.Config().ID()] = true
})
me.hook(ceReceiveTicket, func(e string, args ...interface{}) {
peerID := args[0].(string)
me.mTicketMutex.Lock()
defer me.mTicketMutex.Unlock()
me.mTicketCount[peerID] = true
})
me.hook(ceDisposing, func(e string, args ...interface{}) {
me.mTicketMutex.Lock()
defer me.mTicketMutex.Unlock()
me.mTicketCount = make(map[string]bool, 0)
})
}
func (me *tCandidateState) hookEventsForDisposedFlag() {
me.hook(ceInit, func(e string, args ...interface{}) {
me.mDisposedFlag = false
})
me.hook(ceDisposing, func(e string, args ...interface{}) {
me.mDisposedFlag = true
})
}
func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
// check term
if cmd.Term <= me.mTerm {
// bad leader
ret.Code = rpc.HBTermMismatch
return nil
}
// new leader
me.raise(ceLeaderAnnounced)
// return ok
ret.Code = rpc.HBOk
return nil
}
func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
// check term
if cmd.Term <= me.mTerm {
// bad leader
ret.Code = rpc.ALTermMismatch
return nil
}
// new leader
me.raise(ceLeaderAnnounced)
// ignore and return
ret.Code = rpc.ALInternalError
return nil
}
func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
// ignore and return
ret.Code = rpc.CLInternalError
return nil
}
func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
// check voted term
if cmd.Term < me.mVotedTerm {
ret.Code = rpc.RVTermMismatch
return nil
}
if cmd.Term == me.mVotedTerm {
if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID {
// already vote another
ret.Code = rpc.RVVotedAnother
return nil
} else {
// already voted
ret.Code = rpc.RVOk
return nil
}
}
if cmd.Term > me.mVotedTerm {
// new term, check log
if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() {
// good log
me.raise(ceVoteToCandidate, cmd)
ret.Code = rpc.RVOk
} else {
// bad log
ret.Code = rpc.RVLogMismatch
}
return nil
}
// should not reaches here
ret.Code = rpc.RVTermMismatch
return nil
}
func (me *tCandidateState) Role() roles.RaftRole {
return roles.Candidate
}
func (me *tCandidateState) Start() {
me.mStartOnce.Do(func() {
me.raise(feStart)
})
}
func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {
me.raise(ceDisposing)
me.context.HandleStateChanged(newFollowerState(me.context))
}
func (me *tCandidateState) whenElectionTimeoutThenAskForVoteAgain(_ string, _ ...interface{}) {
me.beginAskForVote()
}
func (me *tCandidateState) whenStartThenAskForVote(_ string, _ ...interface{}) {
me.beginAskForVote()
}
func (me *tCandidateState) beginAskForVote() {
// raise ceAskingForVote
me.raise(ceAskingForVote)
// for each node, call node.RequestVote
cmd := new(rpc.RequestVoteCmd)
cmd.CandidateID = me.context.Config().ID()
cmd.Term = me.mTerm
store := me.context.Store()
cmd.LastLogIndex = store.LastCommittedIndex()
cmd.LastLogTerm = store.LastCommittedTerm()
term := me.mTerm
for _,node := range me.context.Config().Nodes() {
if node.ID() == me.context.Config().ID() {
continue
}
peerID := node.ID()
go func() {
ret := new(rpc.RequestVoteRet)
err := me.context.RaftClientService().Using(peerID, func(client rpc.IRaftRPC) error {
return client.RequestVote(cmd, ret)
})
if err == nil && ret.Code == rpc.RVOk {
me.handleRequestVoteOK(peerID, term)
}
}()
}
}
func (me *tCandidateState) whenAskingForVoteThenWatchElectionTimeout(_ string, _ ...interface{}) {
term := me.mTerm
go func() {
time.Sleep(timeout.RandElectionTimeout())
if me.mDisposedFlag || me.mTerm != term {
return
}
tc := me.getTicketCount()
if tc < len(me.context.Config().Nodes())/2 + 1 {
me.raise(ceElectionTimeout)
}
}()
}
func (me *tCandidateState) handleRequestVoteOK(peerID string, term int64) {
if me.mDisposedFlag || me.mTerm != term {
return
}
me.raise(ceReceiveTicket, peerID)
}
func (me *tCandidateState) whenReceiveTicketThenCheckTicketCount(_ string, _ ...interface{}) {
tc := me.getTicketCount()
if tc >= len(me.context.Config().Nodes())/2 + 1 {
// win the vote
me.raise(ceWinningTheVote)
}
}
func (me *tCandidateState) getTicketCount() int {
me.mTicketMutex.Lock()
defer me.mTicketMutex.Unlock()
return len(me.mTicketCount)
}
func (me *tCandidateState) whenWinningTheVoteThenSwitchToLeader(_ string, _ ...interface{}) {
me.raise(ceDisposing)
me.context.HandleStateChanged(newLeaderState(me.context, me.mTerm))
}
tRaftClient.go
管理到指定raft节点的rpc连接
package client
import (
"learning/gooop/etcd/raft/config"
rrpc "learning/gooop/etcd/raft/rpc"
"net/rpc"
)
type tRaftClient struct {
cfg config.IRaftNodeConfig
conn *rpc.Client
state iClientState
}
func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {
it := new(tRaftClient)
it.init(cfg, conn)
return it
}
func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {
me.cfg = cfg
me.conn = conn
if conn == nil {
me.state = newBrokenState(me)
} else {
me.state = newConnectedState(me)
}
me.state.Start()
}
func (me *tRaftClient) Config() config.IRaftNodeConfig {
return me.cfg
}
func (me *tRaftClient) GetConn() *rpc.Client {
return me.conn
}
func (me *tRaftClient) SetConn(conn *rpc.Client) {
me.conn = conn
}
func (me *tRaftClient) HandleStateChanged(state iClientState) {
me.state = state
state.Start()
}
func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
return me.state.Heartbeat(cmd, ret)
}
func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
return me.state.AppendLog(cmd, ret)
}
func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
return me.state.CommitLog(cmd, ret)
}
func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
return me.state.RequestVote(cmd, ret)
}
func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {
return me.state.Ping(cmd, ret)
}
tRaftClientService.go
管理当前节点到其他raft节点的rpc连接
package client
import (
"errors"
"learning/gooop/etcd/raft/config"
"learning/gooop/etcd/raft/rpc"
netrpc "net/rpc"
)
type tRaftClientService struct {
cfg config.IRaftConfig
clients map[string]IRaftClient
}
func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {
it := new(tRaftClientService)
it.init(cfg)
return it
}
func (me *tRaftClientService) init(cfg config.IRaftConfig) {
me.cfg = cfg
me.clients = make(map[string]IRaftClient)
for _,nc := range me.cfg.Nodes() {
me.clients[nc.ID()] = me.createRaftClient(nc)
}
}
func (me *tRaftClientService) createRaftClient(nodeCfg config.IRaftNodeConfig) IRaftClient {
// dial to peer
conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())
if err != nil {
return newRaftClient(nodeCfg, nil)
} else {
return newRaftClient(nodeCfg, conn)
}
}
func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {
it, ok := me.clients[peerID]
if ok {
return action(it)
} else {
return gErrorUnknownRaftPeer
}
}
var gErrorUnknownRaftPeer = errors.New("unknown raft peer")
(未完待续)
有疑问加站长微信联系(非本文作者)