专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。
1 DeltaFIFO数据结构(仅追加变化数据)
- Delta其实就是kubernetes系统中对象的变化(增、删、改、同步),FIFO比较好理解,是一个先入先出的队列,那么DeltaFIFO就是一个按序的(先入先出)kubernetes对象变化的队列。
- DeltaFIFO内部一直追加变化数据,例如:(DeltaFIFO.Replace)同步一次,就向DeltaFIFO 是全量插入sync变量Delta。
- 删除合并操作发生在最近两次更新的操作中,仅取其中之一。dedupDeltas()就是这种货色。
- queueActionLocked就是DeltaFIFO的灵魂,任何的风吹草动(如:增、删、改、同步),都会放进items map[string]Deltas中。
items解释:
- items map[string] Deltas
- type Deltas []Delta // Delta数组
- f.items表示Map集合
- f.items[id] 表示某一个key(资源对象)对应的数组集合,即:资源操作变化数组
// 代码源自client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
lock sync.RWMutex // 读写锁,因为涉及到同时读写,读写锁性能要高
cond sync.Cond // 给Pop()接口使用,在没有对象的时候可以阻塞,内部锁复用读写锁
items map[string]Deltas // 这个应该是Store的本质了,按照kv的方式存储对象,但是存储的是对象的Deltas数组
queue []string // 这个是为先入先出实现的,存储的就是对象的键
populated bool // 通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
initialPopulationCount int // 通过Replace()接口将第一批对象放入队列的对象数量
keyFunc KeyFunc // 对象键计算函数,在Indexer那篇文章介绍过
knownObjects KeyListerGetter // 前面介绍就是为了这是用,该对象指向的就是Indexer,
closed bool // 是否已经关闭的标记
closedLock sync.Mutex // 专为关闭设计的所,为什么不复用读写锁?
}
复制代码
- 一般是先追加,生成新变化数组,然后更新DeltaFIFO.items集合,类似:id:[add:obj1,update:obj2,delete:obj3]
id, err := f.KeyOf(obj) //得到obj对应的Map的key
newDeltas := append(f.items[id], Delta{actionType, obj}) //追加,生成新数组
f.items[id] = newDeltas //更新DeltaFIFO.items集合
复制代码
- queueActionLocked最终会解决资源变化追加的问题,代码如下:
// 代码源自client-go/tools/cache/delta_fifo.go
// 从函数名称来看把“动作”放入队列中,这个动作就是DeltaType,而且已经加锁了
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 前面提到的计算对象键的函数
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 如果是同步,并且对象未来会被删除,那么就直接返回,没必要记录这个动作了
// 肯定有人会问为什么Add/Delete/Update这些动作可以,因为同步对于已经删除的对象是没有意义的
// 已经删除的对象后续跟添加、更新有可能,因为同名的对象又被添加了,删除也是有可能
// 删除有些复杂,后面会有说明
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
// 同一个对象的多次操作,所以要追加到Deltas数组中
newDeltas := append(f.items[id], Delta{actionType, obj})
// 合并操作,去掉冗余的delta
newDeltas = dedupDeltas(newDeltas)
// 判断对象是否已经存在
_, exists := f.items[id]
// 合并后操作有可能变成没有Delta么?后面的代码分析来看应该不会,所以暂时不知道这个判断目的
if len(newDeltas) > 0 {
// 如果对象没有存在过,那就放入队列中,如果存在说明已经在queue中了,也就没必要再添加了
if !exists {
f.queue = append(f.queue, id)
}
// 更新Deltas数组,通知所有调用Pop()的人
f.items[id] = newDeltas
f.cond.Broadcast()
} else if exists {
// 直接把对象删除,这段代码我不知道什么条件会进来,因为dedupDeltas()肯定有返回结果的
// 后面会有dedupDeltas()详细说明
delete(f.items, id)
}
return nil
}
复制代码
- dedupDeltas 主要解决连续两个删除操作的变化的合并,id:[add:obj1,update:obj2,delete:obj3,delete:obj3],该函数会去掉最后一个,返回一个完整的f.items[id]: id:[add:obj1,update:obj2,delete:obj3]。最终对应的DeltaFIFO.items就是:Map( DeltaFIFO。key->id:[add:obj1,update:obj2,delete:obj3])
// 代码源自client-go/tools/cache/delta_fifo.go
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
// 取出最后两个
a := &deltas[n-1]
b := &deltas[n-2]
// 判断如果是重复的,那就删除这两个delta把合并后的追加到Deltas数组尾部
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
// 判断两个Delta是否是重复的
func isDup(a, b *Delta) *Delta {
// 只有一个判断,只能判断是否为删除类操作,和我们上面的判断相同
// 这个函数的本意应该还可以判断多种类型的重复,当前来看只能有删除这一种能够合并
if out := isDeletionDup(a, b); out != nil {
return out
}
return nil
}
// 判断是否为删除类的重复
func isDeletionDup(a, b *Delta) *Delta {
// 二者都是删除那肯定有一个是重复的
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// 理论上返回最后一个比较好,但是对象已经不再系统监控范围,前一个删除状态是好的
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
复制代码
2 DeltaFIFO.replace()(全量插入sync类型变量,并做Delete检测)
- Reflector通过List(),拿到全量变化,然后调用Replace, 然后每一个item执行:if err := f.queueActionLocked(Sync, item)
- replace操作还是追加变化量,类似于kafka的Append日志。
- replace向DeltaFIFO 是全量插入sync变量,类似:append(f.items[id], Delta{Sync, obj}),并做Delete检测,以追加if err := f.queueActionLocked(Deleted, item)
// 代码源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 遍历所有的输入目标
for _, item := range list {
// 计算目标键
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
// 记录处理过的目标键,采用set存储,是为了后续快速查找
keys.Insert(key)
// 因为输入是目标全量,所以每个目标相当于重新同步了一次
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 如果没有存储的话,自己存储的就是所有的老对象,目的要看看那些老对象不在全量集合中,那么就是删除的对象了
if f.knownObjects == nil {
// 遍历所有的元素
for k, oldItem := range f.items {
// 这个目标在输入的对象中存在就可以忽略
if keys.Has(k) {
continue
}
// 输入对象中没有,说明对象已经被删除了。
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
// 终于看到哪里用到DeletedFinalStateUnknown了,队列中存储对象的Deltas数组中
// 可能已经存在Delete了,避免重复,采用DeletedFinalStateUnknown这种类型
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 如果populated还没有设置,说明是第一次并且还没有任何修改操作执行过
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) // 记录第一次通过来的对象数量
}
return nil
}
// 下面处理的就是检测某些目标删除但是Delta没有在队列中
// 从存储中获取所有对象键
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
// 对象还存在那就忽略
if keys.Has(k) {
continue
}
// 获取对象
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
// 累积删除的对象数量
queuedDeletions++
// 把对象删除的Delta放入队列
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 和上面的代码差不多,只是计算initialPopulationCount值的时候增加了删除对象的数量
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
复制代码
3 何为同步?
period time.Duration // 反射器在List和Watch的时候理论上是死循环,只有出现错误才会退出
这个变量用在出错后多长时间再执行List和Watch,默认值是1秒钟
resyncPeriod time.Duration // 重新同步的周期,很多人肯定认为这个同步周期指的是从apiserver的同步周期
其实这里面同步指的是shared_informer使用者需要定期同步全量对象
复制代码
- 周期同步,采用period,从apiserver的进行同步
- 使用者需要定期同步全量对象
- 这里发现是从indexr中,来进行数据同步的,knownObjects
4 knownObjects是谁(index)
- 发现Reflector就是一个通过NEW封装的Controller,完成资源变化监听,并放进DeltaFIFO和Index队列中。
- knownObjects是谁也一目了然,就是最终完成检索
5 HandleDeltas打通任督二脉
- sharedIndexInformer.run -> controler.run-> wait.Until(c.processLoop, time.Second, stopCh)->obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))->Process: s.HandleDeltas ->更新Index,并发送通知Controller 来回调处理
- sharedIndexInformer是什么?就是包含了两个重要步骤:1:Reflector来实现变化更新到DeltaFIFO,注意Reflector本身初始化是在Controller内部,2:Controller来实现更新Index,并发送通知Controller来处理,这个Controller也是(s.controller = New(cfg))。
- sharedIndexInformer是什么?其实就是包含上述两个步骤,而最终呈现的就是s.controller.Run(stopCh)。
- sharedIndexInformer是什么?其实就是对controller的配置,初始化了Config。该Config不仅用来初始化Reflector,也用来初始化controller。
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
//初始化Reflector,更新Index, 并发送通知给Controller来回调处理
s.controller.Run(stopCh)
}
复制代码
- 完成index的增删改操作,也即最终的变量同步。
6 sharedIndexInformer是什么?
- sharedIndexInformer就是一个包装,初始化Reflector,也用来初始化controller。最终呈现给大家的就是controller.run。
- sharedIndexInformer和controller是一一对应的。
7 总结
本文综合分析了Kubernetes 大量源码,试图从较高的视野来看问题,猛看表,一天时间就过去了。辛苦成文,各自珍惜,谢谢!
专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。
有疑问加站长微信联系(非本文作者)