You can run this notebook in a live sessionBinder or view it on Github.

Distributed

As we have seen so far, Dask allows you to simply construct graphs of tasks with dependencies, as well as have graphs created automatically for you using functional, Numpy or Pandas syntax on data collections. None of this would be very useful, if there weren’t also a way to execute these graphs, in a parallel and memory-aware way. So far we have been calling thing.compute() or dask.compute(thing) without worrying what this entails. Now we will discuss the options available for thatexecution, and in particular, the distributed scheduler, which comes with additional functionality.

Dask comes with four available schedulers: - “threaded”: a scheduler backed by a thread pool - “processes”: a scheduler backed by a process pool - “single-threaded” (aka “sync”): a synchronous scheduler, good for debugging - distributed: a distributed scheduler for executing graphs on multiple machines, see below.

To select one of these for computation, you can specify at the time of asking for a result, e.g.,

  1. myvalue.compute(scheduler="single-threaded") # for debugging

or set the current default, either temporarily or globally

  1. with dask.config.set(scheduler='processes'):
  2. # set temporarily fo this block only
  3. myvalue.compute()
  4.  
  5. dask.config.set(scheduler='processes')
  6. # set until further notice

Lets see the difference for the familiar case of the flights data

  1. [1]:
  1. %run prep.py -d flights
  1. [2]:
  1. import dask.dataframe as dd
  2. import os
  3. df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
  4. parse_dates={'Date': [0, 1, 2]},
  5. dtype={'TailNum': object,
  6. 'CRSElapsedTime': float,
  7. 'Cancelled': bool})
  8.  
  9. # Maximum non-cancelled delay
  10. largest_delay = df[~df.Cancelled].DepDelay.max()
  11. largest_delay
  1. [2]:
  1. dd.Scalar<series-..., dtype=float64>
  1. [3]:
  1. # each of the following gives the same results (you can check!)
  2. # any surprises?
  3. import time
  4. for sch in ['threading', 'processes', 'sync']:
  5. t0 = time.time()
  6. _ = largest_delay.compute(scheduler=sch)
  7. t1 = time.time()
  8. print(f"{sch:>10}, {t1 - t0:0.4f}")
  1. threading, 0.1329
  2. processes, 0.1664
  3. sync, 0.1307

Some Questions to Consider:

  • How much speedup is possible for this task (hint, look at the graph).

  • Given how many cores are on this machine, how much faster could the parallel schedulers be than the single-threaded scheduler.

  • How much faster was using threads over a single thread? Why does this differ from the optimal speedup?

  • Why is the multiprocessing scheduler so much slower here?

The threaded scheduler is a fine choice for working with large datasets out-of-core on a single machine, as long as the functions being used release the GIL most of the time. NumPy and pandas release the GIL in most places, so the threaded scheduler is the default for dask.array and dask.dataframe. The distributed scheduler, perhaps with processes=False, will also work well for these workloads on a single machine.

For workloads that do hold the GIL, as is common with dask.bag and custom code wrapped with dask.delayed, we recommend using the distributed scheduler, even on a single machine. Generally speaking, it’s more intelligent and provides better diagnostics than the processes scheduler.

https://docs.dask.org/en/latest/scheduling.html provides some additional details on choosing a scheduler.

For scaling out work across a cluster, the distributed scheduler is required.

Making a cluster

Simple method

The dask.distributed system is composed of a single centralized scheduler and one or more worker processes. Deploying a remote Dask cluster involves some additional effort. But doing things locally is just involves creating a Client object, which lets you interact with the “cluster” (local threads or processes on your machine). For more information see here.

Note that Client() takes a lot of optional arguments, to configure the number of processes/threads, memory limits and other

  1. [4]:
  1. from dask.distributed import Client
  2.  
  3. # Setup a local cluster.
  4. # By default this sets up 1 worker per core
  5. client = Client()
  6. client.cluster

If you aren’t in jupyterlab and using the dask-labextension, be sure to click the Dashboard link to open up the diagnostics dashboard.

Executing with the distributed client

Consider some trivial calculation, such as we’ve used before, where we have added sleep statements in order to simulate real work being done.

  1. [5]:
  1. from dask import delayed
  2. import time
  3.  
  4. def inc(x):
  5. time.sleep(5)
  6. return x + 1
  7.  
  8. def dec(x):
  9. time.sleep(3)
  10. return x - 1
  11.  
  12. def add(x, y):
  13. time.sleep(7)
  14. return x + y

By default, creating a Client makes it the default scheduler. Any calls to .compute will use the cluster your client is attached to, unless you specify otherwise, as above.

  1. [6]:
  1. x = delayed(inc)(1)
  2. y = delayed(dec)(2)
  3. total = delayed(add)(x, y)
  4. total.compute()
  1. [6]:
  1. 3

The tasks will appear in the web UI as they are processed by the cluster and, eventually, a result will be printed as output of the cell above. Note that the kernel is blocked while waiting for the result. The resulting tasks block graph might look something like below. Hovering over each block gives which function it related to, and how long it took to execute. this

You can also see a simplified version of the graph being executed on Graph pane of the dashboard, so long as the calculation is in-flight.

Let’s return to the flights computation from before, and see what happens on the dashboard (you may wish to have both the notebook and dashboard side-by-side). How did does this perform compared to before?

  1. [7]:
  1. %time largest_delay.compute()
  1. CPU times: user 93.7 ms, sys: 972 µs, total: 94.7 ms
  2. Wall time: 1.05 s
  1. [7]:
  1. 409.0

In this particular case, this should be as fast or faster than the best case, threading, above. Why do you suppose this is? You should start your reading here, and in particular note that the distributed scheduler was a complete rewrite with more intelligence around sharing of intermediate results and which tasks run on which worker. This will result in better performance in some cases, but still larger latency and overheadcompared to the threaded scheduler, so there will be rare cases where it performs worse. Fortunately, the dashboard now gives us a lot more diagnostic information. Look at the Profile page of the dashboard to fund out what takes the biggest fraction of CPU time for the computation we just performed?

If all you want to do is execute computations created using delayed, or run calculations based on the higher-level data collections (see the coming sections), then that is about all you need to know to scale your work up to cluster scale. However, there is more detail to know about the distributed scheduler that will help with efficient usage. See the chapter Distributed, Advanced.

Exercise

Run the following computations while looking at the diagnostics page. In each case what is taking the most time?

  1. [8]:
  1. # Number of flights
  2. _ = len(df)
  1. [9]:
  1. # Number of non-cancelled flights
  2. _ = len(df[~df.Cancelled])
  1. [10]:
  1. # Number of non-cancelled flights per-airport
  2. _ = df[~df.Cancelled].groupby('Origin').Origin.count().compute()
  1. [11]:
  1. # Average departure delay from each airport?
  2. _ = df[~df.Cancelled].groupby('Origin').DepDelay.mean().compute()
  1. [12]:
  1. # Average departure delay per day-of-week
  2. _ = df.groupby(df.Date.dt.dayofweek).DepDelay.mean().compute()
  1. [13]:
  1. client.shutdown()