Diagnosing Performance

Understanding the performance of a distributed computation can be difficult.This is due in part to the many components of a distributed computer that mayimpact performance:

  • Compute time
  • Memory bandwidth
  • Network bandwidth
  • Disk bandwidth
  • Scheduler overhead
  • Serialization costs

This difficulty is compounded because the information about these costs isspread among many machines and so there is no central place to collect data toidentify performance issues.

Fortunately, Dask collects a variety of diagnostic information duringexecution. It does this both to provide performance feedback to users, butalso for its own internal scheduling decisions. The primary place to observethis feedback is the diagnostic dashboard. This documentdescribes the various pieces of performance information available and how toaccess them.

Task start and stop times

Workers capture durations associated to tasks. For each task that passesthrough a worker we record start and stop times for each of the following:

  • Serialization (gray)
  • Dependency gathering from peers (red)
  • Disk I/O to collect local data (orange)
  • Execution times (colored by task)

The main way to observe these times is with the task stream plot on thescheduler’s /status page where the colors of the bars correspond to thecolors listed above. Dask task stream Alternatively if you want to do your own diagnostics on every task event youmight want to create a Scheduler plugin. All of thisinformation will be available when a task transitions from processing tomemory or erred.

Statistical Profiling

For single-threaded profiling Python users typically depend on the CProfilemodule in the standard library (Dask developers recommend the snakeviz tool for single-threaded profiling).Unfortunately the standard CProfile module does not work with multi-threaded ordistributed computations.

To address this Dask implements its own distributed statistical profiler#Statistical_profilers).Every 10ms each worker process checks what each of its worker threads aredoing. It captures the call stack and adds this stack to a counting datastructure. This counting data structure is recorded and cleared every secondin order to establish a record of performance over time.

Users typically observe this data through the /profile plot on either theworker or scheduler diagnostic dashboards. On the scheduler page they observethe total profile aggregated over all workers over all threads. Clicking onany of the bars in the profile will zoom the user into just that section, as istypical with most profiling tools. There is a timeline at the bottom of thepage to allow users to select different periods in time. Dask profiler Profiles are also grouped by the task that was being run at the time. You canselect a task name from the selection menu at the top of the page. You canalso click on the rectangle corresponding to the task in the main task streamplot on the /status page.

Users can also query this data directly using the Client.profilefunction. This will deliver the raw data structure used to produce theseplots. They can also pass a filename to save the plot as an HTML filedirectly. Note that this file will have to be served from a webserver likepython -m http.server to be visible.

The 10ms and 1s parameters can be controlled by the profile-interval andprofile-cycle-interval entries in the config.yaml file.

Bandwidth

Dask workers track every incoming and outgoing transfer in theWorker.outgoing_transfer_log and Worker.incoming_transfer_logattributes including

  • Total bytes transferred
  • Compressed bytes transferred
  • Start/stop times
  • Keys moved
  • Peer

These are made available to users through the /status page of the Worker’sdiagnostic dashboard. You can capture their state explicitly by running acommand on the workers:

  1. client.run(lambda dask_worker: dask_worker.outgoing_transfer_log)
  2. client.run(lambda dask_worker: dask_worker.incoming_transfer_log)

Performance Reports

Often when benchmarking and/or profiling, users may want to record aparticular computation or even a full workflow. Dask can save the bokehdashboards as static HTML plots including the task stream, worker profiles,bandwidths, etc. This is done wrapping a computation with the performance_report context manager:

  1. from dask.distributed import performance_report
  2.  
  3. with performance_report(filename="dask-report.html"):
  4. ## some dask computation

The following video demonstrates the performance_report context manager in greaterdetail:

A note about times

Different computers maintain different clocks which may not match perfectly.To address this the Dask scheduler sends its current time in response to everyworker heartbeat. Workers compare their local time against this time to obtainan estimate of differences. All times recorded in workers take this estimateddelay into account. This helps, but still, imprecise measurements may exist.

All times are intended to be from the scheduler’s perspective.