kubelet源码分析(二)之 NewMainKubelet

以下代码分析基于 kubernetes v1.12.0 版本。

本文主要分析 https://github.com/kubernetes/kubernetes/tree/v1.12.0/pkg/kubelet 部分的代码。

本文主要分析kubelet中的NewMainKubelet部分。

1. NewMainKubelet

NewMainKubelet主要用来初始化和构造一个kubelet结构体,kubelet结构体定义参考:https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kubelet.go#L888

  1. // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
  2. // No initialization of Kubelet and its modules should happen here.
  3. func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
  4. kubeDeps *Dependencies,
  5. crOptions *config.ContainerRuntimeOptions,
  6. containerRuntime string,
  7. runtimeCgroups string,
  8. hostnameOverride string,
  9. nodeIP string,
  10. providerID string,
  11. cloudProvider string,
  12. certDirectory string,
  13. rootDirectory string,
  14. registerNode bool,
  15. registerWithTaints []api.Taint,
  16. allowedUnsafeSysctls []string,
  17. remoteRuntimeEndpoint string,
  18. remoteImageEndpoint string,
  19. experimentalMounterPath string,
  20. experimentalKernelMemcgNotification bool,
  21. experimentalCheckNodeCapabilitiesBeforeMount bool,
  22. experimentalNodeAllocatableIgnoreEvictionThreshold bool,
  23. minimumGCAge metav1.Duration,
  24. maxPerPodContainerCount int32,
  25. maxContainerCount int32,
  26. masterServiceNamespace string,
  27. registerSchedulable bool,
  28. nonMasqueradeCIDR string,
  29. keepTerminatedPodVolumes bool,
  30. nodeLabels map[string]string,
  31. seccompProfileRoot string,
  32. bootstrapCheckpointPath string,
  33. nodeStatusMaxImages int32) (*Kubelet, error) {
  34. ...
  35. }

1.1. PodConfig

通过makePodSourceConfig生成Pod config。

  1. if kubeDeps.PodConfig == nil {
  2. var err error
  3. kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
  4. if err != nil {
  5. return nil, err
  6. }
  7. }

1.1.1. makePodSourceConfig

  1. // makePodSourceConfig creates a config.PodConfig from the given
  2. // KubeletConfiguration or returns an error.
  3. func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
  4. ...
  5. // source of all configuration
  6. cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
  7. // define file config source
  8. if kubeCfg.StaticPodPath != "" {
  9. glog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
  10. config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
  11. }
  12. // define url config source
  13. if kubeCfg.StaticPodURL != "" {
  14. glog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
  15. config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
  16. }
  17. // Restore from the checkpoint path
  18. // NOTE: This MUST happen before creating the apiserver source
  19. // below, or the checkpoint would override the source of truth.
  20. ...
  21. if kubeDeps.KubeClient != nil {
  22. glog.Infof("Watching apiserver")
  23. if updatechannel == nil {
  24. updatechannel = cfg.Channel(kubetypes.ApiserverSource)
  25. }
  26. config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
  27. }
  28. return cfg, nil
  29. }

1.1.2. NewPodConfig

  1. // NewPodConfig creates an object that can merge many configuration sources into a stream
  2. // of normalized updates to a pod configuration.
  3. func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
  4. updates := make(chan kubetypes.PodUpdate, 50)
  5. storage := newPodStorage(updates, mode, recorder)
  6. podConfig := &PodConfig{
  7. pods: storage,
  8. mux: config.NewMux(storage),
  9. updates: updates,
  10. sources: sets.String{},
  11. }
  12. return podConfig
  13. }

1.1.3. NewSourceApiserver

  1. // NewSourceApiserver creates a config source that watches and pulls from the apiserver.
  2. func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
  3. lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
  4. newSourceApiserverFromLW(lw, updates)
  5. }

1.2. Lister

serviceListernodeLister分别通过List-Watch机制监听servicenode的列表变化。

1.2.1. serviceLister

  1. serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  2. if kubeDeps.KubeClient != nil {
  3. serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
  4. r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
  5. go r.Run(wait.NeverStop)
  6. }
  7. serviceLister := corelisters.NewServiceLister(serviceIndexer)

1.2.2. nodeLister

  1. nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
  2. if kubeDeps.KubeClient != nil {
  3. fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
  4. nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
  5. r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
  6. go r.Run(wait.NeverStop)
  7. }
  8. nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}

1.3. 各种Manager

1.3.1. containerRefManager

  1. containerRefManager := kubecontainer.NewRefManager()

1.3.2. oomWatcher

  1. oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)

1.3.3. dnsConfigurer

  1. clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
  2. for _, ipEntry := range kubeCfg.ClusterDNS {
  3. ip := net.ParseIP(ipEntry)
  4. if ip == nil {
  5. glog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)
  6. } else {
  7. clusterDNS = append(clusterDNS, ip)
  8. }
  9. }
  10. ...
  11. dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),

1.3.4. secretManager & configMapManager

  1. var secretManager secret.Manager
  2. var configMapManager configmap.Manager
  3. switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
  4. case kubeletconfiginternal.WatchChangeDetectionStrategy:
  5. secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
  6. configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
  7. case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
  8. secretManager = secret.NewCachingSecretManager(
  9. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  10. configMapManager = configmap.NewCachingConfigMapManager(
  11. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  12. case kubeletconfiginternal.GetChangeDetectionStrategy:
  13. secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
  14. configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
  15. default:
  16. return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
  17. }
  18. klet.secretManager = secretManager
  19. klet.configMapManager = configMapManager

1.3.5. livenessManager

  1. klet.livenessManager = proberesults.NewManager()

1.3.6. podManager

  1. // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
  2. klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)

1.3.7. resourceAnalyzer

  1. klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)

1.3.8. containerGC

  1. // setup containerGC
  2. containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
  3. if err != nil {
  4. return nil, err
  5. }
  6. klet.containerGC = containerGC
  7. klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))

1.3.9. imageManager

  1. // setup imageManager
  2. imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
  3. if err != nil {
  4. return nil, fmt.Errorf("failed to initialize image manager: %v", err)
  5. }
  6. klet.imageManager = imageManager

1.3.10. statusManager

  1. klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)

1.3.11. probeManager

  1. klet.probeManager = prober.NewManager(
  2. klet.statusManager,
  3. klet.livenessManager,
  4. klet.runner,
  5. containerRefManager,
  6. kubeDeps.Recorder)

1.3.12. tokenManager

  1. tokenManager := token.NewManager(kubeDeps.KubeClient)

1.3.13. volumePluginMgr

  1. klet.volumePluginMgr, err =
  2. NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
  3. if err != nil {
  4. return nil, err
  5. }
  6. if klet.enablePluginsWatcher {
  7. klet.pluginWatcher = pluginwatcher.NewWatcher(klet.getPluginsDir())
  8. }

1.3.14. volumeManager

  1. // setup volumeManager
  2. klet.volumeManager = volumemanager.NewVolumeManager(
  3. kubeCfg.EnableControllerAttachDetach,
  4. nodeName,
  5. klet.podManager,
  6. klet.statusManager,
  7. klet.kubeClient,
  8. klet.volumePluginMgr,
  9. klet.containerRuntime,
  10. kubeDeps.Mounter,
  11. klet.getPodsDir(),
  12. kubeDeps.Recorder,
  13. experimentalCheckNodeCapabilitiesBeforeMount,
  14. keepTerminatedPodVolumes)

1.3.15. evictionManager

  1. // setup eviction manager
  2. evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
  3. klet.evictionManager = evictionManager
  4. klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)

1.4. containerRuntime

目前pod所使用的runtime只有dockerremote两种,rkt已经废弃。

  1. if containerRuntime == "rkt" {
  2. glog.Fatalln("rktnetes has been deprecated in favor of rktlet. Please see https://github.com/kubernetes-incubator/rktlet for more information.")
  3. }

runtimedocker的时候,会执行docker相关操作。

  1. switch containerRuntime {
  2. case kubetypes.DockerContainerRuntime:
  3. // Create and start the CRI shim running as a grpc server.
  4. ...
  5. // The unix socket for kubelet <-> dockershim communication.
  6. ...
  7. // Create dockerLegacyService when the logging driver is not supported.
  8. ...
  9. case kubetypes.RemoteContainerRuntime:
  10. // No-op.
  11. break
  12. default:
  13. return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
  14. }

1.4.1. NewDockerService

  1. // Create and start the CRI shim running as a grpc server.
  2. streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
  3. ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
  4. &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
  5. if err != nil {
  6. return nil, err
  7. }
  8. if crOptions.RedirectContainerStreaming {
  9. klet.criHandler = ds
  10. }

1.4.2. NewDockerServer

  1. // The unix socket for kubelet <-> dockershim communication.
  2. glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
  3. remoteRuntimeEndpoint,
  4. remoteImageEndpoint)
  5. glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
  6. server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
  7. if err := server.Start(); err != nil {
  8. return nil, err
  9. }

1.4.3. DockerServer.Start

  1. // Start starts the dockershim grpc server.
  2. func (s *DockerServer) Start() error {
  3. // Start the internal service.
  4. if err := s.service.Start(); err != nil {
  5. glog.Errorf("Unable to start docker service")
  6. return err
  7. }
  8. glog.V(2).Infof("Start dockershim grpc server")
  9. l, err := util.CreateListener(s.endpoint)
  10. if err != nil {
  11. return fmt.Errorf("failed to listen on %q: %v", s.endpoint, err)
  12. }
  13. // Create the grpc server and register runtime and image services.
  14. s.server = grpc.NewServer(
  15. grpc.MaxRecvMsgSize(maxMsgSize),
  16. grpc.MaxSendMsgSize(maxMsgSize),
  17. )
  18. runtimeapi.RegisterRuntimeServiceServer(s.server, s.service)
  19. runtimeapi.RegisterImageServiceServer(s.server, s.service)
  20. go func() {
  21. if err := s.server.Serve(l); err != nil {
  22. glog.Fatalf("Failed to serve connections: %v", err)
  23. }
  24. }()
  25. return nil
  26. }

1.5. podWorker

构造podWorkersworkQueue

  1. klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
  2. klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

1.5.1. PodWorkers接口

  1. // PodWorkers is an abstract interface for testability.
  2. type PodWorkers interface {
  3. UpdatePod(options *UpdatePodOptions)
  4. ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
  5. ForgetWorker(uid types.UID)
  6. }

podWorker主要用来对pod相应事件进行处理和同步,包含以下三个方法:UpdatePodForgetNonExistingPodWorkersForgetWorker

2. 总结

  1. NewMainKubelet主要用来构造kubelet结构体,其中kubelet除了包含必要的配置和client(例如:kubeClient、csiClient等)外,最主要的包含各种manager来管理不同的任务。
  2. 核心的manager有以下几种:

    • oomWatcher:监控pod内存是否发生OOM。
    • podManager:管理pod的生命周期,包括对pod的增删改查操作等。
    • containerGC:对死亡容器进行垃圾回收。
    • imageManager:对容器镜像进行垃圾回收。
    • statusManager:与apiserver同步pod状态,同时也作状态缓存。
    • volumeManager:对pod的volume进行attached/detached/mounted/unmounted操作。
    • evictionManager:保证节点稳定,必要时对pod进行驱逐(例如资源不足的情况下)。
  3. NewMainKubelet还包含了serviceListernodeLister来监听servicenode的列表变化。

  4. kubelet使用到的containerRuntime目前主要是docker,其中rkt已废弃。NewMainKubelet启动了dockershim grpc server来执行docker相关操作。
  5. 构建了podWorker来对pod相关的更新逻辑进行处理。

参考文章: