Task Graphs

Internally, Dask encodes algorithms in a simple format involving Python dicts,tuples, and functions. This graph format can be used in isolation from thedask collections. Working directly with dask graphs is rare, unless you intendto develop new modules with Dask. Even then, dask.delayed isoften a better choice. If you are a core developer, then you should start here.

Motivation

Normally, humans write programs and then compilers/interpreters interpret them(for example, python, javac, clang). Sometimes humans disagree with howthese compilers/interpreters choose to interpret and execute their programs.In these cases, humans often bring the analysis, optimization, and execution ofcode into the code itself.

Commonly a desire for parallel execution causes this shift of responsibilityfrom compiler to human developer. In these cases, we often represent thestructure of our program explicitly as data within the program itself.

A common approach to parallel execution in user-space is task scheduling. Intask scheduling we break our program into many medium-sized tasks or units ofcomputation, often a function call on a non-trivial amount of data. Werepresent these tasks as nodes in a graph with edges between nodes if one taskdepends on data produced by another. We call upon a task scheduler toexecute this graph in a way that respects these data dependencies and leveragesparallelism where possible, multiple independent tasks can be runsimultaneously.

Many solutions exist. This is a common approach in parallel executionframeworks. Often task scheduling logic hides within other larger frameworks(Luigi, Storm, Spark, IPython Parallel, and so on) and so is often reinvented.

Dask is a specification that encodes task schedules with minimal incidentalcomplexity using terms common to all Python projects, namely dicts, tuples,and callables. Ideally this minimum solution is easy to adopt and understandby a broad community.

Example

A simple dask dictionaryConsider the following simple program:

  1. def inc(i):
  2. return i + 1
  3.  
  4. def add(a, b):
  5. return a + b
  6.  
  7. x = 1
  8. y = inc(x)
  9. z = add(y, 10)

We encode this as a dictionary in the following way:

  1. d = {'x': 1,
  2. 'y': (inc, 'x'),
  3. 'z': (add, 'y', 10)}

While less pleasant than our original code, this representation can be analyzedand executed by other Python code, not just the CPython interpreter. We don’trecommend that users write code in this way, but rather that it is anappropriate target for automated systems. Also, in non-toy examples, theexecution times are likely much larger than for inc and add, warrantingthe extra complexity.

Schedulers

The Dask library currently contains a few schedulers to execute thesegraphs. Each scheduler works differently, providing different performanceguarantees and operating in different contexts. These implementations are notspecial and others can write different schedulers better suited to otherapplications or architectures easily. Systems that emit dask graphs (likeDask Array, Dask Bag, and so on) may leverage the appropriate scheduler forthe application and hardware.

Task Expectations

When a task is submitted to Dask for execution, there are a number of assumptionsthat are made about that task.

Don’t Modify Data In-Place

In general, tasks with side-effects that alter the state of a future in-placeare not recommended. Modifying data that is stored in Dask in-place can haveunintended consequences. For example, consider a workflow involving a Numpyarray:

  1. from dask.distributed import Client
  2. import numpy as np
  3.  
  4. client = Client()
  5. x = client.submit(np.arange, 10) # [0, 1, 2, 3, ...]
  6.  
  7. def f(arr):
  8. arr[arr > 5] = 0 # modifies input directly without making a copy
  9. arr += 1 # modifies input directly without making a copy
  10. return arr
  11.  
  12. y = client.submit(f, x)

In the example above Dask will update the values of the Numpy arrayx in-place. While efficient, this behavior can have unintended consequences,particularly if other tasks need to use x, or if Dask needs to rerun thiscomputation multiple times because of worker failure.

Avoid Holding the GIL

Some Python functions that wrap external C/C++ code can hold onto the GIL,which stops other Python code from running in the background. This istroublesome because while Dask workers run your function, they also need tocommunicate to each other in the background.

If you wrap external code then please try to release the GIL. This is usuallyeasy to do if you are using any of the common solutions to code-wrapping likeCython, Numba, ctypes or others.