Diagnostics (local)

Profiling parallel code can be challenging, but dask.diagnostics providesfunctionality to aid in profiling and inspecting execution with thelocal task scheduler.

This page describes the following few built-in options:

  • ProgressBar
  • Profiler
  • ResourceProfiler
  • CacheProfilerFurthermore, this page then provides instructions on how to build your own custom diagnostic.

Progress Bar

ProgressBar([minimum, width, dt, out])A progress bar for dask.

The ProgressBar class builds on the scheduler callbacks described above todisplay a progress bar in the terminal or notebook during computation. This cangive a nice feedback during long running graph execution. It can be used as acontext manager around calls to get or compute to profile thecomputation:

  1. >>> from dask.diagnostics import ProgressBar
  2. >>> a = da.random.normal(size=(10000, 10000), chunks=(1000, 1000))
  3. >>> res = a.dot(a.T).mean(axis=0)
  4.  
  5. >>> with ProgressBar():
  6. ... out = res.compute()
  7. [########################################] | 100% Completed | 17.1 s

or registered globally using the register method:

  1. >>> pbar = ProgressBar()
  2. >>> pbar.register()
  3. >>> out = res.compute()
  4. [########################################] | 100% Completed | 17.1 s

To unregister from the global callbacks, call the unregister method:

  1. >>> pbar.unregister()

Profiler

Profiler()A profiler for dask execution at the task level.

Dask provides a few tools for profiling execution. As with the ProgressBar,they each can be used as context managers or registered globally.

The Profiler class is used to profile Dask’s execution at the task level.During execution, it records the following information for each task:

  • Key
  • Task
  • Start time in seconds since the epoch
  • Finish time in seconds since the epoch
  • Worker id

ResourceProfiler

ResourceProfiler([dt])A profiler for resource use.

The ResourceProfiler class is used to profile Dask’s execution at theresource level. During execution, it records the following informationfor each timestep:

  • Time in seconds since the epoch
  • Memory usage in MB
  • % CPU usageThe default timestep is 1 second, but can be set manually using the dtkeyword:
  1. >>> from dask.diagnostics import ResourceProfiler
  2. >>> rprof = ResourceProfiler(dt=0.5)

CacheProfiler

CacheProfiler([metric, metric_name])A profiler for dask execution at the scheduler cache level.

The CacheProfiler class is used to profile Dask’s execution at the schedulercache level. During execution, it records the following information for eachtask:

  • Key
  • Task
  • Size metric
  • Cache entry time in seconds since the epoch
  • Cache exit time in seconds since the epochHere the size metric is the output of a function called on the result of eachtask. The default metric is to count each task (metric is 1 for all tasks).Other functions may be used as a metric instead through the metric keyword.For example, the nbytes function found in cachey can be used to measurethe number of bytes in the scheduler cache:
  1. >>> from dask.diagnostics import CacheProfiler
  2. >>> from cachey import nbytes
  3. >>> cprof = CacheProfiler(metric=nbytes)

Example

As an example to demonstrate using the diagnostics, we’ll profile some linearalgebra done with Dask Array. We’ll create a random array, take its QRdecomposition, and then reconstruct the initial array by multiplying the Q andR components together. Note that since the profilers (and all diagnostics) arejust context managers, multiple profilers can be used in a with block:

  1. >>> import dask.array as da
  2. >>> from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
  3. >>> a = da.random.random(size=(10000, 1000), chunks=(1000, 1000))
  4. >>> q, r = da.linalg.qr(a)
  5. >>> a2 = q.dot(r)
  6.  
  7. >>> with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof,
  8. ... CacheProfiler() as cprof:
  9. ... out = a2.compute()

The results of each profiler are stored in their results attribute as alist of namedtuple objects:

  1. >>> prof.results[0]
  2. TaskData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 5, 0),
  3. task=(qr, (_apply_random, 'random_sample', 1060164455, (1000, 1000), (), {})),
  4. start_time=1454368444.493292,
  5. end_time=1454368444.902987,
  6. worker_id=4466937856)
  7.  
  8. >>> rprof.results[0]
  9. ResourceData(time=1454368444.078748, mem=74.100736, cpu=0.0)
  10.  
  11. >>> cprof.results[0]
  12. CacheData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 7, 0),
  13. task=(qr, (_apply_random, 'random_sample', 1310656009, (1000, 1000), (), {})),
  14. metric=1,
  15. cache_time=1454368444.49662,
  16. free_time=1454368446.769452)

These can be analyzed separately or viewed in a bokeh plot using the providedvisualize method on each profiler:

  1. >>> prof.visualize()

To view multiple profilers at the same time, the dask.diagnostics.visualizefunction can be used. This takes a list of profilers and creates a verticalstack of plots aligned along the x-axis:

  1. >>> from dask.diagnostics import visualize
  2. >>> visualize([prof, rprof, cprof])

Looking at the above figure, from top to bottom:

  • The results from the Profiler object: This shows the execution time foreach task as a rectangle, organized along the y-axis by worker (in this casethreads). Similar tasks are grouped by color and, by hovering over each task,one can see the key and task that each block represents.
  • The results from the ResourceProfiler object: This shows two lines, onefor total CPU percentage used by all the workers, and one for total memoryusage.
  • The results from the CacheProfiler object: This shows a line for eachtask group, plotting the sum of the current metric in the cache againsttime. In this case it’s the default metric (count) and the lines representthe number of each object in the cache at time. Note that the grouping andcoloring is the same as for the Profiler plot, and that the taskrepresented by each line can be found by hovering over the line.From these plots we can see that the initial tasks (calls tonumpy.random.random and numpy.linalg.qr for each chunk) are runconcurrently, but only use slightly more than 100% CPU. This is because thecall to numpy.linalg.qr currently doesn’t release the Global InterpreterLock (GIL), so those calls can’t truly be done in parallel. Next, there’s a reductionstep where all the blocks are combined. This requires all the results from thefirst step to be held in memory, as shown by the increased number of results inthe cache, and increase in memory usage. Immediately after this task ends, thenumber of elements in the cache decreases, showing that they were only neededfor this step. Finally, there’s an interleaved set of calls to dot andsum. Looking at the CPU plot, it shows that these run both concurrently and inparallel, as the CPU percentage spikes up to around 350%.

Custom Callbacks

Callback([start, start_state, pretask, …])Base class for using the callback mechanism

Schedulers based on dask.local.get_async (currentlydask.get, dask.threaded.get, and dask.multiprocessing.get)accept five callbacks, allowing for inspection of scheduler execution.

The callbacks are:

  1. start(dsk): Run at the beginning of execution, right before thestate is initialized. Receives the Dask graph

  2. start_state(dsk, state): Run at the beginning of execution, rightafter the state is initialized. Receives the Dask graph and scheduler state

  3. pretask(key, dsk, state): Run every time a new task is started.Receives the key of the task to be run, the Dask graph, and the scheduler state

  4. posttask(key, result, dsk, state, id): Run every time a task is finished.Receives the key of the task that just completed, the result, the Dask graph,the scheduler state, and the id of the worker that ran the task

  5. finish(dsk, state, errored): Run at the end of execution, right before theresult is returned. Receives the Dask graph, the scheduler state, and a booleanindicating whether or not the exit was due to an error

Custom diagnostics can be created either by instantiating the Callbackclass with the some of the above methods as keywords or by subclassing theCallback class.Here we create a class that prints the name of every key as it’s computed:

  1. from dask.callbacks import Callback
  2. class PrintKeys(Callback):
  3. def _pretask(self, key, dask, state):
  4. """Print the key of every task as it's started"""
  5. print("Computing: {0}!".format(repr(key)))

This can now be used as a context manager during computation:

  1. >>> from operator import add, mul
  2. >>> dsk = {'a': (add, 1, 2), 'b': (add, 3, 'a'), 'c': (mul, 'a', 'b')}
  3.  
  4. >>> with PrintKeys():
  5. ... get(dsk, 'c')
  6. Computing 'a'!
  7. Computing 'b'!
  8. Computing 'c'!

Alternatively, functions may be passed in as keyword arguments to Callback:

  1. >>> def printkeys(key, dask, state):
  2. ... print("Computing: {0}!".format(repr(key)))
  3.  
  4. >>> with Callback(pretask=printkeys):
  5. ... get(dsk, 'c')
  6. Computing 'a'!
  7. Computing 'b'!
  8. Computing 'c'!

API

CacheProfiler([metric, metric_name])A profiler for dask execution at the scheduler cache level.
Callback([start, start_state, pretask, …])Base class for using the callback mechanism
Profiler()A profiler for dask execution at the task level.
ProgressBar([minimum, width, dt, out])A progress bar for dask.
ResourceProfiler([dt])A profiler for resource use.
visualize(profilers[, file_path, show, save])Visualize the results of profiling in a bokeh plot.
  • dask.diagnostics.ProgressBar(minimum=0, width=40, dt=0.1, out=None)
  • A progress bar for dask.

Parameters:

  • minimum:int, optional
  • Minimum time threshold in seconds before displaying a progress bar.Default is 0 (always display)

  • width:int, optional

  • Width of the bar

  • dt:float, optional

  • Update resolution in seconds, default is 0.1 seconds

Examples

Below we create a progress bar with a minimum threshold of 1 second beforedisplaying. For cheap computations nothing is shown:

  1. >>> with ProgressBar(minimum=1.0): # doctest: +SKIP
  2. ... out = some_fast_computation.compute()

But for expensive computations a full progress bar is displayed:

  1. >>> with ProgressBar(minimum=1.0): # doctest: +SKIP
  2. ... out = some_slow_computation.compute()
  3. [########################################] | 100% Completed | 10.4 s

The duration of the last computation is available as an attribute

  1. >>> pbar = ProgressBar()
  2. >>> with pbar: # doctest: +SKIP
  3. ... out = some_computation.compute()
  4. [########################################] | 100% Completed | 10.4 s
  5. >>> pbar.last_duration # doctest: +SKIP
  6. 10.4

You can also register a progress bar so that it displays for allcomputations:

  1. >>> pbar = ProgressBar() # doctest: +SKIP
  2. >>> pbar.register() # doctest: +SKIP
  3. >>> some_slow_computation.compute() # doctest: +SKIP
  4. [########################################] | 100% Completed | 10.4 s
  • dask.diagnostics.Profiler()
  • A profiler for dask execution at the task level.

    • Records the following information for each task:
      • Key
      • Task
      • Start time in seconds since the epoch
      • Finish time in seconds since the epoch
      • Worker idExamples
  1. >>> from operator import add, mul
  2. >>> from dask.threaded import get
  3. >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
  4. >>> with Profiler() as prof:
  5. ... get(dsk, 'z')
  6. 22
  1. >>> prof.results # doctest: +SKIP
  2. [('y', (add, 'x', 10), 1435352238.48039, 1435352238.480655, 140285575100160),
  3. ('z', (mul, 'y', 2), 1435352238.480657, 1435352238.480803, 140285566707456)]

These results can be visualized in a bokeh plot using the visualizemethod. Note that this requires bokeh to be installed.

  1. >>> prof.visualize() # doctest: +SKIP

You can activate the profiler globally

  1. >>> prof.register() # doctest: +SKIP

If you use the profiler globally you will need to clear out old resultsmanually.

  1. >>> prof.clear()
  • dask.diagnostics.ResourceProfiler(dt=1)
  • A profiler for resource use.

    • Records the following each timestep
      • Time in seconds since the epoch
      • Memory usage in MB
      • % CPU usageExamples
  1. >>> from operator import add, mul
  2. >>> from dask.threaded import get
  3. >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
  4. >>> with ResourceProfiler() as prof: # doctest: +SKIP
  5. ... get(dsk, 'z')
  6. 22

These results can be visualized in a bokeh plot using the visualizemethod. Note that this requires bokeh to be installed.

  1. >>> prof.visualize() # doctest: +SKIP

You can activate the profiler globally

  1. >>> prof.register() # doctest: +SKIP

If you use the profiler globally you will need to clear out old resultsmanually.

  1. >>> prof.clear() # doctest: +SKIP

Note that when used as a context manager data will be collected throughoutthe duration of the enclosed block. In contrast, when registered globallydata will only be collected while a dask scheduler is active.

  • dask.diagnostics.CacheProfiler(metric=None, metric_name=None)
  • A profiler for dask execution at the scheduler cache level.

    • Records the following information for each task:
      • Key
      • Task
      • Size metric
      • Cache entry time in seconds since the epoch
      • Cache exit time in seconds since the epochExamples
  1. >>> from operator import add, mul
  2. >>> from dask.threaded import get
  3. >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
  4. >>> with CacheProfiler() as prof:
  5. ... get(dsk, 'z')
  6. 22
  1. >>> prof.results # doctest: +SKIP
  2. [CacheData('y', (add, 'x', 10), 1, 1435352238.48039, 1435352238.480655),
  3. CacheData('z', (mul, 'y', 2), 1, 1435352238.480657, 1435352238.480803)]

The default is to count each task (metric is 1 for all tasks). Otherfunctions may used as a metric instead through the metric keyword. Forexample, the nbytes function found in cachey can be used to measurethe number of bytes in the cache.

  1. >>> from cachey import nbytes # doctest: +SKIP
  2. >>> with CacheProfiler(metric=nbytes) as prof: # doctest: +SKIP
  3. ... get(dsk, 'z')

The profiling results can be visualized in a bokeh plot using thevisualize method. Note that this requires bokeh to be installed.

  1. >>> prof.visualize() # doctest: +SKIP

You can activate the profiler globally

  1. >>> prof.register() # doctest: +SKIP

If you use the profiler globally you will need to clear out old resultsmanually.

  1. >>> prof.clear()
  • dask.diagnostics.Callback(start=None, start_state=None, pretask=None, posttask=None, finish=None)
  • Base class for using the callback mechanism

Create a callback with functions of the following signatures:

  1. >>> def start(dsk):
  2. ... pass
  3. >>> def start_state(dsk, state):
  4. ... pass
  5. >>> def pretask(key, dsk, state):
  6. ... pass
  7. >>> def posttask(key, result, dsk, state, worker_id):
  8. ... pass
  9. >>> def finish(dsk, state, failed):
  10. ... pass

You may then construct a callback object with any number of them

  1. >>> cb = Callback(pretask=pretask, finish=finish) # doctest: +SKIP

And use it either as a context manager over a compute/get call

  1. >>> with cb: # doctest: +SKIP
  2. ... x.compute() # doctest: +SKIP

Or globally with the register method

  1. >>> cb.register() # doctest: +SKIP
  2. >>> cb.unregister() # doctest: +SKIP

Alternatively subclass the Callback class with your own methods.

  1. >>> class PrintKeys(Callback):
  2. ... def _pretask(self, key, dask, state):
  3. ... print("Computing: {0}!".format(repr(key)))
  1. >>> with PrintKeys(): # doctest: +SKIP
  2. ... x.compute() # doctest: +SKIP
  • dask.diagnostics.visualize(profilers, file_path=None, show=True, save=True, **kwargs)
  • Visualize the results of profiling in a bokeh plot.

If multiple profilers are passed in, the plots are stacked vertically.

Parameters:

  • profilers:profiler or list
  • Profiler or list of profilers.

  • file_path:string, optional

  • Name of the plot output file.

  • show:boolean, optional

  • If True (default), the plot is opened in a browser.

  • save:boolean, optional

  • If True (default), the plot is saved to disk.

  • **kwargs

  • Other keyword arguments, passed to bokeh.figure. These will overrideall defaults set by visualize.Returns:
  • The completed bokeh plot object.