kube-controller-manager源码分析(二)之 DeploymentController

以下代码分析基于 kubernetes v1.12.0 版本。

本文主要以deployment controller为例,分析该类controller的运行逻辑。此部分代码主要为位于pkg/controller/deploymentpkg/controller部分的代码包括了各种类型的controller的具体实现。

controller managerpkg部分代码目录结构如下:

  1. controller # 主要包含各种controller的具体实现
  2. ├── apis
  3. ├── bootstrap
  4. ├── certificates
  5. ├── client_builder.go
  6. ├── cloud
  7. ├── clusterroleaggregation
  8. ├── controller_ref_manager.go
  9. ├── controller_utils.go # WaitForCacheSync
  10. ├── cronjob
  11. ├── daemon
  12. ├── deployment # deployment controller
  13. ├── deployment_controller.go # NewDeploymentController、Run、syncDeployment
  14. ├── progress.go # syncRolloutStatus
  15. ├── recreate.go # rolloutRecreate
  16. ├── rollback.go # rollback
  17. ├── rolling.go # rolloutRolling
  18. ├── sync.go
  19. ├── disruption # disruption controller
  20. ├── endpoint
  21. ├── garbagecollector
  22. ├── history
  23. ├── job
  24. ├── lookup_cache.go
  25. ├── namespace # namespace controller
  26. ├── nodeipam
  27. ├── nodelifecycle
  28. ├── podautoscaler
  29. ├── podgc
  30. ├── replicaset # replicaset controller
  31. ├── replication # replication controller
  32. ├── resourcequota
  33. ├── route
  34. ├── service # service controller
  35. ├── serviceaccount
  36. ├── statefulset # statefulset controller
  37. └── volume # PersistentVolumeController、AttachDetachController、PVCProtectionController

1. startDeploymentController

  1. func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
  2. if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
  3. return nil, false, nil
  4. }
  5. dc, err := deployment.NewDeploymentController(
  6. ctx.InformerFactory.Apps().V1().Deployments(),
  7. ctx.InformerFactory.Apps().V1().ReplicaSets(),
  8. ctx.InformerFactory.Core().V1().Pods(),
  9. ctx.ClientBuilder.ClientOrDie("deployment-controller"),
  10. )
  11. if err != nil {
  12. return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
  13. }
  14. go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
  15. return nil, true, nil
  16. }

startDeploymentController主要调用的函数为NewDeploymentController和对应的Run函数。该部分逻辑在kubernetes/pkg/controller中。

2. NewDeploymentController

NewDeploymentController主要构建DeploymentController结构体。

该部分主要处理了以下逻辑:

  • 构建并运行事件处理器eventBroadcaster
  • 初始化赋值rsControlclientsetworkqueue
  • 添加dInformerrsInformerpodInformerResourceEventHandlerFuncs,其中主要为AddFuncUpdateFuncDeleteFunc三类方法。
  • 构造deployment、rs、pod的Informer的Lister函数和HasSynced函数。
  • 调用syncHandler,来实现syncDeployment

2.1. eventBroadcaster

调用事件处理器来记录deployment相关的事件。

  1. eventBroadcaster := record.NewBroadcaster()
  2. eventBroadcaster.StartLogging(glog.Infof)
  3. // TODO: remove the wrapper when every clients have moved to use the clientset.
  4. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")})

2.2. rsControl

构造DeploymentController,包括clientsetworkqueuersControl。其中rsControl是具体实现rs逻辑的controller。

  1. dc := &DeploymentController{
  2. client: client,
  3. eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
  4. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
  5. }
  6. dc.rsControl = controller.RealRSControl{
  7. KubeClient: client,
  8. Recorder: dc.eventRecorder,
  9. }

2.3. Informer().AddEventHandler

添加dInformerrsInformerpodInformerResourceEventHandlerFuncs,其中主要为AddFuncUpdateFuncDeleteFunc三类方法。

  1. dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  2. AddFunc: dc.addDeployment,
  3. UpdateFunc: dc.updateDeployment,
  4. // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
  5. DeleteFunc: dc.deleteDeployment,
  6. })
  7. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  8. AddFunc: dc.addReplicaSet,
  9. UpdateFunc: dc.updateReplicaSet,
  10. DeleteFunc: dc.deleteReplicaSet,
  11. })
  12. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  13. DeleteFunc: dc.deletePod,
  14. })

2.4. Informer.Lister()

调用dInformerrsInformerpodInformerLister()方法。

  1. dc.dLister = dInformer.Lister()
  2. dc.rsLister = rsInformer.Lister()
  3. dc.podLister = podInformer.Lister()

2.5. Informer().HasSynced

调用Informer().HasSynced,判断是否缓存完成;

  1. dc.dListerSynced = dInformer.Informer().HasSynced
  2. dc.rsListerSynced = rsInformer.Informer().HasSynced
  3. dc.podListerSynced = podInformer.Informer().HasSynced

2.6. syncHandler

syncHandler具体为syncDeployment,syncHandler负责deployment的同步实现。

  1. dc.syncHandler = dc.syncDeployment
  2. dc.enqueueDeployment = dc.enqueue

完整代码如下:

  1. // NewDeploymentController creates a new DeploymentController.
  2. func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
  3. eventBroadcaster := record.NewBroadcaster()
  4. eventBroadcaster.StartLogging(glog.Infof)
  5. // TODO: remove the wrapper when every clients have moved to use the clientset.
  6. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")})
  7. if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
  8. if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  9. return nil, err
  10. }
  11. }
  12. dc := &DeploymentController{
  13. client: client,
  14. eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
  15. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
  16. }
  17. dc.rsControl = controller.RealRSControl{
  18. KubeClient: client,
  19. Recorder: dc.eventRecorder,
  20. }
  21. dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  22. AddFunc: dc.addDeployment,
  23. UpdateFunc: dc.updateDeployment,
  24. // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
  25. DeleteFunc: dc.deleteDeployment,
  26. })
  27. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  28. AddFunc: dc.addReplicaSet,
  29. UpdateFunc: dc.updateReplicaSet,
  30. DeleteFunc: dc.deleteReplicaSet,
  31. })
  32. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  33. DeleteFunc: dc.deletePod,
  34. })
  35. dc.syncHandler = dc.syncDeployment
  36. dc.enqueueDeployment = dc.enqueue
  37. dc.dLister = dInformer.Lister()
  38. dc.rsLister = rsInformer.Lister()
  39. dc.podLister = podInformer.Lister()
  40. dc.dListerSynced = dInformer.Informer().HasSynced
  41. dc.rsListerSynced = rsInformer.Informer().HasSynced
  42. dc.podListerSynced = podInformer.Informer().HasSynced
  43. return dc, nil
  44. }

3. DeploymentController.Run

Run执行watch和sync的操作。

  1. // Run begins watching and syncing.
  2. func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
  3. defer utilruntime.HandleCrash()
  4. defer dc.queue.ShutDown()
  5. glog.Infof("Starting deployment controller")
  6. defer glog.Infof("Shutting down deployment controller")
  7. if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
  8. return
  9. }
  10. for i := 0; i < workers; i++ {
  11. go wait.Until(dc.worker, time.Second, stopCh)
  12. }
  13. <-stopCh
  14. }

3.1. WaitForCacheSync

WaitForCacheSync主要是用来在List-Watch机制中可以保持当前cache的数据与etcd的数据一致。

  1. // WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
  2. // indicating that the controller identified by controllerName is waiting for syncs, followed by
  3. // either a successful or failed sync.
  4. func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
  5. glog.Infof("Waiting for caches to sync for %s controller", controllerName)
  6. if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
  7. utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))
  8. return false
  9. }
  10. glog.Infof("Caches are synced for %s controller", controllerName)
  11. return true
  12. }

3.2. dc.worker

worker调用了processNextWorkItemprocessNextWorkItem最终调用了syncHandler,而syncHandlerNewDeploymentController中赋值的具体函数为syncDeployment

  1. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  2. // It enforces that the syncHandler is never invoked concurrently with the same key.
  3. func (dc *DeploymentController) worker() {
  4. for dc.processNextWorkItem() {
  5. }
  6. }
  7. func (dc *DeploymentController) processNextWorkItem() bool {
  8. key, quit := dc.queue.Get()
  9. if quit {
  10. return false
  11. }
  12. defer dc.queue.Done(key)
  13. err := dc.syncHandler(key.(string))
  14. dc.handleErr(err, key)
  15. return true
  16. }

NewDeploymentController中的syncHandler赋值:

  1. func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
  2. ...
  3. dc.syncHandler = dc.syncDeployment
  4. ...
  5. }

4. syncDeployment

syncDeployment基于给定的key执行sync deployment的操作。

主要流程如下:

  1. 通过SplitMetaNamespaceKey获取namespace和deployment对象的name。
  2. 调用Lister的接口获取的deployment的对象。
  3. getReplicaSetsForDeployment获取deployment管理的ReplicaSet对象。
  4. getPodMapForDeployment获取deployment管理的pod,基于ReplicaSet来分组。
  5. checkPausedConditions检查deployment是否是pause状态并添加合适的condition
  6. isScalingEvent检查deployment的更新是否来自于一个scale的事件,如果是则执行scale的操作。
  7. 根据DeploymentStrategyType类型执行rolloutRecreaterolloutRolling

完整代码如下:

  1. // syncDeployment will sync the deployment with the given key.
  2. // This function is not meant to be invoked concurrently with the same key.
  3. func (dc *DeploymentController) syncDeployment(key string) error {
  4. startTime := time.Now()
  5. glog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
  6. defer func() {
  7. glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
  8. }()
  9. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  10. if err != nil {
  11. return err
  12. }
  13. deployment, err := dc.dLister.Deployments(namespace).Get(name)
  14. if errors.IsNotFound(err) {
  15. glog.V(2).Infof("Deployment %v has been deleted", key)
  16. return nil
  17. }
  18. if err != nil {
  19. return err
  20. }
  21. // Deep-copy otherwise we are mutating our cache.
  22. // TODO: Deep-copy only when needed.
  23. d := deployment.DeepCopy()
  24. everything := metav1.LabelSelector{}
  25. if reflect.DeepEqual(d.Spec.Selector, &everything) {
  26. dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
  27. if d.Status.ObservedGeneration < d.Generation {
  28. d.Status.ObservedGeneration = d.Generation
  29. dc.client.ExtensionsV1beta1().Deployments(d.Namespace).UpdateStatus(d)
  30. }
  31. return nil
  32. }
  33. // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
  34. // through adoption/orphaning.
  35. rsList, err := dc.getReplicaSetsForDeployment(d)
  36. if err != nil {
  37. return err
  38. }
  39. // List all Pods owned by this Deployment, grouped by their ReplicaSet.
  40. // Current uses of the podMap are:
  41. //
  42. // * check if a Pod is labeled correctly with the pod-template-hash label.
  43. // * check that no old Pods are running in the middle of Recreate Deployments.
  44. podMap, err := dc.getPodMapForDeployment(d, rsList)
  45. if err != nil {
  46. return err
  47. }
  48. if d.DeletionTimestamp != nil {
  49. return dc.syncStatusOnly(d, rsList, podMap)
  50. }
  51. // Update deployment conditions with an Unknown condition when pausing/resuming
  52. // a deployment. In this way, we can be sure that we won't timeout when a user
  53. // resumes a Deployment with a set progressDeadlineSeconds.
  54. if err = dc.checkPausedConditions(d); err != nil {
  55. return err
  56. }
  57. if d.Spec.Paused {
  58. return dc.sync(d, rsList, podMap)
  59. }
  60. // rollback is not re-entrant in case the underlying replica sets are updated with a new
  61. // revision so we should ensure that we won't proceed to update replica sets until we
  62. // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
  63. if d.Spec.RollbackTo != nil {
  64. return dc.rollback(d, rsList, podMap)
  65. }
  66. scalingEvent, err := dc.isScalingEvent(d, rsList, podMap)
  67. if err != nil {
  68. return err
  69. }
  70. if scalingEvent {
  71. return dc.sync(d, rsList, podMap)
  72. }
  73. switch d.Spec.Strategy.Type {
  74. case extensions.RecreateDeploymentStrategyType:
  75. return dc.rolloutRecreate(d, rsList, podMap)
  76. case extensions.RollingUpdateDeploymentStrategyType:
  77. return dc.rolloutRolling(d, rsList, podMap)
  78. }
  79. return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
  80. }

4.1. Get deployment

  1. // get namespace and deployment name
  2. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  3. // get deployment by name
  4. deployment, err := dc.dLister.Deployments(namespace).Get(name)

4.2. getReplicaSetsForDeployment

  1. // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
  2. // through adoption/orphaning.
  3. rsList, err := dc.getReplicaSetsForDeployment(d)

getReplicaSetsForDeployment具体代码:

  1. // getReplicaSetsForDeployment uses ControllerRefManager to reconcile
  2. // ControllerRef by adopting and orphaning.
  3. // It returns the list of ReplicaSets that this Deployment should manage.
  4. func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment) ([]*apps.ReplicaSet, error) {
  5. // List all ReplicaSets to find those we own but that no longer match our
  6. // selector. They will be orphaned by ClaimReplicaSets().
  7. rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
  8. if err != nil {
  9. return nil, err
  10. }
  11. deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
  12. if err != nil {
  13. return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
  14. }
  15. // If any adoptions are attempted, we should first recheck for deletion with
  16. // an uncached quorum read sometime after listing ReplicaSets (see #42639).
  17. canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  18. fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(d.Name, metav1.GetOptions{})
  19. if err != nil {
  20. return nil, err
  21. }
  22. if fresh.UID != d.UID {
  23. return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID)
  24. }
  25. return fresh, nil
  26. })
  27. cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)
  28. return cm.ClaimReplicaSets(rsList)
  29. }

4.3. getPodMapForDeployment

  1. // List all Pods owned by this Deployment, grouped by their ReplicaSet.
  2. // Current uses of the podMap are:
  3. //
  4. // * check if a Pod is labeled correctly with the pod-template-hash label.
  5. // * check that no old Pods are running in the middle of Recreate Deployments.
  6. podMap, err := dc.getPodMapForDeployment(d, rsList)

getPodMapForDeployment具体代码:

  1. // getPodMapForDeployment returns the Pods managed by a Deployment.
  2. //
  3. // It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,
  4. // according to the Pod's ControllerRef.
  5. func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID]*v1.PodList, error) {
  6. // Get all Pods that potentially belong to this Deployment.
  7. selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
  8. if err != nil {
  9. return nil, err
  10. }
  11. pods, err := dc.podLister.Pods(d.Namespace).List(selector)
  12. if err != nil {
  13. return nil, err
  14. }
  15. // Group Pods by their controller (if it's in rsList).
  16. podMap := make(map[types.UID]*v1.PodList, len(rsList))
  17. for _, rs := range rsList {
  18. podMap[rs.UID] = &v1.PodList{}
  19. }
  20. for _, pod := range pods {
  21. // Do not ignore inactive Pods because Recreate Deployments need to verify that no
  22. // Pods from older versions are running before spinning up new Pods.
  23. controllerRef := metav1.GetControllerOf(pod)
  24. if controllerRef == nil {
  25. continue
  26. }
  27. // Only append if we care about this UID.
  28. if podList, ok := podMap[controllerRef.UID]; ok {
  29. podList.Items = append(podList.Items, *pod)
  30. }
  31. }
  32. return podMap, nil
  33. }

4.4. checkPausedConditions

  1. // Update deployment conditions with an Unknown condition when pausing/resuming
  2. // a deployment. In this way, we can be sure that we won't timeout when a user
  3. // resumes a Deployment with a set progressDeadlineSeconds.
  4. if err = dc.checkPausedConditions(d); err != nil {
  5. return err
  6. }
  7. if d.Spec.Paused {
  8. return dc.sync(d, rsList)
  9. }

checkPausedConditions具体代码:

  1. // checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition.
  2. // These conditions are needed so that we won't accidentally report lack of progress for resumed deployments
  3. // that were paused for longer than progressDeadlineSeconds.
  4. func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error {
  5. if !deploymentutil.HasProgressDeadline(d) {
  6. return nil
  7. }
  8. cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
  9. if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
  10. // If we have reported lack of progress, do not overwrite it with a paused condition.
  11. return nil
  12. }
  13. pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason
  14. needsUpdate := false
  15. if d.Spec.Paused && !pausedCondExists {
  16. condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused")
  17. deploymentutil.SetDeploymentCondition(&d.Status, *condition)
  18. needsUpdate = true
  19. } else if !d.Spec.Paused && pausedCondExists {
  20. condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed")
  21. deploymentutil.SetDeploymentCondition(&d.Status, *condition)
  22. needsUpdate = true
  23. }
  24. if !needsUpdate {
  25. return nil
  26. }
  27. var err error
  28. d, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)
  29. return err
  30. }

4.5. isScalingEvent

  1. scalingEvent, err := dc.isScalingEvent(d, rsList)
  2. if err != nil {
  3. return err
  4. }
  5. if scalingEvent {
  6. return dc.sync(d, rsList)
  7. }

isScalingEvent具体代码:

  1. // isScalingEvent checks whether the provided deployment has been updated with a scaling event
  2. // by looking at the desired-replicas annotation in the active replica sets of the deployment.
  3. //
  4. // rsList should come from getReplicaSetsForDeployment(d).
  5. // podMap should come from getPodMapForDeployment(d, rsList).
  6. func (dc *DeploymentController) isScalingEvent(d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
  7. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
  8. if err != nil {
  9. return false, err
  10. }
  11. allRSs := append(oldRSs, newRS)
  12. for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
  13. desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs)
  14. if !ok {
  15. continue
  16. }
  17. if desired != *(d.Spec.Replicas) {
  18. return true, nil
  19. }
  20. }
  21. return false, nil
  22. }

4.6. rolloutRecreate

  1. switch d.Spec.Strategy.Type {
  2. case apps.RecreateDeploymentStrategyType:
  3. return dc.rolloutRecreate(d, rsList, podMap)

rolloutRecreate具体代码:

  1. // rolloutRecreate implements the logic for recreating a replica set.
  2. func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
  3. // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down.
  4. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
  5. if err != nil {
  6. return err
  7. }
  8. allRSs := append(oldRSs, newRS)
  9. activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)
  10. // scale down old replica sets.
  11. scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
  12. if err != nil {
  13. return err
  14. }
  15. if scaledDown {
  16. // Update DeploymentStatus.
  17. return dc.syncRolloutStatus(allRSs, newRS, d)
  18. }
  19. // Do not process a deployment when it has old pods running.
  20. if oldPodsRunning(newRS, oldRSs, podMap) {
  21. return dc.syncRolloutStatus(allRSs, newRS, d)
  22. }
  23. // If we need to create a new RS, create it now.
  24. if newRS == nil {
  25. newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
  26. if err != nil {
  27. return err
  28. }
  29. allRSs = append(oldRSs, newRS)
  30. }
  31. // scale up new replica set.
  32. if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
  33. return err
  34. }
  35. if util.DeploymentComplete(d, &d.Status) {
  36. if err := dc.cleanupDeployment(oldRSs, d); err != nil {
  37. return err
  38. }
  39. }
  40. // Sync deployment status.
  41. return dc.syncRolloutStatus(allRSs, newRS, d)
  42. }

4.7. rolloutRolling

  1. switch d.Spec.Strategy.Type {
  2. case apps.RecreateDeploymentStrategyType:
  3. return dc.rolloutRecreate(d, rsList, podMap)
  4. case apps.RollingUpdateDeploymentStrategyType:
  5. return dc.rolloutRolling(d, rsList)
  6. }

rolloutRolling具体代码:

  1. // rolloutRolling implements the logic for rolling a new replica set.
  2. func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
  3. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
  4. if err != nil {
  5. return err
  6. }
  7. allRSs := append(oldRSs, newRS)
  8. // Scale up, if we can.
  9. scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
  10. if err != nil {
  11. return err
  12. }
  13. if scaledUp {
  14. // Update DeploymentStatus
  15. return dc.syncRolloutStatus(allRSs, newRS, d)
  16. }
  17. // Scale down, if we can.
  18. scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
  19. if err != nil {
  20. return err
  21. }
  22. if scaledDown {
  23. // Update DeploymentStatus
  24. return dc.syncRolloutStatus(allRSs, newRS, d)
  25. }
  26. if deploymentutil.DeploymentComplete(d, &d.Status) {
  27. if err := dc.cleanupDeployment(oldRSs, d); err != nil {
  28. return err
  29. }
  30. }
  31. // Sync deployment status
  32. return dc.syncRolloutStatus(allRSs, newRS, d)
  33. }

5. 总结

startDeploymentController主要包括NewDeploymentControllerDeploymentController.Run两部分。

NewDeploymentController主要构建DeploymentController结构体。

该部分主要处理了以下逻辑:

  1. 构建并运行事件处理器eventBroadcaster
  2. 初始化赋值rsControlclientsetworkqueue
  3. 添加dInformerrsInformerpodInformerResourceEventHandlerFuncs,其中主要为AddFuncUpdateFuncDeleteFunc三类方法。
  4. 构造deployment、rs、pod的Informer的Lister函数和HasSynced函数。
  5. 赋值syncHandler,来实现syncDeployment

DeploymentController.Run主要包含WaitForCacheSyncsyncDeployment两部分。

syncDeployment基于给定的key执行sync deployment的操作。

主要流程如下:

  1. 通过SplitMetaNamespaceKey获取namespace和deployment对象的name。
  2. 调用Lister的接口获取的deployment的对象。
  3. getReplicaSetsForDeployment获取deployment管理的ReplicaSet对象。
  4. getPodMapForDeployment获取deployment管理的pod,基于ReplicaSet来分组。
  5. checkPausedConditions检查deployment是否是pause状态并添加合适的condition
  6. isScalingEvent检查deployment的更新是否来自于一个scale的事件,如果是则执行scale的操作。
  7. 根据DeploymentStrategyType类型执行rolloutRecreaterolloutRolling

参考: