Debugging

Debugging parallel programs is hard. Normal debugging tools like logging andusing pdb to interact with tracebacks stop working normally when exceptionsoccur in far-away machines, different processes, or threads.

Dask has a variety of mechanisms to make this process easier. Depending onyour situation, some of these approaches may be more appropriate than others.

These approaches are ordered from lightweight or easy solutions to moreinvolved solutions.

Exceptions

When a task in your computation fails, the standard way of understanding whatwent wrong is to look at the exception and traceback. Often people do thiswith the pdb module, IPython %debug or %pdb magics, or by justlooking at the traceback and investigating where in their code the exceptionoccurred.

Normally when a computation executes in a separate thread or a differentmachine, these approaches break down. To address this, Dask provides a fewmechanisms to recreate the normal Python debugging experience.

Inspect Exceptions and Tracebacks

By default, Dask already copies the exception and traceback wherever theyoccur and reraises that exception locally. If your task failed with aZeroDivisionError remotely, then you’ll get a ZeroDivisionError in yourinteractive session. Similarly you’ll see a full traceback of where this erroroccurred, which, just like in normal Python, can help you to identify thetroublesome spot in your code.

However, you cannot use the pdb module or %debug IPython magics withthese tracebacks to look at the value of variables during failure. You canonly inspect things visually. Additionally, the top of the traceback may befilled with functions that are Dask-specific and not relevant to yourproblem, so you can safely ignore these.

Both the single-machine and distributed schedulers do this.

Use the Single-Threaded Scheduler

Dask ships with a simple single-threaded scheduler. This doesn’t offer anyparallel performance improvements but does run your Dask computationfaithfully in your local thread, allowing you to use normal tools like pdb,%debug IPython magics, the profiling tools like the cProfile module, andsnakeviz. This allows you to useall of your normal Python debugging tricks in Dask computations, as long as youdon’t need parallelism.

The single-threaded scheduler can be used, for example, by settingscheduler='single-threaded' in a compute call:

  1. >>> x.compute(scheduler='single-threaded')

For more ways to configure schedulers, see the scheduler configurationdocumentation.

This only works for single-machine schedulers. It does not work withdask.distributed unless you are comfortable using the Tornado API (look at thetesting infrastructuredocs, which accomplish this). Also, because this operates on a single machine,it assumes that your computation can run on a single machine without exceedingmemory limits. It may be wise to use this approach on smaller versions of yourproblem if possible.

Rerun Failed Task Locally

If a remote task fails, we can collect the function and all inputs, bring themto the local thread, and then rerun the function in hopes of triggering thesame exception locally where normal debugging tools can be used.

With the single machine schedulers, use the rerun_exceptions_locally=Truekeyword:

  1. >>> x.compute(rerun_exceptions_locally=True)

On the distributed scheduler use the recreate_error_locally method onanything that contains Futures:

  1. >>> x.compute()
  2. ZeroDivisionError(...)
  3.  
  4. >>> %pdb
  5. >>> future = client.compute(x)
  6. >>> client.recreate_error_locally(future)

Remove Failed Futures Manually

Sometimes only parts of your computations fail, for example, if some rows of aCSV dataset are faulty in some way. When running with the distributedscheduler, you can remove chunks of your data that have produced bad results ifyou switch to dealing with Futures:

  1. >>> import dask.dataframe as dd
  2. >>> df = ... # create dataframe
  3. >>> df = df.persist() # start computing on the cluster
  4.  
  5. >>> from distributed.client import futures_of
  6. >>> futures = futures_of(df) # get futures behind dataframe
  7. >>> futures
  8. [<Future: status: finished, type: pd.DataFrame, key: load-1>
  9. <Future: status: finished, type: pd.DataFrame, key: load-2>
  10. <Future: status: error, key: load-3>
  11. <Future: status: pending, key: load-4>
  12. <Future: status: error, key: load-5>]
  13.  
  14. >>> # wait until computation is done
  15. >>> while any(f.status == 'pending' for f in futures):
  16. ... sleep(0.1)
  17.  
  18. >>> # pick out only the successful futures and reconstruct the dataframe
  19. >>> good_futures = [f for f in futures if f.status == 'finished']
  20. >>> df = dd.from_delayed(good_futures, meta=df._meta)

This is a bit of a hack, but often practical when first exploring messy data.If you are using the concurrent.futures API (map, submit, gather), then thisapproach is more natural.

Inspect Scheduling State

Not all errors present themselves as exceptions. For example, in a distributedsystem workers may die unexpectedly, your computation may be unreasonablyslow due to inter-worker communication or scheduler overhead, or one of severalother issues. Getting feedback about what’s going on can help to identifyboth failures and general performance bottlenecks.

For the single-machine scheduler, see diagnostics documentation. The rest of the section willassume that you are using the distributed scheduler where these issues arise morecommonly.

Web Diagnostics

First, the distributed scheduler has a number of diagnostic web pages showing dozens ofrecorded metrics like CPU, memory, network, and disk use, a history of previoustasks, allocation of tasks to workers, worker memory pressure, work stealing,open file handle limits, etc. Many problems can be correctly diagnosed byinspecting these pages. By default, these are available athttp://scheduler:8787/, http://scheduler:8788/, and http://worker:8789/,where scheduler and worker should be replaced by the addresses of thescheduler and each of the workers. See web diagnostic docs for more information.

Logs

The scheduler, workers, and client all emits logs using Python’s standardlogging module. By default,these emit to standard error. When Dask is launched by a cluster job scheduler(SGE/SLURM/YARN/Mesos/Marathon/Kubernetes/whatever), that system will trackthese logs and will have an interface to help you access them. If you arelaunching Dask on your own, they will probably dump to the screen unless youredirect stderr to a file#Redirecting_to_and_from_the_standard_file_handles).

You can control the logging verbosity in the ~/.dask/config.yaml file.Defaults currently look like the following:

  1. logging:
  2. distributed: info
  3. distributed.client: warning
  4. bokeh: error

So, for example, you could add a line like distributed.worker: debug to getvery verbose output from the workers.

LocalCluster

If you are using the distributed scheduler from a single machine, you may besetting up workers manually using the command line interface or you may beusing LocalClusterwhich is what runs when you just call Client():

  1. >>> from dask.distributed import Client, LocalCluster
  2. >>> client = Client() # This is actually the following two commands
  3.  
  4. >>> cluster = LocalCluster()
  5. >>> client = Client(cluster.scheduler.address)

LocalCluster is useful because the scheduler and workers are in the sameprocess with you, so you can easily inspect their state whilethey run (they are running in a separate thread):

  1. >>> cluster.scheduler.processing
  2. {'worker-one:59858': {'inc-123', 'add-443'},
  3. 'worker-two:48248': {'inc-456'}}

You can also do this for the workers if you run them without nanny processes:

  1. >>> cluster = LocalCluster(nanny=False)
  2. >>> client = Client(cluster)

This can be very helpful if you want to use the Dask distributed API and stillwant to investigate what is going on directly within the workers. Informationis not distilled for you like it is in the web diagnostics, but you have fulllow-level access.

Inspect state with IPython

Sometimes you want to inspect the state of your cluster but you don’t have theluxury of operating on a single machine. In these cases you can launch anIPython kernel on the scheduler and on every worker, which lets you inspectstate on the scheduler and workers as computations are completing.

This does not give you the ability to run %pdb or %debug on remotemachines. The tasks are still running in separate threads, and so are noteasily accessible from an interactive IPython session.

For more details, see the Dask distributed IPython docs.