API

Dask APIs generally follow from upstream APIs:

Additionally, Dask has its own functions to start computations, persist data inmemory, check progress, and so forth that complement the APIs above.These more general Dask functions are described below:

compute(*args, **kwargs)Compute several dask collections at once.
is_dask_collection(x)Returns True if x is a dask collection
optimize(*args, **kwargs)Optimize several dask collections at once.
persist(*args, **kwargs)Persist multiple Dask collections into memory
visualize(*args, **kwargs)Visualize several dask graphs at once.

These functions work with any scheduler. More advanced operations areavailable when using the newer scheduler and starting adask.distributed.Client (which, despite its name, runs nicely on asingle machine). This API provides the ability to submit, cancel, and trackwork asynchronously, and includes many functions for complex inter-taskworkflows. These are not necessary for normal operation, but can be useful forreal-time or advanced operation.

This more advanced API is available in the Dask distributed documentation

  • dask.compute(*args, **kwargs)
  • Compute several dask collections at once.

Parameters:

  • args:object
  • Any number of objects. If it is a dask object, it’s computed and theresult is returned. By default, python builtin collections are alsotraversed to look for dask objects (for more information see thetraverse keyword). Non-dask arguments are passed through unchanged.

  • traverse:bool, optional

  • By default dask traverses builtin python collections looking for daskobjects passed to compute. For large collections this can beexpensive. If none of the arguments contain any dask objects, settraverse=False to avoid doing this traversal.

  • scheduler:string, optional

  • Which scheduler to use like “threads”, “synchronous” or “processes”.If not provided, the default is to check the global settings first,and then fall back to the collection defaults.

  • optimize_graph:bool, optional

  • If True [default], the optimizations for each collection are appliedbefore computation. Otherwise the graph is run as is. This can beuseful for debugging.

  • kwargs

  • Extra keywords to forward to the scheduler function.

Examples

  1. >>> import dask.array as da
  2. >>> a = da.arange(10, chunks=2).sum()
  3. >>> b = da.arange(10, chunks=2).mean()
  4. >>> compute(a, b)
  5. (45, 4.5)

By default, dask objects inside python collections will also be computed:

  1. >>> compute({'a': a, 'b': b, 'c': 1}) # doctest: +SKIP
  2. ({'a': 45, 'b': 4.5, 'c': 1},)
  • dask.isdask_collection(_x)
  • Returns True if x is a dask collection
  • dask.optimize(*args, **kwargs)
  • Optimize several dask collections at once.

Returns equivalent dask collections that all share the same merged andoptimized underlying graph. This can be useful if converting multiplecollections to delayed objects, or to manually apply the optimizations atstrategic points.

Note that in most cases you shouldn’t need to call this method directly.

Parameters:

  • *args:objects
  • Any number of objects. If a dask object, its graph is optimized andmerged with all those of all other dask objects before returning anequivalent dask collection. Non-dask arguments are passed throughunchanged.

  • traverse:bool, optional

  • By default dask traverses builtin python collections looking for daskobjects passed to optimize. For large collections this can beexpensive. If none of the arguments contain any dask objects, settraverse=False to avoid doing this traversal.

  • optimizations:list of callables, optional

  • Additional optimization passes to perform.

  • **kwargs

  • Extra keyword arguments to forward to the optimization passes.

Examples

  1. >>> import dask.array as da
  2. >>> a = da.arange(10, chunks=2).sum()
  3. >>> b = da.arange(10, chunks=2).mean()
  4. >>> a2, b2 = optimize(a, b)
  1. >>> a2.compute() == a.compute()
  2. True
  3. >>> b2.compute() == b.compute()
  4. True
  • dask.persist(*args, **kwargs)
  • Persist multiple Dask collections into memory

This turns lazy Dask collections into Dask collections with the samemetadata, but now with their results fully computed or actively computingin the background.

For example a lazy dask.array built up from many lazy calls will now be adask.array of the same shape, dtype, chunks, etc., but now with all ofthose previously lazy tasks either computed in memory as many small numpy.array(in the single-machine case) or asynchronously running in thebackground on a cluster (in the distributed case).

This function operates differently if a dask.distributed.Client existsand is connected to a distributed scheduler. In this case this functionwill return as soon as the task graph has been submitted to the cluster,but before the computations have completed. Computations will continueasynchronously in the background. When using this function with the singlemachine scheduler it blocks until the computations have finished.

When using Dask on a single machine you should ensure that the dataset fitsentirely within memory.

Parameters:

  • *args: Dask collections
  • scheduler:string, optional
  • Which scheduler to use like “threads”, “synchronous” or “processes”.If not provided, the default is to check the global settings first,and then fall back to the collection defaults.

  • traverse:bool, optional

  • By default dask traverses builtin python collections looking for daskobjects passed to persist. For large collections this can beexpensive. If none of the arguments contain any dask objects, settraverse=False to avoid doing this traversal.

  • optimize_graph:bool, optional

  • If True [default], the graph is optimized before computation.Otherwise the graph is run as is. This can be useful for debugging.

  • **kwargs

  • Extra keywords to forward to the scheduler function.Returns:
  • New dask collections backed by in-memory data

Examples

  1. >>> df = dd.read_csv('/path/to/*.csv') # doctest: +SKIP
  2. >>> df = df[df.name == 'Alice'] # doctest: +SKIP
  3. >>> df['in-debt'] = df.balance < 0 # doctest: +SKIP
  4. >>> df = df.persist() # triggers computation # doctest: +SKIP
  1. >>> df.value().min() # future computations are now fast # doctest: +SKIP
  2. -10
  3. >>> df.value().max() # doctest: +SKIP
  4. 100
  1. >>> from dask import persist # use persist function on multiple collections
  2. >>> a, b = persist(a, b) # doctest: +SKIP
  • dask.visualize(*args, **kwargs)
  • Visualize several dask graphs at once.

Requires graphviz to be installed. All options that are not the daskgraph(s) should be passed as keyword arguments.

Parameters:

  • dsk:dict(s) or collection(s)
  • The dask graph(s) to visualize.

  • filename:str or None, optional

  • The name (without an extension) of the file to write to disk. Iffilename is None, no file will be written, and we communicatewith dot using only pipes.

  • format:{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’}, optional

  • Format in which to write output file. Default is ‘png’.

  • optimize_graph:bool, optional

  • If True, the graph is optimized before rendering. Otherwise,the graph is displayed as is. Default is False.

  • color: {None, ‘order’}, optional

  • Options to color nodes. Provide cmap= keyword for additionalcolormap

  • **kwargs

  • Additional keyword arguments to forward to to_graphviz.Returns:
  • result:IPython.diplay.Image, IPython.display.SVG, or None
  • See dask.dot.dot_graph for more information.

See also

  • dask.dot.dot_graph

Notes

For more information on optimization see here:

https://docs.dask.org/en/latest/optimize.html

Examples

  1. >>> x.visualize(filename='dask.pdf') # doctest: +SKIP
  2. >>> x.visualize(filename='dask.pdf', color='order') # doctest: +SKIP

Datasets

Dask has a few helpers for generating demo datasets

  • dask.datasets.makepeople(_npartitions=10, records_per_partition=1000, seed=None, locale='en')
  • Make a dataset of random people

This makes a Dask Bag with dictionary records of randomly generated people.This requires the optional library mimesis to generate records.

Parameters:

  • npartitions:int
  • Number of partitions

  • records_per_partition:int

  • Number of records in each partition

  • seed:int, (optional)

  • Random seed

  • locale:str

  • Language locale, like ‘en’, ‘fr’, ‘zh’, or ‘ru’Returns:
  • b: Dask Bag
  • dask.datasets.timeseries(start='2000-01-01', end='2000-01-31', freq='1s', partition_freq='1d', dtypes={'name': , 'id': , 'x': , 'y': }, seed=None, **kwargs)
  • Create timeseries dataframe with random data

Parameters:

  • start:datetime (or datetime-like string)
  • Start of time series

  • end:datetime (or datetime-like string)

  • End of time series

  • dtypes:dict

  • Mapping of column names to types.Valid types include {float, int, str, ‘category’}

  • freq:string

  • String like ‘2s’ or ‘1H’ or ‘12W’ for the time series frequency

  • partition_freq:string

  • String like ‘1M’ or ‘2Y’ to divide the dataframe into partitions

  • seed:int (optional)

  • Randomstate seed

  • kwargs:

  • Keywords to pass down to individual column creation functions.Keywords should be prefixed by the column name and then an underscore.

Examples

  1. >>> import dask
  2. >>> df = dask.datasets.timeseries()
  3. >>> df.head() # doctest: +SKIP
  4. timestamp id name x y
  5. 2000-01-01 00:00:00 967 Jerry -0.031348 -0.040633
  6. 2000-01-01 00:00:01 1066 Michael -0.262136 0.307107
  7. 2000-01-01 00:00:02 988 Wendy -0.526331 0.128641
  8. 2000-01-01 00:00:03 1016 Yvonne 0.620456 0.767270
  9. 2000-01-01 00:00:04 998 Ursula 0.684902 -0.463278
  10. >>> df = dask.datasets.timeseries(
  11. ... '2000', '2010',
  12. ... freq='2H', partition_freq='1D', seed=1, # data frequency
  13. ... dtypes={'value': float, 'name': str, 'id': int}, # data types
  14. ... id_lam=1000 # control number of items in id column
  15. ... )

Utilities

Dask has some public utility methods. These are primarily used for parsingconfiguration values.

  • dask.utils.formatbytes(_n)
  • Format bytes as text
  1. >>> format_bytes(1)
  2. '1 B'
  3. >>> format_bytes(1234)
  4. '1.23 kB'
  5. >>> format_bytes(12345678)
  6. '12.35 MB'
  7. >>> format_bytes(1234567890)
  8. '1.23 GB'
  9. >>> format_bytes(1234567890000)
  10. '1.23 TB'
  11. >>> format_bytes(1234567890000000)
  12. '1.23 PB'
  • dask.utils.formattime(_n)
  • format integers as time
  1. >>> format_time(1)
  2. '1.00 s'
  3. >>> format_time(0.001234)
  4. '1.23 ms'
  5. >>> format_time(0.00012345)
  6. '123.45 us'
  7. >>> format_time(123.456)
  8. '123.46 s'
  • dask.utils.parsebytes(_s)
  • Parse byte string to numbers
  1. >>> parse_bytes('100')
  2. 100
  3. >>> parse_bytes('100 MB')
  4. 100000000
  5. >>> parse_bytes('100M')
  6. 100000000
  7. >>> parse_bytes('5kB')
  8. 5000
  9. >>> parse_bytes('5.4 kB')
  10. 5400
  11. >>> parse_bytes('1kiB')
  12. 1024
  13. >>> parse_bytes('1e6')
  14. 1000000
  15. >>> parse_bytes('1e6 kB')
  16. 1000000000
  17. >>> parse_bytes('MB')
  18. 1000000
  19. >>> parse_bytes(123)
  20. 123
  21. >>> parse_bytes('5 foos') # doctest: +SKIP
  22. ValueError: Could not interpret 'foos' as a byte unit
  • dask.utils.parsetimedelta(_s, default='seconds')
  • Parse timedelta string to number of seconds

Examples

  1. >>> parse_timedelta('3s')
  2. 3
  3. >>> parse_timedelta('3.5 seconds')
  4. 3.5
  5. >>> parse_timedelta('300ms')
  6. 0.3
  7. >>> parse_timedelta(timedelta(seconds=3)) # also supports timedeltas
  8. 3