在前面的文章中已经介绍了 deployment controller 的设计与实现,deployment 控制的是 replicaset,而 replicaset 控制 pod 的创建与删除,deployment 通过控制 replicaset 实现了滚动更新、回滚等操作。而 replicaset 会直接控制 pod 的创建与删除,本文会继续从源码层面分析 replicaset 的设计与实现。

在分析源码前先考虑一下 replicaset 的使用场景,在平时的操作中其实我们并不会直接操作 replicaset,replicaset 也仅有几个简单的操作,创建、删除、更新等,但其地位是非常重要的,replicaset 的主要功能就是通过 add/del pod 来达到期望的状态。

ReplicaSetController 源码分析

kubernetes 版本: v1.16

启动流程

首先来看 replicaSetController 对象初始化以及启动的代码,在 startReplicaSetController 中有两个比较重要的变量:

  • BurstReplicas:用来控制在一个 syncLoop 过程中 rs 最多能创建的 pod 数量,设置上限值是为了避免单个 rs 影响整个系统,默认值为 500;
  • ConcurrentRSSyncs:指的是需要启动多少个 goroutine 处理 informer 队列中的对象,默认值为 5;

k8s.io/kubernetes/cmd/kube-controller-manager/app/apps.go:69

  1. func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
  2. if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
  3. return nil, false, nil
  4. }
  5. go replicaset.NewReplicaSetController(
  6. ctx.InformerFactory.Apps().V1().ReplicaSets(),
  7. ctx.InformerFactory.Core().V1().Pods(),
  8. ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
  9. replicaset.BurstReplicas,
  10. ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
  11. return nil, true, nil
  12. }

下面是 replicaSetController 初始化的具体步骤,可以看到其会监听 pod 以及 rs 两个对象的事件。

k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:109

  1. func NewReplicaSetController(......) *ReplicaSetController {
  2. ......
  3. // 1、此处调用 NewBaseController
  4. return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
  5. apps.SchemeGroupVersion.WithKind("ReplicaSet"),
  6. "replicaset_controller",
  7. "replicaset",
  8. controller.RealPodControl{
  9. KubeClient: kubeClient,
  10. Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
  11. },
  12. )
  13. }
  14. func NewBaseController(......) *ReplicaSetController {
  15. ......
  16. // 2、ReplicaSetController 初始化
  17. rsc := &ReplicaSetController{
  18. GroupVersionKind: gvk,
  19. kubeClient: kubeClient,
  20. podControl: podControl,
  21. burstReplicas: burstReplicas,
  22. // 3、expectations 的初始化
  23. expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
  24. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
  25. }
  26. // 4、rsInformer 中注册的 EventHandler
  27. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  28. AddFunc: rsc.enqueueReplicaSet,
  29. UpdateFunc: rsc.updateRS,
  30. DeleteFunc: rsc.enqueueReplicaSet,
  31. })
  32. ......
  33. // 5、podInformer 中注册的 EventHandler
  34. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  35. AddFunc: rsc.addPod,
  36. UpdateFunc: rsc.updatePod,
  37. DeleteFunc: rsc.deletePod,
  38. })
  39. ......
  40. return rsc
  41. }

replicaSetController 初始化完成后会调用 Run 方法启动 5 个 goroutine 处理 informer 队列中的事件并进行 sync 操作,kube-controller-manager 中每个 controller 的启动操作都是如下所示流程。

k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:177

  1. func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
  2. ......
  3. // 1、等待 informer 同步缓存
  4. if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
  5. return
  6. }
  7. // 2、启动 5 个 goroutine 执行 worker 方法
  8. for i := 0; i < workers; i++ {
  9. go wait.Until(rsc.worker, time.Second, stopCh)
  10. }
  11. <-stopCh
  12. }
  13. // 3、worker 方法中调用 rocessNextWorkItem
  14. func (rsc *ReplicaSetController) worker() {
  15. for rsc.processNextWorkItem() {
  16. }
  17. }
  18. func (rsc *ReplicaSetController) processNextWorkItem() bool {
  19. // 4、从队列中取出对象
  20. key, quit := rsc.queue.Get()
  21. if quit {
  22. return false
  23. }
  24. defer rsc.queue.Done(key)
  25. // 5、执行 sync 操作
  26. err := rsc.syncHandler(key.(string))
  27. ......
  28. return true
  29. }

EventHandler

初始化 replicaSetController 时,其中有一个 expectations 字段,这是 rs 中一个比较特殊的机制,为了说清楚 expectations,先来看一下 controller 中所注册的 eventHandler,replicaSetController 会 watch pod 和 replicaSet 两个对象,eventHandler 中注册了对这两种对象的 add、update、delete 三个操作。

addPod
  • 1、判断 pod 是否处于删除状态;
  • 2、获取该 pod 关联的 rs 以及 rsKey,入队 rs 并更新 rsKey 的 expectations;
  • 3、若 pod 对象没体现出关联的 rs 则为孤儿 pod,遍历 rsList 查找匹配的 rs,若该 rs.Namespace == pod.Namespace 并且 rs.Spec.Selector 匹配 pod.Labels,则说明该 pod 应该与此 rs 关联,将匹配的 rs 入队;

k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:255

  1. func (rsc *ReplicaSetController) addPod(obj interface{}) {
  2. pod := obj.(*v1.Pod)
  3. if pod.DeletionTimestamp != nil {
  4. rsc.deletePod(pod)
  5. return
  6. }
  7. // 1、获取 pod 所关联的 rs
  8. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
  9. rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
  10. if rs == nil {
  11. return
  12. }
  13. rsKey, err := controller.KeyFunc(rs)
  14. if err != nil {
  15. return
  16. }
  17. // 2、更新 expectations,rsKey 的 add - 1
  18. rsc.expectations.CreationObserved(rsKey)
  19. rsc.enqueueReplicaSet(rs)
  20. return
  21. }
  22. rss := rsc.getPodReplicaSets(pod)
  23. if len(rss) == 0 {
  24. return
  25. }
  26. for _, rs := range rss {
  27. rsc.enqueueReplicaSet(rs)
  28. }
  29. }
updatePod
  • 1、如果 pod label 改变或者处于删除状态,则直接删除;
  • 2、如果 pod 的 OwnerReference 发生改变,此时 oldRS 需要创建 pod,将 oldRS 入队;
  • 3、获取 pod 关联的 rs,入队 rs,若 pod 当前处于 ready 并非 available 状态,则会再次将该 rs 加入到延迟队列中,因为 pod 从 ready 到 available 状态需要触发一次 status 的更新;
  • 4、否则为孤儿 pod,遍历 rsList 查找匹配的 rs,若找到则将 rs 入队;

k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:298

  1. func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
  2. curPod := cur.(*v1.Pod)
  3. oldPod := old.(*v1.Pod)
  4. if curPod.ResourceVersion == oldPod.ResourceVersion {
  5. return
  6. }
  7. // 1、如果 pod label 改变或者处于删除状态,则直接删除
  8. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  9. if curPod.DeletionTimestamp != nil {
  10. rsc.deletePod(curPod)
  11. if labelChanged {
  12. rsc.deletePod(oldPod)
  13. }
  14. return
  15. }
  16. // 2、如果 pod 的 OwnerReference 发生改变,将 oldRS 入队
  17. curControllerRef := metav1.GetControllerOf(curPod)
  18. oldControllerRef := metav1.GetControllerOf(oldPod)
  19. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  20. if controllerRefChanged && oldControllerRef != nil {
  21. if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
  22. rsc.enqueueReplicaSet(rs)
  23. }
  24. }
  25. // 3、获取 pod 关联的 rs,入队 rs
  26. if curControllerRef != nil {
  27. rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
  28. if rs == nil {
  29. return
  30. }
  31. rsc.enqueueReplicaSet(rs)
  32. if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
  33. rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
  34. }
  35. return
  36. }
  37. // 4、查找匹配的 rs
  38. if labelChanged || controllerRefChanged {
  39. rss := rsc.getPodReplicaSets(curPod)
  40. if len(rss) == 0 {
  41. return
  42. }
  43. for _, rs := range rss {
  44. rsc.enqueueReplicaSet(rs)
  45. }
  46. }
  47. }
deletePod
  • 1、确认该对象是否为 pod;
  • 2、判断是否为孤儿 pod;
  • 3、获取其对应的 rs 以及 rsKey;
  • 4、更新 expectations 中 rsKey 的 del 值;
  • 5、将 rs 入队;

k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:372

  1. func (rsc *ReplicaSetController) deletePod(obj interface{}) {
  2. pod, ok := obj.(*v1.Pod)
  3. if !ok {
  4. ......
  5. }
  6. controllerRef := metav1.GetControllerOf(pod)
  7. if controllerRef == nil {
  8. return
  9. }
  10. rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
  11. if rs == nil {
  12. return
  13. }
  14. rsKey, err := controller.KeyFunc(rs)
  15. if err != nil {
  16. return
  17. }
  18. // 更新 expectations,该 rsKey 的 del - 1
  19. rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
  20. rsc.enqueueReplicaSet(rs)
  21. }
AddRS 和 DeleteRS

以上两个操作仅仅是将对应的 rs 入队。

UpdateRS

其实 updateRS 也仅仅是将对应的 rs 进行入队,不过多了一个打印日志的操作,如下所示:

k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:232

  1. func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
  2. oldRS := old.(*apps.ReplicaSet)
  3. curRS := cur.(*apps.ReplicaSet)
  4. if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
  5. klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
  6. }
  7. rsc.enqueueReplicaSet(cur)
  8. }

至于 expectations 机制会在下文进行分析。

syncReplicaSet

syncReplicaSet 是 controller 的核心方法,它会驱动 controller 所控制的对象达到期望状态,主要逻辑如下所示:

  • 1、根据 ns/name 获取 rs 对象;
  • 2、调用 expectations.SatisfiedExpectations 判断是否需要执行真正的 sync 操作;
  • 3、获取所有 pod list;
  • 4、根据 pod label 进行过滤获取与该 rs 关联的 pod 列表,对于其中的孤儿 pod 若与该 rs label 匹配则进行关联,若已关联的 pod 与 rs label 不匹配则解除关联关系;
  • 5、调用 manageReplicas 进行同步 pod 操作,add/del pod;
  • 6、计算 rs 当前的 status 并进行更新;
  • 7、若 rs 设置了 MinReadySeconds 字段则将该 rs 加入到延迟队列中;

k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:562

  1. func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
  2. ......
  3. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  4. if err != nil {
  5. return err
  6. }
  7. // 1、根据 ns/name 从 informer cache 中获取 rs 对象,
  8. // 若 rs 已经被删除则直接删除 expectations 中的对象
  9. rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
  10. if errors.IsNotFound(err) {
  11. rsc.expectations.DeleteExpectations(key)
  12. return nil
  13. }
  14. ......
  15. // 2、判断该 rs 是否需要执行 sync 操作
  16. rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
  17. selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
  18. if err != nil {
  19. ......
  20. }
  21. // 3、获取所有 pod list
  22. allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
  23. ......
  24. // 4、过滤掉异常 pod,处于删除状态或者 failed 状态的 pod 都为非 active 状态
  25. filteredPods := controller.FilterActivePods(allPods)
  26. // 5、检查所有 pod,根据 pod 并进行 adopt 与 release 操作,最后获取与该 rs 关联的 pod list
  27. filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
  28. ......
  29. // 6、若需要 sync 则执行 manageReplicas 创建/删除 pod
  30. var manageReplicasErr error
  31. if rsNeedsSync && rs.DeletionTimestamp == nil {
  32. manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
  33. }
  34. rs = rs.DeepCopy()
  35. // 7、计算 rs 当前的 status
  36. newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
  37. // 8、更新 rs status
  38. updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
  39. // 9、判断是否需要将 rs 加入到延迟队列中
  40. if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
  41. updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
  42. updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
  43. rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
  44. }
  45. return manageReplicasErr
  46. }

syncReplicaSet 方法中有几个重要的操作分别为:rsc.expectations.SatisfiedExpectationsrsc.manageReplicascalculateStatus,下面一一进行分析。

SatisfiedExpectations

该方法主要判断 rs 是否需要执行真正的同步操作,若需要 add/del pod 或者 expectations 已过期则需要进行同步操作。

k8s.io/kubernetes/pkg/controller/controller_utils.go:181

  1. func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
  2. // 1、若该 key 存在时,判断是否满足条件或者是否超过同步周期
  3. if exp, exists, err := r.GetExpectations(controllerKey); exists {
  4. if exp.Fulfilled() {
  5. return true
  6. } else if exp.isExpired() {
  7. return true
  8. } else {
  9. return false
  10. }
  11. } else if err != nil {
  12. ......
  13. } else {
  14. // 2、该 rs 可能为新创建的,需要进行 sync
  15. ......
  16. }
  17. return true
  18. }
  19. // 3、若 add <= 0 且 del <= 0 说明本地观察到的状态已经为期望状态了
  20. func (e *ControlleeExpectations) Fulfilled() bool {
  21. return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
  22. }
  23. // 4、判断 key 是否过期,ExpectationsTimeout 默认值为 5 * time.Minute
  24. func (exp *ControlleeExpectations) isExpired() bool {
  25. return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
  26. }
manageReplicas

manageReplicas 是最核心的方法,它会计算 replicaSet 需要创建或者删除多少个 pod 并调用 apiserver 的接口进行操作,在此阶段仅仅是调用 apiserver 的接口进行创建,并不保证 pod 成功运行,如果在某一轮,未能成功创建的所有 Pod 对象,则不再创建剩余的 pod。一个周期内最多只能创建或删除 500 个 pod,若超过上限值未创建完成的 pod 数会在下一个 syncLoop 继续进行处理。

该方法主要逻辑如下所示:

  • 1、计算已存在 pod 数与期望数的差异;
  • 2、如果 diff < 0 说明 rs 实际的 pod 数未达到期望值需要继续创建 pod,首先会将需要创建的 pod 数在 expectations 中进行记录,然后调用 slowStartBatch 创建所需要的 pod,slowStartBatch 以指数级增长的方式批量创建 pod,创建 pod 过程中若出现 timeout err 则忽略,若为其他 err 则终止创建操作并更新 expectations;
  • 3、如果 diff > 0 说明可能是一次缩容操作需要删除多余的 pod,如果需要删除全部的 pod 则直接进行删除,否则会通过 getPodsToDelete 方法筛选出需要删除的 pod,具体的筛选策略在下文会将到,然后并发删除这些 pod,对于删除失败操作也会记录在 expectations 中;

slowStartBatch 中会调用 rsc.podControl.CreatePodsWithControllerRef 方法创建 pod,若创建 pod 失败会判断是否为创建超时错误,或者可能是超时后失败,但此时认为超时并不影响后续的批量创建动作,大家知道,创建 pod 操作提交到 apiserver 后会经过认证、鉴权、以及动态访问控制三个步骤,此过程有可能会超时,即使真的创建失败了,等到 expectations 过期后在下一个 syncLoop 时会重新创建。

k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:459

  1. func (rsc *ReplicaSetController) manageReplicas(......) error {
  2. // 1、计算已存在 pod 数与期望数的差异
  3. diff := len(filteredPods) - int(*(rs.Spec.Replicas))
  4. rsKey, err := controller.KeyFunc(rs)
  5. if err != nil {
  6. ......
  7. }
  8. 2、如果 <0,则需要创建 pod
  9. if diff < 0 {
  10. diff *= -1
  11. 3、判断需要创建的 pod 数是否超过单次 sync 上限值 500
  12. if diff > rsc.burstReplicas {
  13. diff = rsc.burstReplicas
  14. }
  15. 4、在 expectations 中进行记录,若该 key 已经存在会进行覆盖
  16. rsc.expectations.ExpectCreations(rsKey, diff)
  17. 5、调用 slowStartBatch 创建所需要的 pod
  18. successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
  19. err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
  20. // 6、若为 timeout err 则忽略
  21. if err != nil && errors.IsTimeout(err) {
  22. return nil
  23. }
  24. return err
  25. })
  26. // 7、计算未创建的 pod 数,并记录在 expectations 中
  27. // 若 pod 创建成功,informer watch 到事件后会在 addPod handler 中更新 expectations
  28. if skippedPods := diff - successfulCreations; skippedPods > 0 {
  29. for i := 0; i < skippedPods; i++ {
  30. rsc.expectations.CreationObserved(rsKey)
  31. }
  32. }
  33. return err
  34. } else if diff > 0 {
  35. // 8、若 diff >0 说明需要删除多创建的 pod
  36. if diff > rsc.burstReplicas {
  37. diff = rsc.burstReplicas
  38. }
  39. // 9、getPodsToDelete 会按照一定的策略找出需要删除的 pod 列表
  40. podsToDelete := getPodsToDelete(filteredPods, diff)
  41. // 10、在 expectations 中进行记录,若该 key 已经存在会进行覆盖
  42. rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
  43. // 11、进行并发删除的操作
  44. errCh := make(chan error, diff)
  45. var wg sync.WaitGroup
  46. wg.Add(diff)
  47. for _, pod := range podsToDelete {
  48. go func(targetPod *v1.Pod) {
  49. defer wg.Done()
  50. if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
  51. podKey := controller.PodKey(targetPod)
  52. // 12、某次删除操作若失败会记录在 expectations 中
  53. rsc.expectations.DeletionObserved(rsKey, podKey)
  54. errCh <- err
  55. }
  56. }(pod)
  57. }
  58. wg.Wait()
  59. // 13、返回其中一条 err
  60. select {
  61. case err := <-errCh:
  62. if err != nil {
  63. return err
  64. }
  65. default:
  66. }
  67. }
  68. return nil
  69. }

slowStartBatch 会批量创建出已计算出的 diff pod 数,创建的 pod 数依次为 1、2、4、8……,呈指数级增长,其方法如下所示:

k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:658

  1. func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
  2. remaining := count
  3. successes := 0
  4. for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
  5. errCh := make(chan error, batchSize)
  6. var wg sync.WaitGroup
  7. wg.Add(batchSize)
  8. for i := 0; i < batchSize; i++ {
  9. go func() {
  10. defer wg.Done()
  11. if err := fn(); err != nil {
  12. errCh <- err
  13. }
  14. }()
  15. }
  16. wg.Wait()
  17. curSuccesses := batchSize - len(errCh)
  18. successes += curSuccesses
  19. if len(errCh) > 0 {
  20. return successes, <-errCh
  21. }
  22. remaining -= batchSize
  23. }
  24. return successes, nil
  25. }

若 diff > 0 时再删除 pod 阶段会调用getPodsToDelete 对 pod 进行筛选操作,此阶段会选出最劣质的 pod,下面是用到的 6 种筛选方法:

  • 1、判断是否绑定了 node:Unassigned < assigned;
  • 2、判断 pod phase:PodPending < PodUnknown < PodRunning;
  • 3、判断 pod 状态:Not ready < ready;
  • 4、若 pod 都为 ready,则按运行时间排序,运行时间最短会被删除:empty time < less time < more time;
  • 5、根据 pod 重启次数排序:higher restart counts < lower restart counts;
  • 6、按 pod 创建时间进行排序:Empty creation time pods < newer pods < older pods;

上面的几个排序规则遵循互斥原则,从上到下进行匹配,符合条件则排序完成,代码如下所示:

k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:684

  1. func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod {
  2. if diff < len(filteredPods) {
  3. sort.Sort(controller.ActivePods(filteredPods))
  4. }
  5. return filteredPods[:diff]
  6. }

k8s.io/kubernetes/pkg/controller/controller_utils.go:735

  1. type ActivePods []*v1.Pod
  2. func (s ActivePods) Len() int { return len(s) }
  3. func (s ActivePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  4. func (s ActivePods) Less(i, j int) bool {
  5. // 1. Unassigned < assigned
  6. if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) {
  7. return len(s[i].Spec.NodeName) == 0
  8. }
  9. // 2. PodPending < PodUnknown < PodRunning
  10. m := map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2}
  11. if m[s[i].Status.Phase] != m[s[j].Status.Phase] {
  12. return m[s[i].Status.Phase] < m[s[j].Status.Phase]
  13. }
  14. // 3. Not ready < ready
  15. if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) {
  16. return !podutil.IsPodReady(s[i])
  17. }
  18. // 4. Been ready for empty time < less time < more time
  19. if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) && !podReadyTime(s[i]).Equal(podReadyTime(s[j])) {
  20. return afterOrZero(podReadyTime(s[i]), podReadyTime(s[j]))
  21. }
  22. // 5. Pods with containers with higher restart counts < lower restart counts
  23. if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) {
  24. return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j])
  25. }
  26. // 6. Empty creation time pods < newer pods < older pods
  27. if !s[i].CreationTimestamp.Equal(&s[j].CreationTimestamp) {
  28. return afterOrZero(&s[i].CreationTimestamp, &s[j].CreationTimestamp)
  29. }
  30. return false
  31. }
calculateStatus

calculateStatus 会通过当前 pod 的状态计算出 rs 中 status 字段值,status 字段如下所示:

  1. status:
  2. availableReplicas: 10
  3. fullyLabeledReplicas: 10
  4. observedGeneration: 1
  5. readyReplicas: 10
  6. replicas: 10

k8s.io/kubernetes/pkg/controller/replicaset/replica_set_utils.go:85

  1. func calculateStatus(......) apps.ReplicaSetStatus {
  2. newStatus := rs.Status
  3. fullyLabeledReplicasCount := 0
  4. readyReplicasCount := 0
  5. availableReplicasCount := 0
  6. templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
  7. for _, pod := range filteredPods {
  8. if templateLabel.Matches(labels.Set(pod.Labels)) {
  9. fullyLabeledReplicasCount++
  10. }
  11. if podutil.IsPodReady(pod) {
  12. readyReplicasCount++
  13. if podutil.IsPodAvailable(pod, rs.Spec.MinReadySeconds, metav1.Now()) {
  14. availableReplicasCount++
  15. }
  16. }
  17. }
  18. failureCond := GetCondition(rs.Status, apps.ReplicaSetReplicaFailure)
  19. if manageReplicasErr != nil && failureCond == nil {
  20. var reason string
  21. if diff := len(filteredPods) - int(*(rs.Spec.Replicas)); diff < 0 {
  22. reason = "FailedCreate"
  23. } else if diff > 0 {
  24. reason = "FailedDelete"
  25. }
  26. cond := NewReplicaSetCondition(apps.ReplicaSetReplicaFailure, v1.ConditionTrue, reason, manageReplicasErr.Error())
  27. SetCondition(&newStatus, cond)
  28. } else if manageReplicasErr == nil && failureCond != nil {
  29. RemoveCondition(&newStatus, apps.ReplicaSetReplicaFailure)
  30. }
  31. newStatus.Replicas = int32(len(filteredPods))
  32. newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
  33. newStatus.ReadyReplicas = int32(readyReplicasCount)
  34. newStatus.AvailableReplicas = int32(availableReplicasCount)
  35. return newStatus
  36. }

expectations 机制

通过上面的分析可知,在 rs 每次入队后进行 sync 操作时,首先需要判断该 rs 是否满足 expectations 机制,那么这个 expectations 的目的是什么?其实,rs 除了有 informer 的缓存外,还有一个本地缓存就是 expectations,expectations 会记录 rs 所有对象需要 add/del 的 pod 数量,若两者都为 0 则说明该 rs 所期望创建的 pod 或者删除的 pod 数已经被满足,若不满足则说明某次在 syncLoop 中创建或者删除 pod 时有失败的操作,则需要等待 expectations 过期后再次同步该 rs。

通过上面对 eventHandler 的分析,再来总结一下触发 replicaSet 对象发生同步事件的条件:

  • 1、与 rs 相关的:AddRS、UpdateRS、DeleteRS;
  • 2、与 pod 相关的:AddPod、UpdatePod、DeletePod;
  • 3、informer 二级缓存的同步;

但是所有的更新事件是否都需要执行 sync 操作?对于除 rs.Spec.Replicas 之外的更新操作其实都没必要执行 sync 操作,因为 spec 其他字段和 status 的更新都不需要创建或者删除 pod。

在 sync 操作真正开始之前,依据 expectations 机制进行判断,确定是否要真正地启动一次 sync,因为在 eventHandler 阶段也会更新 expectations 值,从上面的 eventHandler 中可以看到在 addPod 中会调用 rsc.expectations.CreationObserved 更新 rsKey 的 expectations,将其 add 值 -1,在 deletePod 中调用 rsc.expectations.DeletionObserved 将其 del 值 -1。所以等到 sync 时,若 controllerKey(name 或者 ns/name)满足 expectations 机制则进行 sync 操作,而 updatePod 并不会修改 expectations,所以,expectations 的设计就是当需要创建或删除 pod 才会触发对应的 sync 操作,expectations 机制的目的就是减少不必要的 sync 操作。

什么条件下 expectations 机制会满足?

  • 1、当 expectations 中不存在 rsKey 时,也就说首次创建 rs 时;
  • 2、当 expectations 中 del 以及 add 值都为 0 时,即 rs 所需要创建或者删除的 pod 数都已满足;
  • 3、当 expectations 过期时,即超过 5 分钟未进行 sync 操作;

最后再看一下 expectations 中用到的几个方法:

  1. // 创建了一个 pod 说明 expectations 中对应的 key add 期望值需要减少一个 pod, add -1
  2. CreationObserved(controllerKey string)
  3. // 删除了一个 pod 说明 expectations 中对应的 key del 期望值需要减少一个 pod, del - 1
  4. DeletionObserved(controllerKey string)
  5. // 写入 key 需要 add 的 pod 数量
  6. ExpectCreations(controllerKey string, adds int) error
  7. // 写入 key 需要 del 的 pod 数量
  8. ExpectDeletions(controllerKey string, dels int) error
  9. // 删除该 key
  10. DeleteExpectations(controllerKey string)

当在 syncLoop 中发现满足条件时,会执行 manageReplicas 方法,在 manageReplicas 中无论是为 rs 创建还是删除 pod 都会调用 ExpectCreations 和 ExpectDeletions 为 rsKey 创建 expectations 对象。

总结

本文主要从源码层面分析了 replicaSetController 的设计与实现,但是不得不说其在设计方面考虑了很多因素,文中只提到了笔者理解了或者思考后稍有了解的一些机制,至于其他设计思想还得自行阅读代码体会。

下面以一个流程图总结下创建 rs 的主要流程。

  1. SatisfiedExpectations
  2. (expectations 中不存在
  3. rsKeyrsNeedsSync
  4. true)
  5. | 判断 add/del pod
  6. | |
  7. |
  8. | 创建 expectations 对象,
  9. | 并设置 add/del
  10. |
  11. create rs --> syncReplicaSet --> manageReplicas -->
  12. (为 rs 创建 pod) 调用 slowStartBatch 批量创建 pod/
  13. | 删除筛选出的多余 pod
  14. | |
  15. |
  16. | 更新 expectations 对象
  17. updateReplicaSetStatus
  18. (更新 rs status
  19. subResource)

参考:

https://keyla.vip/k8s/3-master/controller/replica-set/