《k8s 源码分析》- Custom Controller 之 Informer

CloudGeek · · 2387 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

源码分析系列文章已经开源到github,地址如下:

  • github:
    https://github.com/farmer-hutao/k8s-source-code-analysis
  • gitbook:
    https://farmer-hutao.github.io/k8s-source-code-analysis

------------------------------------------

本文大纲

1. 概述

本节标题写的是 Informer,不过我们的内容不局限于狭义的 Informer 部分,只是 Informer 最有代表性,其他的 Reflector 等也不好独立开来讲。

Informer 在很多组件的源码中可以看到,尤其是 kube-controller-manager (写这篇文章时我已经基本写完 kube-scheduler 的源码分析,着手写 kube-controller-manager 了,鉴于 controlelr 和 client-go 关联比较大,跳过来先讲讲典型的控制器工作流程中涉及到的 client-go 部分).

Informer 是 client-go 中一个比较核心的工具,通过 Informer(实际我们用到的都不是单纯的 informer,而是组合了各种工具的 sharedInformerFactory) 我们可以轻松 List/Get 某个资源对象,可以监听资源对象的各种事件(比如创建和删除)然后触发回调函数,让我们能够在各种事件发生的时候能够作出相应的逻辑处理。举个例字,当 pod 数量变化的时候 deployment 是不是需要判断自己名下的 pod 数量是否还和预期的一样?如果少了是不是要考虑创建?

2. 架构概览

自定义控制器的工作流程基本如下图所示,我们今天要分析图中上半部分的逻辑。

1555996411720

我们开发自定义控制器的时候用到的“机制”主要定义在 client-go 的 tool/cache下:

1556075198766

我们根据图中的9个步骤来跟源码

3. reflector - List & Watch API Server

Reflector 会监视特定的资源,将变化写入给定的存储中,也就是 Delta FIFO queue.

3.1. Reflector 对象

Reflector 的中文含义是反射器,我们先看一下类型定义:

tools/cache/reflector.go:47

type Reflector struct {
   name string
   metrics *reflectorMetrics
   expectedType reflect.Type

   store Store
   listerWatcher ListerWatcher

   period       time.Duration
   resyncPeriod time.Duration
   ShouldResync func() bool
   clock clock.Clock
   lastSyncResourceVersion string
   lastSyncResourceVersionMutex sync.RWMutex
}
Copy

reflector.go中主要就 Reflector 这个 struct 和相关的一些函数:

1556075898739

3.2. ListAndWatch

ListAndWatch 首先 list 所有 items,获取当前的资源版本信息,然后使用这个版本信息来 watch(也就是从这个版本开始的所有资源变化会被关注)。我们看一下这里的 ListAndWatch 方法主要逻辑:

tools/cache/reflector.go:168

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
   // list 资源
   list, err := r.listerWatcher.List(options)
   // 提取 items
   items, err := meta.ExtractList(list)
    // 更新存储(Delta FIFO)中的 items
   if err := r.syncWith(items, resourceVersion); err != nil {
      return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
   }
   r.setLastSyncResourceVersion(resourceVersion)

   // ……

   for {
      select {
      case <-stopCh:
         return nil
      default:
      }

      timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
      options = metav1.ListOptions{
         ResourceVersion: resourceVersion,
         TimeoutSeconds: &timeoutSeconds,
      }

      r.metrics.numberOfWatches.Inc()
       // 开始 watch
      w, err := r.listerWatcher.Watch(options)
       // ……
       // w 交给 watchHandler 处理
      if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
         if err != errorStopRequested {
            klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
         }
         return nil
      }
   }
}
Copy

4. watchHandler - add obj to delta fifo

前面讲到 ListAndWatch 函数的最后一步逻辑是 watchHandler,在 ListAndWatch 中先是更新了 Delta FIFO 中的 item,然后 watch 资源对象,最后交给 watchHandler 处理,所以 watchHandler 基本可以猜到是将有变化的资源添加到 Delta FIFO 中了。

tools/cache/reflector.go:287

func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
   // ……
loop:
    // 这里进入一个无限循环
   for {
      select {
      case <-stopCh:
         return errorStopRequested
      case err := <-errc:
         return err
          // watch 返回值中的一个 channel
      case event, ok := <-w.ResultChan():
         // ……
         newResourceVersion := meta.GetResourceVersion()
          // 根据事件类型处理,有 Added Modified Deleted 3种
          // 3 种事件分别对应 store 中的增改删操作
         switch event.Type {
         case watch.Added:
            err := r.store.Add(event.Object)

         case watch.Modified:
            err := r.store.Update(event.Object)

         case watch.Deleted:
            err := r.store.Delete(event.Object)

         default:
            utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
         }
         *resourceVersion = newResourceVersion
         r.setLastSyncResourceVersion(newResourceVersion)
         eventCount++
      }
   }

   // ……

   return nil
}
Copy

5. Informer (controller) - pop obj from delta fifo

5.1. Controller

一个 Informer 需要实现 Controller 接口:

tools/cache/controller.go:82

type Controller interface {
   Run(stopCh <-chan struct{})
   HasSynced() bool
   LastSyncResourceVersion() string
}
Copy

一个基础的 Controller 实现如下:

tools/cache/controller.go:75

type controller struct {
   config         Config
   reflector      *Reflector
   reflectorMutex sync.RWMutex
   clock          clock.Clock
}
Copy

controller 类型结构如下:

1556088003902

可以看到主要对外暴露的逻辑是 Run() 方法,我们看一下 Run() 中的逻辑:

tools/cache/controller.go:100

func (c *controller) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   go func() {
      <-stopCh
      c.config.Queue.Close()
   }()
    // 内部 Reflector 创建
   r := NewReflector(
      c.config.ListerWatcher,
      c.config.ObjectType,
      c.config.Queue,
      c.config.FullResyncPeriod,
   )
   r.ShouldResync = c.config.ShouldResync
   r.clock = c.clock

   c.reflectorMutex.Lock()
   c.reflector = r
   c.reflectorMutex.Unlock()

   var wg wait.Group
   defer wg.Wait()

   wg.StartWithChannel(stopCh, r.Run)
    // 循环调用 processLoop
   wait.Until(c.processLoop, time.Second, stopCh)
}
Copy

5.2. processLoop

tools/cache/controller.go:148

func (c *controller) processLoop() {
   for {
       // 主要逻辑
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
       // 异常处理
   }
}
Copy

这里的 Queue 就是 Delta FIFO,Pop 是个阻塞方法,内部实现时会逐个 pop 队列中的数据,交给 PopProcessFunc 处理。我们先不看 Pop 的实现,关注一下 PopProcessFunc 是如何处理 Pop 中从队列拿出来的 item 的。

PopProcessFunc 是一个类型:

type PopProcessFunc func(interface{}) error

所以这里只是一个类型转换,我们关注c.config.Process就行:

tools/cache/controller.go:367

Process: func(obj interface{}) error {
    for _, d := range obj.(Deltas) {
        switch d.Type {
            // 更新、添加、同步、删除等操作
        case Sync, Added, Updated:
            if old, exists, err := clientState.Get(d.Object); err == nil && exists {
                if err := clientState.Update(d.Object); err != nil {
                    return err
                }
                h.OnUpdate(old, d.Object)
            } else {
                if err := clientState.Add(d.Object); err != nil {
                    return err
                }
                h.OnAdd(d.Object)
            }
        case Deleted:
            if err := clientState.Delete(d.Object); err != nil {
                return err
            }
            h.OnDelete(d.Object)
        }
    }
    return nil
},
Copy

这里涉及到2个点:

  • clientState
  • ResourceEventHandler (h)

我们一一来看

6. Add obj to Indexer (Thread safe store)

前面说到 clientState,这个变量的初始化是clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)

NewIndexer 代码如下:

tools/cache/store.go:239

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
   return &cache{
      cacheStorage: NewThreadSafeStore(indexers, Indices{}),
      keyFunc:      keyFunc,
   }
}
Copy

tools/cache/index.go:27

type Indexer interface {
   Store
   Index(indexName string, obj interface{}) ([]interface{}, error)
   IndexKeys(indexName, indexKey string) ([]string, error)
   ListIndexFuncValues(indexName string) []string
   ByIndex(indexName, indexKey string) ([]interface{}, error)
   GetIndexers() Indexers
   AddIndexers(newIndexers Indexers) error
}
Copy

顺带看一下 NewThreadSafeStore()

tools/cache/thread_safe_store.go:298

func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
   return &threadSafeMap{
      items:    map[string]interface{}{},
      indexers: indexers,
      indices:  indices,
   }
}
Copy

然后关注一下 Process 中的err := clientState.Add(d.Object)的 Add() 方法:

tools/cache/store.go:123

func (c *cache) Add(obj interface{}) error {
    // 计算key;一般是namespace/name
   key, err := c.keyFunc(obj)
   if err != nil {
      return KeyError{obj, err}
   }
    // Add
   c.cacheStorage.Add(key, obj)
   return nil
}
Copy

cacheStorage 是一个 ThreadSafeStore 实例,这个 Add() 代码如下:

tools/cache/thread_safe_store.go:68

func (c *threadSafeMap) Add(key string, obj interface{}) {
   c.lock.Lock()
   defer c.lock.Unlock()
    // 拿出 old obj
   oldObject := c.items[key]
    // 写入 new obj
   c.items[key] = obj
    // 更新索引,有一堆逻辑
   c.updateIndices(oldObject, obj, key)
}
Copy

第四步和第五步的内容先分析到这里,后面关注 threadSafeMap 实现的时候再继续深入。

7. sharedIndexInformer

第六步是 Dispatch Event Handler functions(Send Object to Custom Controller)

我们先看一个接口 SharedInformer:

tools/cache/shared_informer.go:43

type SharedInformer interface {
   AddEventHandler(handler ResourceEventHandler)
   AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
   GetStore() Store
   GetController() Controller
   Run(stopCh <-chan struct{})
   HasSynced() bool
   LastSyncResourceVersion() string
}
Copy

SharedInformer 有一个共享的 data cache,能够分发 changes 通知到缓存,到通过 AddEventHandler 注册了的 listerners. 当你接收到一个通知,缓存的内容能够保证至少和通知中的一样新。

再看一下 SharedIndexInformer 接口:

tools/cache/shared_informer.go:66

type SharedIndexInformer interface {
   SharedInformer
   // AddIndexers add indexers to the informer before it starts.
   AddIndexers(indexers Indexers) error
   GetIndexer() Indexer
}
Copy

相比 SharedInformer 增加了一个 Indexer. 然后看具体的实现 sharedIndexInformer 吧:

tools/cache/shared_informer.go:127

type sharedIndexInformer struct {
   indexer    Indexer
   controller Controller
   processor             *sharedProcessor
   cacheMutationDetector CacheMutationDetector
   listerWatcher ListerWatcher

   objectType    runtime.Object
   resyncCheckPeriod time.Duration
   defaultEventHandlerResyncPeriod time.Duration
   clock clock.Clock
   started, stopped bool
   startedLock      sync.Mutex
   blockDeltas sync.Mutex
}
Copy

这个类型内包了很多我们前面看到过的对象,indexer、controller、listeratcher 都不陌生,我们看这里的 processor 是做什么的:

7.1. sharedProcessor

类型定义如下:

tools/cache/shared_informer.go:375

type sharedProcessor struct {
   listenersStarted bool
   listenersLock    sync.RWMutex
   listeners        []*processorListener
   syncingListeners []*processorListener
   clock            clock.Clock
   wg               wait.Group
}
Copy

这里的重点明显是 listeners 属性了,我们继续看 listeners 的类型中的 processorListener:

7.1.1. processorListener

tools/cache/shared_informer.go:466

type processorListener struct {
   nextCh chan interface{}
   addCh  chan interface{}

   handler ResourceEventHandler
   // 一个 ring buffer,保存未分发的通知
   pendingNotifications buffer.RingGrowing
   // ……
}
Copy

processorListener 主要有2个方法:

  • run()
  • pop()

7.1.2. processorListener.run()

先看一下这个 run 做了什么:

tools/cache/shared_informer.go:540

func (p *processorListener) run() {
   stopCh := make(chan struct{})
    wait.Until(func() { // 一分钟执行一次这个 func()
        // 一分钟内的又有几次重试
      err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
          // 等待信号 nextCh
         for next := range p.nextCh {
             // notification 是 next 的实际类型
            switch notification := next.(type) {
                // update
            case updateNotification:
               p.handler.OnUpdate(notification.oldObj, notification.newObj)
                // add
            case addNotification:
               p.handler.OnAdd(notification.newObj)
                // delete
            case deleteNotification:
               p.handler.OnDelete(notification.oldObj)
            default:
               utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
            }
         }
         return true, nil
      })

      if err == nil {
         close(stopCh)
      }
   }, 1*time.Minute, stopCh)
}
Copy

这个 run 过程不复杂,等待信号然后调用 handler 的增删改方法做对应的处理逻辑。case 里的 Notification 再看一眼:

tools/cache/shared_informer.go:176

type updateNotification struct {
   oldObj interface{}
   newObj interface{}
}

type addNotification struct {
   newObj interface{}
}

type deleteNotification struct {
   oldObj interface{}
}
Copy

另外注意到for next := range p.nextCh是下面的 case 执行的前提,也就是说触发点是 p.nextCh,我们接着看 pop 过程(这里的逻辑不简单,可能得多花点精力)

7.1.3. processorListener.pop()

tools/cache/shared_informer.go:510

func (p *processorListener) pop() {
   defer utilruntime.HandleCrash()
   defer close(p.nextCh) // Tell .run() to stop
    // 这个 chan 是没有初始化的
   var nextCh chan<- interface{}
    // 可以接收任意类型,其实是对应前面提到的 addNotification 等
   var notification interface{}
    // for 循环套 select 是比较常规的写法
   for {
      select {
          //第一遍执行到这里的时候由于 nexth 没有初始化,所以这里会阻塞(和notification有没有值没有关系,notification哪怕是nil也可以写入 chan interface{} 类型的 channel)
      case nextCh <- notification:
         var ok bool
          // 第二次循环,下面一个case运行过之后才有这里的逻辑
         notification, ok = p.pendingNotifications.ReadOne()
         if !ok { 
             // 将 channel 指向 nil 相当于初始化的逆操作,会使得这个 case 条件阻塞
            nextCh = nil 
         }
          // 这里是 for 首次执行逻辑的入口
      case notificationToAdd, ok := <-p.addCh:
         if !ok {
            return
         }
          // 如果是 nil,也就是第一个通知过来的时候,这时不需要用到缓存(和下面else相对)
         if notification == nil { 
             // 赋值给 notification,这样上面一个 case 在接下来的一轮循化中就可以读到了
            notification = notificationToAdd
             // 相当于复制引用,nextCh 就指向了 p.nextCh,使得上面 case 写 channel 的时候本质上操作了 p.nextCh,从而 run 能够读到 p.nextCh 中的信号
            nextCh = p.nextCh
         } else { 
             // 处理到这里的时候,其实第一个 case 已经有了首个 notification,这里的逻辑是一下子来了太多 notification 就往 pendingNotifications 缓存,在第一个 case 中 有对应的 ReadOne()操作
            p.pendingNotifications.WriteOne(notificationToAdd)
         }
      }
   }
}
Copy

这里的 pop 逻辑的入口是<-p.addCh,我们继续向上找一下这个 addCh 的来源:

7.1.4. processorListener.add()

tools/cache/shared_informer.go:506

func (p *processorListener) add(notification interface{}) {
   p.addCh <- notification
}
Copy

这个 add() 方法又在哪里被调用呢?

7.1.5. sharedProcessor.distribute()

tools/cache/shared_informer.go:400

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
   p.listenersLock.RLock()
   defer p.listenersLock.RUnlock()

   if sync {
      for _, listener := range p.syncingListeners {
         listener.add(obj)
      }
   } else {
      for _, listener := range p.listeners {
         listener.add(obj)
      }
   }
}
Copy

这个方法逻辑比较简洁,分发对象。我们继续看哪里进入的 distribute:

7.2. sharedIndexInformer.HandleDeltas()

tools/cache/shared_informer.go:344

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
   s.blockDeltas.Lock()
   defer s.blockDeltas.Unlock()

   // from oldest to newest
   for _, d := range obj.(Deltas) {
      switch d.Type { // 根据 DeltaType 选择 case
      case Sync, Added, Updated:
         isSync := d.Type == Sync
         s.cacheMutationDetector.AddObject(d.Object)
         if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
             // indexer 更新的是本地 store
            if err := s.indexer.Update(d.Object); err != nil {
               return err
            }
             // 前面分析的 distribute;update
            s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
         } else {
            if err := s.indexer.Add(d.Object); err != nil {
               return err
            }
             // 前面分析的 distribute;add
            s.processor.distribute(addNotification{newObj: d.Object}, isSync)
         }
      case Deleted:
         if err := s.indexer.Delete(d.Object); err != nil {
            return err
         }
          // 前面分析的 distribute;delete
         s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
      }
   }
   return nil
}
Copy

继续往前看代码逻辑。

7.3. sharedIndexInformer.Run()

tools/cache/shared_informer.go:189

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
    // new DeltaFIFO
   fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

   cfg := &Config{
       // DeltaFIFO
      Queue:            fifo,
      ListerWatcher:    s.listerWatcher,
      ObjectType:       s.objectType,
      FullResyncPeriod: s.resyncCheckPeriod,
      RetryOnError:     false,
      ShouldResync:     s.processor.shouldResync,
       // 前面分析的 HandleDeltas()
      Process: s.HandleDeltas,
   }

   func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()
        // 创建 Informer
      s.controller = New(cfg)
      s.controller.(*controller).clock = s.clock
      s.started = true
   }()

   processorStopCh := make(chan struct{})
   var wg wait.Group
   defer wg.Wait()              // Wait for Processor to stop
   defer close(processorStopCh) // Tell Processor to stop
   wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    // 关注一下 s.processor.run
   wg.StartWithChannel(processorStopCh, s.processor.run)

   defer func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()
      s.stopped = true
   }()
    // Run informer
   s.controller.Run(stopCh)
}
Copy

看到这里已经挺和谐了,在 sharedIndexInformer 的 Run() 方法中先是创建一个 DeltaFIFO,然后和 lw 一起初始化 cfg,利用 cfg 创建 controller,最后 Run 这个 controller,也就是最基础的 informer.

在这段代码里我们还注意到有一步是s.processor.run,我们看一下这个 run 的逻辑。

7.3.1. sharedProcessor.run()

tools/cache/shared_informer.go:415

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
   func() {
      p.listenersLock.RLock()
      defer p.listenersLock.RUnlock()
      for _, listener := range p.listeners {
          // 前面详细讲过 listener.run
         p.wg.Start(listener.run)
          // 前面详细讲过 listener.pop
         p.wg.Start(listener.pop)
      }
      p.listenersStarted = true
   }()
   <-stopCh
  // ……
}
Copy

撇开细节,可以看到这里调用了内部所有 listener 的 run() 和 pop() 方法,和前面的分析呼应上了。

到这里,我们基本讲完了自定义 controller 的时候 client-go 里相关的逻辑,也就是图中的上半部分:

1556161315850


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2387 次点击  
加入收藏 微博
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传