调度器初始化

概述

今天我们要做一些琐碎的知识点分析,比如调度器启动的时候默认配置是怎么来的?默认生效了哪些调度算法?自定义的算法是如何注入的?诸如这些问题,我们顺带会看一下调度器相关的一些数据结构的含义。看完前面这些节的分析后再看完本篇文章你可能会有一种醍醐灌顶的感觉哦~

从 —config 开始

如果我们编译出来一个 kube-scheduler 二进制文件,运行./kube-scheduler -h后会看到很多的帮助信息,这些信息是分组的,比如第一组 Misc,差不多是“大杂烩”的意思,不好分类的几个 flag,其实也是最重要的几个 flag,如下:

1554085596345

很好理解,第一个红框框圈出来的--config用于指定配置文件,老版本的各种参数基本都不建议使用了,所以这个 config flag 指定的 config 文件中基本包含了所有可配置项,我们看一下代码中获取这个 flag 的相关代码:

!FILENAME cmd/kube-scheduler/app/options/options.go:143

  1. func (o *Options) Flags() (nfs apiserverflag.NamedFlagSets) {
  2. fs := nfs.FlagSet("misc")
  3. // 关注 --config
  4. fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
  5. fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
  6. fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
  7. o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
  8. o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
  9. o.Authentication.AddFlags(nfs.FlagSet("authentication"))
  10. o.Authorization.AddFlags(nfs.FlagSet("authorization"))
  11. o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)
  12. leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
  13. utilfeature.DefaultFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
  14. return nfs
  15. }

上述代码中有几个点可以关注到:

  1. FlagSet 的含义,命令行输出的分组和这里的分组是对应的;
  2. 除了认证授权、选举等“非关键”配置外,其他配置基本 Deprecated 了,也就意味着建议使用 config file;

上面代码中可以看到o.ConfigFile接收了config配置,我们看看Option类型是什么样子的~

options.Option 对象

Options对象包含运行一个 Scheduler 所需要的所有参数

!FILENAME cmd/kube-scheduler/app/options/options.go:55

  1. type Options struct {
  2. // 和命令行帮助信息的分组是一致的
  3. ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
  4. SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
  5. CombinedInsecureServing *CombinedInsecureServingOptions
  6. Authentication *apiserveroptions.DelegatingAuthenticationOptions
  7. Authorization *apiserveroptions.DelegatingAuthorizationOptions
  8. Deprecated *DeprecatedOptions
  9. // config 文件的路径
  10. ConfigFile string
  11. // 如果指定了,会输出 config 的默认配置到这个文件
  12. WriteConfigTo string
  13. Master string
  14. }

前面的 flag 相关代码中写到配置文件的内容给了o.ConfigFile,也就是Options.ConfigFile,那这个属性怎么使用呢?

我们来看下面这个 ApplyTo() 函数,这个函数要做的事情是把 options 配置 apply 给 scheduler app configuration(这个对象后面会讲到):

!FILENAME cmd/kube-scheduler/app/options/options.go:162

  1. // 把 Options apply 给 Config
  2. func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
  3. // --config 没有使用的情况
  4. if len(o.ConfigFile) == 0 {
  5. c.ComponentConfig = o.ComponentConfig
  6. // 使用 Deprecated 的配置
  7. if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
  8. return err
  9. }
  10. if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
  11. return err
  12. }
  13. } else {
  14. // 加载 config 文件中的内容
  15. cfg, err := loadConfigFromFile(o.ConfigFile)
  16. if err != nil {
  17. return err
  18. }
  19. // 上面加载到的配置赋值给 Config中的 ComponentConfig
  20. c.ComponentConfig = *cfg
  21. if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
  22. return err
  23. }
  24. }
  25. // ……
  26. return nil
  27. }

这个函数中可以看到用 —config 和不用 —config 两种情况下 options 是如何应用到schedulerappconfig.Config中的。那么这里提到的 Config 对象又是什么呢?

config.Config对象

Config 对象包含运行一个 Scheduler 所需要的所有 context

!FILENAME cmd/kube-scheduler/app/config/config.go:32

  1. type Config struct {
  2. // 调度器配置对象
  3. ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
  4. LoopbackClientConfig *restclient.Config
  5. InsecureServing *apiserver.DeprecatedInsecureServingInfo
  6. InsecureMetricsServing *apiserver.DeprecatedInsecureServingInfo
  7. Authentication apiserver.AuthenticationInfo
  8. Authorization apiserver.AuthorizationInfo
  9. SecureServing *apiserver.SecureServingInfo
  10. Client clientset.Interface
  11. InformerFactory informers.SharedInformerFactory
  12. PodInformer coreinformers.PodInformer
  13. EventClient v1core.EventsGetter
  14. Recorder record.EventRecorder
  15. Broadcaster record.EventBroadcaster
  16. LeaderElection *leaderelection.LeaderElectionConfig
  17. }

所以前面的c.ComponentConfig = o.ComponentConfig这行代码也就是把 Options 中的 ComponentConfig 赋值给了 Config 中的 ComponentConfig;是哪里的逻辑让 OptionsConfig 对象产生了关联呢?(也就是说前面提到的 ApplyTo() 方法是再哪里被调用的?)

继续跟下去可以找到Config()函数,从这个函数的返回值*schedulerappconfig.Config可以看到它的目的,是需要得到一个 schedulerappconfig.Config,代码不长:

!FILENAME cmd/kube-scheduler/app/options/options.go:221

  1. func (o *Options) Config() (*schedulerappconfig.Config, error) {
  2. // ……
  3. c := &schedulerappconfig.Config{}
  4. // 前面我们看到的 ApplyTo() 函数
  5. if err := o.ApplyTo(c); err != nil {
  6. return nil, err
  7. }
  8. // Prepare kube clients.
  9. // ……
  10. // Prepare event clients.
  11. eventBroadcaster := record.NewBroadcaster()
  12. recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})
  13. // Set up leader election if enabled.
  14. // ……
  15. c.Client = client
  16. c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
  17. c.PodInformer = factory.NewPodInformer(client, 0)
  18. c.EventClient = eventClient
  19. c.Recorder = recorder
  20. c.Broadcaster = eventBroadcaster
  21. c.LeaderElection = leaderElectionConfig
  22. return c, nil
  23. }

那调用这个Config()函数的地方又在哪里呢?继续跟就到 runCommand 里面了~

runCommand

runCommand 这个函数我们不陌生:

!FILENAME cmd/kube-scheduler/app/server.go:117

  1. func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
  2. // ……
  3. // 这个地方完成了前面说到的配置文件和命令行参数等读取和应用工作
  4. c, err := opts.Config()
  5. if err != nil {
  6. fmt.Fprintf(os.Stderr, "%v\n", err)
  7. os.Exit(1)
  8. }
  9. stopCh := make(chan struct{})
  10. // Get the completed config
  11. cc := c.Complete()
  12. // To help debugging, immediately log version
  13. klog.Infof("Version: %+v", version.Get())
  14. // 这里有一堆逻辑
  15. algorithmprovider.ApplyFeatureGates()
  16. // Configz registration.
  17. // ……
  18. return Run(cc, stopCh)
  19. }

runCommand 在最开始的时候我们有见到过,已经到 cobra 入口的 Run 中了:

!FILENAME cmd/kube-scheduler/app/server.go:85

  1. Run: func(cmd *cobra.Command, args []string) {
  2. if err := runCommand(cmd, args, opts); err != nil {
  3. fmt.Fprintf(os.Stderr, "%v\n", err)
  4. os.Exit(1)
  5. }
  6. },

上面涉及到2个知识点:

  • ApplyFeatureGates
  • Run 中的逻辑

我们下面分别来看看~

ApplyFeatureGates

这个函数跟进去可以看到如下几行简单的代码,这里很自然我们能够想到继续跟defaults.ApplyFeatureGates(),但是不能只看到这个函数哦,具体来看:

!FILENAME pkg/scheduler/algorithmprovider/plugins.go:17

  1. package algorithmprovider
  2. import (
  3. "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
  4. )
  5. // ApplyFeatureGates applies algorithm by feature gates.
  6. func ApplyFeatureGates() {
  7. defaults.ApplyFeatureGates()
  8. }

到这里分2条路:

  • import defaults 这个 package 的时候有一个init()函数调用的逻辑
  • defaults.ApplyFeatureGates() 函数调用本身。

默认算法注册

!FILENAME pkg/scheduler/algorithmprovider/defaults/defaults.go:38

  1. func init() {
  2. // ……
  3. registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
  4. // ……
  5. }

init()函数中我们先关注 registerAlgorithmProvider() 函数,这里从字面上可以得到不少信息,大胆猜一下:是不是注册了默认的预选算法和优选算法?

!FILENAME pkg/scheduler/algorithmprovider/defaults/defaults.go:222

  1. func registerAlgorithmProvider(predSet, priSet sets.String) {
  2. // 注册 algorithm provider. 默认使用 DefaultProvider
  3. factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
  4. factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
  5. copyAndReplace(priSet, "LeastRequestedPriority", "MostRequestedPriority"))
  6. }

看到这里可以关注到 AlgorithmProvider 这个概念,后面会讲到。

先看一下里面调用的注册函数是怎么实现的:

!FILENAME pkg/scheduler/factory/plugins.go:387

  1. func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys sets.String) string {
  2. schedulerFactoryMutex.Lock()
  3. defer schedulerFactoryMutex.Unlock()
  4. validateAlgorithmNameOrDie(name)
  5. // 很明显,关键逻辑在这里
  6. algorithmProviderMap[name] = AlgorithmProviderConfig{
  7. FitPredicateKeys: predicateKeys,
  8. PriorityFunctionKeys: priorityKeys,
  9. }
  10. return name
  11. }

首先,algorithmProviderMap 这个变量是一个包级变量,在86行做的定义:algorithmProviderMap = make(map[string]AlgorithmProviderConfig)

这里的 key 有2种情况:

  • “DefaultProvider”
  • “ClusterAutoscalerProvider”

混合云场景用得到 ClusterAutoscalerProvider,大家感兴趣可以研究一下 ClusterAutoscaler 特性,这块我们先不说。默认的情况是生效的 DefaultProvider,这块逻辑后面还会提到。

然后这个 map 的 value 的类型是一个简单的 struct:

!FILENAME pkg/scheduler/factory/plugins.go:99

  1. type AlgorithmProviderConfig struct {
  2. FitPredicateKeys sets.String
  3. PriorityFunctionKeys sets.String
  4. }

接着看一下defaultPredicates()函数

!FILENAME pkg/scheduler/algorithmprovider/defaults/defaults.go:106

  1. func defaultPredicates() sets.String {
  2. return sets.NewString(
  3. // Fit is determined by volume zone requirements.
  4. factory.RegisterFitPredicateFactory(
  5. predicates.NoVolumeZoneConflictPred,
  6. func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
  7. return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo)
  8. },
  9. ),
  10. // ……
  11. factory.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict),
  12. // ……
  13. )
  14. }

这个函数里面就2中类型的玩法,简化一些可以理解成上面这个样子,我们一个个来看。

先认识一下 sets.NewString()函数要干嘛:

!FILENAME vendor/k8s.io/apimachinery/pkg/util/sets/string.go:27

  1. type String map[string]Empty
  2. // NewString creates a String from a list of values.
  3. func NewString(items ...string) String {
  4. ss := String{}
  5. ss.Insert(items...)
  6. return ss
  7. }
  8. // ……
  9. // Insert adds items to the set.
  10. func (s String) Insert(items ...string) {
  11. for _, item := range items {
  12. s[item] = Empty{}
  13. }
  14. }

如上,很简单的类型封装。里面的Empty是:type Empty struct{},所以本质上就是要用map[string]struct{}这个类型罢了。

因此上面defaultPredicates()函数中sets.NewString()内每一个参数本质上就是一个 string 类型了,我们来看这一个个 string 是怎么来的。

!FILENAME pkg/scheduler/factory/plugins.go:195

  1. func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
  2. schedulerFactoryMutex.Lock()
  3. defer schedulerFactoryMutex.Unlock()
  4. validateAlgorithmNameOrDie(name)
  5. // 唯一值的关注的逻辑
  6. fitPredicateMap[name] = predicateFactory
  7. // 返回 name
  8. return name
  9. }

这个函数要返回一个 string 我们已经知道了,里面的逻辑也只有这一行需要我们关注:fitPredicateMap[name] = predicateFactory,这个 map 类型也是一个包级变量:fitPredicateMap = make(map[string]FitPredicateFactory),所以前面讲的注册本质也就是在填充这个变量而已。理解fitPredicateMap[name] = predicateFactoryfitPredicateMapkeyvalue,也就知道了这里的 Register 要做什么。

defaultPredicates()中的第二种注册方式 RegisterFitPredicate 区别不大,函数体也是调用的 RegisterFitPredicateFactory()

!FILENAME pkg/scheduler/factory/plugins.go:106

  1. func RegisterFitPredicate(name string, predicate algorithm.FitPredicate) string {
  2. return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) algorithm.FitPredicate { return predicate })
  3. }

特性开关

!FILENAME pkg/scheduler/algorithmprovider/defaults/defaults.go:183

  1. func ApplyFeatureGates() {
  2. if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
  3. factory.RemoveFitPredicate(predicates.CheckNodeConditionPred)
  4. factory.RemoveFitPredicate(predicates.CheckNodeMemoryPressurePred)
  5. factory.RemoveFitPredicate(predicates.CheckNodeDiskPressurePred)
  6. factory.RemoveFitPredicate(predicates.CheckNodePIDPressurePred)
  7. factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeConditionPred)
  8. factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeMemoryPressurePred)
  9. factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeDiskPressurePred)
  10. factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodePIDPressurePred)
  11. factory.RegisterMandatoryFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints)
  12. factory.RegisterMandatoryFitPredicate(predicates.CheckNodeUnschedulablePred, predicates.CheckNodeUnschedulablePredicate)
  13. factory.InsertPredicateKeyToAlgorithmProviderMap(predicates.PodToleratesNodeTaintsPred)
  14. factory.InsertPredicateKeyToAlgorithmProviderMap(predicates.CheckNodeUnschedulablePred)
  15. }
  16. if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
  17. factory.RegisterPriorityFunction2("ResourceLimitsPriority", priorities.ResourceLimitsPriorityMap, nil, 1)
  18. factory.InsertPriorityKeyToAlgorithmProviderMap(factory.RegisterPriorityFunction2("ResourceLimitsPriority", priorities.ResourceLimitsPriorityMap, nil, 1))
  19. }
  20. }

这个函数看着几十行,实际上只在重复一件事情,增加或删除一些预选和优选算法。我们看一下这里的一些逻辑:

utilfeature.DefaultFeatureGate.Enabled() 函数要做的事情是判断一个 feature 是否开启;函数参数本质只是一个字符串:

!FILENAME pkg/features/kube_features.go:25

  1. const (
  2. AppArmor utilfeature.Feature = "AppArmor"
  3. DynamicKubeletConfig utilfeature.Feature = "DynamicKubeletConfig"
  4. // ……
  5. )

这里定义了很多的 feature,然后定义了哪些 feature 是开启的,处在 alpha 还是 beta 或者 GA 等:

!FILENAME pkg/features/kube_features.go:405

  1. var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec{
  2. AppArmor: {Default: true, PreRelease: utilfeature.Beta},
  3. DynamicKubeletConfig: {Default: true, PreRelease: utilfeature.Beta},
  4. ExperimentalHostUserNamespaceDefaultingGate: {Default: false, PreRelease: utilfeature.Beta},
  5. ExperimentalCriticalPodAnnotation: {Default: false, PreRelease: utilfeature.Alpha},
  6. DevicePlugins: {Default: true, PreRelease: utilfeature.Beta},
  7. TaintBasedEvictions: {Default: true, PreRelease: utilfeature.Beta},
  8. RotateKubeletServerCertificate: {Default: true, PreRelease: utilfeature.Beta},
  9. // ……
  10. }

所以回到前面ApplyFeatureGates()的逻辑,utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition)要判断的是 TaintNodesByCondition 这个特性是否开启了,如果开启了就把 predicates 中 “CheckNodeCondition”, “CheckNodeMemoryPressure”, “CheckNodePIDPressurePred”, “CheckNodeDiskPressure” 这几个算法去掉,把 “PodToleratesNodeTaints”, “CheckNodeUnschedulable” 加上。接着对于特性 “ResourceLimitsPriorityFunction” 的处理也是同一个逻辑。

Scheduler 的创建

我们换一条线,从 Scheduler 对象的创建再来看另外几个知识点。

前面分析到runCommand()函数的时候我们说到了需要关注最后一行return Run(cc, stopCh)的逻辑,在Run()函数中主要的逻辑就是创建 Scheduler 和启动 Scheduler;现在我们来看创建逻辑:

!FILENAME cmd/kube-scheduler/app/server.go:174

  1. sched, err := scheduler.New(cc.Client,
  2. cc.InformerFactory.Core().V1().Nodes(),
  3. cc.PodInformer,
  4. cc.InformerFactory.Core().V1().PersistentVolumes(),
  5. cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
  6. cc.InformerFactory.Core().V1().ReplicationControllers(),
  7. cc.InformerFactory.Apps().V1().ReplicaSets(),
  8. cc.InformerFactory.Apps().V1().StatefulSets(),
  9. cc.InformerFactory.Core().V1().Services(),
  10. cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
  11. storageClassInformer,
  12. cc.Recorder,
  13. cc.ComponentConfig.AlgorithmSource,
  14. stopCh,
  15. scheduler.WithName(cc.ComponentConfig.SchedulerName),
  16. scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
  17. scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
  18. scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
  19. scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
  20. scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))

这里调用了一个New()函数,传了很多参数进去。New()函数的定义如下:

!FILENAME pkg/scheduler/scheduler.go:131

  1. func New(client clientset.Interface,
  2. nodeInformer coreinformers.NodeInformer,
  3. podInformer coreinformers.PodInformer,
  4. pvInformer coreinformers.PersistentVolumeInformer,
  5. pvcInformer coreinformers.PersistentVolumeClaimInformer,
  6. replicationControllerInformer coreinformers.ReplicationControllerInformer,
  7. replicaSetInformer appsinformers.ReplicaSetInformer,
  8. statefulSetInformer appsinformers.StatefulSetInformer,
  9. serviceInformer coreinformers.ServiceInformer,
  10. pdbInformer policyinformers.PodDisruptionBudgetInformer,
  11. storageClassInformer storageinformers.StorageClassInformer,
  12. recorder record.EventRecorder,
  13. schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
  14. stopCh <-chan struct{},
  15. opts ...func(o *schedulerOptions)) (*Scheduler, error)

这里涉及到的东西有点小多,我们一点点看:

options := defaultSchedulerOptions 这行代码的 defaultSchedulerOptions 是一个 schedulerOptions 对象:

!FILENAME pkg/scheduler/scheduler.go:121

  1. // LINE 67
  2. type schedulerOptions struct {
  3. schedulerName string
  4. hardPodAffinitySymmetricWeight int32
  5. enableEquivalenceClassCache bool
  6. disablePreemption bool
  7. percentageOfNodesToScore int32
  8. bindTimeoutSeconds int64
  9. }
  10. // ……
  11. // LINE 121
  12. var defaultSchedulerOptions = schedulerOptions{
  13. // "default-scheduler"
  14. schedulerName: v1.DefaultSchedulerName,
  15. // 1
  16. hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
  17. enableEquivalenceClassCache: false,
  18. disablePreemption: false,
  19. // 50
  20. percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
  21. // 100
  22. bindTimeoutSeconds: BindTimeoutSeconds,
  23. }

回到New()函数的逻辑:

!FILENAME pkg/scheduler/scheduler.go:148

  1. for _, opt := range opts {
  2. opt(&options)
  3. }

这里的 opts 定义在参数里:opts ...func(o *schedulerOptions),我们看一个实参来理解一下:scheduler.WithName(cc.ComponentConfig.SchedulerName)

!FILENAME pkg/scheduler/scheduler.go:80

  1. // 这个函数能够把给定的 schedulerName 赋值给 schedulerOptions.schedulerName
  2. func WithName(schedulerName string) Option {
  3. return func(o *schedulerOptions) {
  4. o.schedulerName = schedulerName
  5. }
  6. }

这种方式设置一个对象的属性还是挺有意思的。

调度算法源

我们继续往后面看New()函数的其他逻辑:

source := schedulerAlgorithmSource 这行代码里的 schedulerAlgorithmSource 代表了什么?

形参中有这个变量的定义:schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,跟进去可以看到:

!FILENAME pkg/scheduler/apis/config/types.go:97

  1. // 表示调度算法源,两个属性是互相排斥的,也就是二选一的意思
  2. type SchedulerAlgorithmSource struct {
  3. // Policy is a policy based algorithm source.
  4. Policy *SchedulerPolicySource
  5. // Provider is the name of a scheduling algorithm provider to use.
  6. Provider *string
  7. }

这两个属性肯定得理解一下了,目测挺重要的样子:

Policy

!FILENAME pkg/scheduler/apis/config/types.go:106

  1. type SchedulerPolicySource struct {
  2. // 文件方式配置生效的调度算法
  3. File *SchedulerPolicyFileSource
  4. // ConfigMap 方式配置生效的调度算法
  5. ConfigMap *SchedulerPolicyConfigMapSource
  6. }
  7. // 下面分别是2个属性的结构定义:
  8. // ……
  9. type SchedulerPolicyFileSource struct {
  10. // Path is the location of a serialized policy.
  11. Path string
  12. }
  13. // ……
  14. type SchedulerPolicyConfigMapSource struct {
  15. // Namespace is the namespace of the policy config map.
  16. Namespace string
  17. // Name is the name of hte policy config map.
  18. Name string
  19. }

大家还记得我们在讲调度器设计的时候提到的 Policy 文件不?大概长这个样子:

  1. {
  2. "kind" : "Policy",
  3. "apiVersion" : "v1",
  4. "predicates" : [
  5. {"name" : "PodFitsHostPorts"},
  6. {"name" : "HostName"}
  7. ],
  8. "priorities" : [
  9. {"name" : "LeastRequestedPriority", "weight" : 1},
  10. {"name" : "EqualPriority", "weight" : 1}
  11. ],
  12. "hardPodAffinitySymmetricWeight" : 10,
  13. "alwaysCheckAllPredicates" : false
  14. }

所以啊,这个 Policy原来是通过代码里的 SchedulerPolicySource 去配置的~

policy / provider 如何生效

前面讲到调度算法从何而来(源头),现在我们看一下这些算法配置如何生效的:

!FILENAME pkg/scheduler/scheduler.go:173

  1. source := schedulerAlgorithmSource
  2. switch {
  3. // 如果 Provider 配置了,就不用 policy 了
  4. case source.Provider != nil:
  5. // 根据给定的 Provider 创建 scheduler config
  6. sc, err := configurator.CreateFromProvider(*source.Provider)
  7. if err != nil {
  8. return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
  9. }
  10. config = sc
  11. // 如果 Policy 提供了,就没有上面的 provider 的事情了
  12. case source.Policy != nil:
  13. // 根据给定的 Policy 创建 scheduler config
  14. policy := &schedulerapi.Policy{}
  15. switch {
  16. // 是 File 的情况
  17. case source.Policy.File != nil:
  18. if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
  19. return nil, err
  20. }
  21. // 是 ConfigMap 的情况
  22. case source.Policy.ConfigMap != nil:
  23. if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
  24. return nil, err
  25. }
  26. }
  27. sc, err := configurator.CreateFromConfig(*policy)
  28. if err != nil {
  29. return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
  30. }
  31. config = sc
  32. default:
  33. return nil, fmt.Errorf("unsupported algorithm source: %v", source)
  34. }

上面代码涉及到的2个类型我们再来关注一下:

  • schedulerapi.Policy
  • factory.Config

这个 Policy 就是具体用于存放我们配置的 policy 的载体,对照着这个结构我们可以判断自己在配置 policy 的时候应该按照什么格式:

!FILENAME pkg/scheduler/api/types.go:47

  1. type Policy struct {
  2. metav1.TypeMeta
  3. Predicates []PredicatePolicy
  4. Priorities []PriorityPolicy
  5. ExtenderConfigs []ExtenderConfig
  6. HardPodAffinitySymmetricWeight int32
  7. AlwaysCheckAllPredicates bool
  8. }

这个结构内部封装的一层层结构我就不继续贴了,大家感兴趣可以点开看一下,跟到底的落点都是基础类型的,string啊,int啊,bool啊这些~

关于 factory.Config 可能大家有印象,这个结构就是 Scheduler 对象的唯一属性:

!FILENAME pkg/scheduler/scheduler.go:58

  1. type Scheduler struct {
  2. config *factory.Config
  3. }

Config 结构体的属性不外乎 Scheduler 在落实调度、抢占等动作时所需要的一系列方法(或对象);在New()函数的最后有一行sched := NewFromConfig(config),实现是简单地实例化 Scheduler,然后将 config 赋值给 Scheduler 的 config 属性,然后返回 Scheduler 对象的地址。

默认生效的算法

我们最后还是单独拎出来强调一下生效了哪些算法的具体逻辑吧,前面有提到一些了,我相信肯定有人很关注这个知识点。

前面提到 Scheduler 创建的时候使用的 New()函数,函数中 switch 判断 schedulerAlgorithmSource 是 Provider 还是 Policy,然后做了具体的初始化逻辑,我们具体看其中一个初始化, 串一下这些点:

sc, err := configurator.CreateFromProvider(*source.Provider)

如果我们配置的是 Provider,这时候代码逻辑调用的是这样一行,这个函数的实现如下:

!FILENAME pkg/scheduler/factory/factory.go:1156

  1. func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
  2. // 比如说我们配置的 name 是 DefaultProvider,这个函数要获取一个 AlgorithmProviderConfig 类型的对象
  3. provider, err := GetAlgorithmProvider(providerName)
  4. if err != nil {
  5. return nil, err
  6. }
  7. // 下面详细看
  8. return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
  9. }

这个函数里有2个点需要关注,第一个是GetAlgorithmProvider()函数返回了什么:

!FILENAME pkg/scheduler/factory/plugins.go:99

  1. type AlgorithmProviderConfig struct {
  2. FitPredicateKeys sets.String
  3. PriorityFunctionKeys sets.String
  4. }

看到这个返回值类型,心里就明朗了。

我们继续看比较重要的CreateFromKeys()方法调用的具体逻辑,这个函数的实参中 provider.FitPredicateKeys, provider.PriorityFunctionKeys 很明显和具体的 provider 相关,不同 provider 定义的预置算法不同。继续来看函数实现:

!FILENAME pkg/scheduler/factory/factory.go:1255

  1. func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
  2. // ……
  3. // 根据 predicateKeys 得到 predicateFuncs
  4. predicateFuncs, err := c.GetPredicates(predicateKeys)
  5. // 根据 priorityKeys 得到 priorityConfigs
  6. priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
  7. // ……
  8. // 创建一个 genericScheduler,这个对象我们很熟悉。algo 也就是 Algorithm 的简写;
  9. algo := core.NewGenericScheduler(
  10. c.schedulerCache,
  11. c.equivalencePodCache,
  12. c.podQueue,
  13. predicateFuncs, // 和 predicateKeys 对应
  14. predicateMetaProducer,
  15. priorityConfigs, // 和 priorityKeys 对应
  16. priorityMetaProducer,
  17. // ……
  18. )
  19. podBackoff := util.CreateDefaultPodBackoff()
  20. return &Config{
  21. // ……
  22. Algorithm: algo, // 很清晰了
  23. // ……
  24. }, nil
  25. }

上面的NewGenericScheduler()函数接收了这些参数之后丢给了 genericScheduler 对象,这个对象中 predicates 属性对应参数 predicateFuncs,prioritizers 属性对应参数 priorityConfigs;

从这里的代码可以看出来我们配置的算法源可以影响到 Scheduler 的初始化,最终体现在改变了 Scheduler 对象的 config 属性的 Algorithm 属性的 prioritizers 和 prioritizers 上。我们最后回顾一下这2个属性的类型,就和以前的预选、优选过程分析的时候关注的点对上了:

  • predicates —> map[string]algorithm.FitPredicate
  • prioritizers —> []algorithm.PriorityConfig

是不是很熟悉呢?

行,今天就讲到这里~