Managing Computation

Data and Computation in Dask.distributed are always in one of three states

  • Concrete values in local memory. Example include the integer 1 or anumpy array in the local process.
  • Lazy computations in a dask graph, perhaps stored in a dask.delayed ordask.dataframe object.
  • Running computations or remote data, represented by Future objectspointing to computations currently in flight.

All three of these forms are important and there are functions that convertbetween all three states.

Dask Collections to Concrete Values

You can turn any dask collection into a concrete value by calling the.compute() method or dask.compute(…) function. This function willblock until the computation is finished, going straight from a lazy daskcollection to a concrete value in local memory.

This approach is the most familiar and straightforward, especially for peoplecoming from the standard single-machine Dask experience or from just normalprogramming. It is great when you have data already in memory and want to getsmall fast results right to your local process.

  1. >>> df = dd.read_csv('s3://...')
  2. >>> df.value.sum().compute()
  3. 100000000

However, this approach often breaks down if you try to bring the entire datasetback to local RAM

  1. >>> df.compute()
  2. MemoryError(...)

It also forces you to wait until the computation finishes before handing backcontrol of the interpreter.

Dask Collections to Futures

You can asynchronously submit lazy dask graphs to run on the cluster with theclient.compute and client.persist methods. These functions return Future objectsimmediately. These futures can then be queried to determine the state of thecomputation.

client.compute

The .compute method takes a collection and returns a single future.

  1. >>> df = dd.read_csv('s3://...')
  2. >>> total = client.compute(df.sum()) # Return a single future
  3. >>> total
  4. Future(..., status='pending')
  5.  
  6. >>> total.result() # Block until finished
  7. 100000000

Because this is a single future the result must fit on a single worker machine.Like dask.compute above, the client.compute method is only appropriate whenresults are small and should fit in memory. The following would likely fail:

  1. >>> future = client.compute(df) # Blows up memory

Instead, you should use client.persist

client.persist

The .persist method submits the task graph behind the Dask collection tothe scheduler, obtaining Futures for all of the top-most tasks (for example oneFuture for each Pandas DataFrame in a Dask DataFrame). It then returns a copyof the collection pointing to these futures instead of the previous graph.This new collection is semantically equivalent but now points to activelyrunning data rather than a lazy graph. If you look at the dask graph withinthe collection you will see the Future objects directly:

  1. >>> df = dd.read_csv('s3://...')
  2. >>> df.dask # Recipe to compute df in chunks
  3. {('read', 0): (load_s3_bytes, ...),
  4. ('parse', 0): (pd.read_csv, ('read', 0)),
  5. ('read', 1): (load_s3_bytes, ...),
  6. ('parse', 1): (pd.read_csv, ('read', 1)),
  7. ...
  8. }
  9.  
  10. >>> df = client.persist(df) # Start computation
  11. >>> df.dask # Now points to running futures
  12. {('parse', 0): Future(..., status='finished'),
  13. ('parse', 1): Future(..., status='pending'),
  14. ...
  15. }

The collection is returned immediately and the computation happens in thebackground on the cluster. Eventually all of the futures of this collectionwill be completed at which point further queries on this collection will likelybe very fast.

Typically the workflow is to define a computation with a tool likedask.dataframe or dask.delayed until a point where you have a nicedataset to work from, then persist that collection to the cluster and thenperform many fast queries off of the resulting collection.

Concrete Values to Futures

We obtain futures through a few different ways. One is the mechanism above, bywrapping Futures within Dask collections. Another is by submitting data ortasks directly to the cluster with client.scatter, client.submit or client.map.

  1. futures = client.scatter(args) # Send data
  2. future = client.submit(function, *args, **kwargs) # Send single task
  3. futures = client.map(function, sequence, **kwargs) # Send many tasks

In this case args or *kwargs can be normal Python objects, like 1or 'hello', or they can be other Future objects if you want to linktasks together with dependencies.

Unlike Dask collections like dask.delayed these task submissions happenimmediately. The concurrent.futures interface is very similar to dask.delayedexcept that execution is immediate rather than lazy.

Futures to Concrete Values

You can turn an individual Future into a concrete value in the localprocess by calling the Future.result() method. You can convert acollection of futures into concrete values by calling the client.gather method.

  1. >>> future.result()
  2. 1
  3.  
  4. >>> client.gather(futures)
  5. [1, 2, 3, 4, ...]

Futures to Dask Collections

As seen in the Collection to futures section it is common to have currentlycomputing Future objects within Dask graphs. This lets us build furthercomputations on top of currently running computations. This is most often donewith dask.delayed workflows on custom computations:

  1. >>> x = delayed(sum)(futures)
  2. >>> y = delayed(product)(futures)
  3. >>> future = client.compute(x + y)

Mixing the two forms allow you to build and submit a computation in stages likesum(…) + product(…). This is often valuable if you want to wait to seethe values of certain parts of the computation before determining how toproceed. Submitting many computations at once allows the scheduler to beslightly more intelligent when determining what gets run.

If this page interests you then you may also want to check out the doc pageonManaging Memory