EtcdRaft源码分析(线性一致读)

Pillar_Zhong · · 683 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

背景

我们知道Raft是Leader+Follower的模型,所有的更新由Leader处理,然后再同步给Follower。

想象一下,如果要所有的节点都参与进来支持读取的请求,会带来什么样的问题?

  • Leader跟Follower并不总是一致的,换句话说Follower会落后Leader的进度。如果没有特别的处理,那么不同的节点读取的结果很可能不一致。

  • 如果Leader被集群孤立,而且其他人已经推举出了新的Leader。而老的Leader还没有察觉到这个变化,他任然觉得还是Leader,但是他的数据已经不可信。如果他还在对外提供服务,那么读取的结果很可能不一致。

EtcdRaft的线性一致读是通过ReadIndex的机制来实现,大致的实现其实很简单,也就是在处理请求之前,会去集群中确认自己权力是否稳固,这样对外提供的服务才够权威。下面我们一起来剖析下Raft是怎么处理的。

接口

type Node interface {
    ...
   // ReadIndex request a read state. The read state will be set in the ready.
   // Read state has a read index. Once the application advances further than the read
   // index, any linearizable read requests issued before the read request can be
   // processed safely. The read state will have the same rctx attached.
   ReadIndex(ctx context.Context, rctx []byte) error
    ...
}

这个接口很怪,返回error,不是我们通常意义理解的查询接口。Golang的世界就是这么奇妙。注定这个接口没有那么简单。下面我们先搞清楚,这个接口是怎么用的。

EtcdServer

EtcdServer是再好不过的例子了。

func (s *EtcdServer) linearizableReadLoop() {
   var rs raft.ReadState

   for {
      ctxToSend := make([]byte, 8)
      id1 := s.reqIDGen.Next()
      binary.BigEndian.PutUint64(ctxToSend, id1)
        ...
      cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
      if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
         ...
      }
      cancel()

      var (
         timeout bool
         done    bool
      )
      for !timeout && !done {
         select {
         case rs = <-s.r.readStateC:
            done = bytes.Equal(rs.RequestCtx, ctxToSend)
           ...
      }
      ...

      if ai := s.getAppliedIndex(); ai < rs.Index {
         select {
         case <-s.applyWait.Wait(rs.Index):
         case <-s.stopping:
            return
         }
      }
      ...
   }
}

当然这是精简过后的代码,只保留了跟ReadIndex相关的逻辑。从这里我们也能看出一些端倪。

  • 首先,传入的参数ctxToSend,是一个单调自增的id,没有任何意义,只是用来客户端区分请求只用。
  • 其次,会从readStateC里面监听发出的ReadIndex请求Raft是否有了回应。
  • 第三,会看下本地已经写入状态机的日志有没有到ReadIndex请求回来的位置,如果没有继续等待,如果有这个方法立即结束。
  • 注意没有这个方法整个是个for循环,而且请求id还单调自增,那么执行下来的结果就是会一直保持对Raft状态的监控,一有风吹草动,这边就会提前收到通知。

ReadIndex

func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
   return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
}

可以看到,最终是通过MsgReadIndex的方式对内进行广播。

Follower

case pb.MsgReadIndex:
   if r.lead == None {
      r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
      return nil
   }
   m.To = r.lead
   r.send(m)

首先破除第一个问题,不管谁接到ReadIndex的请求,将转发给Leader。

Leader

case pb.MsgReadIndex:
   if r.quorum() > 1 {
      if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
         // Reject read only request when this leader has not committed any log entry at its term.
         return nil
      }

      // thinking: use an interally defined context instead of the user given context.
      // We can express this in terms of the term and index instead of a user-supplied value.
      // This would allow multiple reads to piggyback on the same message.
      switch r.readOnly.option {
      case ReadOnlySafe:
         r.readOnly.addRequest(r.raftLog.committed, m)
         r.bcastHeartbeatWithCtx(m.Entries[0].Data)
      case ReadOnlyLeaseBased:
         ri := r.raftLog.committed
         if m.From == None || m.From == r.id { // from local member
            r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
         } else {
            r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
         }
      }
   } else {
      r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
   }

   return nil
}
  • 如果当前Leader截至到现在还没有提交任何entry,那么直接返回。
  • 下面我们看下不同的ReadOnly选项都是怎么实现的。

ReadOnlySafe

func (ro *readOnly) addRequest(index uint64, m pb.Message) {
   ctx := string(m.Entries[0].Data)
   if _, ok := ro.pendingReadIndex[ctx]; ok {
      return
   }
   ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
   ro.readIndexQueue = append(ro.readIndexQueue, ctx)
}
  • 首先我们将请求的requestid放入pendingReadIndex中暂存,当然了,如果之前已经请求过了,那么返回,同时会初始化一个readIndexStatus,最后append到readIndexQueue
  • 注意这里的readIndexQueue是FIFO的,是严格有序的。
  • 其次,request里面保存的只是一个请求的id,用来客户端来区分请求用的。
  • 另外一个需要注意的是,这里保存的index是当前Leader的committedindex,也就是最终一致的地方。
    • 想象一下,客户端那边在不停的往Raft里面灌数据,那么其他客户端在读取Raft数据的时候,怎么知道哪些数据是形成一致的,可以安全拿出来用的。当然committedindex是重要的指标,代表,包括这个index及之前的数据都是安全的,有保障的。
    • 那么怎么拿到这个committedindex,这就是ReadIndex的目的。

bcastHeartbeatWithCtx

r.bcastHeartbeatWithCtx(m.Entries[0].Data)

func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
    // Attach the commit as min(to.matched, r.committed).
    // When the leader sends out heartbeat message,
    // the receiver(follower) might not be matched with the leader
    // or it might not have all the committed entries.
    // The leader MUST NOT forward the follower's commit to
    // an unmatched index.
    commit := min(r.getProgress(to).Match, r.raftLog.committed)
    m := pb.Message{
        To:      to,
        Type:    pb.MsgHeartbeat,
        Commit:  commit,
        Context: ctx,
    }

    r.send(m)
}

将请求的requestid作为心跳的context,发给其他成员。下面我们看下Follower或Candidate接到请求是怎么处理的。

Follower&Candidate

case pb.MsgHeartbeat:
        r.electionElapsed = 0
        r.lead = m.From
        r.handleHeartbeat(m)

case pb.MsgHeartbeat:
   r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
   r.handleHeartbeat(m)
   
func (r *raft) handleHeartbeat(m pb.Message) {
    r.raftLog.commitTo(m.Commit)
    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}
  • 可以看到二位接到心跳后,并没有针对ReadOnly做特殊处理。做了他们自己平时该做的。
  • 回忆下心跳篇,Leader怎么维持自己的权威,不就是发心跳么?其他成员接受到心跳就认为Leader还存在,还不用重新选举。
  • 既然ReadOnly的实现是需要大多数人确认我还是不是真正的Leader,那么Leader在接受心跳响应的时候,看收到的有没有过半数不就可以了么?所以心跳是完美的实现这个的载体。
  • 下面我们看下Leader在处理心跳响应的时候,在做什么

Leader

case pb.MsgHeartbeatResp:
   pr.RecentActive = true
   pr.resume()

   // free one slot for the full inflights window to allow progress.
   if pr.State == ProgressStateReplicate && pr.ins.full() {
      pr.ins.freeFirstOne()
   }
   if pr.Match < r.raftLog.lastIndex() {
      r.sendAppend(m.From)
   }

   if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
      return nil
   }

   ackCount := r.readOnly.recvAck(m)
   if ackCount < r.quorum() {
      return nil
   }

   rss := r.readOnly.advance(m)
   for _, rs := range rss {
      req := rs.req
      if req.From == None || req.From == r.id { // from local member
         r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
      } else {
         r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
      }
   }
  • 跟心跳篇重复的部分,在这里我们不再赘述

  • 首先到了这里要不你开启了ReadOnlySafe,要不就是消息上下文中没有发现ReadIndex

recvAck

ackCount := r.readOnly.recvAck(m)
        if ackCount < r.quorum() {
            return nil
        }

func (ro *readOnly) recvAck(m pb.Message) int {
   rs, ok := ro.pendingReadIndex[string(m.Context)]
   if !ok {
      return 0
   }

   rs.acks[m.From] = struct{}{}
   // add one to include an ack from local node
   return len(rs.acks) + 1
}
  • 努力回忆下在Leader发心跳前做的准备工作,其中之一就是保存一个客户端的请求id到pendingReadIndex。这里再次提取这个request,将这个心跳响应累加到acks里面。
  • 那Leader怎么知道这是普通的心跳响应还是ReadIndex的心跳响应呢?关键就是Message的context是不是有请求id
  • 换句话说,这里就是收集因为这次ReadIndex请求发起的心跳,最终有多少人给了回应。
  • 如果超过一半的人答复了Leader,说明这个Leader是被人承认的,有公信力的。那么继续往下

advance

func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
   var (
      i     int
      found bool
   )

   ctx := string(m.Context)
   rss := []*readIndexStatus{}

   for _, okctx := range ro.readIndexQueue {
      i++
      rs, ok := ro.pendingReadIndex[okctx]
      if !ok {
         panic("cannot find corresponding read state from pending map")
      }
      rss = append(rss, rs)
      if okctx == ctx {
         found = true
         break
      }
   }

   if found {
      ro.readIndexQueue = ro.readIndexQueue[i:]
      for _, rs := range rss {
         delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
      }
      return rss
   }

   return nil
}
  • 之前在addRequest的时候往ReadOnly里面写了这次请求的信息,那么这里从里面找出来。
  • 找到后,将之前的还没来得及处理得请求,一起摘出来,往下处理
for _, rs := range rss {
   req := rs.req
   if req.From == None || req.From == r.id { // from local member
      r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
   } else {
      r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
   }
}
  • 遍历上面返回的请求列表
  • 如果是发给当前节点的请求,那么将这个ReadState累加在本地
  • 如果不是,给对方发MsgReadIndexResp

Follower

case pb.MsgReadIndexResp:
   if len(m.Entries) != 1 {
      r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
      return nil
   }
   r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}

可以看到,Follower是通过Leader拿到最新的commit,尽管他自己都还没有跟上对方的进度。但是对于外部请求方来说,他并不区分当前是Follower或Leader,他只想知道当前Raft的最新状态。所以将最新得commit累加到本地得ReadState,等待发送出去。

总结

事已至此,最新的readstate都已经保存在本地了,那么这些状态怎么发送出去,让应用层处理,那就是Ready在做的事情了。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:Pillar_Zhong

查看原文:EtcdRaft源码分析(线性一致读)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

683 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传