手撸golang etcd raft协议之7
缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
Raft算法把问题分解成了四个子问题:
1. 领袖选举(leader election)、
2. 日志复制(log replication)、
3. 安全性(safety)
4. 成员关系变化(membership changes)
这几个子问题。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 7)
- 实现各raft节点之间的rpc通讯
- 定义IRaftClientService服务,管理所有节点的tcp长连接
- 定义IRaftClient接口,封装节点间的rpc调用
- 基于状态模式,区分已连接状态和已断开状态
- 基于事件驱动的逻辑编排
- 基于读写分离的字段管理
设计
- model/IEventDirvenModel: 事件驱动的逻辑编排基类
- IRaftClientService:管理所有的节点间rpc连接
- IRaftClient:管理当前节点与某个节点间的rpc连接
- iClientState:基于状态模式的rpc连接状态接口
- iStateContext:状态模式下的连接状态上下文接口
- tRaftClient:IRaftClient接口的具体实现,并实现iStateContext接口。
- tConnectedState: 管理已连接状态的rpc连接
- 定时Ping以检测连接状态
- 基于事件驱动的逻辑编排
- 基于读写分离的字段管理
- tBrokenState:管理已断开状态的rpc连接
- 定时Dial以尝试重连接
- 基于事件驱动的逻辑编排
- 基于读写分离的字段管理
model/IEventDirvenModel.go
事件驱动的逻辑编排基类
package model
type TEventHandleFunc func(e string, args ...interface{})
type IEventDrivenModel interface {
hook(e string, handleFunc TEventHandleFunc)
raise(e string, args ...interface{})
}
type TEventDrivenModel struct {
items map[string][]TEventHandleFunc
}
func (me *TEventDrivenModel) Hook(e string, handler TEventHandleFunc) {
arr, ok := me.items[e]
if ok {
me.items[e] = append(arr, handler)
} else {
me.items[e] = []TEventHandleFunc{handler}
}
}
func (me *TEventDrivenModel) Raise(e string, args ...interface{}) {
if handlers, ok := me.items[e]; ok {
for _, it := range handlers {
it(e, args...)
}
}
}
IRaftClientService.go
管理所有的节点间rpc连接
package client
import (
"learning/gooop/etcd/raft/config"
"learning/gooop/etcd/raft/rpc"
netrpc "net/rpc"
"sync"
)
type tRaftClientService struct {
cfg config.IRaftConfig
rwmutex *sync.RWMutex
clients map[string]IRaftClient
}
func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {
it := new(tRaftClientService)
it.init(cfg)
return it
}
func (me *tRaftClientService) init(cfg config.IRaftConfig) {
me.cfg = cfg
me.rwmutex = new(sync.RWMutex)
me.clients = make(map[string]IRaftClient)
}
func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {
// check client exists?
me.rwmutex.RLock()
it,ok := me.clients[peerID]
if ok {
return action(it)
}
var nodeCfg config.IRaftNodeConfig
for _,it := range me.cfg.Nodes() {
if it.ID() == peerID {
nodeCfg = it
break
}
}
me.rwmutex.RUnlock()
// dial to peer
conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())
if err != nil {
return err
}
// to create new client
me.rwmutex.Lock()
defer me.rwmutex.Unlock()
// recheck client
_,ok = me.clients[peerID]
if ok {
defer conn.Close()
return action(it)
}
// create new client
return action(newRaftClient(nodeCfg, conn))
}
IRaftClient.go
管理当前节点与某个节点间的rpc连接
package client
import "learning/gooop/etcd/raft/rpc"
type IRaftClient interface {
rpc.IRaftRPC
iStateContext
Ping(cmd *PingCmd, ret *PingRet) error
}
type PingCmd struct {
SenderID string
Timestamp int64
}
type PingRet struct {
SenderID string
Timestamp int64
}
iClientState.go
基于状态模式的rpc连接状态接口
package client
import "learning/gooop/etcd/raft/rpc"
type iClientState interface {
rpc.IRaftRPC
Start()
Ping(cmd *PingCmd, ret *PingRet) error
}
iStateContext.go
状态模式下的连接状态上下文接口
package client
import (
"learning/gooop/etcd/raft/config"
"net/rpc"
)
type iStateContext interface {
Config() config.IRaftNodeConfig
GetConn() *rpc.Client
SetConn(client *rpc.Client)
HandleStateChanged(state iClientState)
}
tRaftClient.go
IRaftClient接口的具体实现,并实现iStateContext接口。
package client
import (
"learning/gooop/etcd/raft/config"
"net/rpc"
rrpc "learning/gooop/etcd/raft/rpc"
)
type tRaftClient struct {
cfg config.IRaftNodeConfig
conn *rpc.Client
state iClientState
}
func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {
it := new(tRaftClient)
it.init(cfg, conn)
return it
}
func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {
me.cfg = cfg
me.conn = conn
}
func (me *tRaftClient) Config() config.IRaftNodeConfig {
return me.cfg
}
func (me *tRaftClient) GetConn() *rpc.Client {
return me.conn
}
func (me *tRaftClient) SetConn(conn *rpc.Client) {
me.conn = conn
}
func (me *tRaftClient) HandleStateChanged(state iClientState) {
me.state = state
state.Start()
}
func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
return me.state.Heartbeat(cmd, ret)
}
func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
return me.state.AppendLog(cmd, ret)
}
func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
return me.state.CommitLog(cmd, ret)
}
func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
return me.state.RequestVote(cmd, ret)
}
func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {
return me.state.Ping(cmd, ret)
}
tConnectedState.go
管理已连接状态的rpc连接
- 定时Ping以检测连接状态
- 基于事件驱动的逻辑编排
- 基于读写分离的字段管理
package client
import (
"learning/gooop/etcd/raft/model"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/timeout"
"sync"
"time"
)
type tConnectedState struct {
model.TEventDrivenModel
context iStateContext
mInitOnce sync.Once
mStartOnce sync.Once
// update: ceInit, ceDisposing
mDisposedFlag bool
}
// trigger: init()
// args: empty
const ceInit = "connected.init"
// trigger: Start()
// args: empty
const ceStart = "connected.Start"
// trigger:
// args: empty
const ceDisposing = "connected.Disposing"
// trigger: whenStartThenBeginPing()
// args: empty
const cePingFailed = "connected.PingFailed"
func newConnectedState(ctx iStateContext) iClientState {
it := new(tConnectedState)
it.init(ctx)
return it
}
func (me *tConnectedState) init(ctx iStateContext) {
me.mInitOnce.Do(func() {
me.context = ctx
me.initEventHandlers()
me.Raise(ceInit)
})
}
func (me *tConnectedState) initEventHandlers() {
// write only logic
me.hookEventsForDisposedFlag()
// read only logic
me.Hook(ceStart,
me.whenStartThenBeginPing)
me.Hook(cePingFailed,
me.whenPingFailedThenSwitchToBrokenState)
me.Hook(ceDisposing,
me.whenDisposingThenCloseConn)
}
func (me *tConnectedState) Start() {
me.mStartOnce.Do(func() {
me.Raise(ceStart)
})
}
func (me *tConnectedState) hookEventsForDisposedFlag() {
me.Hook(ceInit, func(e string, args ...interface{}) {
me.mDisposedFlag = false
})
me.Hook(ceDisposing, func(e string, args ...interface{}) {
me.mDisposedFlag = true
})
}
func (me *tConnectedState) whenStartThenBeginPing(_ string, _ ...interface{}) {
go func() {
cmd := &PingCmd{
SenderID: me.context.Config().ID(),
Timestamp: time.Now().UnixNano(),
}
ret := &PingRet{}
for range time.Tick(timeout.ClientPingInterval) {
if me.mDisposedFlag {
return
}
cmd.Timestamp = time.Now().UnixNano()
err := me.Ping(cmd, ret)
if err != nil {
me.Raise(cePingFailed)
}
}
}()
}
func (me *tConnectedState) whenPingFailedThenSwitchToBrokenState(_ string, _ ...interface{}) {
me.Raise(ceDisposing)
me.context.HandleStateChanged(newBrokenState(me.context))
}
func (me *tConnectedState) whenDisposingThenCloseConn(_ string, _ ...interface{}) {
it := me.context.GetConn()
if it != nil {
it.Close()
}
me.context.SetConn(nil)
}
func (me *tConnectedState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
return me.context.GetConn().Call("TRaftRPCServer.Heartbeat", cmd, ret)
}
func (me *tConnectedState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
return me.context.GetConn().Call("TRaftRPCServer.AppendLog", cmd, ret)
}
func (me *tConnectedState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
return me.context.GetConn().Call("TRaftRPCServer.CommitLog", cmd, ret)
}
func (me *tConnectedState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
return me.context.GetConn().Call("TRaftRPCServer.RequestVote", cmd, ret)
}
func (me *tConnectedState) Ping(cmd *PingCmd, ret *PingRet) error {
return me.context.GetConn().Call("TRaftRPCServer.Ping", cmd, ret)
}
tBrokenState.go
管理已断开状态的rpc连接
- 定时Dial以尝试重连接
- 基于事件驱动的逻辑编排
- 基于读写分离的字段管理
package client
import (
"errors"
"learning/gooop/etcd/raft/model"
rrpc "learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/timeout"
"sync"
"net/rpc"
"time"
)
type tBrokenState struct {
model.TEventDrivenModel
context iStateContext
mInitOnce sync.Once
mStartOnce sync.Once
mDisposedFlag bool
}
// trigger : init()
// args: empty
const beInit = "broken.init"
// trigger: Start()
// args: empty
const beStart = "broken.Start"
// trigger: whenStartThenBeginDial
// args: *rpc.Client
const beDialOK = "broken.DialOK"
// trigger: whenDialOKThenSwitchToConnectedState
// args: empty
const beDisposing = "broken.Disposing"
func newBrokenState(ctx iStateContext) iClientState {
it := new(tBrokenState)
it.init(ctx)
return it
}
func (me *tBrokenState) init(ctx iStateContext) {
me.mInitOnce.Do(func() {
me.context = ctx
me.initEventHandlers()
me.Raise(beInit)
})
}
func (me *tBrokenState) initEventHandlers() {
// write only logic
me.hookEventsForDisposedFlag()
// read only logic
me.Hook(beStart,
me.whenStartThenBeginDial)
me.Hook(beDialOK,
me.whenDialOKThenSetConn)
me.Hook(beDialOK,
me.whenDialOKThenSwitchToConnectedState)
}
func (me *tBrokenState) hookEventsForDisposedFlag() {
me.Hook(beInit, func(e string, args ...interface{}) {
me.mDisposedFlag = false
})
me.Hook(beDisposing, func(e string, args ...interface{}) {
me.mDisposedFlag = true
})
}
func (me *tBrokenState) Start() {
me.mStartOnce.Do(func() {
me.Raise(beStart)
})
}
func (me *tBrokenState) whenStartThenBeginDial(_ string, _ ...interface{}) {
go func() {
for !me.mDisposedFlag {
conn, err := rpc.Dial("tcp", me.context.Config().Endpoint())
if err == nil {
me.Raise(beDialOK, conn)
break
} else {
time.Sleep(timeout.ClientRedialInterval)
}
}
}()
}
func (me *tBrokenState) whenDialOKThenSetConn(_ string, args ...interface{}) {
conn := args[0].(*rpc.Client)
me.context.SetConn(conn)
}
func (me *tBrokenState) whenDialOKThenSwitchToConnectedState(_ string, _ ...interface{}) {
me.Raise(beDisposing)
me.context.HandleStateChanged(newConnectedState(me.context))
}
func (me *tBrokenState) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
return gErrorConnectionBroken
}
func (me *tBrokenState) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
return gErrorConnectionBroken
}
func (me *tBrokenState) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
return gErrorConnectionBroken
}
func (me *tBrokenState) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
return gErrorConnectionBroken
}
func (me *tBrokenState) Ping(cmd *PingCmd, ret *PingRet) error {
return gErrorConnectionBroken
}
var gErrorConnectionBroken = errors.New("peer connection broken")
(未完待续)
有疑问加站长微信联系(非本文作者)