Scheduling Policies

This document describes the policies used to select the preference of tasks andto select the preference of workers used by Dask’s distributed scheduler. Formore information on how this these policies are enacted efficiently seeScheduling State.

Choosing Workers

When a task transitions from waiting to a processing state we decide a suitableworker for that task. If the task has significant data dependencies or if theworkers are under heavy load then this choice of worker can strongly impactglobal performance. Currently workers for tasks are determined as follows:

  • If the task has no major dependencies and no restrictions then we find theleast occupied worker.
  • Otherwise, if a task has user-provided restrictions (for example it mustrun on a machine with a GPU) then we restrict the available pool of workersto just that set, otherwise we consider all workers
  • From among this pool of workers we determine the workers to whom the leastamount of data would need to be transferred.
  • We break ties by choosing the worker that currently has the fewest tasks,counting both those tasks in memory and those tasks processing currently.

This process is easy to change (and indeed this document may be outdated). Weencourage readers to inspect the decide_worker function in scheduler.py

decide_worker(ts, allworkers, …)Decide which worker should take task _ts.

Choosing Tasks

We often have a choice between running many valid tasks. There are a fewcompeting interests that might motivate our choice:

  • Run tasks on a first-come-first-served basis for fairness betweenmultiple clients
  • Run tasks that are part of the critical path in an effort toreduce total running time and minimize straggler workloads
  • Run tasks that allow us to release many dependencies in an effort to keepthe memory footprint small
  • Run tasks that are related so that large chunks of work can be completelyeliminated before running new chunks of work

Accomplishing all of these objectives simultaneously is impossible. Optimizingfor any of these objectives perfectly can result in costly overhead. Theheuristics with the scheduler do a decent but imperfect job of optimizing forall of these (they all come up in important workloads) quickly.

Last in, first out

When a worker finishes a task the immediate dependencies of that task get toppriority. This encourages a behavior of finishing ongoing work immediatelybefore starting new work. This often conflicts with thefirst-come-first-served objective but often results in shorter total runtimesand significantly reduced memory footprints.

Break ties with children and depth

Often a task has multiple dependencies and we need to break ties between themwith some other objective. Breaking these ties has a surprisingly strongimpact on performance and memory footprint.

When a client submits a graph we perform a few linear scans over the graph todetermine something like the number of descendants of each node (not quite,because it’s a DAG rather than a tree, but this is a close proxy). This numbercan be used to break ties and helps us to prioritize nodes with longer criticalpaths and nodes with many children. The actual algorithms used are somewhatmore complex and are described in detail in dask/order.py

Initial Task Placement

When a new large batch of tasks come in and there are many idle workers then wewant to give each worker a set of tasks that are close together/related andunrelated from the tasks given to other workers. This usually avoidsinter-worker communication down the line. The samedepth-first-with-child-weights priority given to workers described above canusually be used to properly segment the leaves of a graph into decently wellseparated sub-graphs with relatively low inter-sub-graph connectedness.

First-Come-First-Served, Coarsely

The last-in-first-out behavior used by the workers to minimize memory footprintcan distort the task order provided by the clients. Tasks submitted recentlymay run sooner than tasks submitted long ago because they happen to be moreconvenient given the current data in memory. This behavior can be unfair butimproves global runtimes and system efficiency, sometimes quite significantly.

However, workers inevitably run out of tasks that were related to tasks theywere just working on and the last-in-first-out policy eventually exhaustsitself. In these cases workers often pull tasks from the common task pool.The tasks in this pool are ordered in a first-come-first-served basis and soworkers do behave in a fair scheduling manner at a coarse level if not a finegrained one.

Dask’s scheduling policies are short-term-efficient and long-term-fair.

Where these decisions are made

The objectives above are mostly followed by small decisions made by the client,scheduler, and workers at various points in the computation.

  • As we submit a graph from the client to the scheduler we assign a numericpriority to each task of that graph. This priority focuses oncomputing deeply before broadly, preferring critical paths, preferringnodes with many dependencies, etc.. This is the same logic used by thesingle-machine scheduler and lives in dask/order.py.
  • When the graph reaches the scheduler the scheduler changes each of thesenumeric priorities into a tuple of two numbers, the first of which is anincreasing counter, the second of which is the client-generated prioritydescribed above. This per-graph counter encourages a first-in-first-outpolicy between computations. All tasks from a previous call to computehave a higher priority than all tasks from a subsequent call to compute (orsubmit, persist, map, or any operation that generates futures).
  • Whenever a task is ready to run the scheduler assigns it to a worker. Thescheduler does not wait based on priority.
  • However when the worker receives these tasks it considers their prioritieswhen determining which tasks to prioritize for communication or forcomputation. The worker maintains a heap of all ready-to-run tasks orderedby this priority.