kube-controller-manager源码分析(三)之 Informer机制

以下代码分析基于 kubernetes v1.12.0 版本。

本文主要分析k8s中各个核心组件经常使用到的Informer机制(即List-Watch)。该部分的代码主要位于client-go这个第三方包中。

此部分的逻辑主要位于/vendor/k8s.io/client-go/tools/cache包中,代码目录结构如下:

  1. cache
  2. ├── controller.go # 包含:Config、Run、processLoop、NewInformer、NewIndexerInformer
  3. ├── delta_fifo.go # 包含:NewDeltaFIFO、DeltaFIFO、AddIfNotPresent
  4. ├── expiration_cache.go
  5. ├── expiration_cache_fakes.go
  6. ├── fake_custom_store.go
  7. ├── fifo.go # 包含:Queue、FIFO、NewFIFO
  8. ├── heap.go
  9. ├── index.go # 包含:Indexer、MetaNamespaceIndexFunc
  10. ├── listers.go
  11. ├── listwatch.go # 包含:ListerWatcher、ListWatch、List、Watch
  12. ├── mutation_cache.go
  13. ├── mutation_detector.go
  14. ├── reflector.go # 包含:Reflector、NewReflector、Run、ListAndWatch
  15. ├── reflector_metrics.go
  16. ├── shared_informer.go # 包含:NewSharedInformer、WaitForCacheSync、Run、HasSynced
  17. ├── store.go # 包含:Store、MetaNamespaceKeyFunc、SplitMetaNamespaceKey
  18. ├── testing
  19. ├── fake_controller_source.go
  20. ├── thread_safe_store.go # 包含:ThreadSafeStore、threadSafeMap
  21. ├── undelta_store.go

0. 原理示意图

示意图1

Informer机制 - 图1

示意图2

Informer机制 - 图2

0.1. client-go组件

  • Reflector:reflector用来watch特定的k8s API资源。具体的实现是通过ListAndWatch的方法,watch可以是k8s内建的资源或者是自定义的资源。当reflector通过watch API接收到有关新资源实例存在的通知时,它使用相应的列表API获取新创建的对象,并将其放入watchHandler函数内的Delta Fifo队列中。

  • Informer:informer从Delta Fifo队列中弹出对象。执行此操作的功能是processLoop。base controller的作用是保存对象以供以后检索,并调用我们的控制器将对象传递给它。

  • Indexer:索引器提供对象的索引功能。典型的索引用例是基于对象标签创建索引。 Indexer可以根据多个索引函数维护索引。Indexer使用线程安全的数据存储来存储对象及其键。 在Store中定义了一个名为MetaNamespaceKeyFunc的默认函数,该函数生成对象的键作为该对象的<namespace> / <name>组合。

0.2. 自定义controller组件

  • Informer reference:指的是Informer实例的引用,定义如何使用自定义资源对象。 自定义控制器代码需要创建对应的Informer。

  • Indexer reference: 自定义控制器对Indexer实例的引用。自定义控制器需要创建对应的Indexser。

client-go中提供NewIndexerInformer函数可以创建Informer 和 Indexer。

  • Resource Event Handlers:资源事件回调函数,当它想要将对象传递给控制器时,它将被调用。 编写这些函数的典型模式是获取调度对象的key,并将该key排入工作队列以进行进一步处理。

  • Work queue:任务队列。 编写资源事件处理程序函数以提取传递的对象的key并将其添加到任务队列。

  • Process Item:处理任务队列中对象的函数, 这些函数通常使用Indexer引用或Listing包装器来重试与该key对应的对象。

1. sharedInformerFactory.Start

在controller-manager的Run函数部分调用了InformerFactory.Start的方法。

此部分代码位于/cmd/kube-controller-manager/app/controllermanager.go

  1. // Run runs the KubeControllerManagerOptions. This should never exit.
  2. func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
  3. ...
  4. controllerContext.InformerFactory.Start(controllerContext.Stop)
  5. close(controllerContext.InformersStarted)
  6. ...
  7. }

InformerFactory是一个SharedInformerFactory的接口,接口定义如下:

此部分代码位于vendor/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go

  1. // SharedInformerFactory a small interface to allow for adding an informer without an import cycle
  2. type SharedInformerFactory interface {
  3. Start(stopCh <-chan struct{})
  4. InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
  5. }

Start方法初始化各种类型的informer,并且每个类型起了个informer.Run的goroutine。

此部分代码位于vendor/k8s.io/client-go/informers/factory.go

  1. // Start initializes all requested informers.
  2. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
  3. f.lock.Lock()
  4. defer f.lock.Unlock()
  5. for informerType, informer := range f.informers {
  6. if !f.startedInformers[informerType] {
  7. go informer.Run(stopCh)
  8. f.startedInformers[informerType] = true
  9. }
  10. }
  11. }

2. sharedIndexInformer.Run

此部分的代码位于/vendor/k8s.io/client-go/tools/cache/shared_informer.go

  1. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  2. defer utilruntime.HandleCrash()
  3. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
  4. cfg := &Config{
  5. Queue: fifo,
  6. ListerWatcher: s.listerWatcher,
  7. ObjectType: s.objectType,
  8. FullResyncPeriod: s.resyncCheckPeriod,
  9. RetryOnError: false,
  10. ShouldResync: s.processor.shouldResync,
  11. Process: s.HandleDeltas,
  12. }
  13. func() {
  14. s.startedLock.Lock()
  15. defer s.startedLock.Unlock()
  16. s.controller = New(cfg)
  17. s.controller.(*controller).clock = s.clock
  18. s.started = true
  19. }()
  20. // Separate stop channel because Processor should be stopped strictly after controller
  21. processorStopCh := make(chan struct{})
  22. var wg wait.Group
  23. defer wg.Wait() // Wait for Processor to stop
  24. defer close(processorStopCh) // Tell Processor to stop
  25. wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
  26. wg.StartWithChannel(processorStopCh, s.processor.run)
  27. defer func() {
  28. s.startedLock.Lock()
  29. defer s.startedLock.Unlock()
  30. s.stopped = true // Don't want any new listeners
  31. }()
  32. s.controller.Run(stopCh)
  33. }

2.1. NewDeltaFIFO

DeltaFIFO是一个对象变化的存储队列,依据先进先出的原则,process的函数接收该队列的Pop方法的输出对象来处理相关功能。

  1. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)

2.2. Config

构造controller的配置文件,构造process,即HandleDeltas,该函数为后面使用到的process函数。

  1. cfg := &Config{
  2. Queue: fifo,
  3. ListerWatcher: s.listerWatcher,
  4. ObjectType: s.objectType,
  5. FullResyncPeriod: s.resyncCheckPeriod,
  6. RetryOnError: false,
  7. ShouldResync: s.processor.shouldResync,
  8. Process: s.HandleDeltas,
  9. }

2.3. controller

调用New(cfg),构建sharedIndexInformer的controller。

  1. func() {
  2. s.startedLock.Lock()
  3. defer s.startedLock.Unlock()
  4. s.controller = New(cfg)
  5. s.controller.(*controller).clock = s.clock
  6. s.started = true
  7. }()

2.4. cacheMutationDetector.Run

调用s.cacheMutationDetector.Run,检查缓存对象是否变化。

  1. wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)

defaultCacheMutationDetector.Run

  1. func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
  2. // we DON'T want protection from panics. If we're running this code, we want to die
  3. for {
  4. d.CompareObjects()
  5. select {
  6. case <-stopCh:
  7. return
  8. case <-time.After(d.period):
  9. }
  10. }
  11. }

CompareObjects

  1. func (d *defaultCacheMutationDetector) CompareObjects() {
  2. d.lock.Lock()
  3. defer d.lock.Unlock()
  4. altered := false
  5. for i, obj := range d.cachedObjs {
  6. if !reflect.DeepEqual(obj.cached, obj.copied) {
  7. fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectDiff(obj.cached, obj.copied))
  8. altered = true
  9. }
  10. }
  11. if altered {
  12. msg := fmt.Sprintf("cache %s modified", d.name)
  13. if d.failureFunc != nil {
  14. d.failureFunc(msg)
  15. return
  16. }
  17. panic(msg)
  18. }
  19. }

2.5. processor.run

调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数。

  1. wg.StartWithChannel(processorStopCh, s.processor.run)

sharedProcessor.Run

  1. func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  2. func() {
  3. p.listenersLock.RLock()
  4. defer p.listenersLock.RUnlock()
  5. for _, listener := range p.listeners {
  6. p.wg.Start(listener.run)
  7. p.wg.Start(listener.pop)
  8. }
  9. }()
  10. <-stopCh
  11. p.listenersLock.RLock()
  12. defer p.listenersLock.RUnlock()
  13. for _, listener := range p.listeners {
  14. close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
  15. }
  16. p.wg.Wait() // Wait for all .pop() and .run() to stop
  17. }

该部分逻辑待后面分析。

2.6. controller.Run

调用s.controller.Run,构建Reflector,进行对etcd的缓存

  1. defer func() {
  2. s.startedLock.Lock()
  3. defer s.startedLock.Unlock()
  4. s.stopped = true // Don't want any new listeners
  5. }()
  6. s.controller.Run(stopCh)

controller.Run

此部分代码位于/vendor/k8s.io/client-go/tools/cache/controller.go

  1. // Run begins processing items, and will continue until a value is sent down stopCh.
  2. // It's an error to call Run more than once.
  3. // Run blocks; call via go.
  4. func (c *controller) Run(stopCh <-chan struct{}) {
  5. defer utilruntime.HandleCrash()
  6. go func() {
  7. <-stopCh
  8. c.config.Queue.Close()
  9. }()
  10. r := NewReflector(
  11. c.config.ListerWatcher,
  12. c.config.ObjectType,
  13. c.config.Queue,
  14. c.config.FullResyncPeriod,
  15. )
  16. r.ShouldResync = c.config.ShouldResync
  17. r.clock = c.clock
  18. c.reflectorMutex.Lock()
  19. c.reflector = r
  20. c.reflectorMutex.Unlock()
  21. var wg wait.Group
  22. defer wg.Wait()
  23. wg.StartWithChannel(stopCh, r.Run)
  24. wait.Until(c.processLoop, time.Second, stopCh)
  25. }

核心代码:

  1. // 构建Reflector
  2. r := NewReflector(
  3. c.config.ListerWatcher,
  4. c.config.ObjectType,
  5. c.config.Queue,
  6. c.config.FullResyncPeriod,
  7. )
  8. // 运行Reflector
  9. wg.StartWithChannel(stopCh, r.Run)
  10. // 执行processLoop
  11. wait.Until(c.processLoop, time.Second, stopCh)

3. Reflector

3.1. Reflector

Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。

常用属性说明:

  • expectedType:期望放入缓存store的资源类型。
  • store:watch的资源对应的本地缓存。
  • listerWatcher:list和watch的接口。
  • period:watch的周期,默认为1秒。
  • resyncPeriod:resync的周期,当非零的时候,会按该周期执行list。
  • lastSyncResourceVersion:最新一次看到的资源的版本号,主要在watch时候使用。
  1. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
  2. type Reflector struct {
  3. // name identifies this reflector. By default it will be a file:line if possible.
  4. name string
  5. // metrics tracks basic metric information about the reflector
  6. metrics *reflectorMetrics
  7. // The type of object we expect to place in the store.
  8. expectedType reflect.Type
  9. // The destination to sync up with the watch source
  10. store Store
  11. // listerWatcher is used to perform lists and watches.
  12. listerWatcher ListerWatcher
  13. // period controls timing between one watch ending and
  14. // the beginning of the next one.
  15. period time.Duration
  16. resyncPeriod time.Duration
  17. ShouldResync func() bool
  18. // clock allows tests to manipulate time
  19. clock clock.Clock
  20. // lastSyncResourceVersion is the resource version token last
  21. // observed when doing a sync with the underlying store
  22. // it is thread safe, but not synchronized with the underlying store
  23. lastSyncResourceVersion string
  24. // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
  25. lastSyncResourceVersionMutex sync.RWMutex
  26. }

3.2. NewReflector

NewReflector主要用来构建Reflector的结构体。

此部分的代码位于/vendor/k8s.io/client-go/tools/cache/reflector.go

  1. // NewReflector creates a new Reflector object which will keep the given store up to
  2. // date with the server's contents for the given resource. Reflector promises to
  3. // only put things in the store that have the type of expectedType, unless expectedType
  4. // is nil. If resyncPeriod is non-zero, then lists will be executed after every
  5. // resyncPeriod, so that you can use reflectors to periodically process everything as
  6. // well as incrementally processing the things that change.
  7. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  8. return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
  9. }
  10. // reflectorDisambiguator is used to disambiguate started reflectors.
  11. // initialized to an unstable value to ensure meaning isn't attributed to the suffix.
  12. var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)
  13. // NewNamedReflector same as NewReflector, but with a specified name for logging
  14. func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  15. reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
  16. r := &Reflector{
  17. name: name,
  18. // we need this to be unique per process (some names are still the same)but obvious who it belongs to
  19. metrics: newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
  20. listerWatcher: lw,
  21. store: store,
  22. expectedType: reflect.TypeOf(expectedType),
  23. period: time.Second,
  24. resyncPeriod: resyncPeriod,
  25. clock: &clock.RealClock{},
  26. }
  27. return r
  28. }

3.3. Reflector.Run

Reflector.Run主要执行了ListAndWatch的方法。

  1. // Run starts a watch and handles watch events. Will restart the watch if it is closed.
  2. // Run will exit when stopCh is closed.
  3. func (r *Reflector) Run(stopCh <-chan struct{}) {
  4. glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
  5. wait.Until(func() {
  6. if err := r.ListAndWatch(stopCh); err != nil {
  7. utilruntime.HandleError(err)
  8. }
  9. }, r.period, stopCh)
  10. }

3.4. ListAndWatch

ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。

3.4.1. List

  1. // ListAndWatch first lists all items and get the resource version at the moment of call,
  2. // and then use the resource version to watch.
  3. // It returns error if ListAndWatch didn't even try to initialize watch.
  4. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  5. glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
  6. var resourceVersion string
  7. // Explicitly set "0" as resource version - it's fine for the List()
  8. // to be served from cache and potentially be delayed relative to
  9. // etcd contents. Reflector framework will catch up via Watch() eventually.
  10. options := metav1.ListOptions{ResourceVersion: "0"}
  11. r.metrics.numberOfLists.Inc()
  12. start := r.clock.Now()
  13. list, err := r.listerWatcher.List(options)
  14. if err != nil {
  15. return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
  16. }
  17. r.metrics.listDuration.Observe(time.Since(start).Seconds())
  18. listMetaInterface, err := meta.ListAccessor(list)
  19. if err != nil {
  20. return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
  21. }
  22. resourceVersion = listMetaInterface.GetResourceVersion()
  23. items, err := meta.ExtractList(list)
  24. if err != nil {
  25. return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
  26. }
  27. r.metrics.numberOfItemsInList.Observe(float64(len(items)))
  28. if err := r.syncWith(items, resourceVersion); err != nil {
  29. return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
  30. }
  31. r.setLastSyncResourceVersion(resourceVersion)
  32. ...
  33. }

首先将资源的版本号设置为0,然后调用listerWatcher.List(options),列出所有list的内容。

  1. // 版本号设置为0
  2. options := metav1.ListOptions{ResourceVersion: "0"}
  3. // list接口
  4. list, err := r.listerWatcher.List(options)

获取资源版本号,并将list的内容提取成对象列表。

  1. // 获取版本号
  2. resourceVersion = listMetaInterface.GetResourceVersion()
  3. // 将list的内容提取成对象列表
  4. items, err := meta.ExtractList(list)

将list中对象列表的内容和版本号存储到本地的缓存store中,并全量替换已有的store的内容。

  1. err := r.syncWith(items, resourceVersion)

syncWith调用了store的Replace的方法来替换原来store中的数据。

  1. // syncWith replaces the store's items with the given list.
  2. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
  3. found := make([]interface{}, 0, len(items))
  4. for _, item := range items {
  5. found = append(found, item)
  6. }
  7. return r.store.Replace(found, resourceVersion)
  8. }

Store.Replace方法定义如下:

  1. type Store interface {
  2. ...
  3. // Replace will delete the contents of the store, using instead the
  4. // given list. Store takes ownership of the list, you should not reference
  5. // it after calling this function.
  6. Replace([]interface{}, string) error
  7. ...
  8. }

最后设置最新的资源版本号。

  1. r.setLastSyncResourceVersion(resourceVersion)

setLastSyncResourceVersion:

  1. func (r *Reflector) setLastSyncResourceVersion(v string) {
  2. r.lastSyncResourceVersionMutex.Lock()
  3. defer r.lastSyncResourceVersionMutex.Unlock()
  4. r.lastSyncResourceVersion = v
  5. rv, err := strconv.Atoi(v)
  6. if err == nil {
  7. r.metrics.lastResourceVersion.Set(float64(rv))
  8. }
  9. }

3.4.2. store.Resync

  1. resyncerrc := make(chan error, 1)
  2. cancelCh := make(chan struct{})
  3. defer close(cancelCh)
  4. go func() {
  5. resyncCh, cleanup := r.resyncChan()
  6. defer func() {
  7. cleanup() // Call the last one written into cleanup
  8. }()
  9. for {
  10. select {
  11. case <-resyncCh:
  12. case <-stopCh:
  13. return
  14. case <-cancelCh:
  15. return
  16. }
  17. if r.ShouldResync == nil || r.ShouldResync() {
  18. glog.V(4).Infof("%s: forcing resync", r.name)
  19. if err := r.store.Resync(); err != nil {
  20. resyncerrc <- err
  21. return
  22. }
  23. }
  24. cleanup()
  25. resyncCh, cleanup = r.resyncChan()
  26. }
  27. }()

核心代码:

  1. err := r.store.Resync()

store的具体对象为DeltaFIFO,即调用DeltaFIFO.Resync

  1. // Resync will send a sync event for each item
  2. func (f *DeltaFIFO) Resync() error {
  3. f.lock.Lock()
  4. defer f.lock.Unlock()
  5. if f.knownObjects == nil {
  6. return nil
  7. }
  8. keys := f.knownObjects.ListKeys()
  9. for _, k := range keys {
  10. if err := f.syncKeyLocked(k); err != nil {
  11. return err
  12. }
  13. }
  14. return nil
  15. }

3.4.3. Watch

  1. for {
  2. // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
  3. select {
  4. case <-stopCh:
  5. return nil
  6. default:
  7. }
  8. timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  9. options = metav1.ListOptions{
  10. ResourceVersion: resourceVersion,
  11. // We want to avoid situations of hanging watchers. Stop any wachers that do not
  12. // receive any events within the timeout window.
  13. TimeoutSeconds: &timemoutseconds,
  14. }
  15. r.metrics.numberOfWatches.Inc()
  16. w, err := r.listerWatcher.Watch(options)
  17. if err != nil {
  18. switch err {
  19. case io.EOF:
  20. // watch closed normally
  21. case io.ErrUnexpectedEOF:
  22. glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
  23. default:
  24. utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
  25. }
  26. // If this is "connection refused" error, it means that most likely apiserver is not responsive.
  27. // It doesn't make sense to re-list all objects because most likely we will be able to restart
  28. // watch where we ended.
  29. // If that's the case wait and resend watch request.
  30. if urlError, ok := err.(*url.Error); ok {
  31. if opError, ok := urlError.Err.(*net.OpError); ok {
  32. if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
  33. time.Sleep(time.Second)
  34. continue
  35. }
  36. }
  37. }
  38. return nil
  39. }
  40. if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
  41. if err != errorStopRequested {
  42. glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
  43. }
  44. return nil
  45. }
  46. }

设置watch的超时时间,默认为5分钟。

  1. timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  2. options = metav1.ListOptions{
  3. ResourceVersion: resourceVersion,
  4. // We want to avoid situations of hanging watchers. Stop any wachers that do not
  5. // receive any events within the timeout window.
  6. TimeoutSeconds: &timemoutseconds,
  7. }

执行listerWatcher.Watch(options)。

  1. w, err := r.listerWatcher.Watch(options)

执行watchHandler。

  1. err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh)

3.4.4. watchHandler

watchHandler主要是通过watch的方式保证当前的资源版本是最新的。

  1. // watchHandler watches w and keeps *resourceVersion up to date.
  2. func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
  3. start := r.clock.Now()
  4. eventCount := 0
  5. // Stopping the watcher should be idempotent and if we return from this function there's no way
  6. // we're coming back in with the same watch interface.
  7. defer w.Stop()
  8. // update metrics
  9. defer func() {
  10. r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
  11. r.metrics.watchDuration.Observe(time.Since(start).Seconds())
  12. }()
  13. loop:
  14. for {
  15. select {
  16. case <-stopCh:
  17. return errorStopRequested
  18. case err := <-errc:
  19. return err
  20. case event, ok := <-w.ResultChan():
  21. if !ok {
  22. break loop
  23. }
  24. if event.Type == watch.Error {
  25. return apierrs.FromObject(event.Object)
  26. }
  27. if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
  28. utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
  29. continue
  30. }
  31. meta, err := meta.Accessor(event.Object)
  32. if err != nil {
  33. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  34. continue
  35. }
  36. newResourceVersion := meta.GetResourceVersion()
  37. switch event.Type {
  38. case watch.Added:
  39. err := r.store.Add(event.Object)
  40. if err != nil {
  41. utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
  42. }
  43. case watch.Modified:
  44. err := r.store.Update(event.Object)
  45. if err != nil {
  46. utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
  47. }
  48. case watch.Deleted:
  49. // TODO: Will any consumers need access to the "last known
  50. // state", which is passed in event.Object? If so, may need
  51. // to change this.
  52. err := r.store.Delete(event.Object)
  53. if err != nil {
  54. utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
  55. }
  56. default:
  57. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  58. }
  59. *resourceVersion = newResourceVersion
  60. r.setLastSyncResourceVersion(newResourceVersion)
  61. eventCount++
  62. }
  63. }
  64. watchDuration := r.clock.Now().Sub(start)
  65. if watchDuration < 1*time.Second && eventCount == 0 {
  66. r.metrics.numberOfShortWatches.Inc()
  67. return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
  68. }
  69. glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
  70. return nil
  71. }

获取watch接口中的事件的channel,来获取事件的内容。

  1. for {
  2. select {
  3. ...
  4. case event, ok := <-w.ResultChan():
  5. ...
  6. }

当获得添加、更新、删除的事件时,将对应的对象更新到本地缓存store中。

  1. switch event.Type {
  2. case watch.Added:
  3. err := r.store.Add(event.Object)
  4. if err != nil {
  5. utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
  6. }
  7. case watch.Modified:
  8. err := r.store.Update(event.Object)
  9. if err != nil {
  10. utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
  11. }
  12. case watch.Deleted:
  13. // TODO: Will any consumers need access to the "last known
  14. // state", which is passed in event.Object? If so, may need
  15. // to change this.
  16. err := r.store.Delete(event.Object)
  17. if err != nil {
  18. utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
  19. }
  20. default:
  21. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  22. }

更新当前的最新版本号。

  1. newResourceVersion := meta.GetResourceVersion()
  2. *resourceVersion = newResourceVersion
  3. r.setLastSyncResourceVersion(newResourceVersion)

通过对Reflector模块的分析,可以看到多次使用到本地缓存store模块,而store的数据由DeltaFIFO赋值而来,以下针对DeltaFIFO和store做分析。

4. DeltaFIFO

DeltaFIFO由NewDeltaFIFO初始化,并赋值给config.Queue。

  1. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  2. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
  3. cfg := &Config{
  4. Queue: fifo,
  5. ...
  6. }
  7. ...
  8. }

4.1. NewDeltaFIFO

  1. // NewDeltaFIFO returns a Store which can be used process changes to items.
  2. //
  3. // keyFunc is used to figure out what key an object should have. (It's
  4. // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
  5. //
  6. // 'compressor' may compress as many or as few items as it wants
  7. // (including returning an empty slice), but it should do what it
  8. // does quickly since it is called while the queue is locked.
  9. // 'compressor' may be nil if you don't want any delta compression.
  10. //
  11. // 'keyLister' is expected to return a list of keys that the consumer of
  12. // this queue "knows about". It is used to decide which items are missing
  13. // when Replace() is called; 'Deleted' deltas are produced for these items.
  14. // It may be nil if you don't need to detect all deletions.
  15. // TODO: consider merging keyLister with this object, tracking a list of
  16. // "known" keys when Pop() is called. Have to think about how that
  17. // affects error retrying.
  18. // TODO(lavalamp): I believe there is a possible race only when using an
  19. // external known object source that the above TODO would
  20. // fix.
  21. //
  22. // Also see the comment on DeltaFIFO.
  23. func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
  24. f := &DeltaFIFO{
  25. items: map[string]Deltas{},
  26. queue: []string{},
  27. keyFunc: keyFunc,
  28. deltaCompressor: compressor,
  29. knownObjects: knownObjects,
  30. }
  31. f.cond.L = &f.lock
  32. return f
  33. }

controller.Run的部分调用了NewReflector。

  1. func (c *controller) Run(stopCh <-chan struct{}) {
  2. ...
  3. r := NewReflector(
  4. c.config.ListerWatcher,
  5. c.config.ObjectType,
  6. c.config.Queue,
  7. c.config.FullResyncPeriod,
  8. )
  9. ...
  10. }

NewReflector构造函数,将c.config.Queue赋值给Reflector.store的属性。

  1. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  2. return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
  3. }
  4. // NewNamedReflector same as NewReflector, but with a specified name for logging
  5. func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  6. reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
  7. r := &Reflector{
  8. name: name,
  9. // we need this to be unique per process (some names are still the same)but obvious who it belongs to
  10. metrics: newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
  11. listerWatcher: lw,
  12. store: store,
  13. expectedType: reflect.TypeOf(expectedType),
  14. period: time.Second,
  15. resyncPeriod: resyncPeriod,
  16. clock: &clock.RealClock{},
  17. }
  18. return r
  19. }

4.2. DeltaFIFO

DeltaFIFO是一个生产者与消费者的队列,其中Reflector是生产者,消费者调用Pop()的方法。

DeltaFIFO主要用在以下场景:

  • 希望对象变更最多处理一次
  • 处理对象时,希望查看自上次处理对象以来发生的所有事情
  • 要处理对象的删除
  • 希望定期重新处理对象
  1. // DeltaFIFO is like FIFO, but allows you to process deletes.
  2. //
  3. // DeltaFIFO is a producer-consumer queue, where a Reflector is
  4. // intended to be the producer, and the consumer is whatever calls
  5. // the Pop() method.
  6. //
  7. // DeltaFIFO solves this use case:
  8. // * You want to process every object change (delta) at most once.
  9. // * When you process an object, you want to see everything
  10. // that's happened to it since you last processed it.
  11. // * You want to process the deletion of objects.
  12. // * You might want to periodically reprocess objects.
  13. //
  14. // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
  15. // interface{} to satisfy the Store/Queue interfaces, but it
  16. // will always return an object of type Deltas.
  17. //
  18. // A note on threading: If you call Pop() in parallel from multiple
  19. // threads, you could end up with multiple threads processing slightly
  20. // different versions of the same object.
  21. //
  22. // A note on the KeyLister used by the DeltaFIFO: It's main purpose is
  23. // to list keys that are "known", for the purpose of figuring out which
  24. // items have been deleted when Replace() or Delete() are called. The deleted
  25. // object will be included in the DeleteFinalStateUnknown markers. These objects
  26. // could be stale.
  27. //
  28. // You may provide a function to compress deltas (e.g., represent a
  29. // series of Updates as a single Update).
  30. type DeltaFIFO struct {
  31. // lock/cond protects access to 'items' and 'queue'.
  32. lock sync.RWMutex
  33. cond sync.Cond
  34. // We depend on the property that items in the set are in
  35. // the queue and vice versa, and that all Deltas in this
  36. // map have at least one Delta.
  37. items map[string]Deltas
  38. queue []string
  39. // populated is true if the first batch of items inserted by Replace() has been populated
  40. // or Delete/Add/Update was called first.
  41. populated bool
  42. // initialPopulationCount is the number of items inserted by the first call of Replace()
  43. initialPopulationCount int
  44. // keyFunc is used to make the key used for queued item
  45. // insertion and retrieval, and should be deterministic.
  46. keyFunc KeyFunc
  47. // deltaCompressor tells us how to combine two or more
  48. // deltas. It may be nil.
  49. deltaCompressor DeltaCompressor
  50. // knownObjects list keys that are "known", for the
  51. // purpose of figuring out which items have been deleted
  52. // when Replace() or Delete() is called.
  53. knownObjects KeyListerGetter
  54. // Indication the queue is closed.
  55. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  56. // Currently, not used to gate any of CRED operations.
  57. closed bool
  58. closedLock sync.Mutex
  59. }

4.3. Queue & Store

DeltaFIFO的类型是Queue接口,Reflector.store是Store接口,Queue接口是一个存储队列,Process的方法执行Queue.Pop出来的数据对象,

  1. // Queue is exactly like a Store, but has a Pop() method too.
  2. type Queue interface {
  3. Store
  4. // Pop blocks until it has something to process.
  5. // It returns the object that was process and the result of processing.
  6. // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
  7. // should be requeued before releasing the lock on the queue.
  8. Pop(PopProcessFunc) (interface{}, error)
  9. // AddIfNotPresent adds a value previously
  10. // returned by Pop back into the queue as long
  11. // as nothing else (presumably more recent)
  12. // has since been added.
  13. AddIfNotPresent(interface{}) error
  14. // Return true if the first batch of items has been popped
  15. HasSynced() bool
  16. // Close queue
  17. Close()
  18. }

5. store

Store是一个通用的存储接口,Reflector通过watch server的方式更新数据到store中,store给Reflector提供本地的缓存,让Reflector可以像消息队列一样的工作。

Store实现的是一种可以准确的写入对象和获取对象的机制。

  1. // Store is a generic object storage interface. Reflector knows how to watch a server
  2. // and update a store. A generic store is provided, which allows Reflector to be used
  3. // as a local caching system, and an LRU store, which allows Reflector to work like a
  4. // queue of items yet to be processed.
  5. //
  6. // Store makes no assumptions about stored object identity; it is the responsibility
  7. // of a Store implementation to provide a mechanism to correctly key objects and to
  8. // define the contract for obtaining objects by some arbitrary key type.
  9. type Store interface {
  10. Add(obj interface{}) error
  11. Update(obj interface{}) error
  12. Delete(obj interface{}) error
  13. List() []interface{}
  14. ListKeys() []string
  15. Get(obj interface{}) (item interface{}, exists bool, err error)
  16. GetByKey(key string) (item interface{}, exists bool, err error)
  17. // Replace will delete the contents of the store, using instead the
  18. // given list. Store takes ownership of the list, you should not reference
  19. // it after calling this function.
  20. Replace([]interface{}, string) error
  21. Resync() error
  22. }

其中Replace方法会删除原来store中的内容,并将新增的list的内容存入store中,即完全替换数据。

6.1. cache

cache实现了store的接口,而cache的具体实现又是调用ThreadSafeStore接口来实现功能的。

cache的功能主要有以下两点:

  • 通过keyFunc计算对象的key
  • 调用ThreadSafeStorage接口的方法
  1. // cache responsibilities are limited to:
  2. // 1. Computing keys for objects via keyFunc
  3. // 2. Invoking methods of a ThreadSafeStorage interface
  4. type cache struct {
  5. // cacheStorage bears the burden of thread safety for the cache
  6. cacheStorage ThreadSafeStore
  7. // keyFunc is used to make the key for objects stored in and retrieved from items, and
  8. // should be deterministic.
  9. keyFunc KeyFunc
  10. }

其中ListAndWatch主要用到以下的方法:

cache.Replace

  1. // Replace will delete the contents of 'c', using instead the given list.
  2. // 'c' takes ownership of the list, you should not reference the list again
  3. // after calling this function.
  4. func (c *cache) Replace(list []interface{}, resourceVersion string) error {
  5. items := map[string]interface{}{}
  6. for _, item := range list {
  7. key, err := c.keyFunc(item)
  8. if err != nil {
  9. return KeyError{item, err}
  10. }
  11. items[key] = item
  12. }
  13. c.cacheStorage.Replace(items, resourceVersion)
  14. return nil
  15. }

cache.Add

  1. // Add inserts an item into the cache.
  2. func (c *cache) Add(obj interface{}) error {
  3. key, err := c.keyFunc(obj)
  4. if err != nil {
  5. return KeyError{obj, err}
  6. }
  7. c.cacheStorage.Add(key, obj)
  8. return nil
  9. }

cache.Update

  1. // Update sets an item in the cache to its updated state.
  2. func (c *cache) Update(obj interface{}) error {
  3. key, err := c.keyFunc(obj)
  4. if err != nil {
  5. return KeyError{obj, err}
  6. }
  7. c.cacheStorage.Update(key, obj)
  8. return nil
  9. }

cache.Delete

  1. // Delete removes an item from the cache.
  2. func (c *cache) Delete(obj interface{}) error {
  3. key, err := c.keyFunc(obj)
  4. if err != nil {
  5. return KeyError{obj, err}
  6. }
  7. c.cacheStorage.Delete(key)
  8. return nil
  9. }

6.2. ThreadSafeStore

cache的具体是调用ThreadSafeStore来实现的。

  1. // ThreadSafeStore is an interface that allows concurrent access to a storage backend.
  2. // TL;DR caveats: you must not modify anything returned by Get or List as it will break
  3. // the indexing feature in addition to not being thread safe.
  4. //
  5. // The guarantees of thread safety provided by List/Get are only valid if the caller
  6. // treats returned items as read-only. For example, a pointer inserted in the store
  7. // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
  8. // on the same key and modify the pointer in a non-thread-safe way. Also note that
  9. // modifying objects stored by the indexers (if any) will *not* automatically lead
  10. // to a re-index. So it's not a good idea to directly modify the objects returned by
  11. // Get/List, in general.
  12. type ThreadSafeStore interface {
  13. Add(key string, obj interface{})
  14. Update(key string, obj interface{})
  15. Delete(key string)
  16. Get(key string) (item interface{}, exists bool)
  17. List() []interface{}
  18. ListKeys() []string
  19. Replace(map[string]interface{}, string)
  20. Index(indexName string, obj interface{}) ([]interface{}, error)
  21. IndexKeys(indexName, indexKey string) ([]string, error)
  22. ListIndexFuncValues(name string) []string
  23. ByIndex(indexName, indexKey string) ([]interface{}, error)
  24. GetIndexers() Indexers
  25. // AddIndexers adds more indexers to this store. If you call this after you already have data
  26. // in the store, the results are undefined.
  27. AddIndexers(newIndexers Indexers) error
  28. Resync() error
  29. }

threadSafeMap

  1. // threadSafeMap implements ThreadSafeStore
  2. type threadSafeMap struct {
  3. lock sync.RWMutex
  4. items map[string]interface{}
  5. // indexers maps a name to an IndexFunc
  6. indexers Indexers
  7. // indices maps a name to an Index
  8. indices Indices
  9. }

6. processLoop

  1. func (c *controller) Run(stopCh <-chan struct{}) {
  2. ...
  3. wait.Until(c.processLoop, time.Second, stopCh)
  4. }

在controller.Run方法中会调用processLoop,以下分析processLoop的处理逻辑。

  1. // processLoop drains the work queue.
  2. // TODO: Consider doing the processing in parallel. This will require a little thought
  3. // to make sure that we don't end up processing the same object multiple times
  4. // concurrently.
  5. //
  6. // TODO: Plumb through the stopCh here (and down to the queue) so that this can
  7. // actually exit when the controller is stopped. Or just give up on this stuff
  8. // ever being stoppable. Converting this whole package to use Context would
  9. // also be helpful.
  10. func (c *controller) processLoop() {
  11. for {
  12. obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
  13. if err != nil {
  14. if err == FIFOClosedError {
  15. return
  16. }
  17. if c.config.RetryOnError {
  18. // This is the safe way to re-enqueue.
  19. c.config.Queue.AddIfNotPresent(obj)
  20. }
  21. }
  22. }
  23. }

processLoop主要处理任务队列中的任务,其中处理逻辑是调用具体的ProcessFunc函数来实现,核心代码为:

  1. obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))

5.1. DeltaFIFO.Pop

Pop会阻塞住直到队列里面添加了新的对象,如果有多个对象,按照先进先出的原则处理,如果某个对象没有处理成功会重新被加入该队列中。

Pop中会调用具体的process函数来处理对象。

  1. // Pop blocks until an item is added to the queue, and then returns it. If
  2. // multiple items are ready, they are returned in the order in which they were
  3. // added/updated. The item is removed from the queue (and the store) before it
  4. // is returned, so if you don't successfully process it, you need to add it back
  5. // with AddIfNotPresent().
  6. // process function is called under lock, so it is safe update data structures
  7. // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
  8. // may return an instance of ErrRequeue with a nested error to indicate the current
  9. // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
  10. //
  11. // Pop returns a 'Deltas', which has a complete list of all the things
  12. // that happened to the object (deltas) while it was sitting in the queue.
  13. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  14. f.lock.Lock()
  15. defer f.lock.Unlock()
  16. for {
  17. for len(f.queue) == 0 {
  18. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  19. // When Close() is called, the f.closed is set and the condition is broadcasted.
  20. // Which causes this loop to continue and return from the Pop().
  21. if f.IsClosed() {
  22. return nil, FIFOClosedError
  23. }
  24. f.cond.Wait()
  25. }
  26. id := f.queue[0]
  27. f.queue = f.queue[1:]
  28. item, ok := f.items[id]
  29. if f.initialPopulationCount > 0 {
  30. f.initialPopulationCount--
  31. }
  32. if !ok {
  33. // Item may have been deleted subsequently.
  34. continue
  35. }
  36. delete(f.items, id)
  37. err := process(item)
  38. if e, ok := err.(ErrRequeue); ok {
  39. f.addIfNotPresent(id, item)
  40. err = e.Err
  41. }
  42. // Don't need to copyDeltas here, because we're transferring
  43. // ownership to the caller.
  44. return item, err
  45. }
  46. }

核心代码:

  1. for {
  2. ...
  3. item, ok := f.items[id]
  4. ...
  5. err := process(item)
  6. if e, ok := err.(ErrRequeue); ok {
  7. f.addIfNotPresent(id, item)
  8. err = e.Err
  9. }
  10. // Don't need to copyDeltas here, because we're transferring
  11. // ownership to the caller.
  12. return item, err
  13. }

5.2. HandleDeltas

  1. cfg := &Config{
  2. Queue: fifo,
  3. ListerWatcher: s.listerWatcher,
  4. ObjectType: s.objectType,
  5. FullResyncPeriod: s.resyncCheckPeriod,
  6. RetryOnError: false,
  7. ShouldResync: s.processor.shouldResync,
  8. Process: s.HandleDeltas,
  9. }

其中process函数就是在sharedIndexInformer.Run方法中,给config.Process赋值的HandleDeltas函数。

  1. func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  2. s.blockDeltas.Lock()
  3. defer s.blockDeltas.Unlock()
  4. // from oldest to newest
  5. for _, d := range obj.(Deltas) {
  6. switch d.Type {
  7. case Sync, Added, Updated:
  8. isSync := d.Type == Sync
  9. s.cacheMutationDetector.AddObject(d.Object)
  10. if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
  11. if err := s.indexer.Update(d.Object); err != nil {
  12. return err
  13. }
  14. s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
  15. } else {
  16. if err := s.indexer.Add(d.Object); err != nil {
  17. return err
  18. }
  19. s.processor.distribute(addNotification{newObj: d.Object}, isSync)
  20. }
  21. case Deleted:
  22. if err := s.indexer.Delete(d.Object); err != nil {
  23. return err
  24. }
  25. s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
  26. }
  27. }
  28. return nil
  29. }

核心代码:

  1. switch d.Type {
  2. case Sync, Added, Updated:
  3. ...
  4. if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
  5. ...
  6. s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
  7. } else {
  8. ...
  9. s.processor.distribute(addNotification{newObj: d.Object}, isSync)
  10. }
  11. case Deleted:
  12. ...
  13. s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
  14. }

根据不同的类型,调用processor.distribute方法,该方法将对象加入processorListener的channel中。

5.3. sharedProcessor.distribute

  1. func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  2. p.listenersLock.RLock()
  3. defer p.listenersLock.RUnlock()
  4. if sync {
  5. for _, listener := range p.syncingListeners {
  6. listener.add(obj)
  7. }
  8. } else {
  9. for _, listener := range p.listeners {
  10. listener.add(obj)
  11. }
  12. }
  13. }

processorListener.add:

  1. func (p *processorListener) add(notification interface{}) {
  2. p.addCh <- notification
  3. }

综合以上的分析,可以看出processLoop通过调用HandleDeltas,再调用distribute,processorListener.add最终将不同更新类型的对象加入processorListener的channel中,供processorListener.Run使用。以下分析processorListener.Run的部分。

7. processor

processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。在sharedIndexInformer.Run部分会调用processor.run。

流程:

  1. listenser的add函数负责将notify装进pendingNotifications。
  2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。
  3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数是在不同的NewXxxcontroller实现中注册的。
  1. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  2. ...
  3. wg.StartWithChannel(processorStopCh, s.processor.run)
  4. ...
  5. }

7.1. sharedProcessor.Run

  1. func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  2. func() {
  3. p.listenersLock.RLock()
  4. defer p.listenersLock.RUnlock()
  5. for _, listener := range p.listeners {
  6. p.wg.Start(listener.run)
  7. p.wg.Start(listener.pop)
  8. }
  9. }()
  10. <-stopCh
  11. p.listenersLock.RLock()
  12. defer p.listenersLock.RUnlock()
  13. for _, listener := range p.listeners {
  14. close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
  15. }
  16. p.wg.Wait() // Wait for all .pop() and .run() to stop
  17. }

7.1.1. listener.pop

pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。

  1. func (p *processorListener) pop() {
  2. defer utilruntime.HandleCrash()
  3. defer close(p.nextCh) // Tell .run() to stop
  4. var nextCh chan<- interface{}
  5. var notification interface{}
  6. for {
  7. select {
  8. case nextCh <- notification:
  9. // Notification dispatched
  10. var ok bool
  11. notification, ok = p.pendingNotifications.ReadOne()
  12. if !ok { // Nothing to pop
  13. nextCh = nil // Disable this select case
  14. }
  15. case notificationToAdd, ok := <-p.addCh:
  16. if !ok {
  17. return
  18. }
  19. if notification == nil { // No notification to pop (and pendingNotifications is empty)
  20. // Optimize the case - skip adding to pendingNotifications
  21. notification = notificationToAdd
  22. nextCh = p.nextCh
  23. } else { // There is already a notification waiting to be dispatched
  24. p.pendingNotifications.WriteOne(notificationToAdd)
  25. }
  26. }
  27. }
  28. }

7.1.2. listener.run

listener.run部分根据不同的更新类型调用不同的处理函数。

  1. func (p *processorListener) run() {
  2. defer utilruntime.HandleCrash()
  3. for next := range p.nextCh {
  4. switch notification := next.(type) {
  5. case updateNotification:
  6. p.handler.OnUpdate(notification.oldObj, notification.newObj)
  7. case addNotification:
  8. p.handler.OnAdd(notification.newObj)
  9. case deleteNotification:
  10. p.handler.OnDelete(notification.oldObj)
  11. default:
  12. utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
  13. }
  14. }
  15. }

其中具体的实现函数handler是在NewDeploymentController(其他不同类型的controller类似)中赋值的,而该handler是一个接口,具体如下:

  1. // ResourceEventHandler can handle notifications for events that happen to a
  2. // resource. The events are informational only, so you can't return an
  3. // error.
  4. // * OnAdd is called when an object is added.
  5. // * OnUpdate is called when an object is modified. Note that oldObj is the
  6. // last known state of the object-- it is possible that several changes
  7. // were combined together, so you can't use this to see every single
  8. // change. OnUpdate is also called when a re-list happens, and it will
  9. // get called even if nothing changed. This is useful for periodically
  10. // evaluating or syncing something.
  11. // * OnDelete will get the final state of the item if it is known, otherwise
  12. // it will get an object of type DeletedFinalStateUnknown. This can
  13. // happen if the watch is closed and misses the delete event and we don't
  14. // notice the deletion until the subsequent re-list.
  15. type ResourceEventHandler interface {
  16. OnAdd(obj interface{})
  17. OnUpdate(oldObj, newObj interface{})
  18. OnDelete(obj interface{})
  19. }

7.2. ResourceEventHandler

以下以DeploymentController的处理逻辑为例。

NewDeploymentController部分会注册deployment的事件函数,以下注册了三种类型的事件函数,其中包括:dInformer、rsInformer和podInformer。

  1. // NewDeploymentController creates a new DeploymentController.
  2. func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
  3. ...
  4. dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  5. AddFunc: dc.addDeployment,
  6. UpdateFunc: dc.updateDeployment,
  7. // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
  8. DeleteFunc: dc.deleteDeployment,
  9. })
  10. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  11. AddFunc: dc.addReplicaSet,
  12. UpdateFunc: dc.updateReplicaSet,
  13. DeleteFunc: dc.deleteReplicaSet,
  14. })
  15. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  16. DeleteFunc: dc.deletePod,
  17. })
  18. ...
  19. }

7.2.1. addDeployment

以下以addDeployment为例,addDeployment主要是将对象加入到enqueueDeployment的队列中。

  1. func (dc *DeploymentController) addDeployment(obj interface{}) {
  2. d := obj.(*extensions.Deployment)
  3. glog.V(4).Infof("Adding deployment %s", d.Name)
  4. dc.enqueueDeployment(d)
  5. }

enqueueDeployment的定义

  1. type DeploymentController struct {
  2. ...
  3. enqueueDeployment func(deployment *extensions.Deployment)
  4. ...
  5. }

将dc.enqueue赋值给dc.enqueueDeployment

  1. dc.enqueueDeployment = dc.enqueue

dc.enqueue调用了dc.queue.Add(key)

  1. func (dc *DeploymentController) enqueue(deployment *extensions.Deployment) {
  2. key, err := controller.KeyFunc(deployment)
  3. if err != nil {
  4. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
  5. return
  6. }
  7. dc.queue.Add(key)
  8. }

dc.queue主要记录了需要被同步的deployment的对象,供syncDeployment使用。

  1. dc := &DeploymentController{
  2. ...
  3. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
  4. }

NewNamedRateLimitingQueue

  1. func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
  2. return &rateLimitingType{
  3. DelayingInterface: NewNamedDelayingQueue(name),
  4. rateLimiter: rateLimiter,
  5. }
  6. }

通过以上分析,可以看出processor记录了不同类似的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。

8. 总结

本文分析的部分主要是k8s的informer机制,即List-Watch机制。

8.1. Reflector

Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。

8.2. ListAndWatch

ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。

8.3. DeltaFIFO

DeltaFIFO是一个生产者与消费者的队列,其中Reflector是生产者,消费者调用Pop()的方法。

DeltaFIFO主要用在以下场景:

  • 希望对象变更最多处理一次
  • 处理对象时,希望查看自上次处理对象以来发生的所有事情
  • 要处理对象的删除
  • 希望定期重新处理对象

8.4. store

Store是一个通用的存储接口,Reflector通过watch server的方式更新数据到store中,store给Reflector提供本地的缓存,让Reflector可以像消息队列一样的工作。

Store实现的是一种可以准确的写入对象和获取对象的机制。

8.5. processor

processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。在sharedIndexInformer.Run部分会调用processor.run。

流程:

  1. listenser的add函数负责将notify装进pendingNotifications。
  2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。
  3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数是在不同的NewXxxcontroller实现中注册的。

processor记录了不同类似的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。

8.6. 主要步骤

  1. 在controller-manager的Run函数部分调用了InformerFactory.Start的方法,Start方法初始化各种类型的informer,并且每个类型起了个informer.Run的goroutine。
  2. informer.Run的部分先生成一个DeltaFIFO的队列来存储对象变化的数据。然后调用processor.Run和controller.Run函数。
  3. controller.Run函数会生成一个Reflector,Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。ReflectorresyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。
  4. Reflector接着执行ListAndWatch函数,ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。
  5. controller.Run函数还会调用processLoop函数,processLoop通过调用HandleDeltas,再调用distribute,processorListener.add最终将不同更新类型的对象加入processorListener的channel中,供processorListener.Run使用。
  6. processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。processor记录了不同类型的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。

参考文章: