手撸golang etcd raft协议之4
缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
Raft算法把问题分解成了领袖选举(leader election)、
日志复制(log replication)、安全性(safety)
和成员关系变化(membership changes)这几个子问题。
Raft算法的基本操作只需2种RPC即可完成。
RequestVote RPC是在选举过程中通过旧的Leader触发的,
AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 4)
- 使用boltdb存储操作日志和kv键值数据
- unstable存储桶:已收到未提交的日志,重启后清空
- committed存储桶:已提交的日志
- data存储桶:kv键值数据
- meta存储桶:记录末次提交的index和term
设计
- model/LogEntry: 日志条目
- ICmd:操作指令接口
- ICmdFactory:操作指令工厂
- ILogStore:日志存储接口
- tCmdBase:指令基类
- PutCmd:put指令
- DelCmd:del指令
- tBoltDBStore:基于boltdb实现日志暂存,提交和应用
LogEntry.go
日志条目
package model
import "encoding/json"
type LogEntry struct {
Tag int
Term int64
Index int64
PrevTerm int64
PrevIndex int64
Command []byte
}
func (me *LogEntry) Marshal() (error, []byte) {
j, e := json.Marshal(me)
if e != nil {
return e, nil
}
return nil, j
}
func (me *LogEntry) Unmarshal(data []byte) error {
return json.Unmarshal(data, me)
}
ICmd.go
操作指令接口
package store
import "github.com/boltdb/bolt"
type ICmd interface {
Marshal() []byte
Unmarshal(data []byte)
Apply(tx *bolt.Tx) error
}
ICmdFactory.go
操作指令工厂
package store
import "fmt"
type ICmdFactory interface {
OfTag(tag int) ICmd
Tag(cmd ICmd) int
}
type tDefaultCmdFactory int
const gPutCmdTag = 1
const gDelCmdTag = 2
func (me *tDefaultCmdFactory) OfTag(tag int) ICmd {
switch tag {
case gPutCmdTag:
return new(PutCmd)
case gDelCmdTag:
return new(DelCmd)
}
panic(fmt.Sprintf("unknown tag: %d", tag))
}
func (me *tDefaultCmdFactory) Tag(cmd ICmd) int {
if _, ok := cmd.(*PutCmd); ok {
return gPutCmdTag
}
if _, ok := cmd.(*DelCmd); ok {
return gDelCmdTag
}
panic(fmt.Sprintf("unknown cmd: %v", cmd))
}
var gCmdFactory = new(tDefaultCmdFactory)
ILogStore.go
日志存储接口
package store
import "learning/gooop/etcd/raft/model"
type ILogStore interface {
Term() int64
Index() int64
Append(entry *model.LogEntry) error
Commit(index int64) error
}
tCmdBase.go
指令基类
package store
import "encoding/json"
type tCmdBase struct {
}
func (me *tCmdBase) Marshal() []byte {
j, e := json.Marshal(me)
if e != nil {
return nil
}
return j
}
func (me *tCmdBase) Unmarshal(data []byte) {
_ = json.Unmarshal(data, me)
}
PutCmd.go
put指令
package store
import "github.com/boltdb/bolt"
type PutCmd struct {
tCmdBase
Key string
Value []byte
}
func (me *PutCmd) Apply(tx *bolt.Tx) error {
b := tx.Bucket(gDataBucket)
return b.Put([]byte(me.Key), me.Value)
}
DelCmd.go
del指令
package store
import "github.com/boltdb/bolt"
type DelCmd struct {
tCmdBase
Key string
}
func (me *DelCmd) Apply(tx *bolt.Tx) error {
b := tx.Bucket(gDataBucket)
return b.Delete([]byte(me.Key))
}
tBoltDBStore.go
基于boltdb实现日志暂存,提交和应用
package store
import (
"bytes"
"encoding/binary"
"errors"
"github.com/boltdb/bolt"
"learning/gooop/etcd/raft/model"
)
type tBoltDBStore struct {
file string
term int64
index int64
db bolt.DB
}
func NewBoltStore(file string) (error, ILogStore) {
db, err := bolt.Open(file, 0600, nil)
if err != nil {
return err, nil
}
store := new(tBoltDBStore)
err = db.Update(func(tx *bolt.Tx) error {
b, e := tx.CreateBucketIfNotExists(gMetaBucket)
if e != nil {
return e
}
v := b.Get(gKeyCommittedTerm)
if v == nil {
e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
if e != nil {
return e
}
store.term = gDefaultTerm
} else {
store.term = bytesToInt64(v)
}
v = b.Get(gKeyCommittedIndex)
if v == nil {
e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
if e != nil {
return e
}
store.index = gDefaultIndex
} else {
store.index = bytesToInt64(v)
}
b, e = tx.CreateBucketIfNotExists(gDataBucket)
if e != nil {
return e
}
e = tx.DeleteBucket(gUnstableBucket)
if e != nil {
return e
}
_, e = tx.CreateBucket(gUnstableBucket)
if e != nil {
return e
}
_, e = tx.CreateBucketIfNotExists(gCommittedBucket)
if e != nil {
return e
}
return nil
})
if err != nil {
return err, nil
}
return nil, store
}
func int64ToBytes(i int64) []byte {
buf := bytes.NewBuffer(make([]byte, 8))
_ = binary.Write(buf, binary.BigEndian, i)
return buf.Bytes()
}
func bytesToInt64(data []byte) int64 {
var i int64
buf := bytes.NewBuffer(data)
_ = binary.Read(buf, binary.BigEndian, &i)
return i
}
func (me *tBoltDBStore) Term() int64 {
return me.term
}
func (me *tBoltDBStore) Index() int64 {
return me.index
}
func (me *tBoltDBStore) Append(entry *model.LogEntry) error {
cmd := gCmdFactory.OfTag(entry.Tag)
cmd.Unmarshal(entry.Command)
e, entryData := entry.Marshal()
if e != nil {
return e
}
return me.db.Update(func(tx *bolt.Tx) error {
// save log to unstable
b := tx.Bucket(gUnstableBucket)
e = b.Put(int64ToBytes(entry.Index), entryData)
if e != nil {
return e
}
me.index = entry.Index
me.term = entry.Term
return nil
})
}
func (me *tBoltDBStore) Commit(index int64) error {
return me.db.Update(func(tx *bolt.Tx) error {
// read unstable log
ub := tx.Bucket(gUnstableBucket)
k := int64ToBytes(index)
data := ub.Get(k)
if data == nil {
return gErrorCommitLogNotFound
}
entry := new(model.LogEntry)
e := entry.Unmarshal(data)
if e != nil {
return e
}
// apply cmd
cmd := gCmdFactory.OfTag(entry.Tag)
cmd.Unmarshal(entry.Command)
e = cmd.Apply(tx)
if e != nil {
return e
}
// save to committed log
cb := tx.Bucket(gCommittedBucket)
e = cb.Put(k, data)
if e != nil {
return e
}
// update committed.index, committed.term
mb := tx.Bucket(gMetaBucket)
e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
if e != nil {
return e
}
e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
if e != nil {
return e
}
// del unstable.index
e = ub.Delete(k)
if e != nil {
return e
}
return nil
})
}
var gMetaBucket = []byte("meta")
var gUnstableBucket = []byte("unstable")
var gCommittedBucket = []byte("committed")
var gDataBucket = []byte("data")
var gKeyCommittedIndex = []byte("committed.index")
var gKeyCommittedTerm = []byte("committed.term")
var gDefaultTerm int64 = 0
var gDefaultIndex int64 = 0
var gErrorCommitLogNotFound = errors.New("committing log not found")
(未完待续)
有疑问加站长微信联系(非本文作者)