Work Stealing

Some tasks prefer to run on certain workers. This may be because that workerholds data dependencies of the task or because the user has expressed a loosedesire that the task run in a particular place. Occasionally this results in afew very busy workers and several idle workers. In this situation the idleworkers may choose to steal work from the busy workers, even if stealing workrequires the costly movement of data.

This is a performance optimization and not required for correctness. Workstealing provides robustness in many ad-hoc cases, but can also backfire whenwe steal the wrong tasks and reduce performance.

Criteria for stealing

Computation to Communication Ratio

Stealing is profitable when the computation time for a task is much longer thanthe communication time of the task’s dependencies.

Bad example

We do not want to steal tasks that require moving a large dependent piece ofdata across a wire from the victim to the thief if the computation is fast. Weend up spending far more time in communication than just waiting a bit longerand giving up on parallelism.

  1. [data] = client.scatter([np.arange(1000000000)])
  2. x = client.submit(np.sum, data)

Good example

We do want to steal task tasks that only need to move dependent pieces of data,especially when the computation time is expensive (here 100 seconds.)

  1. [data] = client.scatter([100])
  2. x = client.submit(sleep, data)

Fortunately we often know both the number of bytes of dependencies (asreported by calling sys.getsizeof on the workers) and the runtime cost ofpreviously seen functions, which is maintained as an exponentially weightedmoving average.

Saturated Worker Burden

Stealing may be profitable even when the computation-time to communication-timeratio is poor. This occurs when the saturated workers have a very long backlogof tasks and there are a large number of idle workers. We determine if itacceptable to steal a task if the last task to be run by the saturated workerswould finish more quickly if stolen or if it remains on the original/victimworker.

The longer the backlog of stealable tasks, and the smaller the number of activeworkers we have both increase our willingness to steal. This is balancedagainst the compute-to-communicate cost ratio.

It is also good long term if stealing causes highly-sought-after data to bereplicated on more workers.

Steal from the Rich

We would like to steal tasks from particularly over-burdened workers ratherthan workers with just a few excess tasks.

Restrictions

If a task has been specifically restricted to run on particular workers (suchas is the case when special hardware is required) then we do not steal.

Choosing tasks to steal

We maintain a list of sets of stealable tasks, ordered into bins bycomputation-to-communication time ratio. The first bin contains all tasks witha compute-to-communicate ratio greater than or equal to 8 (considered highenough to always steal), the next bin with a ratio of 4, the next bin with aratio of 2, etc.., all the way down to a ratio of 1/256, which we will neversteal.

This data structure provides a somewhat-ordered view of all stealable tasksthat we can add to and remove from in constant time, rather than log(n) aswith more traditional data structures, like a heap.

During any stage when we submit tasks to workers we check if there are bothidle and saturated workers and if so we quickly run through this list of sets,selecting tasks from the best buckets first, working our way down to thebuckets of less desirable stealable tasks. We stop either when there are nomore stealable tasks, no more idle workers, or when the quality of thetask-to-be-stolen is not high enough given the current backlog.

This approach is fast, optimizes to steal the tasks with the bestcomputation-to-communication cost ratio (up to a factor of two) and tends tosteal from the workers that have the largest backlogs, just by nature thatrandom selection tends to draw from the largest population.

Transactional Work Stealing

To avoid running the same task twice, Dask implements transactional workstealing. When the scheduler identifies a task that should be moved it firstsends a request to the busy worker. The worker inspects its current state ofthe task and sends a response to the scheduler:

  • If the task is not yet running, then the worker cancels the task andinforms the scheduler that it can reroute the task elsewhere.
  • If the task is already running or complete then the worker tells thescheduler that it should not replicate the task elsewhere.This avoids redundant work, and also the duplication of side effects for moreexotic tasks. However, concurrent or repeated execution of the same task isstill possible in the event of worker death or a disrupted network connection.

Disabling Work Stealing

Work stealing is a toggleable setting on the Dask Scheduler; to disablework stealing, you can toggle the scheduler work-stealing configurationoption to "False" either by setting DASK_DISTRIBUTEDSCHEDULERWORK_STEALING="False"or through your Dask configuration file