Delayed

Sometimes problems don’t fit into one of the collections like dask.array ordask.dataframe. In these cases, users can parallelize custom algorithmsusing the simpler dask.delayed interface. This allows one to create graphsdirectly with a light annotation of normal python code:

  1. >>> x = dask.delayed(inc)(1)
  2. >>> y = dask.delayed(inc)(2)
  3. >>> z = dask.delayed(add)(x, y)
  4. >>> z.compute()
  5. 5
  6. >>> z.visualize()

simple task graph created with dask.delayed

Example

Visit https://examples.dask.org/delayed.html to see and run examples using DaskDelayed.

Sometimes we face problems that are parallelizable, but don’t fit into high-levelabstractions like Dask Array or Dask DataFrame. Consider the following example:

  1. def inc(x):
  2. return x + 1
  3.  
  4. def double(x):
  5. return x + 2
  6.  
  7. def add(x, y):
  8. return x + y
  9.  
  10. data = [1, 2, 3, 4, 5]
  11.  
  12. output = []
  13. for x in data:
  14. a = inc(x)
  15. b = double(x)
  16. c = add(a, b)
  17. output.append(c)
  18.  
  19. total = sum(output)

There is clearly parallelism in this problem (many of the inc,double, and add functions can evaluate independently), but it’s notclear how to convert this to a big array or big DataFrame computation.

As written, this code runs sequentially in a single thread. However, we see thata lot of this could be executed in parallel.

The Dask delayed function decorates your functions so that they operatelazily. Rather than executing your function immediately, it will deferexecution, placing the function and its arguments into a task graph.

delayed([obj, name, pure, nout, traverse])Wraps a function or object to produce a Delayed.

We slightly modify our code by wrapping functions in delayed.This delays the execution of the function and generates a Dask graph instead:

  1. import dask
  2.  
  3. output = []
  4. for x in data:
  5. a = dask.delayed(inc)(x)
  6. b = dask.delayed(double)(x)
  7. c = dask.delayed(add)(a, b)
  8. output.append(c)
  9.  
  10. total = dask.delayed(sum)(output)

We used the dask.delayed function to wrap the function calls that we wantto turn into tasks. None of the inc, double, add, or sum callshave happened yet. Instead, the object total is a Delayed result thatcontains a task graph of the entire computation. Looking at the graph we seeclear opportunities for parallel execution. The Dask schedulers will exploitthis parallelism, generally improving performance (although not in thisexample, because these functions are already very small and fast.)

  1. total.visualize() # see image to the right

simple task graph created with dask.delayed

We can now compute this lazy result to execute the graph in parallel:

  1. >>> total.compute()
  2. 45

Decorator

It is also common to see the delayed function used as a decorator. Here is areproduction of our original problem as a parallel code:

  1. import dask
  2.  
  3. @dask.delayed
  4. def inc(x):
  5. return x + 1
  6.  
  7. @dask.delayed
  8. def double(x):
  9. return x + 2
  10.  
  11. @dask.delayed
  12. def add(x, y):
  13. return x + y
  14.  
  15. data = [1, 2, 3, 4, 5]
  16.  
  17. output = []
  18. for x in data:
  19. a = inc(x)
  20. b = double(x)
  21. c = add(a, b)
  22. output.append(c)
  23.  
  24. total = dask.delayed(sum)(output)

Real time

Sometimes you want to create and destroy work during execution, launch tasksfrom other tasks, etc. For this, see the Futures interface.

Best Practices

For a list of common problems and recommendations see Delayed BestPractices.