手撸golang etcd raft协议之7

老罗话编程 · · 330 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

手撸golang etcd raft协议之7

缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?

Raft算法把问题分解成了四个子问题:
1. 领袖选举(leader election)、
2. 日志复制(log replication)、
3. 安全性(safety)
4. 成员关系变化(membership changes)
这几个子问题。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 7)

  • 实现各raft节点之间的rpc通讯
    • 定义IRaftClientService服务,管理所有节点的tcp长连接
    • 定义IRaftClient接口,封装节点间的rpc调用
      • 基于状态模式,区分已连接状态和已断开状态
      • 基于事件驱动的逻辑编排
      • 基于读写分离的字段管理

设计

  • model/IEventDirvenModel: 事件驱动的逻辑编排基类
  • IRaftClientService:管理所有的节点间rpc连接
  • IRaftClient:管理当前节点与某个节点间的rpc连接
  • iClientState:基于状态模式的rpc连接状态接口
  • iStateContext:状态模式下的连接状态上下文接口
  • tRaftClient:IRaftClient接口的具体实现,并实现iStateContext接口。
  • tConnectedState: 管理已连接状态的rpc连接
    • 定时Ping以检测连接状态
    • 基于事件驱动的逻辑编排
    • 基于读写分离的字段管理
  • tBrokenState:管理已断开状态的rpc连接
    • 定时Dial以尝试重连接
    • 基于事件驱动的逻辑编排
    • 基于读写分离的字段管理

model/IEventDirvenModel.go

事件驱动的逻辑编排基类

package model

type TEventHandleFunc func(e string, args ...interface{})

type IEventDrivenModel interface {
    hook(e string, handleFunc TEventHandleFunc)
    raise(e string, args ...interface{})
}

type TEventDrivenModel struct {
    items map[string][]TEventHandleFunc
}

func (me *TEventDrivenModel) Hook(e string, handler TEventHandleFunc) {
    arr, ok := me.items[e]
    if ok {
        me.items[e] = append(arr, handler)
    } else {
        me.items[e] = []TEventHandleFunc{handler}
    }
}

func (me *TEventDrivenModel) Raise(e string, args ...interface{}) {
    if handlers, ok := me.items[e]; ok {
        for _, it := range handlers {
            it(e, args...)
        }
    }
}

IRaftClientService.go

管理所有的节点间rpc连接

package client

import (
    "learning/gooop/etcd/raft/config"
    "learning/gooop/etcd/raft/rpc"
    netrpc "net/rpc"
    "sync"
)

type tRaftClientService struct {
    cfg config.IRaftConfig
    rwmutex *sync.RWMutex
    clients map[string]IRaftClient
}

func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {
    it := new(tRaftClientService)
    it.init(cfg)
    return it
}


func (me *tRaftClientService) init(cfg config.IRaftConfig) {
    me.cfg = cfg
    me.rwmutex = new(sync.RWMutex)
    me.clients = make(map[string]IRaftClient)
}


func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {
    // check client exists?
    me.rwmutex.RLock()
    it,ok := me.clients[peerID]
    if ok {
        return action(it)
    }

    var nodeCfg config.IRaftNodeConfig
    for _,it := range me.cfg.Nodes() {
        if it.ID() == peerID {
            nodeCfg = it
            break
        }
    }
    me.rwmutex.RUnlock()

    // dial to peer
    conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())
    if err != nil {
        return err
    }

    // to create new client
    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    // recheck client
    _,ok = me.clients[peerID]
    if ok {
        defer conn.Close()
        return action(it)
    }

    // create new client
    return action(newRaftClient(nodeCfg, conn))
}

IRaftClient.go

管理当前节点与某个节点间的rpc连接

package client

import "learning/gooop/etcd/raft/rpc"

type IRaftClient interface {
    rpc.IRaftRPC
    iStateContext

    Ping(cmd *PingCmd, ret *PingRet) error
}


type PingCmd struct {
    SenderID string
    Timestamp int64
}

type PingRet struct {
    SenderID string
    Timestamp int64
}

iClientState.go

基于状态模式的rpc连接状态接口

package client

import "learning/gooop/etcd/raft/rpc"

type iClientState interface {
    rpc.IRaftRPC

    Start()
    Ping(cmd *PingCmd, ret *PingRet) error
}

iStateContext.go

状态模式下的连接状态上下文接口

package client

import (
    "learning/gooop/etcd/raft/config"
    "net/rpc"
)

type iStateContext interface {
    Config() config.IRaftNodeConfig
    GetConn() *rpc.Client
    SetConn(client *rpc.Client)
    HandleStateChanged(state iClientState)
}

tRaftClient.go

IRaftClient接口的具体实现,并实现iStateContext接口。

package client

import (
    "learning/gooop/etcd/raft/config"
    "net/rpc"
    rrpc "learning/gooop/etcd/raft/rpc"
)

type tRaftClient struct {
    cfg config.IRaftNodeConfig
    conn *rpc.Client
    state iClientState
}

func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {
    it := new(tRaftClient)
    it.init(cfg, conn)
    return it
}

func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {
    me.cfg = cfg
    me.conn = conn
}

func (me *tRaftClient) Config() config.IRaftNodeConfig {
    return me.cfg
}

func (me *tRaftClient) GetConn() *rpc.Client {
    return me.conn
}

func (me *tRaftClient) SetConn(conn *rpc.Client) {
    me.conn = conn
}

func (me *tRaftClient) HandleStateChanged(state iClientState) {
    me.state = state
    state.Start()
}

func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
    return me.state.Heartbeat(cmd, ret)
}

func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
    return me.state.AppendLog(cmd, ret)
}

func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
    return me.state.CommitLog(cmd, ret)
}

func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
    return me.state.RequestVote(cmd, ret)
}

func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {
    return me.state.Ping(cmd, ret)
}

tConnectedState.go

管理已连接状态的rpc连接

  • 定时Ping以检测连接状态
  • 基于事件驱动的逻辑编排
  • 基于读写分离的字段管理
package client

import (
    "learning/gooop/etcd/raft/model"
    "learning/gooop/etcd/raft/rpc"
    "learning/gooop/etcd/raft/timeout"
    "sync"
    "time"
)

type tConnectedState struct {
    model.TEventDrivenModel
    context iStateContext

    mInitOnce sync.Once
    mStartOnce sync.Once

    // update: ceInit, ceDisposing
    mDisposedFlag bool
}

// trigger: init()
// args: empty
const ceInit = "connected.init"

// trigger: Start()
// args: empty
const ceStart = "connected.Start"

// trigger:
// args: empty
const ceDisposing = "connected.Disposing"

// trigger: whenStartThenBeginPing()
// args: empty
const cePingFailed = "connected.PingFailed"


func newConnectedState(ctx iStateContext) iClientState {
    it := new(tConnectedState)
    it.init(ctx)
    return it
}

func (me *tConnectedState) init(ctx iStateContext) {
    me.mInitOnce.Do(func() {
        me.context = ctx
        me.initEventHandlers()
        me.Raise(ceInit)
    })
}

func (me *tConnectedState) initEventHandlers() {
    // write only logic
    me.hookEventsForDisposedFlag()

    // read only logic
    me.Hook(ceStart,
        me.whenStartThenBeginPing)

    me.Hook(cePingFailed,
        me.whenPingFailedThenSwitchToBrokenState)

    me.Hook(ceDisposing,
        me.whenDisposingThenCloseConn)
}

func (me *tConnectedState) Start() {
    me.mStartOnce.Do(func() {
        me.Raise(ceStart)
    })
}

func (me *tConnectedState) hookEventsForDisposedFlag() {
    me.Hook(ceInit, func(e string, args ...interface{}) {
        me.mDisposedFlag = false
    })

    me.Hook(ceDisposing, func(e string, args ...interface{}) {
        me.mDisposedFlag = true
    })
}

func (me *tConnectedState) whenStartThenBeginPing(_ string, _ ...interface{}) {
    go func() {
        cmd := &PingCmd{
            SenderID: me.context.Config().ID(),
            Timestamp: time.Now().UnixNano(),
        }
        ret := &PingRet{}
        for range time.Tick(timeout.ClientPingInterval) {
            if me.mDisposedFlag {
                return
            }

            cmd.Timestamp = time.Now().UnixNano()
            err := me.Ping(cmd, ret)
            if err != nil {
                me.Raise(cePingFailed)
            }
        }
    }()
}

func (me *tConnectedState) whenPingFailedThenSwitchToBrokenState(_ string, _ ...interface{}) {
    me.Raise(ceDisposing)
    me.context.HandleStateChanged(newBrokenState(me.context))
}

func (me *tConnectedState) whenDisposingThenCloseConn(_ string, _ ...interface{}) {
    it := me.context.GetConn()
    if it != nil {
        it.Close()
    }

    me.context.SetConn(nil)
}


func (me *tConnectedState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
    return me.context.GetConn().Call("TRaftRPCServer.Heartbeat", cmd, ret)
}

func (me *tConnectedState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
    return me.context.GetConn().Call("TRaftRPCServer.AppendLog", cmd, ret)
}

func (me *tConnectedState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
    return me.context.GetConn().Call("TRaftRPCServer.CommitLog", cmd, ret)
}

func (me *tConnectedState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
    return me.context.GetConn().Call("TRaftRPCServer.RequestVote", cmd, ret)
}

func (me *tConnectedState) Ping(cmd *PingCmd, ret *PingRet) error {
    return me.context.GetConn().Call("TRaftRPCServer.Ping", cmd, ret)
}

tBrokenState.go

管理已断开状态的rpc连接

  • 定时Dial以尝试重连接
  • 基于事件驱动的逻辑编排
  • 基于读写分离的字段管理
package client

import (
    "errors"
    "learning/gooop/etcd/raft/model"
    rrpc "learning/gooop/etcd/raft/rpc"
    "learning/gooop/etcd/raft/timeout"
    "sync"
    "net/rpc"
    "time"
)

type tBrokenState struct {
    model.TEventDrivenModel
    context iStateContext

    mInitOnce sync.Once
    mStartOnce sync.Once

    mDisposedFlag bool
}

// trigger : init()
// args: empty
const beInit = "broken.init"

// trigger: Start()
// args: empty
const beStart = "broken.Start"

// trigger: whenStartThenBeginDial
// args: *rpc.Client
const beDialOK  = "broken.DialOK"

// trigger: whenDialOKThenSwitchToConnectedState
// args: empty
const beDisposing = "broken.Disposing"

func newBrokenState(ctx iStateContext) iClientState {
    it := new(tBrokenState)
    it.init(ctx)
    return it
}

func (me *tBrokenState) init(ctx iStateContext) {
    me.mInitOnce.Do(func() {
        me.context = ctx
        me.initEventHandlers()
        me.Raise(beInit)
    })
}

func (me *tBrokenState) initEventHandlers() {
    // write only logic
    me.hookEventsForDisposedFlag()

    // read only logic
    me.Hook(beStart,
        me.whenStartThenBeginDial)

    me.Hook(beDialOK,
        me.whenDialOKThenSetConn)

    me.Hook(beDialOK,
        me.whenDialOKThenSwitchToConnectedState)
}


func (me *tBrokenState) hookEventsForDisposedFlag() {
    me.Hook(beInit, func(e string, args ...interface{}) {
        me.mDisposedFlag = false
    })

    me.Hook(beDisposing, func(e string, args ...interface{}) {
        me.mDisposedFlag = true
    })
}

func (me *tBrokenState) Start() {
    me.mStartOnce.Do(func() {
        me.Raise(beStart)
    })
}


func (me *tBrokenState) whenStartThenBeginDial(_ string, _ ...interface{}) {
    go func() {
        for !me.mDisposedFlag {
            conn, err := rpc.Dial("tcp", me.context.Config().Endpoint())
            if err == nil {
                me.Raise(beDialOK, conn)
                break

            } else {
                time.Sleep(timeout.ClientRedialInterval)
            }
        }
    }()
}


func (me *tBrokenState) whenDialOKThenSetConn(_ string, args ...interface{}) {
    conn := args[0].(*rpc.Client)
    me.context.SetConn(conn)
}


func (me *tBrokenState) whenDialOKThenSwitchToConnectedState(_ string, _ ...interface{}) {
    me.Raise(beDisposing)
    me.context.HandleStateChanged(newConnectedState(me.context))
}

func (me *tBrokenState) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
    return gErrorConnectionBroken
}

func (me *tBrokenState) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
    return gErrorConnectionBroken
}

func (me *tBrokenState) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
    return gErrorConnectionBroken
}

func (me *tBrokenState) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
    return gErrorConnectionBroken
}

func (me *tBrokenState) Ping(cmd *PingCmd, ret *PingRet) error {
    return gErrorConnectionBroken
}

var gErrorConnectionBroken = errors.New("peer connection broken")

(未完待续)


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:老罗话编程

查看原文:手撸golang etcd raft协议之7

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

330 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传