ETCD探索-Watch
梗概
watch是mvcc包中的一个功能,之所以拿出来说,是因为它确实有很重的逻辑。watch是监听一个或一组key,key的任何变化都会发出消息。某种意义上讲,这就是发布订阅模式。
对比
既然Watch机制就是发布订阅模式,我们通过对比Kafka,来更深入了解Watch。
首先说明结论:
ETCD没有消费者组的概念,所以不能代替Kafka
对比其他方面呢:
ETCD | Kafka | |
---|---|---|
消费方式 | 监听一个Key | 订阅一个Topic |
生产方式 | Put(Key, Value) | Produce(Topic, Message) |
历史消息是否保留 | 保留 | 保留 |
能否从指定位置消费 | 可以从指定Revision消费 | 可以从指定offset消费 |
能否保证消息不重放 | 不能 | 消费者会主动上报offset,kafka会保存每个消费者的offset,消费者重启会从当前进度消费 |
对比Kafka不是试图用ETCD代替Kafka,是想通过对比了解Watch的特性和局限性
猜想
在讨论别人是怎么实现的时候,自己总要先猜想下。想的过程中就会发现难点在哪。
我的想法:
type watcher struct {
key string // 要监听的key
ch chan struct{} // 通过ch将消息发出来
}
func loop() {
for _, w := range []watchers {
ch <- message
}
}
解释下,我的想法中,每一个监听者都是一个watcher,监听者会自己消费自己的ch,实现消费功能。在服务端需要维护一个loop,将消息不断的发送到每一个监听者的ch中。
我感觉大多数人的最直观想法应该就是这样。
这样做我实现了
- 订阅发布功能
但我没有做到
- 同时监听一个范围的key(比如:我可以监听key=foo,但不能监听key=foo~fox。这是ETCD一个重要的功能)
- 消费者消费速率不同(比如:按我的设想,有一个消费者出现阻塞,会导致loop阻塞)
有了这些想法之后,我们来看看ETCD中Watch是怎么实现的。
实现
在MVCC文章中提到,KV接口的具体实现是store结构体。Watch的实现是在store上封装了一层,叫做:watchableStore
,重写了store的Write方法。
通过MVCC中介绍,store的任何写操作,都需要Write方法返回的TxnWrite。所以这里重写Write方法意味这任何写操作都会经过watchableStore。
func (tw *watchableStoreTxnWrite) End() {
changes := tw.Changes()
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
evs[i].Kv = &changes[i]
}
tw.s.notify(rev, evs)
tw.TxnWrite.End()
}
type watchableStoreTxnWrite struct {
TxnWrite
s *watchableStore
}
func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {
return &watchableStoreTxnWrite{s.store.Write(trace), s}
}
以上代码只列出了核心的逻辑,不难看出,watchableStoreTxnWrite在事务提交时,先将本次变更changes
打包成Event,然后调用notify来将变更通知出去。最后真正提交事务TxnWrite.End()
现在待推送的消息(Event)已经通过notify方法进入到了Watch机制中,我们看看这个消息是如何流转的。
首先需要介绍几个对象:
- Event
事件。变更的消息是以Event的形式发送出去的,Event包括KeyValue,同时包括操作类型(Put、Delete等)
- watcher
watcher监听一个或一组key,如果有变更,watcher将变更内容通过chan发送出去。
- watcherGroup
顾名思义,一组watcher。watcherGroup管理多个watcher,能够根据key快速找到监听该key的一个或多个watcher。
- watchableStore
继承自store,在store基础上实现了watch功能。watchableStore管理着两个watcherGroup:synced、unsynced,和一个用于缓存的victims。victims是缓存当前未发出去的Event。
- watchStream
watchStream是对watchableStore的封装。因为watchableStore继承自store,所以他实现了很多方法,但这些方法并不都是用于Watch功能。所以watchStream对watchableStore再次封装,暴露出与Watch有关的方法。
在知道这5个对象之后,我们是如何使用Watch呢?
func testWatch() {
s := newWatchableStore()
w := s.NewWatchStream()
w.Watch(start_key: foo, end_key: nil)
w.Watch(start_key: bar, end_key: nil)
for {
consume := <- w.Chan()
}
}
解释下,我们先创建了watchableStore,这是ETCD启动后就创建了的。当我们要使用Watch功能时,我们创建了一个watchStream
(s.NewWatchStream)。创建出来的w可以监听多个key:foo、bar。之后我们就可以消费w.Chan()返回的chan。foo或bar的任何变化,都会通过这个chan发送给消费端consume。
于是我们便得到下面这幅图:
可以看到watchStream实现了在一大堆kv的变化中,过滤出监听的key,将key的变化输出。
紧接着,我们将这幅图补充完整:
这幅图是什么意思呢?
watchableStore收到了所有key的变更后,将这些key交给synced(watchGroup),synced能够快速
地从所有key中找到监听的key。将这些key发送给对应的watcher,这些watcher再通过chan将变更信息发送出去。
synced是怎么快速找到符合条件的key呢?
ETCD中使用了map和adt(红黑树)来实现。
不单独使用map是因为watch可以监听一个范围的key。如果只监听一个key
watch(start_key: foo, end_key: nil)
我们可以这样存储
map[key]*watcher
这样可以根据key快速找到对应的watcher,ETCD也是这样做的。
但对于一组key呢?
watch(start_key: foo, end_key: fop)
这里我监听了从foo->fop之间的所有key,理论上这些key的数目是无限的,所以无法再使用map。
比如:key=fooac也属于监听范围。
ETCD用adt来存储这种key。
adt的实现这里不做介绍,只用知道adt能够根据key=fooac快速地找到所属范围foo->fop。
adt的原理推荐这篇文章:https://www.jianshu.com/p/e13...
adt的go实现:go.etcd.io/etcd/pkg/ad
在找到watcher后,调用watcher的send()方法,将变更的Event发送出去。
这就是上述图的意思,也就是正常的Watch流程。
各种场景
上图所述是正常流程,但是会有很多不正常的情况发生。
上图可以看到,消息都是通过一个Chan发送出去,但如果消费者消费速度慢,Chan就容易堆积。Chan的空间不可能无限大,那就必然会有满的时候,满了后该怎么办呢?
接下来就要讨论上图unsynced、victims的作用了。
Chan什么时候会满呢?
代码中Chan的长度是1024。不过这也是一个随机值,只是没有现在更好的选择。
一旦满了,会发生以下操作:
func (s *watchableStore) notify() {
var victim watcherBatch
...
w.minRev = rev + 1 // w是当前watcher
if victim == nil {
victim = make(watcherBatch)
}
w.victim = true // w被标记为受损的
victim[w] = eb // eb是当前的变更消息EventBatch
s.synced.delete(w)
...
s.addVictim(victim) // 将victim添加到s的victims中
}
(victim:受害者、牺牲品、受损的)
watcher会记录当前的Revision,并将自身标记为受损的
。此次的变更操作会被保存到watchableStore的victims中。同时该watcher会被从synced踢出。
假设此时有一个写操作:foo=f1。而正好Chan此时刚满,则监听foo的watcher将从synced中踢出,同时foo=f1被保存到victims中
接下来对foo的任何变更,该watcher都不会记录。那这些消息就都丢掉了吗?当然不是,watcher变成受损状态时记录下了当时的Revision,这个很重要。
这时要说到两个工作协程了:
// 我们在创建watchableStore时,会同时启动两个工作协程
go s.syncWatchersLoop()
go s.syncVictimsLoop()
顾名思义,第一个协程用于将unsynced的watcher同步为synced。
第二个协程用于循环清除watchableStore中的victims。
在上面的场景中,我们知道,队列满时,当时变更的Event被放入了victims中。这个协程就会试图清除这个Event。怎么清除呢?协程会不断尝试让watcher发送这个Event,一旦队列不满,watcher将这个Event发出后。该watcher就被划入了unsycned中,同时不再是受损状态。
此时syncWatchersLoop协程就开始起作用。由于在受损状态下,这个watcher已经错过了很多消息。为了追回进度,协程会根据watcher保存的Revision,找出受损之后所有的消息,将关于foo的消息全部给watcher,当watcher将这些消息都发送出去后。watcher就脱离了unsynced,成为了synced。
至此就解决了Chan满导致的问题。同时也阐明了Watch的设计实现。
有疑问加站长微信联系(非本文作者)