User Interfaces

Dask supports several user interfaces:

Each of these user interfaces employs the same underlying parallel computingmachinery, and so has the same scaling, diagnostics, resilience, and so on, buteach provides a different set of parallel algorithms and programming style.

This document helps you to decide which user interface best suits your needs,and gives some general information that applies to all interfaces.The pages linked above give more information about each interface in greaterdepth.

High-Level Collections

Many people who start using Dask are explicitly looking for a scalable version ofNumPy, Pandas, or Scikit-Learn. For these situations, the starting point withinDask is usually fairly clear. If you want scalable NumPy arrays, then start with Daskarray; if you want scalable Pandas DataFrames, then start with Dask DataFrame, and so on.

These high-level interfaces copy the standard interface with slight variations.These interfaces automatically parallelize over larger datasets for you for alarge subset of the API from the original project.

  1. # Arrays
  2. import dask.array as da
  3. x = da.random.uniform(low=0, high=10, size=(10000, 10000), # normal numpy code
  4. chunks=(1000, 1000)) # break into chunks of size 1000x1000
  5.  
  6. y = x + x.T - x.mean(axis=0) # Use normal syntax for high level algorithms
  7.  
  8. # DataFrames
  9. import dask.dataframe as dd
  10. df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp', # normal Pandas code
  11. blocksize=64000000) # break text into 64MB chunks
  12.  
  13. s = df.groupby('name').balance.mean() # Use normal syntax for high level algorithms
  14.  
  15. # Bags / lists
  16. import dask.bag as db
  17. b = db.read_text('*.json').map(json.loads)
  18. total = (b.filter(lambda d: d['name'] == 'Alice')
  19. .map(lambda d: d['balance'])
  20. .sum())

It is important to remember that, while APIs may be similar, some differences doexist. Additionally, the performance of some algorithms may differ from theirin-memory counterparts due to the advantages and disadvantages of parallelprogramming. Some thought and attention is still required when using Dask.

Low-Level Interfaces

Often when parallelizing existing code bases or building custom algorithms, yourun into code that is parallelizable, but isn’t just a big DataFrame or array.Consider the for-loopy code below:

  1. results = []
  2. for a in A:
  3. for b in B:
  4. if a < b:
  5. c = f(a, b)
  6. else:
  7. c = g(a, b)
  8. results.append(c)

There is potential parallelism in this code (the many calls to f and gcan be done in parallel), but it’s not clear how to rewrite it into a bigarray or DataFrame so that it can use a higher-level API. Even if you couldrewrite it into one of these paradigms, it’s not clear that this would be agood idea. Much of the meaning would likely be lost in translation, and thisprocess would become much more difficult for more complex systems.

Instead, Dask’s lower-level APIs let you write parallel code one function callat a time within the context of your existing for loops. A common solutionhere is to use Dask delayed to wrap individual function callsinto a lazily constructed task graph:

  1. import dask
  2.  
  3. lazy_results = []
  4. for a in A:
  5. for b in B:
  6. if a < b:
  7. c = dask.delayed(f)(a, b) # add lazy task
  8. else:
  9. c = dask.delayed(g)(a, b) # add lazy task
  10. lazy_results.append(c)
  11.  
  12. results = dask.compute(*lazy_results) # compute all in parallel

Combining High- and Low-Level Interfaces

It is common to combine high- and low-level interfaces.For example, you might use Dask array/bag/dataframe to load in data and doinitial pre-processing, then switch to Dask delayed for a custom algorithm thatis specific to your domain, then switch back to Dask array/dataframe to cleanup and store results. Understanding both sets of user interfaces, and howto switch between them, can be a productive combination.

  1. # Convert to a list of delayed Pandas dataframes
  2. delayed_values = df.to_delayed()
  3.  
  4. # Manipulate delayed values arbitrarily as you like
  5.  
  6. # Convert many delayed Pandas DataFrames back to a single Dask DataFrame
  7. df = dd.from_delayed(delayed_values)

Laziness and Computing

Most Dask user interfaces are lazy, meaning that they do not evaluate untilyou explicitly ask for a result using the compute method:

  1. # This array syntax doesn't cause computation
  2. y = x + x.T - x.mean(axis=0)
  3.  
  4. # Trigger computation by explicitly calling the compute method
  5. y = y.compute()

If you have multiple results that you want to compute at the same time, use thedask.compute function. This can share intermediate results and so be moreefficient:

  1. # compute multiple results at the same time with the compute function
  2. min, max = dask.compute(y.min(), y.max())

Note that the compute() function returns in-memory results. It convertsDask DataFrames to Pandas DataFrames, Dask arrays to NumPy arrays, and Daskbags to lists. You should only call compute on results that will fitcomfortably in memory. If your result does not fit in memory, then you mightconsider writing it to disk instead.

  1. # Write larger results out to disk rather than store them in memory
  2. my_dask_dataframe.to_parquet('myfile.parquet')
  3. my_dask_array.to_hdf5('myfile.hdf5')
  4. my_dask_bag.to_textfiles('myfile.*.txt')

Persist into Distributed Memory

Alternatively, if you are on a cluster, then you may want to trigger acomputation and store the results in distributed memory. In this case you donot want to call compute, which would create a single Pandas, NumPy, orlist result. Instead, you want to call persist, which returns a new Daskobject that points to actively computing, or already computed results spreadaround your cluster’s memory.

  1. # Compute returns an in-memory non-Dask object
  2. y = y.compute()
  3.  
  4. # Persist returns an in-memory Dask object that uses distributed storage if available
  5. y = y.persist()

This is common to see after data loading an preprocessing steps, but beforerapid iteration, exploration, or complex algorithms. For example, we might readin a lot of data, filter down to a more manageable subset, and then persistdata into memory so that we can iterate quickly.

  1. import dask.dataframe as dd
  2. df = dd.read_parquet('...')
  3. df = df[df.name == 'Alice'] # select important subset of data
  4. df = df.persist() # trigger computation in the background
  5.  
  6. # These are all relatively fast now that the relevant data is in memory
  7. df.groupby(df.id).balance.sum().compute() # explore data quickly
  8. df.groupby(df.id).balance.mean().compute() # explore data quickly
  9. df.id.nunique() # explore data quickly

Lazy vs Immediate

As mentioned above, most Dask workloads are lazy, that is, they don’t start anywork until you explicitly trigger them with a call to compute().However, sometimes you do want to submit work as quickly as possible, track itover time, submit new work or cancel work depending on partial results, and soon. This can be useful when tracking or responding to real-time events,handling streaming data, or when building complex and adaptive algorithms.

For these situations, people typically turn to the futures interface which is a low-level interface like Dask delayed, but operatesimmediately rather than lazily.

Here is the same example with Dask delayed and Dask futures to illustrate thedifference.

Delayed: Lazy

  1. @dask.delayeddef inc(x): return x + 1

  2. @dask.delayeddef add(x, y): return x + y

  3. a = inc(1) # no work has happened yetb = inc(2) # no work has happened yetc = add(a, b) # no work has happened yet

  4. c = c.compute() # This triggers all of the above computations

Futures: Immediate

  1. from dask.distributed import Client
  2. client = Client()
  3.  
  4. def inc(x):
  5. return x + 1
  6.  
  7. def add(x, y):
  8. return x + y
  9.  
  10. a = client.submit(inc, 1) # work starts immediately
  11. b = client.submit(inc, 2) # work starts immediately
  12. c = client.submit(add, a, b) # work starts immediately
  13.  
  14. c = c.result() # block until work finishes, then gather result

You can also trigger work with the high-level collections using thepersist function. This will cause work to happen in the background whenusing the distributed scheduler.

Combining Interfaces

There are established ways to combine the interfaces above:

  • The high-level interfaces (array, bag, dataframe) have a to_delayedmethod that can convert to a sequence (or grid) of Dask delayed objects
  1. delayeds = df.to_delayed()
  • The high-level interfaces (array, bag, dataframe) have a fromdelayedmethod that can convert from either Delayed _or Future objects
  1. df = dd.from_delayed(delayeds)
  2. df = dd.from_delayed(futures)
  • The Client.compute method converts Delayed objects into Futures
  1. futures = client.compute(delayeds)
  • The dask.distributed.futures_of function gathers futures frompersisted collections
  1. from dask.distributed import futures_of
  2.  
  3. df = df.persist() # start computation in the background
  4. futures = futures_of(df)
  • The Dask.delayed object converts Futures into delayed objects
  1. delayed_value = dask.delayed(future)

The approaches above should suffice to convert any interface into any other.We often see some anti-patterns that do not work as well:

  • Calling low-level APIs (delayed or futures) on high-level objects (likeDask arrays or DataFrames). This downgrades those objects to their NumPy orPandas equivalents, which may not be desired.Often people are looking for APIs like dask.array.map_blocks ordask.dataframe.map_partitions instead.
  • Calling compute() on Future objects.Often people want the .result() method instead.
  • Calling NumPy/Pandas functions on high-level Dask objects orhigh-level Dask functions on NumPy/Pandas objects

Conclusion

Most people who use Dask start with only one of the interfaces above buteventually learn how to use a few interfaces together. This helps themleverage the sophisticated algorithms in the high-level interfaces while alsoworking around tricky problems with the low-level interfaces.

For more information, see the documentation for the particular user interfacesbelow: