kube-scheduler源码分析(四)之 findNodesThatFit

以下代码分析基于 kubernetes v1.12.0 版本。

本文主要分析调度逻辑中的预选策略,即第一步筛选出符合pod调度条件的节点。

1. 调用入口

预选,通过预选函数来判断每个节点是否适合被该Pod调度。

genericScheduler.Schedule中对findNodesThatFit的调用过程如下:

此部分代码位于pkg/scheduler/core/generic_scheduler.go

  1. func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
  2. ...
  3. // 列出所有的节点
  4. nodes, err := nodeLister.List()
  5. if err != nil {
  6. return "", err
  7. }
  8. if len(nodes) == 0 {
  9. return "", ErrNoNodesAvailable
  10. }
  11. // Used for all fit and priority funcs.
  12. err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
  13. if err != nil {
  14. return "", err
  15. }
  16. trace.Step("Computing predicates")
  17. startPredicateEvalTime := time.Now()
  18. // 调用findNodesThatFit过滤出预选节点
  19. filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
  20. if err != nil {
  21. return "", err
  22. }
  23. if len(filteredNodes) == 0 {
  24. return "", &FitError{
  25. Pod: pod,
  26. NumAllNodes: len(nodes),
  27. FailedPredicates: failedPredicateMap,
  28. }
  29. }
  30. // metrics
  31. metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
  32. metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
  33. ...
  34. }

核心代码:

  1. // 调用findNodesThatFit过滤出预选节点
  2. filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)

2. findNodesThatFit

findNodesThatFit基于给定的预选函数过滤node,每个node传入到预选函数中来确实该节点是否符合要求。

findNodesThatFit的入参是被调度的pod和当前的节点列表,返回预选节点列表和错误。

findNodesThatFit基本流程如下:

  1. 设置可行节点的总数,作为预选节点数组的容量,避免总节点过多需要筛选的节点过多。
  2. 通过NodeTree不断获取下一个节点来判断该节点是否满足pod的调度条件。
  3. 通过之前注册的各种预选函数来判断当前节点是否符合pod的调度条件。
  4. 最后返回满足调度条件的node列表,供下一步的优选操作。

findNodesThatFit完整代码如下:

此部分代码位于pkg/scheduler/core/generic_scheduler.go

  1. // Filters the nodes to find the ones that fit based on the given predicate functions
  2. // Each node is passed through the predicate functions to determine if it is a fit
  3. func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
  4. var filtered []*v1.Node
  5. failedPredicateMap := FailedPredicateMap{}
  6. if len(g.predicates) == 0 {
  7. filtered = nodes
  8. } else {
  9. allNodes := int32(g.cache.NodeTree().NumNodes)
  10. numNodesToFind := g.numFeasibleNodesToFind(allNodes)
  11. // Create filtered list with enough space to avoid growing it
  12. // and allow assigning.
  13. filtered = make([]*v1.Node, numNodesToFind)
  14. errs := errors.MessageCountMap{}
  15. var (
  16. predicateResultLock sync.Mutex
  17. filteredLen int32
  18. equivClass *equivalence.Class
  19. )
  20. ctx, cancel := context.WithCancel(context.Background())
  21. // We can use the same metadata producer for all nodes.
  22. meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
  23. if g.equivalenceCache != nil {
  24. // getEquivalenceClassInfo will return immediately if no equivalence pod found
  25. equivClass = equivalence.NewClass(pod)
  26. }
  27. checkNode := func(i int) {
  28. var nodeCache *equivalence.NodeCache
  29. nodeName := g.cache.NodeTree().Next()
  30. if g.equivalenceCache != nil {
  31. nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
  32. }
  33. fits, failedPredicates, err := podFitsOnNode(
  34. pod,
  35. meta,
  36. g.cachedNodeInfoMap[nodeName],
  37. g.predicates,
  38. g.cache,
  39. nodeCache,
  40. g.schedulingQueue,
  41. g.alwaysCheckAllPredicates,
  42. equivClass,
  43. )
  44. if err != nil {
  45. predicateResultLock.Lock()
  46. errs[err.Error()]++
  47. predicateResultLock.Unlock()
  48. return
  49. }
  50. if fits {
  51. length := atomic.AddInt32(&filteredLen, 1)
  52. if length > numNodesToFind {
  53. cancel()
  54. atomic.AddInt32(&filteredLen, -1)
  55. } else {
  56. filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
  57. }
  58. } else {
  59. predicateResultLock.Lock()
  60. failedPredicateMap[nodeName] = failedPredicates
  61. predicateResultLock.Unlock()
  62. }
  63. }
  64. // Stops searching for more nodes once the configured number of feasible nodes
  65. // are found.
  66. workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
  67. filtered = filtered[:filteredLen]
  68. if len(errs) > 0 {
  69. return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
  70. }
  71. }
  72. if len(filtered) > 0 && len(g.extenders) != 0 {
  73. for _, extender := range g.extenders {
  74. if !extender.IsInterested(pod) {
  75. continue
  76. }
  77. filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
  78. if err != nil {
  79. if extender.IsIgnorable() {
  80. glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
  81. extender, err)
  82. continue
  83. } else {
  84. return []*v1.Node{}, FailedPredicateMap{}, err
  85. }
  86. }
  87. for failedNodeName, failedMsg := range failedMap {
  88. if _, found := failedPredicateMap[failedNodeName]; !found {
  89. failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
  90. }
  91. failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
  92. }
  93. filtered = filteredList
  94. if len(filtered) == 0 {
  95. break
  96. }
  97. }
  98. }
  99. return filtered, failedPredicateMap, nil
  100. }

以下对findNodesThatFit分段分析。

3. numFeasibleNodesToFind

findNodesThatFit先基于所有的节点找出可行的节点是总数。numFeasibleNodesToFind的作用主要是避免当节点过多(超过100)影响调度的效率。

  1. allNodes := int32(g.cache.NodeTree().NumNodes)
  2. numNodesToFind := g.numFeasibleNodesToFind(allNodes)
  3. // Create filtered list with enough space to avoid growing it
  4. // and allow assigning.
  5. filtered = make([]*v1.Node, numNodesToFind)

numFeasibleNodesToFind基本流程如下:

  • 如果所有的node节点小于minFeasibleNodesToFind(当前默认为100)则返回节点数。
  • 如果节点数超100,则取指定计分的百分比的节点数,当该百分比后的数目仍小于minFeasibleNodesToFind,则返回minFeasibleNodesToFind
  • 如果百分比后的数目大于minFeasibleNodesToFind,则返回该百分比。
  1. // numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
  2. // its search for more feasible nodes.
  3. func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) int32 {
  4. if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore <= 0 ||
  5. g.percentageOfNodesToScore >= 100 {
  6. return numAllNodes
  7. }
  8. numNodes := numAllNodes * g.percentageOfNodesToScore / 100
  9. if numNodes < minFeasibleNodesToFind {
  10. return minFeasibleNodesToFind
  11. }
  12. return numNodes
  13. }

4. checkNode

checkNode是一个校验node是否符合要求的函数,其中实际调用到的核心函数是podFitsOnNode。再通过workqueue并发执行checkNode操作。

checkNode主要流程如下:

  1. 通过cache中的nodeTree不断获取下一个node。
  2. 将当前node和pod传入podFitsOnNode判断当前node是否符合要求。
  3. 如果当前node符合要求就将当前node加入预选节点的数组中filtered
  4. 如果当前node不满足要求,则加入到失败的数组中,并记录原因。
  5. 通过workqueue.ParallelizeUntil并发执行checkNode函数,一旦找到配置的可行节点数,就停止搜索更多节点。
  1. checkNode := func(i int) {
  2. var nodeCache *equivalence.NodeCache
  3. nodeName := g.cache.NodeTree().Next()
  4. if g.equivalenceCache != nil {
  5. nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
  6. }
  7. fits, failedPredicates, err := podFitsOnNode(
  8. pod,
  9. meta,
  10. g.cachedNodeInfoMap[nodeName],
  11. g.predicates,
  12. g.cache,
  13. nodeCache,
  14. g.schedulingQueue,
  15. g.alwaysCheckAllPredicates,
  16. equivClass,
  17. )
  18. if err != nil {
  19. predicateResultLock.Lock()
  20. errs[err.Error()]++
  21. predicateResultLock.Unlock()
  22. return
  23. }
  24. if fits {
  25. length := atomic.AddInt32(&filteredLen, 1)
  26. if length > numNodesToFind {
  27. cancel()
  28. atomic.AddInt32(&filteredLen, -1)
  29. } else {
  30. filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
  31. }
  32. } else {
  33. predicateResultLock.Lock()
  34. failedPredicateMap[nodeName] = failedPredicates
  35. predicateResultLock.Unlock()
  36. }
  37. }

workqueue的并发操作:

  1. // Stops searching for more nodes once the configured number of feasible nodes
  2. // are found.
  3. workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

ParallelizeUntil具体代码如下:

  1. // ParallelizeUntil is a framework that allows for parallelizing N
  2. // independent pieces of work until done or the context is canceled.
  3. func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
  4. var stop <-chan struct{}
  5. if ctx != nil {
  6. stop = ctx.Done()
  7. }
  8. toProcess := make(chan int, pieces)
  9. for i := 0; i < pieces; i++ {
  10. toProcess <- i
  11. }
  12. close(toProcess)
  13. if pieces < workers {
  14. workers = pieces
  15. }
  16. wg := sync.WaitGroup{}
  17. wg.Add(workers)
  18. for i := 0; i < workers; i++ {
  19. go func() {
  20. defer utilruntime.HandleCrash()
  21. defer wg.Done()
  22. for piece := range toProcess {
  23. select {
  24. case <-stop:
  25. return
  26. default:
  27. doWorkPiece(piece)
  28. }
  29. }
  30. }()
  31. }
  32. wg.Wait()
  33. }

5. podFitsOnNode

podFitsOnNode主要内容如下:

  • podFitsOnNode会检查给定的某个Node是否满足预选的函数。

  • 对于给定的pod,podFitsOnNode会检查是否有相同的pod存在,尽量复用缓存过的预选结果。

podFitsOnNode主要在Schedule(调度)和Preempt(抢占)的时候被调用。

当在Schedule中被调用的时候,主要判断是否可以被调度到当前节点,依据为当前节点上所有已存在的pod及被提名要运行到该节点的具有相等或更高优先级的pod。

当在Preempt中被调用的时候,即发生抢占的时候,通过SelectVictimsOnNode函数选出需要被移除的pod,移除后然后将预调度的pod调度到该节点上。

podFitsOnNode基本流程如下:

  1. 遍历之前注册好的预选策略predicates.Ordering,并获取预选策略的执行函数。
  2. 遍历执行每个预选函数,并返回是否合适,预选失败的原因和错误。
  3. 如果预选函数执行的结果不合适,则加入预选失败的数组中。
  4. 最后返回预选失败的个数是否为0,和预选失败的原因。

入参:

  • pod
  • PredicateMetadata
  • NodeInfo
  • predicateFuncs
  • schedulercache.Cache
  • nodeCache
  • SchedulingQueue
  • alwaysCheckAllPredicates
  • equivClass

出参:

  • fit
  • PredicateFailureReason

完整代码如下:

此部分代码位于pkg/scheduler/core/generic_scheduler.go

  1. // podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
  2. // For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
  3. // predicate results as possible.
  4. // This function is called from two different places: Schedule and Preempt.
  5. // When it is called from Schedule, we want to test whether the pod is schedulable
  6. // on the node with all the existing pods on the node plus higher and equal priority
  7. // pods nominated to run on the node.
  8. // When it is called from Preempt, we should remove the victims of preemption and
  9. // add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
  10. // It removes victims from meta and NodeInfo before calling this function.
  11. func podFitsOnNode(
  12. pod *v1.Pod,
  13. meta algorithm.PredicateMetadata,
  14. info *schedulercache.NodeInfo,
  15. predicateFuncs map[string]algorithm.FitPredicate,
  16. cache schedulercache.Cache,
  17. nodeCache *equivalence.NodeCache,
  18. queue SchedulingQueue,
  19. alwaysCheckAllPredicates bool,
  20. equivClass *equivalence.Class,
  21. ) (bool, []algorithm.PredicateFailureReason, error) {
  22. var (
  23. eCacheAvailable bool
  24. failedPredicates []algorithm.PredicateFailureReason
  25. )
  26. podsAdded := false
  27. // We run predicates twice in some cases. If the node has greater or equal priority
  28. // nominated pods, we run them when those pods are added to meta and nodeInfo.
  29. // If all predicates succeed in this pass, we run them again when these
  30. // nominated pods are not added. This second pass is necessary because some
  31. // predicates such as inter-pod affinity may not pass without the nominated pods.
  32. // If there are no nominated pods for the node or if the first run of the
  33. // predicates fail, we don't run the second pass.
  34. // We consider only equal or higher priority pods in the first pass, because
  35. // those are the current "pod" must yield to them and not take a space opened
  36. // for running them. It is ok if the current "pod" take resources freed for
  37. // lower priority pods.
  38. // Requiring that the new pod is schedulable in both circumstances ensures that
  39. // we are making a conservative decision: predicates like resources and inter-pod
  40. // anti-affinity are more likely to fail when the nominated pods are treated
  41. // as running, while predicates like pod affinity are more likely to fail when
  42. // the nominated pods are treated as not running. We can't just assume the
  43. // nominated pods are running because they are not running right now and in fact,
  44. // they may end up getting scheduled to a different node.
  45. for i := 0; i < 2; i++ {
  46. metaToUse := meta
  47. nodeInfoToUse := info
  48. if i == 0 {
  49. podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue)
  50. } else if !podsAdded || len(failedPredicates) != 0 {
  51. break
  52. }
  53. // Bypass eCache if node has any nominated pods.
  54. // TODO(bsalamat): consider using eCache and adding proper eCache invalidations
  55. // when pods are nominated or their nominations change.
  56. eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
  57. for _, predicateKey := range predicates.Ordering() {
  58. var (
  59. fit bool
  60. reasons []algorithm.PredicateFailureReason
  61. err error
  62. )
  63. //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
  64. if predicate, exist := predicateFuncs[predicateKey]; exist {
  65. if eCacheAvailable {
  66. fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
  67. } else {
  68. fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
  69. }
  70. if err != nil {
  71. return false, []algorithm.PredicateFailureReason{}, err
  72. }
  73. if !fit {
  74. // eCache is available and valid, and predicates result is unfit, record the fail reasons
  75. failedPredicates = append(failedPredicates, reasons...)
  76. // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
  77. if !alwaysCheckAllPredicates {
  78. glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
  79. "evaluation is short circuited and there are chances " +
  80. "of other predicates failing as well.")
  81. break
  82. }
  83. }
  84. }
  85. }
  86. }
  87. return len(failedPredicates) == 0, failedPredicates, nil
  88. }

5.1. predicateFuncs

根据之前初注册好的预选策略函数来执行预选,判断节点是否符合调度。

  1. for _, predicateKey := range predicates.Ordering() {
  2. if predicate, exist := predicateFuncs[predicateKey]; exist {
  3. if eCacheAvailable {
  4. fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
  5. } else {
  6. fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
  7. }

预选策略如下:

  1. var (
  2. predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
  3. GeneralPred, HostNamePred, PodFitsHostPortsPred,
  4. MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
  5. PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
  6. CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
  7. MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
  8. CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
  9. )

6. PodFitsResources

以下以PodFitsResources这个预选函数为例做分析,其他重要的预选函数待后续单独分析。

PodFitsResources用来检查一个节点是否有足够的资源来运行当前的pod,包括CPU、内存、GPU等。

PodFitsResources基本流程如下:

  1. 判断当前节点上pod总数加上预调度pod个数是否大于node的可分配pod总数,若是则不允许调度。
  2. 判断pod的request值是否都为0,若是则允许调度。
  3. 判断pod的request值加上当前node上所有pod的request值总和是否大于node的可分配资源,若是则不允许调度。
  4. 判断pod的拓展资源request值加上当前node上所有pod对应的request值总和是否大于node对应的可分配资源,若是则不允许调度。

PodFitsResources的注册代码如下:

  1. factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)

PodFitsResources入参:

  • pod

  • nodeInfo

  • PredicateMetadata

PodFitsResources出参:

  • fit
  • PredicateFailureReason

PodFitsResources完整代码:

此部分的代码位于pkg/scheduler/algorithm/predicates/predicates.go

  1. // PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
  2. // First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
  3. // predicate failure reasons if the node has insufficient resources to run the pod.
  4. func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
  5. node := nodeInfo.Node()
  6. if node == nil {
  7. return false, nil, fmt.Errorf("node not found")
  8. }
  9. var predicateFails []algorithm.PredicateFailureReason
  10. allowedPodNumber := nodeInfo.AllowedPodNumber()
  11. if len(nodeInfo.Pods())+1 > allowedPodNumber {
  12. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
  13. }
  14. // No extended resources should be ignored by default.
  15. ignoredExtendedResources := sets.NewString()
  16. var podRequest *schedulercache.Resource
  17. if predicateMeta, ok := meta.(*predicateMetadata); ok {
  18. podRequest = predicateMeta.podRequest
  19. if predicateMeta.ignoredExtendedResources != nil {
  20. ignoredExtendedResources = predicateMeta.ignoredExtendedResources
  21. }
  22. } else {
  23. // We couldn't parse metadata - fallback to computing it.
  24. podRequest = GetResourceRequest(pod)
  25. }
  26. if podRequest.MilliCPU == 0 &&
  27. podRequest.Memory == 0 &&
  28. podRequest.EphemeralStorage == 0 &&
  29. len(podRequest.ScalarResources) == 0 {
  30. return len(predicateFails) == 0, predicateFails, nil
  31. }
  32. allocatable := nodeInfo.AllocatableResource()
  33. if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
  34. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
  35. }
  36. if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
  37. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
  38. }
  39. if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
  40. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
  41. }
  42. for rName, rQuant := range podRequest.ScalarResources {
  43. if v1helper.IsExtendedResourceName(rName) {
  44. // If this resource is one of the extended resources that should be
  45. // ignored, we will skip checking it.
  46. if ignoredExtendedResources.Has(string(rName)) {
  47. continue
  48. }
  49. }
  50. if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
  51. predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
  52. }
  53. }
  54. if glog.V(10) {
  55. if len(predicateFails) == 0 {
  56. // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
  57. // not logged. There is visible performance gain from it.
  58. glog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
  59. podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
  60. }
  61. }
  62. return len(predicateFails) == 0, predicateFails, nil
  63. }

6.1. NodeInfo

NodeInfo是node的聚合信息,主要包括:

  • node:k8s node的结构体
  • pods:当前node上pod的数量
  • requestedResource:当前node上所有pod的request总和
  • allocatableResource:node的实际所有的可分配资源(对应于Node.Status.Allocatable.*),可理解为node的资源总量。

此部分代码位于pkg/scheduler/cache/node_info.go

  1. // NodeInfo is node level aggregated information.
  2. type NodeInfo struct {
  3. // Overall node information.
  4. node *v1.Node
  5. pods []*v1.Pod
  6. podsWithAffinity []*v1.Pod
  7. usedPorts util.HostPortInfo
  8. // Total requested resource of all pods on this node.
  9. // It includes assumed pods which scheduler sends binding to apiserver but
  10. // didn't get it as scheduled yet.
  11. requestedResource *Resource
  12. nonzeroRequest *Resource
  13. // We store allocatedResources (which is Node.Status.Allocatable.*) explicitly
  14. // as int64, to avoid conversions and accessing map.
  15. allocatableResource *Resource
  16. // Cached taints of the node for faster lookup.
  17. taints []v1.Taint
  18. taintsErr error
  19. // imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
  20. // checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
  21. // state information.
  22. imageStates map[string]*ImageStateSummary
  23. // TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
  24. // scheduling cycle.
  25. // TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
  26. TransientInfo *transientSchedulerInfo
  27. // Cached conditions of node for faster lookup.
  28. memoryPressureCondition v1.ConditionStatus
  29. diskPressureCondition v1.ConditionStatus
  30. pidPressureCondition v1.ConditionStatus
  31. // Whenever NodeInfo changes, generation is bumped.
  32. // This is used to avoid cloning it if the object didn't change.
  33. generation int64
  34. }

6.2. Resource

Resource是可计算资源的集合体。主要包括:

  • MilliCPU
  • Memory
  • EphemeralStorage
  • AllowedPodNumber:允许的pod总数(对应于Node.Status.Allocatable.Pods().Value()),一般为110。
  • ScalarResources
  1. // Resource is a collection of compute resource.
  2. type Resource struct {
  3. MilliCPU int64
  4. Memory int64
  5. EphemeralStorage int64
  6. // We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value())
  7. // explicitly as int, to avoid conversions and improve performance.
  8. AllowedPodNumber int
  9. // ScalarResources
  10. ScalarResources map[v1.ResourceName]int64
  11. }

以下分析podFitsOnNode的具体流程。

6.3. allowedPodNumber

首先获取节点的信息,先判断如果该节点当前所有的pod的个数加上当前预调度的pod是否会大于该节点允许的pod的总数,一般为110个。如果超过,则predicateFails数组增加1,即当前节点不适合该pod。

  1. node := nodeInfo.Node()
  2. if node == nil {
  3. return false, nil, fmt.Errorf("node not found")
  4. }
  5. var predicateFails []algorithm.PredicateFailureReason
  6. allowedPodNumber := nodeInfo.AllowedPodNumber()
  7. if len(nodeInfo.Pods())+1 > allowedPodNumber {
  8. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
  9. }

6.4. podRequest

如果podRequest都为0,则允许调度到该节点,直接返回结果。

  1. if podRequest.MilliCPU == 0 &&
  2. podRequest.Memory == 0 &&
  3. podRequest.EphemeralStorage == 0 &&
  4. len(podRequest.ScalarResources) == 0 {
  5. return len(predicateFails) == 0, predicateFails, nil
  6. }

6.5. AllocatableResource

如果当前预调度的pod的request资源加上当前node上所有pod的request总和大于该node的可分配资源总量,则不允许调度到该节点,直接返回结果。其中request资源包括CPU、内存、storage。

  1. allocatable := nodeInfo.AllocatableResource()
  2. if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
  3. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
  4. }
  5. if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
  6. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
  7. }
  8. if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
  9. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
  10. }

6.6. ScalarResources

判断其他拓展的标量资源,是否该pod的request值加上当前node上所有pod的对应资源的request总和大于该node上对应资源的可分配总量,如果是,则不允许调度到该节点。

  1. for rName, rQuant := range podRequest.ScalarResources {
  2. if v1helper.IsExtendedResourceName(rName) {
  3. // If this resource is one of the extended resources that should be
  4. // ignored, we will skip checking it.
  5. if ignoredExtendedResources.Has(string(rName)) {
  6. continue
  7. }
  8. }
  9. if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
  10. predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
  11. }
  12. }

7. 总结

findNodesThatFit基于给定的预选函数过滤node,每个node传入到预选函数中来确实该节点是否符合要求。

findNodesThatFit的入参是被调度的pod和当前的节点列表,返回预选节点列表和错误。

findNodesThatFit基本流程如下:

  1. 设置可行节点的总数,作为预选节点数组的容量,避免总节点过多导致需要筛选的节点过多,效率低。
  2. 通过NodeTree不断获取下一个节点来判断该节点是否满足pod的调度条件。
  3. 通过之前注册的各种预选函数来判断当前节点是否符合pod的调度条件。
  4. 最后返回满足调度条件的node列表,供下一步的优选操作。

7.1. checkNode

checkNode是一个校验node是否符合要求的函数,其中实际调用到的核心函数是podFitsOnNode。再通过workqueue并发执行checkNode操作。

checkNode主要流程如下:

  1. 通过cache中的nodeTree不断获取下一个node。
  2. 将当前node和pod传入podFitsOnNode判断当前node是否符合要求。
  3. 如果当前node符合要求就将当前node加入预选节点的数组中filtered
  4. 如果当前node不满足要求,则加入到失败的数组中,并记录原因。
  5. 通过workqueue.ParallelizeUntil并发执行checkNode函数,一旦找到配置的可行节点数,就停止搜索更多节点。

7.2. podFitsOnNode

其中会调用到核心函数podFitsOnNode。

podFitsOnNode主要内容如下:

  • podFitsOnNode会检查给定的某个Node是否满足预选的函数。

  • 对于给定的pod,podFitsOnNode会检查是否有相同的pod存在,尽量复用缓存过的预选结果。

podFitsOnNode主要在Schedule(调度)和Preempt(抢占)的时候被调用。

当在Schedule中被调用的时候,主要判断是否可以被调度到当前节点,依据为当前节点上所有已存在的pod及被提名要运行到该节点的具有相等或更高优先级的pod。

当在Preempt中被调用的时候,即发生抢占的时候,通过SelectVictimsOnNode函数选出需要被移除的pod,移除后然后将预调度的pod调度到该节点上。

podFitsOnNode基本流程如下:

  1. 遍历之前注册好的预选策略predicates.Ordering,并获取预选策略的执行函数。
  2. 遍历执行每个预选函数,并返回是否合适,预选失败的原因和错误。
  3. 如果预选函数执行的结果不合适,则加入预选失败的数组中。
  4. 最后返回预选失败的个数是否为0,和预选失败的原因。

7.3. PodFitsResources

本文只示例分析了其中一个重要的预选函数:PodFitsResources

PodFitsResources用来检查一个节点是否有足够的资源来运行当前的pod,包括CPU、内存、GPU等。

PodFitsResources基本流程如下:

  1. 判断当前节点上pod总数加上预调度pod个数是否大于node的可分配pod总数,若是则不允许调度。
  2. 判断pod的request值是否都为0,若是则允许调度。
  3. 判断pod的request值加上当前node上所有pod的request值总和是否大于node的可分配资源,若是则不允许调度。
  4. 判断pod的拓展资源request值加上当前node上所有pod对应的request值总和是否大于node对应的可分配资源,若是则不允许调度。

参考: