Scheduling

All of the large-scale Dask collections likeDask Array, Dask DataFrame, and Dask Bagand the fine-grained APIs like delayed and futuresgenerate task graphs where each node in the graph is a normal Python functionand edges between nodes are normal Python objectsthat are created by one task as outputs and used as inputs in another task.After Dask generates these task graphs, it needs to execute them on parallel hardware.This is the job of a task scheduler.Different task schedulers exist, and each will consume a task graph and compute thesame result, but with different performance characteristics.Dask collections and schedulersDask has two families of task schedulers:

  • Single machine scheduler: This scheduler provides basic features on alocal process or thread pool. This scheduler was made first and is thedefault. It is simple and cheap to use, although it can only be used ona single machine and does not scale
  • Distributed scheduler: This scheduler is more sophisticated, offersmore features, but also requires a bit more effort to set up. It canrun locally or distributed across a clusterFor different computations you may find better performance with particular scheduler settings.This document helps you understand how to choose between and configure different schedulers,and provides guidelines on when one might be more appropriate.

Local Threads

  1. import dask
  2. dask.config.set(scheduler='threads') # overwrite default with threaded scheduler

The threaded scheduler executes computations with a local multiprocessing.pool.ThreadPool.It is lightweight and requires no setup.It introduces very little task overhead (around 50us per task)and, because everything occurs in the same process,it incurs no costs to transfer data between tasks.However, due to Python’s Global Interpreter Lock (GIL),this scheduler only provides parallelism when your computation is dominated by non-Python code,as is primarily the case when operating on numeric data in NumPy arrays, Pandas DataFrames,or using any of the other C/C++/Cython based projects in the ecosystem.

The threaded scheduler is the default choice forDask Array, Dask DataFrame, and Dask Delayed.However, if your computation is dominated by processing pure Python objectslike strings, dicts, or lists,then you may want to try one of the process-based schedulers below(we currently recommend the distributed scheduler on a local machine).

Local Processes

Note

The distributed scheduler described a couple sections below is often a better choice today.We encourage readers to continue reading after this section.

  1. import dask.multiprocessing
  2. dask.config.set(scheduler='processes') # overwrite default with multiprocessing scheduler

The multiprocessing scheduler executes computations with a local multiprocessing.Pool.It is lightweight to use and requires no setup.Every task and all of its dependencies are shipped to a local process,executed, and then their result is shipped back to the main process.This means that it is able to bypass issues with the GIL and provide parallelismeven on computations that are dominated by pure Python code,such as those that process strings, dicts, and lists.

However, moving data to remote processes and back can introduce performance penalties,particularly when the data being transferred between processes is large.The multiprocessing scheduler is an excellent choice when workflows are relatively linear,and so does not involve significant inter-task data transferas well as when inputs and outputs are both small, like filenames and counts.

This is common in basic data ingestion workloads,such as those are common in Dask Bag,where the multiprocessing scheduler is the default:

  1. >>> import dask.bag as db
  2. >>> db.read_text('*.json').map(json.loads).pluck('name').frequencies().compute()
  3. {'alice': 100, 'bob': 200, 'charlie': 300}

For more complex workloads,where large intermediate results may be depended upon by multiple downstream tasks,we generally recommend the use of the distributed scheduler on a local machine.The distributed scheduler is more intelligent about moving around large intermediate results.

Single Thread

  1. import dask
  2. dask.config.set(scheduler='synchronous') # overwrite default with single-threaded scheduler

The single-threaded synchronous scheduler executes all computations in the local threadwith no parallelism at all.This is particularly valuable for debugging and profiling,which are more difficult when using threads or processes.

For example, when using IPython or Jupyter notebooks, the %debug, %pdb, or %prun magicswill not work well when using the parallel Dask schedulers(they were not designed to be used in a parallel computing context).However, if you run into an exception and want to step into the debugger,you may wish to rerun your computation under the single-threaded schedulerwhere these tools will function properly.

Dask Distributed (local)

  1. from dask.distributed import Client
  2. client = Client()
  3. # or
  4. client = Client(processes=False)

The Dask distributed scheduler can either be setup on a clusteror run locally on a personal machine. Despite having the name “distributed”,it is often pragmatic on local machines for a few reasons:

  • It provides access to asynchronous API, notably Futures
  • It provides a diagnostic dashboard that can provide valuable insight onperformance and progress
  • It handles data locality with more sophistication, and so can be moreefficient than the multiprocessing scheduler on workloads that requiremultiple processesYou can read more about using the Dask distributed scheduler on a single machine inthese docs.

Dask Distributed (Cluster)

You can also run Dask on a distributed cluster.There are a variety of ways to set this up depending on your cluster.We recommend referring to the setup documentation for more information.

Configuration

You can configure the global default scheduler by using the dask.config.set(scheduler…) command.This can be done globally:

  1. dask.config.set(scheduler='threads')
  2.  
  3. x.compute()

or as a context manager:

  1. with dask.config.set(scheduler='threads'):
  2. x.compute()

or within a single compute call:

  1. x.compute(scheduler='threads')

Additionally some of the scheduler support other keyword arguments.For example, the pool-based single-machine scheduler allows you to provide custom poolsor specify the desired number of workers:

  1. from multiprocessing.pool import ThreadPool
  2. with dask.config.set(pool=ThreadPool(4)):
  3. ...
  4.  
  5. with dask.config.set(num_workers=4):
  6. ...