本来这篇文章会继续讲述 kubelet 中的主要模块,但由于网友反馈能不能先从 kubelet 的启动流程开始,kubelet 的启动流程在很久之前基于 v1.12 写过一篇文章,对比了 v1.16 中的启动流程变化不大,但之前的文章写的比较简洁,本文会重新分析 kubelet 的启动流程。

Kubelet 启动流程

kubernetes 版本:v1.16

kubelet 的启动比较复杂,首先还是把 kubelet 的启动流程图放在此处,便于在后文中清楚各种调用的流程:

kubelet 启动流程分析 - 图1

NewKubeletCommand

首先从 kubelet 的 main 函数开始,其中调用的 NewKubeletCommand 方法主要负责获取配置文件中的参数,校验参数以及为参数设置默认值。主要逻辑为:

  • 1、解析命令行参数;
  • 2、为 kubelet 初始化 feature gates 参数;
  • 3、加载 kubelet 配置文件;
  • 4、校验配置文件中的参数;
  • 5、检查 kubelet 是否启用动态配置功能;
  • 6、初始化 kubeletDeps,kubeletDeps 包含 kubelet 运行所必须的配置,是为了实现 dependency injection,其目的是为了把 kubelet 依赖的组件对象作为参数传进来,这样可以控制 kubelet 的行为;
  • 7、调用 Run 方法;

k8s.io/kubernetes/cmd/kubelet/app/server.go:111

  1. func NewKubeletCommand() *cobra.Command {
  2. cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
  3. cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
  4. // 1、kubelet配置分两部分:
  5. // KubeletFlag: 指那些不允许在 kubelet 运行时进行修改的配置集,或者不能在集群中各个 Nodes 之间共享的配置集。
  6. // KubeletConfiguration: 指可以在集群中各个Nodes之间共享的配置集,可以进行动态配置。
  7. kubeletFlags := options.NewKubeletFlags()
  8. kubeletConfig, err := options.NewKubeletConfiguration()
  9. if err != nil {
  10. klog.Fatal(err)
  11. }
  12. cmd := &cobra.Command{
  13. Use: componentKubelet,
  14. DisableFlagParsing: true,
  15. ......
  16. Run: func(cmd *cobra.Command, args []string) {
  17. // 2、解析命令行参数
  18. if err := cleanFlagSet.Parse(args); err != nil {
  19. cmd.Usage()
  20. klog.Fatal(err)
  21. }
  22. ......
  23. verflag.PrintAndExitIfRequested()
  24. utilflag.PrintFlags(cleanFlagSet)
  25. // 3、初始化 feature gates 配置
  26. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
  27. klog.Fatal(err)
  28. }
  29. if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
  30. klog.Fatal(err)
  31. }
  32. if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
  33. klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
  34. }
  35. // 4、加载 kubelet 配置文件
  36. if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
  37. kubeletConfig, err = loadConfigFile(configFile)
  38. ......
  39. }
  40. // 5、校验配置文件中的参数
  41. if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
  42. klog.Fatal(err)
  43. }
  44. // 6、检查 kubelet 是否启用动态配置功能
  45. var kubeletConfigController *dynamickubeletconfig.Controller
  46. if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
  47. var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
  48. dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
  49. func(kc *kubeletconfiginternal.KubeletConfiguration) error {
  50. return kubeletConfigFlagPrecedence(kc, args)
  51. })
  52. if err != nil {
  53. klog.Fatal(err)
  54. }
  55. if dynamicKubeletConfig != nil {
  56. kubeletConfig = dynamicKubeletConfig
  57. if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
  58. klog.Fatal(err)
  59. }
  60. }
  61. }
  62. kubeletServer := &options.KubeletServer{
  63. KubeletFlags: *kubeletFlags,
  64. KubeletConfiguration: *kubeletConfig,
  65. }
  66. // 7、初始化 kubeletDeps
  67. kubeletDeps, err := UnsecuredDependencies(kubeletServer)
  68. if err != nil {
  69. klog.Fatal(err)
  70. }
  71. kubeletDeps.KubeletConfigController = kubeletConfigController
  72. stopCh := genericapiserver.SetupSignalHandler()
  73. if kubeletServer.KubeletFlags.ExperimentalDockershim {
  74. if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
  75. klog.Fatal(err)
  76. }
  77. return
  78. }
  79. // 8、调用 Run 方法
  80. if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
  81. klog.Fatal(err)
  82. }
  83. },
  84. }
  85. kubeletFlags.AddFlags(cleanFlagSet)
  86. options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
  87. options.AddGlobalFlags(cleanFlagSet)
  88. ......
  89. return cmd
  90. }

Run

该方法中仅仅调用 run 方法执行后面的启动逻辑。

k8s.io/kubernetes/cmd/kubelet/app/server.go:408

  1. func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
  2. if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
  3. return fmt.Errorf("failed OS init: %v", err)
  4. }
  5. if err := run(s, kubeDeps, stopCh); err != nil {
  6. return fmt.Errorf("failed to run Kubelet: %v", err)
  7. }
  8. return nil
  9. }

run

run 方法中主要是为 kubelet 的启动做一些基本的配置及检查工作,主要逻辑为:

  • 1、为 kubelet 设置默认的 FeatureGates,kubelet 所有的 FeatureGates 可以通过命令参数查看,k8s 中处于 Alpha 状态的 FeatureGates 在组件启动时默认关闭,处于 Beta 和 GA 状态的默认开启;
  • 2、校验 kubelet 的参数;
  • 3、尝试获取 kubelet 的 lock file,需要在 kubelet 启动时指定 --exit-on-lock-contention--lock-file,该功能处于 Alpha 版本默认为关闭状态;
  • 4、将当前的配置文件注册到 http server /configz URL 中;
  • 5、检查 kubelet 启动模式是否为 standalone 模式,此模式下不会和 apiserver 交互,主要用于 kubelet 的调试;
  • 6、初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依赖,主要有 KubeClientEventClientHeartbeatClientAuthcadvisorContainerManager
  • 7、检查是否以 root 用户启动;
  • 8、为进程设置 oom 分数,默认为 -999,分数范围为 [-1000, 1000],越小越不容易被 kill 掉;
  • 9、调用 RunKubelet 方法;
  • 10、检查 kubelet 是否启动了动态配置功能;
  • 11、启动 Healthz http server;
  • 12、如果使用 systemd 启动,通知 systemd kubelet 已经启动;

k8s.io/kubernetes/cmd/kubelet/app/server.go:472

  1. func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
  2. // 1、为 kubelet 设置默认的 FeatureGates
  3. err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
  4. if err != nil {
  5. return err
  6. }
  7. // 2、校验 kubelet 的参数
  8. if err := options.ValidateKubeletServer(s); err != nil {
  9. return err
  10. }
  11. // 3、尝试获取 kubelet 的 lock file
  12. if s.ExitOnLockContention && s.LockFilePath == "" {
  13. return errors.New("cannot exit on lock file contention: no lock file specified")
  14. }
  15. done := make(chan struct{})
  16. if s.LockFilePath != "" {
  17. klog.Infof("acquiring file lock on %q", s.LockFilePath)
  18. if err := flock.Acquire(s.LockFilePath); err != nil {
  19. return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
  20. }
  21. if s.ExitOnLockContention {
  22. klog.Infof("watching for inotify events for: %v", s.LockFilePath)
  23. if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
  24. return err
  25. }
  26. }
  27. }
  28. // 4、将当前的配置文件注册到 http server /configz URL 中;
  29. err = initConfigz(&s.KubeletConfiguration)
  30. if err != nil {
  31. klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
  32. }
  33. // 5、判断是否为 standalone 模式
  34. standaloneMode := true
  35. if len(s.KubeConfig) > 0 {
  36. standaloneMode = false
  37. }
  38. // 6、初始化 kubeDeps
  39. if kubeDeps == nil {
  40. kubeDeps, err = UnsecuredDependencies(s)
  41. if err != nil {
  42. return err
  43. }
  44. }
  45. if kubeDeps.Cloud == nil {
  46. if !cloudprovider.IsExternal(s.CloudProvider) {
  47. cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
  48. if err != nil {
  49. return err
  50. }
  51. ......
  52. kubeDeps.Cloud = cloud
  53. }
  54. }
  55. hostName, err := nodeutil.GetHostname(s.HostnameOverride)
  56. if err != nil {
  57. return err
  58. }
  59. nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
  60. if err != nil {
  61. return err
  62. }
  63. // 7、如果是 standalone 模式将所有 client 设置为 nil
  64. switch {
  65. case standaloneMode:
  66. kubeDeps.KubeClient = nil
  67. kubeDeps.EventClient = nil
  68. kubeDeps.HeartbeatClient = nil
  69. // 8、为 kubeDeps 初始化 KubeClient、EventClient、HeartbeatClient 模块
  70. case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
  71. clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
  72. if err != nil {
  73. return err
  74. }
  75. if closeAllConns == nil {
  76. return errors.New("closeAllConns must be a valid function other than nil")
  77. }
  78. kubeDeps.OnHeartbeatFailure = closeAllConns
  79. kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
  80. if err != nil {
  81. return fmt.Errorf("failed to initialize kubelet client: %v", err)
  82. }
  83. eventClientConfig := *clientConfig
  84. eventClientConfig.QPS = float32(s.EventRecordQPS)
  85. eventClientConfig.Burst = int(s.EventBurst)
  86. kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
  87. if err != nil {
  88. return fmt.Errorf("failed to initialize kubelet event client: %v", err)
  89. }
  90. heartbeatClientConfig := *clientConfig
  91. heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
  92. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
  93. leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
  94. if heartbeatClientConfig.Timeout > leaseTimeout {
  95. heartbeatClientConfig.Timeout = leaseTimeout
  96. }
  97. }
  98. heartbeatClientConfig.QPS = float32(-1)
  99. kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
  100. if err != nil {
  101. return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
  102. }
  103. }
  104. // 9、初始化 auth 模块
  105. if kubeDeps.Auth == nil {
  106. auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
  107. if err != nil {
  108. return err
  109. }
  110. kubeDeps.Auth = auth
  111. }
  112. var cgroupRoots []string
  113. // 10、设置 cgroupRoot
  114. cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
  115. kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
  116. if err != nil {
  117. } else if kubeletCgroup != "" {
  118. cgroupRoots = append(cgroupRoots, kubeletCgroup)
  119. }
  120. runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
  121. if err != nil {
  122. } else if runtimeCgroup != "" {
  123. cgroupRoots = append(cgroupRoots, runtimeCgroup)
  124. }
  125. if s.SystemCgroups != "" {
  126. cgroupRoots = append(cgroupRoots, s.SystemCgroups)
  127. }
  128. // 11、初始化 cadvisor
  129. if kubeDeps.CAdvisorInterface == nil {
  130. imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
  131. kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s. ContainerRuntime, s.RemoteRuntimeEndpoint))
  132. if err != nil {
  133. return err
  134. }
  135. }
  136. makeEventRecorder(kubeDeps, nodeName)
  137. // 12、初始化 ContainerManager
  138. if kubeDeps.ContainerManager == nil {
  139. if s.CgroupsPerQOS && s.CgroupRoot == "" {
  140. s.CgroupRoot = "/"
  141. }
  142. kubeReserved, err := parseResourceList(s.KubeReserved)
  143. if err != nil {
  144. return err
  145. }
  146. systemReserved, err := parseResourceList(s.SystemReserved)
  147. if err != nil {
  148. return err
  149. }
  150. var hardEvictionThresholds []evictionapi.Threshold
  151. if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
  152. hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
  153. if err != nil {
  154. return err
  155. }
  156. }
  157. experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
  158. if err != nil {
  159. return err
  160. }
  161. devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
  162. kubeDeps.ContainerManager, err = cm.NewContainerManager(
  163. kubeDeps.Mounter,
  164. kubeDeps.CAdvisorInterface,
  165. cm.NodeConfig{
  166. ......
  167. },
  168. s.FailSwapOn,
  169. devicePluginEnabled,
  170. kubeDeps.Recorder)
  171. if err != nil {
  172. return err
  173. }
  174. }
  175. // 13、检查是否以 root 权限启动
  176. if err := checkPermissions(); err != nil {
  177. klog.Error(err)
  178. }
  179. utilruntime.ReallyCrash = s.ReallyCrashForTesting
  180. // 14、为 kubelet 进程设置 oom 分数
  181. oomAdjuster := kubeDeps.OOMAdjuster
  182. if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
  183. klog.Warning(err)
  184. }
  185. // 15、调用 RunKubelet 方法执行后续的启动操作
  186. if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
  187. return err
  188. }
  189. if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
  190. kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
  191. if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
  192. return err
  193. }
  194. }
  195. // 16、启动 Healthz http server
  196. if s.HealthzPort > 0 {
  197. mux := http.NewServeMux()
  198. healthz.InstallHandler(mux)
  199. go wait.Until(func() {
  200. err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
  201. if err != nil {
  202. klog.Errorf("Starting healthz server failed: %v", err)
  203. }
  204. }, 5*time.Second, wait.NeverStop)
  205. }
  206. if s.RunOnce {
  207. return nil
  208. }
  209. // 17、向 systemd 发送启动信号
  210. go daemon.SdNotify(false, "READY=1")
  211. select {
  212. case <-done:
  213. break
  214. case <-stopCh:
  215. break
  216. }
  217. return nil
  218. }

RunKubelet

RunKubelet 中主要调用了 createAndInitKubelet 方法执行 kubelet 组件的初始化,然后调用 startKubelet 启动 kubelet 中的组件。

k8s.io/kubernetes/cmd/kubelet/app/server.go:989

  1. func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
  2. hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
  3. if err != nil {
  4. return err
  5. }
  6. nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
  7. if err != nil {
  8. return err
  9. }
  10. makeEventRecorder(kubeDeps, nodeName)
  11. // 1、默认启动特权模式
  12. capabilities.Initialize(capabilities.Capabilities{
  13. AllowPrivileged: true,
  14. })
  15. credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
  16. if kubeDeps.OSInterface == nil {
  17. kubeDeps.OSInterface = kubecontainer.RealOS{}
  18. }
  19. // 2、调用 createAndInitKubelet
  20. k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
  21. ......
  22. kubeServer.NodeStatusMaxImages)
  23. if err != nil {
  24. return fmt.Errorf("failed to create kubelet: %v", err)
  25. }
  26. if kubeDeps.PodConfig == nil {
  27. return fmt.Errorf("failed to create kubelet, pod source config was nil")
  28. }
  29. podCfg := kubeDeps.PodConfig
  30. rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))
  31. if runOnce {
  32. if _, err := k.RunOnce(podCfg.Updates()); err != nil {
  33. return fmt.Errorf("runonce failed: %v", err)
  34. }
  35. klog.Info("Started kubelet as runonce")
  36. } else {
  37. // 3、调用 startKubelet
  38. startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
  39. klog.Info("Started kubelet")
  40. }
  41. return nil
  42. }

createAndInitKubelet

createAndInitKubelet 中主要调用了三个方法来完成 kubelet 的初始化:

  • kubelet.NewMainKubelet:实例化 kubelet 对象,并对 kubelet 依赖的所有模块进行初始化;
  • k.BirthCry:向 apiserver 发送一条 kubelet 启动了的 event;
  • k.StartGarbageCollection:启动垃圾回收服务,回收 container 和 images;

k8s.io/kubernetes/cmd/kubelet/app/server.go:1089

  1. func createAndInitKubelet(......) {
  2. k, err = kubelet.NewMainKubelet(
  3. ......
  4. )
  5. if err != nil {
  6. return nil, err
  7. }
  8. k.BirthCry()
  9. k.StartGarbageCollection()
  10. return k, nil
  11. }
kubelet.NewMainKubelet

NewMainKubelet 是初始化 kubelet 的一个方法,主要逻辑为:

  • 1、初始化 PodConfig 即监听 pod 元数据的来源(file,http,apiserver),将不同 source 的 pod configuration 合并到一个结构中;
  • 2、初始化 containerGCPolicy、imageGCPolicy、evictionConfig 配置;
  • 3、启动 serviceInformer 和 nodeInformer;
  • 4、初始化 containerRefManageroomWatcher
  • 5、初始化 kubelet 对象;
  • 6、初始化 secretManagerconfigMapManager
  • 7、初始化 livenessManagerpodManagerstatusManagerresourceAnalyzer
  • 8、调用 kuberuntime.NewKubeGenericRuntimeManager 初始化 containerRuntime
  • 9、初始化 pleg
  • 10、初始化 containerGCcontainerDeletorimageManagercontainerLogManager
  • 11、初始化 serverCertificateManagerprobeManagertokenManagervolumePluginMgrpluginManagervolumeManager
  • 12、初始化 workQueuepodWorkersevictionManager
  • 13、最后注册相关模块的 handler;

NewMainKubelet 中对 kubelet 依赖的所有模块进行了初始化,每个模块对应的功能在上篇文章“kubelet 架构浅析”有介绍,至于每个模块初始化的流程以及功能会在后面的文章中进行详细分析。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:335

  1. func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,) {
  2. if rootDirectory == "" {
  3. return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
  4. }
  5. if kubeCfg.SyncFrequency.Duration <= 0 {
  6. return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
  7. }
  8. if kubeCfg.MakeIPTablesUtilChains {
  9. ......
  10. }
  11. hostname, err := nodeutil.GetHostname(hostnameOverride)
  12. if err != nil {
  13. return nil, err
  14. }
  15. nodeName := types.NodeName(hostname)
  16. if kubeDeps.Cloud != nil {
  17. ......
  18. }
  19. // 1、初始化 PodConfig
  20. if kubeDeps.PodConfig == nil {
  21. var err error
  22. kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
  23. if err != nil {
  24. return nil, err
  25. }
  26. }
  27. // 2、初始化 containerGCPolicy、imageGCPolicy、evictionConfig
  28. containerGCPolicy := kubecontainer.ContainerGCPolicy{
  29. MinAge: minimumGCAge.Duration,
  30. MaxPerPodContainer: int(maxPerPodContainerCount),
  31. MaxContainers: int(maxContainerCount),
  32. }
  33. daemonEndpoints := &v1.NodeDaemonEndpoints{
  34. KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
  35. }
  36. imageGCPolicy := images.ImageGCPolicy{
  37. MinAge: kubeCfg.ImageMinimumGCAge.Duration,
  38. HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
  39. LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
  40. }
  41. enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
  42. if experimentalNodeAllocatableIgnoreEvictionThreshold {
  43. enforceNodeAllocatable = []string{}
  44. }
  45. thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg. EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
  46. if err != nil {
  47. return nil, err
  48. }
  49. evictionConfig := eviction.Config{
  50. PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
  51. MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
  52. Thresholds: thresholds,
  53. KernelMemcgNotification: experimentalKernelMemcgNotification,
  54. PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
  55. }
  56. // 3、启动 serviceInformer 和 nodeInformer
  57. serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  58. if kubeDeps.KubeClient != nil {
  59. serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
  60. r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
  61. go r.Run(wait.NeverStop)
  62. }
  63. serviceLister := corelisters.NewServiceLister(serviceIndexer)
  64. nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
  65. if kubeDeps.KubeClient != nil {
  66. fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
  67. nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
  68. r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
  69. go r.Run(wait.NeverStop)
  70. }
  71. nodeInfo := &CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
  72. ......
  73. // 4、初始化 containerRefManager、oomWatcher
  74. containerRefManager := kubecontainer.NewRefManager()
  75. oomWatcher := oomwatcher.NewWatcher(kubeDeps.Recorder)
  76. clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
  77. for _, ipEntry := range kubeCfg.ClusterDNS {
  78. ip := net.ParseIP(ipEntry)
  79. if ip == nil {
  80. klog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)
  81. } else {
  82. clusterDNS = append(clusterDNS, ip)
  83. }
  84. }
  85. httpClient := &http.Client{}
  86. parsedNodeIP := net.ParseIP(nodeIP)
  87. protocol := utilipt.ProtocolIpv4
  88. if parsedNodeIP != nil && parsedNodeIP.To4() == nil {
  89. protocol = utilipt.ProtocolIpv6
  90. }
  91. // 5、初始化 kubelet 对象
  92. klet := &Kubelet{......}
  93. if klet.cloud != nil {
  94. klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
  95. }
  96. // 6、初始化 secretManager、configMapManager
  97. var secretManager secret.Manager
  98. var configMapManager configmap.Manager
  99. switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
  100. case kubeletconfiginternal.WatchChangeDetectionStrategy:
  101. secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
  102. configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
  103. case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
  104. secretManager = secret.NewCachingSecretManager(
  105. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  106. configMapManager = configmap.NewCachingConfigMapManager(
  107. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  108. case kubeletconfiginternal.GetChangeDetectionStrategy:
  109. secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
  110. configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
  111. default:
  112. return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
  113. }
  114. klet.secretManager = secretManager
  115. klet.configMapManager = configMapManager
  116. if klet.experimentalHostUserNamespaceDefaulting {
  117. klog.Infof("Experimental host user namespace defaulting is enabled.")
  118. }
  119. machineInfo, err := klet.cadvisor.MachineInfo()
  120. if err != nil {
  121. return nil, err
  122. }
  123. klet.machineInfo = machineInfo
  124. imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
  125. // 7、初始化 livenessManager、podManager、statusManager、resourceAnalyzer
  126. klet.livenessManager = proberesults.NewManager()
  127. klet.podCache = kubecontainer.NewCache()
  128. var checkpointManager checkpointmanager.CheckpointManager
  129. if bootstrapCheckpointPath != "" {
  130. checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
  131. if err != nil {
  132. return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
  133. }
  134. }
  135. klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
  136. klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
  137. if remoteRuntimeEndpoint != "" {
  138. if remoteImageEndpoint == "" {
  139. remoteImageEndpoint = remoteRuntimeEndpoint
  140. }
  141. }
  142. pluginSettings := dockershim.NetworkPluginSettings{......}
  143. klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)
  144. var legacyLogProvider kuberuntime.LegacyLogProvider
  145. // 8、调用 kuberuntime.NewKubeGenericRuntimeManager 初始化 containerRuntime
  146. switch containerRuntime {
  147. case kubetypes.DockerContainerRuntime:
  148. streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
  149. ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
  150. &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
  151. if err != nil {
  152. return nil, err
  153. }
  154. if crOptions.RedirectContainerStreaming {
  155. klet.criHandler = ds
  156. }
  157. server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
  158. if err := server.Start(); err != nil {
  159. return nil, err
  160. }
  161. supported, err := ds.IsCRISupportedLogDriver()
  162. if err != nil {
  163. return nil, err
  164. }
  165. if !supported {
  166. klet.dockerLegacyService = ds
  167. legacyLogProvider = ds
  168. }
  169. case kubetypes.RemoteContainerRuntime:
  170. break
  171. default:
  172. return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
  173. }
  174. runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
  175. if err != nil {
  176. return nil, err
  177. }
  178. klet.runtimeService = runtimeService
  179. if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil {
  180. klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
  181. }
  182. runtime, err := kuberuntime.NewKubeGenericRuntimeManager(......)
  183. if err != nil {
  184. return nil, err
  185. }
  186. klet.containerRuntime = runtime
  187. klet.streamingRuntime = runtime
  188. klet.runner = runtime
  189. runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
  190. if err != nil {
  191. return nil, err
  192. }
  193. klet.runtimeCache = runtimeCache
  194. if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
  195. klet.StatsProvider = stats.NewCadvisorStatsProvider(......)
  196. } else {
  197. klet.StatsProvider = stats.NewCRIStatsProvider(......)
  198. }
  199. // 9、初始化 pleg
  200. klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
  201. klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
  202. klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
  203. if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
  204. klog.Errorf("Pod CIDR update failed %v", err)
  205. }
  206. // 10、初始化 containerGC、containerDeletor、imageManager、containerLogManager
  207. containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
  208. if err != nil {
  209. return nil, err
  210. }
  211. klet.containerGC = containerGC
  212. klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
  213. imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions. PodSandboxImage)
  214. if err != nil {
  215. return nil, fmt.Errorf("failed to initialize image manager: %v", err)
  216. }
  217. klet.imageManager = imageManager
  218. if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
  219. containerLogManager, err := logs.NewContainerLogManager(
  220. klet.runtimeService,
  221. kubeCfg.ContainerLogMaxSize,
  222. int(kubeCfg.ContainerLogMaxFiles),
  223. )
  224. if err != nil {
  225. return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
  226. }
  227. klet.containerLogManager = containerLogManager
  228. } else {
  229. klet.containerLogManager = logs.NewStubContainerLogManager()
  230. }
  231. // 11、初始化 serverCertificateManager、probeManager、tokenManager、volumePluginMgr、pluginManager、volumeManager
  232. if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
  233. klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet. getLastObservedNodeAddresses, certDirectory)
  234. if err != nil {
  235. return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
  236. }
  237. kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
  238. cert := klet.serverCertificateManager.Current()
  239. if cert == nil {
  240. return nil, fmt.Errorf("no serving certificate available for the kubelet")
  241. }
  242. return cert, nil
  243. }
  244. }
  245. klet.probeManager = prober.NewManager(......)
  246. tokenManager := token.NewManager(kubeDeps.KubeClient)
  247. klet.volumePluginMgr, err =
  248. NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
  249. if err != nil {
  250. return nil, err
  251. }
  252. klet.pluginManager = pluginmanager.NewPluginManager(
  253. klet.getPluginsRegistrationDir(), /* sockDir */
  254. klet.getPluginsDir(), /* deprecatedSockDir */
  255. kubeDeps.Recorder,
  256. )
  257. if len(experimentalMounterPath) != 0 {
  258. experimentalCheckNodeCapabilitiesBeforeMount = false
  259. klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
  260. }
  261. klet.volumeManager = volumemanager.NewVolumeManager(......)
  262. // 12、初始化 workQueue、podWorkers、evictionManager
  263. klet.reasonCache = NewReasonCache()
  264. klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
  265. klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
  266. klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
  267. klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
  268. evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
  269. klet.evictionManager = evictionManager
  270. klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
  271. if utilfeature.DefaultFeatureGate.Enabled(features.Sysctls) {
  272. runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
  273. if err != nil {
  274. return nil, err
  275. }
  276. safeAndUnsafeSysctls := append(sysctlwhitelist.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
  277. sysctlsWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls)
  278. if err != nil {
  279. return nil, err
  280. }
  281. klet.admitHandlers.AddPodAdmitHandler(runtimeSupport)
  282. klet.admitHandlers.AddPodAdmitHandler(sysctlsWhitelist)
  283. }
  284. // 13、为 pod 注册相关模块的 handler
  285. activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
  286. if err != nil {
  287. return nil, err
  288. }
  289. klet.AddPodSyncLoopHandler(activeDeadlineHandler)
  290. klet.AddPodSyncHandler(activeDeadlineHandler)
  291. if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) {
  292. klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler())
  293. }
  294. criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder),kubeDeps.Recorder)
  295. klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
  296. for _, opt := range kubeDeps.Options {
  297. opt(klet)
  298. }
  299. klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
  300. klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
  301. klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))
  302. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
  303. klet.nodeLeaseController = nodelease.NewController(klet.clock, klet.heartbeatClient, string(klet.nodeName), kubeCfg.NodeLeaseDurationSeconds, klet.onRepeatedHeartbeatFailure)
  304. }
  305. klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewProcMountAdmitHandler(klet.containerRuntime))
  306. klet.kubeletConfiguration = *kubeCfg
  307. klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
  308. return klet, nil
  309. }

startKubelet

startKubelet 中通过调用 k.Run 来启动 kubelet 中的所有模块以及主流程,然后启动 kubelet 所需要的 http server,在 v1.16 中,kubelet 默认仅启动健康检查端口 10248 和 kubelet server 的端口 10250。

k8s.io/kubernetes/cmd/kubelet/app/server.go:1070

  1. func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
  2. // start the kubelet
  3. go wait.Until(func() {
  4. k.Run(podCfg.Updates())
  5. }, 0, wait.NeverStop)
  6. // start the kubelet server
  7. if enableServer {
  8. go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg. EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
  9. }
  10. if kubeCfg.ReadOnlyPort > 0 {
  11. go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
  12. }
  13. if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
  14. go k.ListenAndServePodResources()
  15. }
  16. }

至此,kubelet 对象以及其依赖模块在上面的几个方法中已经初始化完成了,除了单独启动了 gc 模块外其余的模块以及主逻辑最后都会在 Run 方法启动,Run 方法的主要逻辑在下文中会进行解释,此处总结一下 kubelet 启动逻辑中的调用关系如下所示:

  1. |--> NewMainKubelet
  2. |
  3. |--> createAndInitKubelet --|--> BirthCry
  4. | |
  5. |--> RunKubelet --| |--> StartGarbageCollection
  6. | |
  7. | |--> startKubelet --> k.Run
  8. |
  9. NewKubeletCommand --> Run --> run --|--> http.ListenAndServe
  10. |
  11. |--> daemon.SdNotify

Run

Run 方法是启动 kubelet 的核心方法,其中会启动 kubelet 的依赖模块以及主循环逻辑,该方法的主要逻辑为:

  • 1、注册 logServer;
  • 2、判断是否需要启动 cloud provider sync manager;
  • 3、调用 kl.initializeModules 首先启动不依赖 container runtime 的一些模块;
  • 4、启动 volume manager
  • 5、执行 kl.syncNodeStatus 定时同步 Node 状态;
  • 6、调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步;
  • 7、判断是否启用 NodeLease 机制;
  • 8、执行 kl.updateRuntimeUp 定时更新 Runtime 状态;
  • 9、执行 kl.syncNetworkUtil 定时同步 iptables 规则;
  • 10、执行 kl.podKiller 定时清理异常 pod,当 pod 没有被 podworker 正确处理的时候,启动一个goroutine 负责 kill 掉 pod;
  • 11、启动 statusManager
  • 12、启动 probeManager
  • 13、启动 runtimeClassManager
  • 14、启动 pleg
  • 15、调用 kl.syncLoop 监听 pod 变化;

Run 方法中主要调用了两个方法 kl.initializeModuleskl.fastStatusUpdateOnce 来完成启动前的一些初始化,在初始化完所有的模块后会启动主循环。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1398

  1. func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
  2. // 1、注册 logServer
  3. if kl.logServer == nil {
  4. kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
  5. }
  6. if kl.kubeClient == nil {
  7. klog.Warning("No api server defined - no node status update will be sent.")
  8. }
  9. // 2、判断是否需要启动 cloud provider sync manager
  10. if kl.cloudResourceSyncManager != nil {
  11. go kl.cloudResourceSyncManager.Run(wait.NeverStop)
  12. }
  13. // 3、调用 kl.initializeModules 首先启动不依赖 container runtime 的一些模块
  14. if err := kl.initializeModules(); err != nil {
  15. kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
  16. klog.Fatal(err)
  17. }
  18. // 4、启动 volume manager
  19. go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
  20. if kl.kubeClient != nil {
  21. // 5、执行 kl.syncNodeStatus 定时同步 Node 状态
  22. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
  23. // 6、调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步
  24. go kl.fastStatusUpdateOnce()
  25. // 7、判断是否启用 NodeLease 机制
  26. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
  27. go kl.nodeLeaseController.Run(wait.NeverStop)
  28. }
  29. }
  30. // 8、执行 kl.updateRuntimeUp 定时更新 Runtime 状态
  31. go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
  32. // 9、执行 kl.syncNetworkUtil 定时同步 iptables 规则
  33. if kl.makeIPTablesUtilChains {
  34. go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
  35. }
  36. // 10、执行 kl.podKiller 定时清理异常 pod
  37. go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
  38. // 11、启动 statusManager、probeManager、runtimeClassManager
  39. kl.statusManager.Start()
  40. kl.probeManager.Start()
  41. if kl.runtimeClassManager != nil {
  42. kl.runtimeClassManager.Start(wait.NeverStop)
  43. }
  44. // 12、启动 pleg
  45. kl.pleg.Start()
  46. // 13、调用 kl.syncLoop 监听 pod 变化
  47. kl.syncLoop(updates, kl)
  48. }

initializeModules

initializeModules 中启动的模块是不依赖于 container runtime 的,并且不依赖于尚未初始化的模块,其主要逻辑为:

  • 1、调用 kl.setupDataDirs 创建 kubelet 所需要的文件目录;
  • 2、创建 ContainerLogsDir /var/log/containers
  • 3、启动 imageManager,image gc 的功能已经在 RunKubelet 中启动了,此处主要是监控 image 的变化;
  • 4、启动 certificateManager,负责证书更新;
  • 5、启动 oomWatcher,监听 oom 并记录事件;
  • 6、启动 resourceAnalyzer

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1319

  1. func (kl *Kubelet) initializeModules() error {
  2. metrics.Register(
  3. kl.runtimeCache,
  4. collectors.NewVolumeStatsCollector(kl),
  5. collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
  6. )
  7. metrics.SetNodeName(kl.nodeName)
  8. servermetrics.Register()
  9. // 1、创建文件目录
  10. if err := kl.setupDataDirs(); err != nil {
  11. return err
  12. }
  13. // 2、创建 ContainerLogsDir
  14. if _, err := os.Stat(ContainerLogsDir); err != nil {
  15. if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
  16. klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
  17. }
  18. }
  19. // 3、启动 imageManager
  20. kl.imageManager.Start()
  21. // 4、启动 certificate manager
  22. if kl.serverCertificateManager != nil {
  23. kl.serverCertificateManager.Start()
  24. }
  25. // 5、启动 oomWatcher.
  26. if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
  27. return fmt.Errorf("failed to start OOM watcher %v", err)
  28. }
  29. // 6、启动 resource analyzer
  30. kl.resourceAnalyzer.Start()
  31. return nil
  32. }

fastStatusUpdateOnce

fastStatusUpdateOnce 会不断尝试更新 pod CIDR,一旦更新成功会立即执行updateRuntimeUpsyncNodeStatus来进行运行时的更新和节点状态更新。此方法只在 kubelet 启动时执行一次,目的是为了通过更新 pod CIDR,减少节点达到 ready 状态的时延,尽可能快的进行 runtime update 和 node status update。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:2262

  1. func (kl *Kubelet) fastStatusUpdateOnce() {
  2. for {
  3. time.Sleep(100 * time.Millisecond)
  4. node, err := kl.GetNode()
  5. if err != nil {
  6. klog.Errorf(err.Error())
  7. continue
  8. }
  9. if len(node.Spec.PodCIDRs) != 0 {
  10. podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
  11. if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
  12. klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err)
  13. continue
  14. }
  15. kl.updateRuntimeUp()
  16. kl.syncNodeStatus()
  17. return
  18. }
  19. }
  20. }
updateRuntimeUp

updateRuntimeUp 方法在容器运行时首次启动过程中初始化运行时依赖的模块,并在 kubelet 的runtimeState中更新容器运行时的启动时间。updateRuntimeUp 方法首先检查 network 以及 runtime 是否处于 ready 状态,如果 network 以及 runtime 都处于 ready 状态,然后调用 initializeRuntimeDependentModules 初始化 runtime 的依赖模块,包括 cadvisorcontainerManagerevictionManagercontainerLogManagerpluginManage等。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:2168

  1. func (kl *Kubelet) updateRuntimeUp() {
  2. kl.updateRuntimeMux.Lock()
  3. defer kl.updateRuntimeMux.Unlock()
  4. // 1、获取 containerRuntime Status
  5. s, err := kl.containerRuntime.Status()
  6. if err != nil {
  7. klog.Errorf("Container runtime sanity check failed: %v", err)
  8. return
  9. }
  10. if s == nil {
  11. klog.Errorf("Container runtime status is nil")
  12. return
  13. }
  14. // 2、检查 network 和 runtime 是否处于 ready 状态
  15. networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
  16. if networkReady == nil || !networkReady.Status {
  17. kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
  18. } else {
  19. kl.runtimeState.setNetworkState(nil)
  20. }
  21. runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
  22. if runtimeReady == nil || !runtimeReady.Status {
  23. kl.runtimeState.setRuntimeState(err)
  24. return
  25. }
  26. kl.runtimeState.setRuntimeState(nil)
  27. // 3、调用 kl.initializeRuntimeDependentModules 启动依赖模块
  28. kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
  29. kl.runtimeState.setRuntimeSync(kl.clock.Now())
  30. }
initializeRuntimeDependentModules

该方法的主要逻辑为:

  • 1、启动 cadvisor
  • 2、获取 CgroupStats;
  • 3、启动 containerManagerevictionManagercontainerLogManager
  • 4、将 CSI Driver 和 Device Manager 注册到 pluginManager,然后启动 pluginManager

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1361

  1. func (kl *Kubelet) initializeRuntimeDependentModules() {
  2. // 1、启动 cadvisor
  3. if err := kl.cadvisor.Start(); err != nil {
  4. ......
  5. }
  6. // 2、获取 CgroupStats
  7. kl.StatsProvider.GetCgroupStats("/", true)
  8. node, err := kl.getNodeAnyWay()
  9. if err != nil {
  10. klog.Fatalf("Kubelet failed to get node info: %v", err)
  11. }
  12. // 3、启动 containerManager、evictionManager、containerLogManager
  13. if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
  14. klog.Fatalf("Failed to start ContainerManager %v", err)
  15. }
  16. kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
  17. kl.containerLogManager.Start()
  18. kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
  19. kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
  20. // 4、启动 pluginManager
  21. go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
  22. }

小结

Run 方法中可以看到,会直接调用 kl.syncNodeStatuskl.updateRuntimeUp,但在 kl.fastStatusUpdateOnce 中也调用了这两个方法,而在 kl.fastStatusUpdateOnce 中仅执行一次,在 Run 方法中会定期执行。在kl.fastStatusUpdateOnce 中调用的目的就是当 kubelet 首次启动时尽可能快的进行 runtime update 和 node status update,减少节点达到 ready 状态的时延。而在 kl.updateRuntimeUp 中调用的初始化 runtime 依赖模块的方法 kl.initializeRuntimeDependentModules 通过 sync.Once 调用仅仅会被执行一次。

syncLoop

syncLoop 是 kubelet 的主循环方法,它从不同的管道(file,http,apiserver)监听 pod 的变化,并把它们汇聚起来。当有新的变化发生时,它会调用对应的函数,保证 pod 处于期望的状态。

syncLoop 中首先定义了一个 syncTickerhousekeepingTicker,即使没有需要更新的 pod 配置,kubelet 也会定时去做同步和清理 pod 的工作。然后在 for 循环中一直调用 syncLoopIteration,如果在每次循环过程中出现错误时,kubelet 会记录到 runtimeState 中,遇到错误就等待 5 秒中继续循环。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1821

  1. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  2. syncTicker := time.NewTicker(time.Second)
  3. defer syncTicker.Stop()
  4. housekeepingTicker := time.NewTicker(housekeepingPeriod)
  5. defer housekeepingTicker.Stop()
  6. plegCh := kl.pleg.Watch()
  7. const (
  8. base = 100 * time.Millisecond
  9. max = 5 * time.Second
  10. factor = 2
  11. )
  12. duration := base
  13. for {
  14. if err := kl.runtimeState.runtimeErrors(); err != nil {
  15. time.Sleep(duration)
  16. duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
  17. continue
  18. }
  19. duration = base
  20. kl.syncLoopMonitor.Store(kl.clock.Now())
  21. if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
  22. break
  23. }
  24. kl.syncLoopMonitor.Store(kl.clock.Now())
  25. }
  26. }
syncLoopIteration

syncLoopIteration 方法会监听多个 channel,当发现任何一个 channel 有数据就交给 handler 去处理,在 handler 中通过调用 dispatchWork 分发任务。它会从以下几个 channel 中获取消息:

  • 1、configCh:该信息源由 kubeDeps 对象中的 PodConfig 子模块提供,该模块将同时 watch 3 个不同来源的 pod 信息的变化(file,http,apiserver),一旦某个来源的 pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 pod 信息和更新的具体操作;
  • 2、syncCh:定时器,每隔一秒去同步最新保存的 pod 状态;
  • 3、houseKeepingCh:housekeeping 事件的通道,做 pod 清理工作;
  • 4、plegCh:该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态,如果状态发生变化,则这个 channel 产生事件;
  • 5、liveness Manager:健康检查模块发现某个 pod 异常时,kubelet 将根据 pod 的 restartPolicy 自动执行正确的操作;

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1888

  1. func (kl *Kubelet) syncLoopIteration(......) bool {
  2. select {
  3. case u, open := <-configCh:
  4. if !open {
  5. return false
  6. }
  7. switch u.Op {
  8. case kubetypes.ADD:
  9. handler.HandlePodAdditions(u.Pods)
  10. case kubetypes.UPDATE:
  11. handler.HandlePodUpdates(u.Pods)
  12. case kubetypes.REMOVE:
  13. handler.HandlePodRemoves(u.Pods)
  14. case kubetypes.RECONCILE:
  15. handler.HandlePodReconcile(u.Pods)
  16. case kubetypes.DELETE:
  17. handler.HandlePodUpdates(u.Pods)
  18. case kubetypes.RESTORE:
  19. handler.HandlePodAdditions(u.Pods)
  20. case kubetypes.SET:
  21. }
  22. if u.Op != kubetypes.RESTORE {
  23. kl.sourcesReady.AddSource(u.Source)
  24. }
  25. case e := <-plegCh:
  26. if isSyncPodWorthy(e) {
  27. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
  28. klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
  29. handler.HandlePodSyncs([]*v1.Pod{pod})
  30. } else {
  31. klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
  32. }
  33. }
  34. if e.Type == pleg.ContainerDied {
  35. if containerID, ok := e.Data.(string); ok {
  36. kl.cleanUpContainersInPod(e.ID, containerID)
  37. }
  38. }
  39. case <-syncCh:
  40. podsToSync := kl.getPodsToSync()
  41. if len(podsToSync) == 0 {
  42. break
  43. }
  44. handler.HandlePodSyncs(podsToSync)
  45. case update := <-kl.livenessManager.Updates():
  46. if update.Result == proberesults.Failure {
  47. pod, ok := kl.podManager.GetPodByUID(update.PodUID)
  48. if !ok {
  49. break
  50. }
  51. handler.HandlePodSyncs([]*v1.Pod{pod})
  52. }
  53. case <-housekeepingCh:
  54. if !kl.sourcesReady.AllReady() {
  55. klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
  56. } else {
  57. if err := handler.HandlePodCleanups(); err != nil {
  58. klog.Errorf("Failed cleaning pods: %v", err)
  59. }
  60. }
  61. }
  62. return true
  63. }

最后再总结一下启动 kubelet 以及其依赖模块 Run 方法中的调用流程:

  1. |--> kl.cloudResourceSyncManager.Run
  2. |
  3. | |--> kl.setupDataDirs
  4. | |--> kl.imageManager.Start
  5. Run --|--> kl.initializeModules ---|--> kl.serverCertificateManager.Start
  6. | |--> kl.oomWatcher.Start
  7. | |--> kl.resourceAnalyzer.Start
  8. |
  9. |--> kl.volumeManager.Run
  10. | |--> kl.containerRuntime.Status
  11. |--> kl.syncNodeStatus |
  12. | |--> kl.updateRuntimeUp --| |--> kl.cadvisor.Start
  13. | | | |
  14. |--> kl.fastStatusUpdateOnce --| |--> kl.initializeRuntimeDependentModules --|--> kl.containerManager.Start
  15. | | |
  16. | |--> kl.syncNodeStatus |--> kl.evictionManager.Start
  17. | |
  18. |--> kl.updateRuntimeUp |--> kl.containerLogManager.Start
  19. | |
  20. |--> kl.syncNetworkUtil |--> kl.pluginManager.Run
  21. |
  22. |--> kl.podKiller
  23. |
  24. |--> kl.statusManager.Start
  25. |
  26. |--> kl.probeManager.Start
  27. |
  28. |--> kl.runtimeClassManager.Start
  29. |
  30. |--> kl.pleg.Start
  31. |
  32. |--> kl.syncLoop --> kl.syncLoopIteration

总结

本文主要介绍了 kubelet 的启动流程,可以看到 kubelet 启动流程中的环节非常多,kubelet 中也包含了非常多的模块,后续在分享 kubelet 源码的文章中会先以 Run 方法中启动的所有模块为主,各个击破。