You can run this notebook in a live sessionBinder or view it on Github.

Parallelize code with dask.delayed

In this section we parallelize simple for-loop style code with Dask and dask.delayed. Often, this is the only function that you will need to convert functions for use with Dask.

This is a simple way to use dask to parallelize existing codebases or build complex systems. This will also help us to develop an understanding for later sections.

Related Documentation

As well see in the distributed scheduler notebook, Dask has several ways of executing code in parallel. We’ll use the distributed scheduler by creating a dask.distributed.Client. For now, this will provide us with some nice diagnostics. We’ll talk about schedulers in depth later.

  1. [1]:
  1. from dask.distributed import Client
  2.  
  3. client = Client(n_workers=4)

Basics

First let’s make some toy functions, inc and add, that sleep for a while to simulate work. We’ll then time running these functions normally.

In the next section we’ll parallelize this code.

  1. [2]:
  1. from time import sleep
  2.  
  3. def inc(x):
  4. sleep(1)
  5. return x + 1
  6.  
  7. def add(x, y):
  8. sleep(1)
  9. return x + y

We time the execution of this normal code using the %%time magic, which is a special function of the Jupyter Notebook.

  1. [3]:
  1. %%time
  2. # This takes three seconds to run because we call each
  3. # function sequentially, one after the other
  4.  
  5. x = inc(1)
  6. y = inc(2)
  7. z = add(x, y)
  1. CPU times: user 70.2 ms, sys: 5.99 ms, total: 76.2 ms
  2. Wall time: 3 s

Parallelize with the dask.delayed decorator

Those two increment calls could be called in parallel, because they are totally independent of one-another.

We’ll transform the inc and add functions using the dask.delayed function. When we call the delayed version by passing the arguments, exactly as before, but the original function isn’t actually called yet - which is why the cell execution finishes very quickly. Instead, a delayed object is made, which keeps track of the function to call and the arguments to pass to it.

  1. [4]:
  1. from dask import delayed
  1. [5]:
  1. %%time
  2. # This runs immediately, all it does is build a graph
  3.  
  4. x = delayed(inc)(1)
  5. y = delayed(inc)(2)
  6. z = delayed(add)(x, y)
  1. CPU times: user 381 µs, sys: 72 µs, total: 453 µs
  2. Wall time: 382 µs

This ran immediately, since nothing has really happened yet.

To get the result, call compute. Notice that this runs faster than the original code.

  1. [6]:
  1. %%time
  2. # This actually runs our computation using a local process pool
  3.  
  4. z.compute()
  1. CPU times: user 53.9 ms, sys: 10.6 ms, total: 64.5 ms
  2. Wall time: 2.02 s
  1. [6]:
  1. 5

What just happened?

The z object is a lazy Delayed object. This object holds everything we need to compute the final result, including references to all of the functions that are required and their inputs and relationship to one-another. We can evaluate the result with .compute() as above or we can visualize the task graph for this value with .visualize().

  1. [7]:
  1. z
  1. [7]:
  1. Delayed('add-ccf98516-3240-473e-80b2-a7fdbe6c7557')
  1. [8]:
  1. # Look at the task graph for `z`
  2. z.visualize()
  1. [8]:

_images/01_dask.delayed_14_0.png

Notice that this includes the names of the functions from before, and the logical flow of the outputs of the inc functions to the inputs of add.

Some questions to consider:

  • Why did we go from 3s to 2s? Why weren’t we able to parallelize down to 1s?

  • What would have happened if the inc and add functions didn’t include the sleep(1)? Would Dask still be able to speed up this code?

  • What if we have multiple outputs or also want to get access to x or y?

Exercise: Parallelize a for loop

for loops are one of the most common things that we want to parallelize. Use dask.delayed on inc and sum to parallelize the computation below:

  1. [9]:
  1. data = [1, 2, 3, 4, 5, 6, 7, 8]
  1. [10]:
  1. %%time
  2. # Sequential code
  3.  
  4. results = []
  5. for x in data:
  6. y = inc(x)
  7. results.append(y)
  8.  
  9. total = sum(results)
  1. CPU times: user 177 ms, sys: 31.8 ms, total: 209 ms
  2. Wall time: 8.01 s
  1. [11]:
  1. total
  1. [11]:
  1. 44
  1. [12]:
  1. %%time
  2. # Your parallel code here...
  1. CPU times: user 1e+03 ns, sys: 0 ns, total: 1e+03 ns
  2. Wall time: 4.05 µs
  1. [13]:
  1. results = []
  2.  
  3. for x in data:
  4. y = delayed(inc)(x)
  5. results.append(y)
  6.  
  7. total = delayed(sum)(results)
  8. print("Before computing:", total) # Let's see what type of thing total is
  9. result = total.compute()
  10. print("After computing :", result) # After it's computed
  1. Before computing: Delayed('sum-9392299b-8a8e-4185-a6c5-a22f922b57f0')
  2. After computing : 44

How do the graph visualizations compare with the given solution, compared to a version with the sum function used directly rather than wrapped with delay? Can you explain the latter version? You might find the result of the following expression illuminating

  1. delayed(inc)(1) + delayed(inc)(2)

Exercise: Parallelizing a for-loop code with control flow

Often we want to delay only some functions, running a few of them immediately. This is especially helpful when those functions are fast and help us to determine what other slower functions we should call. This decision, to delay or not to delay, is usually where we need to be thoughtful when using dask.delayed.

In the example below we iterate through a list of inputs. If that input is even then we want to call inc. If the input is odd then we want to call double. This is_even decision to call inc or double has to be made immediately (not lazily) in order for our graph-building Python code to proceed.

  1. [14]:
  1. def double(x):
  2. sleep(1)
  3. return 2 * x
  4.  
  5. def is_even(x):
  6. return not x % 2
  7.  
  8. data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  1. [15]:
  1. %%time
  2. # Sequential code
  3.  
  4. results = []
  5. for x in data:
  6. if is_even(x):
  7. y = double(x)
  8. else:
  9. y = inc(x)
  10. results.append(y)
  11.  
  12. total = sum(results)
  13. print(total)
  1. 90
  2. CPU times: user 213 ms, sys: 44.4 ms, total: 257 ms
  3. Wall time: 10 s
  1. [16]:
  1. %%time
  2. # Your parallel code here...
  3. # TODO: parallelize the sequential code above using dask.delayed
  4. # You will need to delay some functions, but not all
  1. CPU times: user 2 µs, sys: 0 ns, total: 2 µs
  2. Wall time: 3.81 µs
  1. [17]:
  1. results = []
  2. for x in data:
  3. if is_even(x): # even
  4. y = delayed(double)(x)
  5. else: # odd
  6. y = delayed(inc)(x)
  7. results.append(y)
  8.  
  9. total = delayed(sum)(results)
  1. [18]:
  1. %time total.compute()
  1. CPU times: user 82.8 ms, sys: 8.86 ms, total: 91.7 ms
  2. Wall time: 3.02 s
  1. [18]:
  1. 90
  1. [19]:
  1. total.visualize()
  1. [19]:

_images/01_dask.delayed_30_0.png

Some questions to consider:

  • What are other examples of control flow where we can’t use delayed?

  • What would have happened if we had delayed the evaluation of is_even(x) in the example above?

  • What are your thoughts on delaying sum? This function is both computational but also fast to run.

Exercise: Parallelizing a Pandas Groupby Reduction

In this exercise we read several CSV files and perform a groupby operation in parallel. We are given sequential code to do this and parallelize it with dask.delayed.

The computation we will parallelize is to compute the mean departure delay per airport from some historical flight data. We will do this by using dask.delayed together with pandas. In a future section we will do this same exercise with dask.dataframe.

Create data

Run this code to prep some data.

This downloads and extracts some historical flight data for flights out of NYC between 1990 and 2000. The data is originally from here.

  1. [20]:
  1. %run prep.py -d flights

Inspect data

  1. [21]:
  1. import os
  2. sorted(os.listdir(os.path.join('data', 'nycflights')))
  1. [21]:
  1. ['1990.csv',
  2. '1991.csv',
  3. '1992.csv',
  4. '1993.csv',
  5. '1994.csv',
  6. '1995.csv',
  7. '1996.csv',
  8. '1997.csv',
  9. '1998.csv',
  10. '1999.csv']

Read one file with pandas.read_csv and compute mean departure delay

  1. [22]:
  1. import pandas as pd
  2. df = pd.read_csv(os.path.join('data', 'nycflights', '1990.csv'))
  3. df.head()
  1. [22]:
YearMonthDayofMonthDayOfWeekDepTimeCRSDepTimeArrTimeCRSArrTimeUniqueCarrierFlightNumAirTimeArrDelayDepDelayOriginDestDistanceTaxiInTaxiOutCancelledDiverted
019901111621.015401747.01701US33NaN46.041.0EWRPIT319.0NaNNaN00
119901221547.015401700.01701US33NaN-1.07.0EWRPIT319.0NaNNaN00
219901331546.015401710.01701US33NaN9.06.0EWRPIT319.0NaNNaN00
319901441542.015401710.01701US33NaN9.02.0EWRPIT319.0NaNNaN00
419901551549.015401706.01701US33NaN5.09.0EWRPIT319.0NaNNaN00

5 rows × 23 columns

  1. [23]:
  1. # What is the schema?
  2. df.dtypes
  1. [23]:
  1. Year int64
  2. Month int64
  3. DayofMonth int64
  4. DayOfWeek int64
  5. DepTime float64
  6. CRSDepTime int64
  7. ArrTime float64
  8. CRSArrTime int64
  9. UniqueCarrier object
  10. FlightNum int64
  11. TailNum float64
  12. ActualElapsedTime float64
  13. CRSElapsedTime int64
  14. AirTime float64
  15. ArrDelay float64
  16. DepDelay float64
  17. Origin object
  18. Dest object
  19. Distance float64
  20. TaxiIn float64
  21. TaxiOut float64
  22. Cancelled int64
  23. Diverted int64
  24. dtype: object
  1. [24]:
  1. # What originating airports are in the data?
  2. df.Origin.unique()
  1. [24]:
  1. array(['EWR', 'LGA', 'JFK'], dtype=object)
  1. [25]:
  1. # Mean departure delay per-airport for one year
  2. df.groupby('Origin').DepDelay.mean()
  1. [25]:
  1. Origin
  2. EWR 10.854962
  3. JFK 17.027397
  4. LGA 10.895592
  5. Name: DepDelay, dtype: float64

Sequential code: Mean Departure Delay Per Airport

The above cell computes the mean departure delay per-airport for one year. Here we expand that to all years using a sequential for loop.

  1. [26]:
  1. from glob import glob
  2. filenames = sorted(glob(os.path.join('data', 'nycflights', '*.csv')))
  1. [27]:
  1. %%time
  2.  
  3. sums = []
  4. counts = []
  5. for fn in filenames:
  6. # Read in file
  7. df = pd.read_csv(fn)
  8.  
  9. # Groupby origin airport
  10. by_origin = df.groupby('Origin')
  11.  
  12. # Sum of all departure delays by origin
  13. total = by_origin.DepDelay.sum()
  14.  
  15. # Number of flights by origin
  16. count = by_origin.DepDelay.count()
  17.  
  18. # Save the intermediates
  19. sums.append(total)
  20. counts.append(count)
  21.  
  22. # Combine intermediates to get total mean-delay-per-origin
  23. total_delays = sum(sums)
  24. n_flights = sum(counts)
  25. mean = total_delays / n_flights
  1. CPU times: user 52.7 ms, sys: 4.36 ms, total: 57 ms
  2. Wall time: 55 ms
  1. [28]:
  1. mean
  1. [28]:
  1. Origin
  2. EWR 12.500968
  3. JFK NaN
  4. LGA 10.169227
  5. Name: DepDelay, dtype: float64

Parallelize the code above

Use dask.delayed to parallelize the code above. Some extra things you will need to know.

  • Methods and attribute access on delayed objects work automatically, so if you have a delayed object you can perform normal arithmetic, slicing, and method calls on it and it will produce the correct delayed calls.
  1. x = delayed(np.arange)(10)
  2. y = (x + 1)[::2].sum() # everything here was delayed
  • Calling the .compute() method works well when you have a single output. When you have multiple outputs you might want to use the dask.compute function:
  1. >>> x = delayed(np.arange)(10)
  2. >>> y = x ** 2
  3. >>> min_, max_ = compute(y.min(), y.max())
  4. >>> min_, max_
  5. (0, 81)

This way Dask can share the intermediate values (like y = x**2)

So your goal is to parallelize the code above (which has been copied below) using dask.delayed. You may also want to visualize a bit of the computation to see if you’re doing it correctly.

  1. [29]:
  1. from dask import compute
  1. [30]:
  1. %%time
  2.  
  3. # copied sequential code
  4.  
  5. sums = []
  6. counts = []
  7. for fn in filenames:
  8. # Read in file
  9. df = pd.read_csv(fn)
  10.  
  11. # Groupby origin airport
  12. by_origin = df.groupby('Origin')
  13.  
  14. # Sum of all departure delays by origin
  15. total = by_origin.DepDelay.sum()
  16.  
  17. # Number of flights by origin
  18. count = by_origin.DepDelay.count()
  19.  
  20. # Save the intermediates
  21. sums.append(total)
  22. counts.append(count)
  23.  
  24. # Combine intermediates to get total mean-delay-per-origin
  25. total_delays = sum(sums)
  26. n_flights = sum(counts)
  27. mean = total_delays / n_flights
  1. CPU times: user 48.5 ms, sys: 7.41 ms, total: 55.9 ms
  2. Wall time: 53.6 ms
  1. [31]:
  1. mean
  1. [31]:
  1. Origin
  2. EWR 12.500968
  3. JFK NaN
  4. LGA 10.169227
  5. Name: DepDelay, dtype: float64
  1. [32]:
  1. %%time
  2. # your code here
  1. CPU times: user 2 µs, sys: 0 ns, total: 2 µs
  2. Wall time: 4.29 µs

If you load the solution, add %%time to the top of the cell to measure the running time.

  1. [33]:
  1. # This is just one possible solution, there are
  2. # several ways to do this using `delayed`
  3.  
  4. sums = []
  5. counts = []
  6. for fn in filenames:
  7. # Read in file
  8. df = delayed(pd.read_csv)(fn)
  9.  
  10. # Groupby origin airport
  11. by_origin = df.groupby('Origin')
  12.  
  13. # Sum of all departure delays by origin
  14. total = by_origin.DepDelay.sum()
  15.  
  16. # Number of flights by origin
  17. count = by_origin.DepDelay.count()
  18.  
  19. # Save the intermediates
  20. sums.append(total)
  21. counts.append(count)
  22.  
  23. # Compute the intermediates
  24. sums, counts = compute(sums, counts)
  25.  
  26. # Combine intermediates to get total mean-delay-per-origin
  27. total_delays = sum(sums)
  28. n_flights = sum(counts)
  29. mean = total_delays / n_flights
  1. [34]:
  1. # ensure the results still match
  2. mean
  1. [34]:
  1. Origin
  2. EWR 12.500968
  3. JFK NaN
  4. LGA 10.169227
  5. Name: DepDelay, dtype: float64

Some questions to consider:

  • How much speedup did you get? Is this how much speedup you’d expect?

  • Experiment with where to call compute. What happens when you call it on sums and counts? What happens if you wait and call it on mean?

  • Experiment with delaying the call to sum. What does the graph look like if sum is delayed? What does the graph look like if it isn’t?

  • Can you think of any reason why you’d want to do the reduction one way over the other?

Learn More

Visit the Delayed documentation. In particular, this delayed screencast will reinforce the concepts you learned here and the delayed best practices document collects advice on using dask.delayed well.

Close the Client

Before moving on to the next exercise, make sure to close your client or stop this kernel.

  1. [35]:
  1. client.close()