Etcd Raft源码分析之二:选举流程

zqhxuyuan · · 3284 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

### 1.6 node tick与raft的tickElection >这一节在没有特殊说明时,都是在raft/raft.go这个文件中(可以查看方法前的r *raft来知道当前是在raft.go中)。 node的tick()方法调用raft/raft.go中raft结构体的tick()。在1.2节中, raft.becomeFollower()中设置了raft结构体的step函数和tick函数=tickElection。 在上一小节的最后,node.run()从n.tickc通道中获取到消息,调用raft.tick()方法,实际上调用了raft.tickElection()。 ```go // tickElection is run by followers and candidates after r.electionTimeout. func (r *raft) tickElection() { r.electionElapsed++ if r.promotable() && r.pastElectionTimeout() { r.electionElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) } } ``` 当Follower或者Candidate超过选举时间后,会发送类型为MsgHup的一条消息给自己,接着调用r.campaign()方法。 >如果消息类型不是MsgHup、MsgVote、MsgPreVote,则调用r.step(r, m)函数。 比如前面becomeFollower时设置了r.step=stepFollower,那么这里就会真正调用stepFollower()方法了。 在stepFollower()方法中,可以看到,它处理的消息类型并没有上面的MsgHup、MsgVote、MsgPreVote。 在最开始启动raft.Node时调用了becomeFollower,初始时,raft的Term为0,后来又被更新为1。 对于类型为MsgHup的pb.Message而言,它的Term初始时为0,所以会执行下面的m.Term==0分支,并接着执行pb.MsgHup分支。 >注意:这里的第一个switch语句块的条件是先判断m.Term==0。除此之外,如果m.Term==r.Term,也会执行第二个switch语句块。 ```go func (r *raft) Step(m pb.Message) error { // Handle the message term, which may result in our stepping down to a follower. switch { case m.Term == 0: // local message case m.Term > r.Term: ... case m.Term < r.Term: ... return nil } switch m.Type { case pb.MsgHup: if r.state != StateLeader { ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) if r.preVote { r.campaign(campaignPreElection) } else { r.campaign(campaignElection) } } else { r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) } case pb.MsgVote, pb.MsgPreVote: ... default: // 必须先调用r.step=xxx设置函数,然后才能调用下面的语句,真正执行函数。 r.step(r, m) } return nil } ``` 复习下raft调用becomeFollower,raft的状态会更新为StateFollower(下面也列出了成为其他两种角色的代码): - 成为Follower,启动选举,定时器为tickElection,在electionTimeout超时后,成为Candidate - 成为Candidate,增加Term,投票给自己,定时器为tickElection - 成为Leader,定时器为tickHeartbeat,定时发送心跳给Follower ```go // raft/raft.go func (r *raft) becomeFollower(term uint64, lead uint64) { r.step = stepFollower r.reset(term) r.tick = r.tickElection r.lead = lead r.state = StateFollower r.logger.Infof("%x became follower at term %d", r.id, r.Term) } func (r *raft) becomeCandidate() { r.step = stepCandidate // 成为候选人时,Term加1 r.reset(r.Term + 1) r.tick = r.tickElection // 投票给自己 r.Vote = r.id r.state = StateCandidate r.logger.Infof("%x became candidate at term %d", r.id, r.Term) } func (r *raft) becomeLeader() { r.step = stepLeader r.reset(r.Term) r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader r.pendingConfIndex = r.raftLog.lastIndex() r.appendEntry(pb.Entry{Data: nil}) r.logger.Infof("%x became leader at term %d", r.id, r.Term) } ``` ### 1.7 竞选Leader Follower在electionTimeout超时后,会竞选成为Candidate。这里我们暂时不考虑两阶段: - 调用becomeCandidate,设置投票消息类型为MsgVote - 如果获取到大多数选票,则调用becomeLeader() - 向每个节点发送voteMsg 这里Follower第一次执行campaign时,步骤2获取到的选票肯定不满足大多数,所以会向其他节点发送MsgVote消息。 ```go func (r *raft) campaign(t CampaignType) { var term uint64 var voteMsg pb.MessageType if t == campaignPreElection { r.becomePreCandidate() voteMsg = pb.MsgPreVote // PreVote RPCs are sent for the next term before we've incremented r.Term. term = r.Term + 1 } else { r.becomeCandidate() voteMsg = pb.MsgVote term = r.Term } if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) { // We won the election after voting for ourselves (which must mean that this is a single-node cluster). Advance to the next state. if t == campaignPreElection { r.campaign(campaignElection) } else { r.becomeLeader() } return } for id := range r.prs { // 如果是自己,不需要发送 if id == r.id continue var ctx []byte if t == campaignTransfer ctx = []byte(t) r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) } } ``` 候选人发送VoteMsg给其他节点(Follower)后,当收到大多数Follower节点的投票结果后,候选人会成为Leader。 下面我们分析Follower节点收到候选人发送的MsgVote请求是如何处理的,这涉及到RPC调用,在Etcd中是rafthttp。 ### 1.8 Raft HTTP 每个EtcdServer都有一个HTTP服务端,用来接收其他节点发送的消息,以及返回响应结果给发送者: ```go //etcdserver/api/rafthttp/http.go func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var m raftpb.Message if err := h.r.Process(context.TODO(), m); err != nil { switch v := err.(type) { case writerToResponse: v.WriteTo(w) } return } } ``` `EtcdServer`实现了etcdserver/api/rafthttp/transport.go的`Raft`接口 ``` //etcdserver/api/rafthttp/transport.go type Raft interface { Process(ctx context.Context, m raftpb.Message) error IsIDRemoved(id uint64) bool ReportUnreachable(id uint64) ReportSnapshot(id uint64, status raft.SnapshotStatus) } ``` >Q:s.r返回的是etcdserver/raft.go的raftNode,它是一个结构体。而Step方法定义在raft/Node.go接口中。 那么问题是:raftNode结构体的Step方法,是怎么调用到Node接口的Step方法? >A:先看下node.go下的Node接口和node结构体。node结构体实现了Node接口的所有方法,所以可以把node看做是是Node接口的实现类。 虽然raftNode结构体中没有定义raft.Node接口,但是它的raftNodeConfig属性定义了!这种语法叫做struct内嵌/嵌套(embedded)interface。 EtcdServer的Process方法调用raft/node.go中Node接口(其实现类是这个文件下的node结构体)的Step方法: ```go // etcdserver/server.go func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { return s.r.Step(ctx, m) } // raft/node.go func (n *node) Step(ctx context.Context, m pb.Message) error { // ignore unexpected local messages receiving over network if IsLocalMsg(m.Type) { return nil } return n.step(ctx, m) } func (n *node) step(ctx context.Context, m pb.Message) error { return n.stepWithWaitOption(ctx, m, false) } // Step advances the state machine using msgs. The ctx.Err() will be returned, if any. func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error { if m.Type != pb.MsgProp { select { case n.recvc <- m: return nil case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } } ... return nil } ``` 候选人发送请求给Follower节点,当Follower节点收到请求时,会将消息发送到node的recvc通道中。 注意:Etcd分布式集群中的所有节点都会启动raftNode、raft.Node,也都会运行node.run()方法。 ```go // raft/node.go func (n *node) run(r *raft) { ... for { select { ... case <-n.tickc: r.tick() case m := <-n.recvc: // filter out response message from unknown From. if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { r.Step(m) } } } } ``` 回顾前面Candidate从n.tickc中获取到定时器(ElectionTimeout)的超时消息,通过tickElection() 调用到raft.go的`Step()`方法,然后参与竞选(campaign)并发送VoteMsg给Follower节点。 这里Follower节点收到VoteMsg请求,从n.recvc中获取到消息,也会调用`r.Step(m)`方法。 两个节点的角色不一样,但是都会调用相同的raft.Step()方法,当然两者的处理逻辑不一样。 ### 1.9 Follower收到MsgVote,投票 Follower收到消息m的Term=1,它自己raft.Term=0,步骤如下: - m.Term > r.Term,调用becomeFollower() - 返回MsgVoteResp给候选人。 ```go // raft/raft.go func (r *raft) Step(m pb.Message) error { // Handle the message term, which may result in our stepping down to a follower. switch { case m.Term == 0: // local message case m.Term > r.Term: switch { default: if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap { r.becomeFollower(m.Term, m.From) } else { // Follower收到Candidate的MsgVote请求,自己成为Follower(更改状态) // 同样,这里只是设置Raft的step函数和tick函数,还没有真正执行成为Follower的逻辑 r.becomeFollower(m.Term, None) } } case m.Term < r.Term: ... return nil } switch m.Type { case pb.MsgHup: ... case pb.MsgVote, pb.MsgPreVote: canVote := r.Vote == m.From || // We can vote if this is a repeat of a vote we've already cast... (r.Vote == None && r.lead == None) || // ...we haven't voted and we don't think there's a leader yet in this term... (m.Type == pb.MsgPreVote && m.Term > r.Term) // ...or this is a PreVote for a future term... if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { // ...and we believe the candidate is up to date. r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) // 返回的消息类型为VoteResp if m.Type == pb.MsgVote { r.electionElapsed = 0 // Only record real votes. 重置选举计数器 r.Vote = m.From // 投票给发送这条消息的节点,即候选人 } } else { r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) // 拒绝投票 } default: err := r.step(r, m) if err != nil { return err } } return nil } ``` 当Follower返回MsgVote的响应结果MsgVoteResp给Candidate,Candidate的处理流程与Follower收到消息的类似,也会调用r.Step()方法。 ### 1.10 Candidate成为Leader 由于Follower返回消息的Term等于Candidate发送消息的Term,所以直接走第二个switch条件的default分支: ```go // raft/raft.go func (r *raft) Step(m pb.Message) error { // Handle the message term, which may result in our stepping down to a follower. switch { case m.Term == 0: // local message case m.Term > r.Term: ... case m.Term < r.Term: ... return nil } switch m.Type { case pb.MsgHup: ... case pb.MsgVote, pb.MsgPreVote: ... default: err := r.step(r, m) // 调用Raft的stepFunc,即1.7节通过becomeCandidate设置的stepCandidate if err != nil { return err } } return nil } ``` 前面我们看到在Step(m)方法中都没有走到default分支,这里开始调用r.step(r,m)函数,对应的是stepCandidate: ```go // stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is // whether they respond to MsgVoteResp or MsgPreVoteResp. func stepCandidate(r *raft, m pb.Message) error { // Only handle vote responses corresponding to our candidacy (while in // StateCandidate, we may get stale MsgPreVoteResp messages in this term from our pre-candidate state). var myVoteRespType pb.MessageType if r.state == StatePreCandidate { myVoteRespType = pb.MsgPreVoteResp } else { myVoteRespType = pb.MsgVoteResp } switch m.Type { case pb.MsgProp: r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return ErrProposalDropped case pb.MsgApp: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleAppendEntries(m) case pb.MsgHeartbeat: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleHeartbeat(m) case pb.MsgSnap: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleSnapshot(m) case myVoteRespType: // 候选人收到Follower的VoteMsg消息,判断投票个数 gr := r.poll(m.From, m.Type, !m.Reject) r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr) switch r.quorum() { case gr: if r.state == StatePreCandidate { r.campaign(campaignElection) } else { r.becomeLeader() // 满足投票个数,成为Leader r.bcastAppend() // 向其他Follower节点发送MsgApp请求 } case len(r.votes) - gr: // pb.MsgPreVoteResp contains future term of pre-candidate m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) } case pb.MsgTimeoutNow: r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From) } return nil } ``` 总结下每个节点在收到HTTP请求时,都会调用raft.Step(m)方法,如果消息的类型不是MsgHup、MsgVote、MsgPreVote,则调用r.step(r,m)方法。 Raft的stepFunc在发送/收到HTTP请求之前一定会被设置,设置的地方在调用becomeFollower、becomeCandidate、becomeLeader这三个方法中。 A:以Follower1转为Candidate为例: 1. 调用becomeFollower,设置r.step=stepFollower 2. ticker的electionTimeout超时,调用becomeCandidate,设置r.step=stepCandidate 3. 调用campaign,发送VoteMsg给其他所有节点 B:以Follower2为例: 1. 调用becomeFollower,设置r.step=stepFollower 2. 收到候选节点的VoteMsg请求,投票给候选节点,返回VoteMsgResp C:以Candidate转为Leader为例: 1. 收到Follower发送的VoteMsgResp,调用r.step函数,即A:2中的stepCandidate 2. 判断投票个数,如果得到大多数的选票,调用becomeLeader,设置r.step=stepLeader 3. 向其他Follower节点发送MsgApp请求 D:以Follower2为例: 1. 收到Leader的MsgApp请求,调用r.step函数,即B:1的stepFollower 2. 处理AppendEntries请求,返回MsgAppResp请求给Leader 下面是Candidate发送MsgVote给Follower,Follower返回MsgVoteResp给Candidate的流程: ![](https://img-blog.csdn.net/20180909105546425?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3pxaHh1eXVhbg==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

3284 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传