You can run this notebook in a live sessionBinder or view it on Github.

Dask DataFrames

We finished Chapter 1 by building a parallel dataframe computation over a directory of CSV files using dask.delayed. In this section we use dask.dataframe to automatically build similiar computations, for the common case of tabular computations. Dask dataframes look and feel like Pandas dataframes but they run on the same infrastructure that powers dask.delayed.

In this notebook we use the same airline data as before, but now rather than write for-loops we let dask.dataframe construct our computations for us. The dask.dataframe.read_csv function can take a globstring like "data/nycflights/*.csv" and build parallel computations on all of our data at once.

When to use dask.dataframe

Pandas is great for tabular datasets that fit in memory. Dask becomes useful when the dataset you want to analyze is larger than your machine’s RAM. The demo dataset we’re working with is only about 200MB, so that you can download it in a reasonable time, but dask.dataframe will scale to datasets much larger than memory.

The dask.dataframe module implements a blocked parallel DataFrame object that mimics a large subset of the Pandas DataFrame. One Dask DataFrame is comprised of many in-memory pandas DataFrames separated along the index. One operation on a Dask DataFrame triggers many pandas operations on the constituent pandas DataFrames in a way that is mindful of potential parallelism and memory constraints.

Related Documentation

Main Take-aways

  • Dask DataFrame should be familiar to Pandas users

  • The partitioning of dataframes is important for efficient execution

Create data

  1. [1]:
  1. %run prep.py -d flights

Setup

  1. [2]:
  1. from dask.distributed import Client
  2.  
  3. client = Client(n_workers=4)

We create artifical data.

  1. [3]:
  1. from prep import accounts_csvs
  2. accounts_csvs()
  3.  
  4. import os
  5. import dask
  6. filename = os.path.join('data', 'accounts.*.csv')
  7. filename
  1. [3]:
  1. 'data/accounts.*.csv'

Filename includes a glob pattern *, so all files in the path matching that pattern will be read into the same Dask DataFrame.

  1. [4]:
  1. import dask.dataframe as dd
  2. df = dd.read_csv(filename)
  3. df.head()
  1. [4]:
idnamesamount
091Kevin575
110Charlie421
238Ursula2056
398Frank2818
495Ingrid197
  1. [5]:
  1. # load and count number of rows
  2. len(df)
  1. [5]:
  1. 30000

What happened here? - Dask investigated the input path and found that there are three matching files - a set of jobs was intelligently created for each chunk - one per original CSV file in this case - each file was loaded into a pandas dataframe, had len() applied to it - the subtotals were combined to give you the final grand total.

Real Data

Lets try this with an extract of flights in the USA across several years. This data is specific to flights out of the three airports in the New York City area.

  1. [6]:
  1. df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
  2. parse_dates={'Date': [0, 1, 2]})

Notice that the respresentation of the dataframe object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes.

  1. [7]:
  1. df
  1. [7]:

Dask DataFrame Structure:

DateDayOfWeekDepTimeCRSDepTimeArrTimeCRSArrTimeUniqueCarrierFlightNumTailNumActualElapsedTimeCRSElapsedTimeAirTimeArrDelayDepDelayOriginDestDistanceTaxiInTaxiOutCancelledDiverted
npartitions=10
datetime64[ns]int64float64int64float64int64objectint64float64float64int64float64float64float64objectobjectfloat64float64float64int64int64

Dask Name: from-delayed, 30 tasks

We can view the start and end of the data

  1. [8]:
  1. df.head()
  1. [8]:
DateDayOfWeekDepTimeCRSDepTimeArrTimeCRSArrTimeUniqueCarrierFlightNumTailNumActualElapsedTimeAirTimeArrDelayDepDelayOriginDestDistanceTaxiInTaxiOutCancelledDiverted
01990-01-0111621.015401747.01701US33NaN86.0NaN46.041.0EWRPIT319.0NaNNaN00
11990-01-0221547.015401700.01701US33NaN73.0NaN-1.07.0EWRPIT319.0NaNNaN00
21990-01-0331546.015401710.01701US33NaN84.0NaN9.06.0EWRPIT319.0NaNNaN00
31990-01-0441542.015401710.01701US33NaN88.0NaN9.02.0EWRPIT319.0NaNNaN00
41990-01-0551549.015401706.01701US33NaN77.0NaN5.09.0EWRPIT319.0NaNNaN00

5 rows × 21 columns

  1. [9]:
  1. df.tail() # this fails
  1. ---------------------------------------------------------------------------
  2. ValueError Traceback (most recent call last)
  3. <ipython-input-9-430ef93b601c> in <module>
  4. ----> 1 df.tail() # this fails
  5.  
  6. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/dataframe/core.py in tail(self, n, compute)
  7. 996
  8. 997 if compute:
  9. --> 998 result = result.compute()
  10. 999 return result
  11. 1000
  12.  
  13. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
  14. 163 dask.base.compute
  15. 164 """
  16. --> 165 (result,) = compute(self, traverse=False, **kwargs)
  17. 166 return result
  18. 167
  19.  
  20. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
  21. 434 keys = [x.__dask_keys__() for x in collections]
  22. 435 postcomputes = [x.__dask_postcompute__() for x in collections]
  23. --> 436 results = schedule(dsk, keys, **kwargs)
  24. 437 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
  25. 438
  26.  
  27. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
  28. 2570 should_rejoin = False
  29. 2571 try:
  30. -> 2572 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  31. 2573 finally:
  32. 2574 for f in futures.values():
  33.  
  34. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
  35. 1870 direct=direct,
  36. 1871 local_worker=local_worker,
  37. -> 1872 asynchronous=asynchronous,
  38. 1873 )
  39. 1874
  40.  
  41. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
  42. 765 else:
  43. 766 return sync(
  44. --> 767 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  45. 768 )
  46. 769
  47.  
  48. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
  49. 332 if error[0]:
  50. 333 typ, exc, tb = error[0]
  51. --> 334 raise exc.with_traceback(tb)
  52. 335 else:
  53. 336 return result[0]
  54.  
  55. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/utils.py in f()
  56. 316 if callback_timeout is not None:
  57. 317 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
  58. --> 318 result[0] = yield future
  59. 319 except Exception as exc:
  60. 320 error[0] = sys.exc_info()
  61.  
  62. ~/miniconda/envs/test/lib/python3.7/site-packages/tornado/gen.py in run(self)
  63. 733
  64. 734 try:
  65. --> 735 value = future.result()
  66. 736 except Exception:
  67. 737 exc_info = sys.exc_info()
  68.  
  69. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
  70. 1726 exc = CancelledError(key)
  71. 1727 else:
  72. -> 1728 raise exception.with_traceback(traceback)
  73. 1729 raise exc
  74. 1730 if errors == "skip":
  75.  
  76. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/utils.py in apply()
  77. 27 def apply(func, args, kwargs=None):
  78. 28 if kwargs:
  79. ---> 29 return func(*args, **kwargs)
  80. 30 else:
  81. 31 return func(*args)
  82.  
  83. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/dataframe/io/csv.py in pandas_read_text()
  84. 68 df = reader(bio, **kwargs)
  85. 69 if dtypes:
  86. ---> 70 coerce_dtypes(df, dtypes)
  87. 71
  88. 72 if enforce and columns and (list(df.columns) != list(columns)):
  89.  
  90. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/dataframe/io/csv.py in coerce_dtypes()
  91. 172 rule.join(filter(None, [dtype_msg, date_msg]))
  92. 173 )
  93. --> 174 raise ValueError(msg)
  94. 175
  95. 176
  96.  
  97. ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.
  98.  
  99. +----------------+---------+----------+
  100. | Column | Found | Expected |
  101. +----------------+---------+----------+
  102. | CRSElapsedTime | float64 | int64 |
  103. | TailNum | object | float64 |
  104. +----------------+---------+----------+
  105.  
  106. The following columns also raised exceptions on conversion:
  107.  
  108. - TailNum
  109. ValueError("could not convert string to float: 'N54711'")
  110.  
  111. Usually this is due to dask's dtype inference failing, and
  112. *may* be fixed by specifying dtypes manually by adding:
  113.  
  114. dtype={'CRSElapsedTime': 'float64',
  115. 'TailNum': 'object'}
  116.  
  117. to the call to `read_csv`/`read_table`.

What just happened?

Unlike pandas.read_csv which reads in the entire file before inferring datatypes, dask.dataframe.read_csv only reads in a sample from the beginning of the file (or first file if using a glob). These inferred datatypes are then enforced when reading all partitions.

In this case, the datatypes inferred in the sample are incorrect. The first n rows have no value for CRSElapsedTime (which pandas infers as a float), and later on turn out to be strings (object dtype). Note that Dask gives an informative error message about the mismatch. When this happens you have a few options:

  • Specify dtypes directly using the dtype keyword. This is the recommended solution, as it’s the least error prone (better to be explicit than implicit) and also the most performant.

  • Increase the size of the sample keyword (in bytes)

  • Use assume_missing to make dask assume that columns inferred to be int (which don’t allow missing values) are actually floats (which do allow missing values). In our particular case this doesn’t apply.

In our case we’ll use the first option and directly specify the dtypes of the offending columns.

  1. [10]:
  1. df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
  2. parse_dates={'Date': [0, 1, 2]},
  3. dtype={'TailNum': str,
  4. 'CRSElapsedTime': float,
  5. 'Cancelled': bool})
  1. [11]:
  1. df.tail() # now works
  1. [11]:
DateDayOfWeekDepTimeCRSDepTimeArrTimeCRSArrTimeUniqueCarrierFlightNumTailNumActualElapsedTimeAirTimeArrDelayDepDelayOriginDestDistanceTaxiInTaxiOutCancelledDiverted
9941999-01-251632.0635803.0817CO437N2721391.068.0-14.0-3.0EWRRDU416.04.019.0False0
9951999-01-262632.0635751.0817CO437N1621779.062.0-26.0-3.0EWRRDU416.03.014.0False0
9961999-01-273631.0635756.0817CO437N1221685.066.0-21.0-4.0EWRRDU416.04.015.0False0
9971999-01-284629.0635803.0817CO437N2621094.069.0-14.0-6.0EWRRDU416.05.020.0False0
9981999-01-295632.0635802.0817CO437N1222590.067.0-15.0-3.0EWRRDU416.05.018.0False0

5 rows × 21 columns

Computations with dask.dataframe

We compute the maximum of the DepDelay column. With just pandas, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums

  1. maxes = []
  2. for fn in filenames:
  3. df = pd.read_csv(fn)
  4. maxes.append(df.DepDelay.max())
  5.  
  6. final_max = max(maxes)

We could wrap that pd.read_csv with dask.delayed so that it runs in parallel. Regardless, we’re still having to think about loops, intermediate results (one per file) and the final reduction (max of the intermediate maxes). This is just noise around the real task, which pandas solves with

  1. df = pd.read_csv(filename, dtype=dtype)
  2. df.DepDelay.max()

dask.dataframe lets us write pandas-like code, that operates on larger than memory datasets in parallel.

  1. [12]:
  1. %time df.DepDelay.max().compute()
  1. CPU times: user 73.1 ms, sys: 6.91 ms, total: 80 ms
  2. Wall time: 702 ms
  1. [12]:
  1. 409.0

This writes the delayed computation for us and then runs it.

Some things to note:

  • As with dask.delayed, we need to call .compute() when we’re done. Up until this point everything is lazy.

  • Dask will delete intermediate results (like the full pandas dataframe for each file) as soon as possible.

    • This lets us handle datasets that are larger than memory

    • This means that repeated computations will have to load all of the data in each time (run the code above again, is it faster or slower than you would expect?)

As with Delayed objects, you can view the underlying task graph using the .visualize method:

  1. [13]:
  1. # notice the parallelism
  2. df.DepDelay.max().visualize()
  1. [13]:

_images/04_dataframe_25_0.png

Exercises

In this section we do a few dask.dataframe computations. If you are comfortable with Pandas then these should be familiar. You will have to think about when to call compute.

1.) How many rows are in our dataset?

If you aren’t familiar with pandas, how would you check how many records are in a list of tuples?

  1. [14]:
  1. # Your code here
  1. [15]:
  1. len(df)
  1. [15]:
  1. 9990

2.) In total, how many non-canceled flights were taken?

With pandas, you would use boolean indexing.

  1. [16]:
  1. # Your code here
  1. [17]:
  1. len(df[~df.Cancelled])
  1. [17]:
  1. 9383

3.) In total, how many non-cancelled flights were taken from each airport?

Hint: use df.groupby</code> &lt;[https://pandas.pydata.org/pandas-docs/stable/groupby.html](https://pandas.pydata.org/pandas-docs/stable/groupby.html)&gt;__.

  1. [18]:
  1. # Your code here
  1. [19]:
  1. df[~df.Cancelled].groupby('Origin').Origin.count().compute()
  1. [19]:
  1. Origin
  2. EWR 4132
  3. JFK 1085
  4. LGA 4166
  5. Name: Origin, dtype: int64

4.) What was the average departure delay from each airport?

Note, this is the same computation you did in the previous notebook (is this approach faster or slower?)

  1. [20]:
  1. # Your code here
  1. [21]:
  1. df.groupby("Origin").DepDelay.mean().compute()
  1. [21]:
  1. Origin
  2. EWR 12.500968
  3. JFK 17.053456
  4. LGA 10.169227
  5. Name: DepDelay, dtype: float64

5.) What day of the week has the worst average departure delay?

  1. [22]:
  1. # Your code here
  1. [23]:
  1. df.groupby("DayOfWeek").DepDelay.mean().compute()
  1. [23]:
  1. DayOfWeek
  2. 1 10.677698
  3. 2 8.633310
  4. 3 14.208160
  5. 4 14.187853
  6. 5 15.209929
  7. 6 9.540307
  8. 7 10.609375
  9. Name: DepDelay, dtype: float64

Sharing Intermediate Results

When computing all of the above, we sometimes did the same operation more than once. For most operations, dask.dataframe hashes the arguments, allowing duplicate computations to be shared, and only computed once.

For example, lets compute the mean and standard deviation for departure delay of all non-canceled flights. Since dask operations are lazy, those values aren’t the final results yet. They’re just the recipe required to get the result.

If we compute them with two calls to compute, there is no sharing of intermediate computations.

  1. [24]:
  1. non_cancelled = df[~df.Cancelled]
  2. mean_delay = non_cancelled.DepDelay.mean()
  3. std_delay = non_cancelled.DepDelay.std()
  1. [25]:
  1. %%time
  2.  
  3. mean_delay_res = mean_delay.compute()
  4. std_delay_res = std_delay.compute()
  1. CPU times: user 138 ms, sys: 6.99 ms, total: 145 ms
  2. Wall time: 440 ms

But lets try by passing both to a single compute call.

  1. [26]:
  1. %%time
  2.  
  3. mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)
  1. CPU times: user 90.9 ms, sys: 3.84 ms, total: 94.8 ms
  2. Wall time: 270 ms

Using dask.compute takes roughly 1/2 the time. This is because the task graphs for both results are merged when calling dask.compute, allowing shared operations to only be done once instead of twice. In particular, using dask.compute only does the following once:

  • the calls to read_csv

  • the filter (df[~df.Cancelled])

  • some of the necessary reductions (sum, count)

To see what the merged task graphs between multiple results look like (and what’s shared), you can use the dask.visualize function (we might want to use filename='graph.pdf' to zoom in on the graph better):

  1. [27]:
  1. dask.visualize(mean_delay, std_delay)
  1. [27]:

_images/04_dataframe_47_0.png

How does this compare to Pandas?

Pandas is more mature and fully featured than dask.dataframe. If your data fits in memory then you should use Pandas. The dask.dataframe module gives you a limited pandas experience when you operate on datasets that don’t fit comfortably in memory.

During this tutorial we provide a small dataset consisting of a few CSV files. This dataset is 45MB on disk that expands to about 400MB in memory. This dataset is small enough that you would normally use Pandas.

We’ve chosen this size so that exercises finish quickly. Dask.dataframe only really becomes meaningful for problems significantly larger than this, when Pandas breaks with the dreaded

  1. MemoryError: ...

Furthermore, the distributed scheduler allows the same dataframe expressions to be executed across a cluster. To enable massive “big data” processing, one could execute data ingestion functions such as read_csv, where the data is held on storage accessible to every worker node (e.g., amazon’s S3), and because most operations begin by selecting only some columns, transforming and filtering the data, only relatively small amounts of data need to be communicated between the machines.

Dask.dataframe operations use pandas operations internally. Generally they run at about the same speed except in the following two cases:

  • Dask introduces a bit of overhead, around 1ms per task. This is usually negligible.

  • When Pandas releases the GIL (coming to groupby in the next version) dask.dataframe can call several pandas operations in parallel within a process, increasing speed somewhat proportional to the number of cores. For operations which don’t release the GIL, multiple processes would be needed to get the same speedup.

Dask DataFrame Data Model

For the most part, a Dask DataFrame feels like a pandas DataFrame. So far, the biggest difference we’ve seen is that Dask operations are lazy; they build up a task graph instead of executing immediately (more details coming in Schedulers). This lets Dask do operations in parallel and out of core.

In Dask Arrays, we saw that a dask.array was composed of many NumPy arrays, chunked along one or more dimensions. It’s similar for dask.dataframe: a Dask DataFrame is composed of many pandas DataFrames. For dask.dataframe the chunking happens only along the index.

We call each chunk a partition, and the upper / lower bounds are divisions. Dask can store information about the divisions. For now, partitions come up when you write custom functions to apply to Dask DataFrames

Converting CRSDepTime to a timestamp

This dataset stores timestamps as HHMM, which are read in as integers in read_csv:

  1. [28]:
  1. crs_dep_time = df.CRSDepTime.head(10)
  2. crs_dep_time
  1. [28]:
  1. 0 1540
  2. 1 1540
  3. 2 1540
  4. 3 1540
  5. 4 1540
  6. 5 1540
  7. 6 1540
  8. 7 1540
  9. 8 1540
  10. 9 1540
  11. Name: CRSDepTime, dtype: int64

To convert these to timestamps of scheduled departure time, we need to convert these integers into pd.Timedelta objects, and then combine them with the Date column.

In pandas we’d do this using the pd.to_timedelta function, and a bit of arithmetic:

  1. [29]:
  1. import pandas as pd
  2.  
  3. # Get the first 10 dates to complement our `crs_dep_time`
  4. date = df.Date.head(10)
  5.  
  6. # Get hours as an integer, convert to a timedelta
  7. hours = crs_dep_time // 100
  8. hours_timedelta = pd.to_timedelta(hours, unit='h')
  9.  
  10. # Get minutes as an integer, convert to a timedelta
  11. minutes = crs_dep_time % 100
  12. minutes_timedelta = pd.to_timedelta(minutes, unit='m')
  13.  
  14. # Apply the timedeltas to offset the dates by the departure time
  15. departure_timestamp = date + hours_timedelta + minutes_timedelta
  16. departure_timestamp
  1. [29]:
  1. 0 1990-01-01 15:40:00
  2. 1 1990-01-02 15:40:00
  3. 2 1990-01-03 15:40:00
  4. 3 1990-01-04 15:40:00
  5. 4 1990-01-05 15:40:00
  6. 5 1990-01-06 15:40:00
  7. 6 1990-01-07 15:40:00
  8. 7 1990-01-08 15:40:00
  9. 8 1990-01-09 15:40:00
  10. 9 1990-01-10 15:40:00
  11. dtype: datetime64[ns]

Custom code and Dask Dataframe

We could swap out pd.to_timedelta for dd.to_timedelta and do the same operations on the entire dask DataFrame. But let’s say that Dask hadn’t implemented a dd.to_timedelta that works on Dask DataFrames. What would you do then?

dask.dataframe provides a few methods to make applying custom functions to Dask DataFrames easier:

  • map_partitions</code> &lt;[http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions](https://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions)&gt;__

  • map_overlap</code> &lt;[http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_overlap](https://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_overlap)&gt;__

  • reduction</code> &lt;[http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.reduction](https://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.reduction)&gt;__

Here we’ll just be discussing map_partitions, which we can use to implement to_timedelta on our own:

  1. [30]:
  1. # Look at the docs for `map_partitions`
  2.  
  3. help(df.CRSDepTime.map_partitions)
  1. Help on method map_partitions in module dask.dataframe.core:
  2.  
  3. map_partitions(func, *args, **kwargs) method of dask.dataframe.core.Series instance
  4. Apply Python function on each DataFrame partition.
  5.  
  6. Note that the index and divisions are assumed to remain unchanged.
  7.  
  8. Parameters
  9. ----------
  10. func : function
  11. Function applied to each partition.
  12. args, kwargs :
  13. Arguments and keywords to pass to the function. The partition will
  14. be the first argument, and these will be passed *after*. Arguments
  15. and keywords may contain ``Scalar``, ``Delayed`` or regular
  16. python objects. DataFrame-like args (both dask and pandas) will be
  17. repartitioned to align (if necessary) before applying the function.
  18. meta : pd.DataFrame, pd.Series, dict, iterable, tuple, optional
  19. An empty ``pd.DataFrame`` or ``pd.Series`` that matches the dtypes
  20. and column names of the output. This metadata is necessary for
  21. many algorithms in dask dataframe to work. For ease of use, some
  22. alternative inputs are also available. Instead of a ``DataFrame``,
  23. a ``dict`` of ``{name: dtype}`` or iterable of ``(name, dtype)``
  24. can be provided (note that the order of the names should match the
  25. order of the columns). Instead of a series, a tuple of ``(name,
  26. dtype)`` can be used. If not provided, dask will try to infer the
  27. metadata. This may lead to unexpected results, so providing
  28. ``meta`` is recommended. For more information, see
  29. ``dask.dataframe.utils.make_meta``.
  30.  
  31. Examples
  32. --------
  33. Given a DataFrame, Series, or Index, such as:
  34.  
  35. >>> import dask.dataframe as dd
  36. >>> df = pd.DataFrame({'x': [1, 2, 3, 4, 5],
  37. ... 'y': [1., 2., 3., 4., 5.]})
  38. >>> ddf = dd.from_pandas(df, npartitions=2)
  39.  
  40. One can use ``map_partitions`` to apply a function on each partition.
  41. Extra arguments and keywords can optionally be provided, and will be
  42. passed to the function after the partition.
  43.  
  44. Here we apply a function with arguments and keywords to a DataFrame,
  45. resulting in a Series:
  46.  
  47. >>> def myadd(df, a, b=1):
  48. ... return df.x + df.y + a + b
  49. >>> res = ddf.map_partitions(myadd, 1, b=2)
  50. >>> res.dtype
  51. dtype('float64')
  52.  
  53. By default, dask tries to infer the output metadata by running your
  54. provided function on some fake data. This works well in many cases, but
  55. can sometimes be expensive, or even fail. To avoid this, you can
  56. manually specify the output metadata with the ``meta`` keyword. This
  57. can be specified in many forms, for more information see
  58. ``dask.dataframe.utils.make_meta``.
  59.  
  60. Here we specify the output is a Series with no name, and dtype
  61. ``float64``:
  62.  
  63. >>> res = ddf.map_partitions(myadd, 1, b=2, meta=(None, 'f8'))
  64.  
  65. Here we map a function that takes in a DataFrame, and returns a
  66. DataFrame with a new column:
  67.  
  68. >>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
  69. >>> res.dtypes
  70. x int64
  71. y float64
  72. z float64
  73. dtype: object
  74.  
  75. As before, the output metadata can also be specified manually. This
  76. time we pass in a ``dict``, as the output is a DataFrame:
  77.  
  78. >>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y),
  79. ... meta={'x': 'i8', 'y': 'f8', 'z': 'f8'})
  80.  
  81. In the case where the metadata doesn't change, you can also pass in
  82. the object itself directly:
  83.  
  84. >>> res = ddf.map_partitions(lambda df: df.head(), meta=df)
  85.  
  86. Also note that the index and divisions are assumed to remain unchanged.
  87. If the function you're mapping changes the index/divisions, you'll need
  88. to clear them afterwards:
  89.  
  90. >>> ddf.map_partitions(func).clear_divisions() # doctest: +SKIP
  91.  

The basic idea is to apply a function that operates on a DataFrame to each partition. In this case, we’ll apply pd.to_timedelta.

  1. [31]:
  1. hours = df.CRSDepTime // 100
  2. # hours_timedelta = pd.to_timedelta(hours, unit='h')
  3. hours_timedelta = hours.map_partitions(pd.to_timedelta, unit='h')
  4.  
  5. minutes = df.CRSDepTime % 100
  6. # minutes_timedelta = pd.to_timedelta(minutes, unit='m')
  7. minutes_timedelta = minutes.map_partitions(pd.to_timedelta, unit='m')
  8.  
  9. departure_timestamp = df.Date + hours_timedelta + minutes_timedelta
  1. [32]:
  1. departure_timestamp
  1. [32]:
  1. Dask Series Structure:
  2. npartitions=10
  3. datetime64[ns]
  4. ...
  5. ...
  6. ...
  7. ...
  8. dtype: datetime64[ns]
  9. Dask Name: add, 110 tasks
  1. [33]:
  1. departure_timestamp.head()
  1. [33]:
  1. 0 1990-01-01 15:40:00
  2. 1 1990-01-02 15:40:00
  3. 2 1990-01-03 15:40:00
  4. 3 1990-01-04 15:40:00
  5. 4 1990-01-05 15:40:00
  6. dtype: datetime64[ns]

Exercise: Rewrite above to use a single call to map_partitions

This will be slightly more efficient than two separate calls, as it reduces the number of tasks in the graph.

  1. [34]:
  1. def compute_departure_timestamp(df):
  2. pass # TODO: implement this
  1. [35]:
  1. departure_timestamp = df.map_partitions(compute_departure_timestamp)
  2.  
  3. departure_timestamp.head()
  1. ---------------------------------------------------------------------------
  2. AttributeError Traceback (most recent call last)
  3. <ipython-input-35-b8c8b4861a84> in <module>
  4. 1 departure_timestamp = df.map_partitions(compute_departure_timestamp)
  5. 2
  6. ----> 3 departure_timestamp.head()
  7.  
  8. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
  9. 948 Whether to compute the result, default is True.
  10. 949 """
  11. --> 950 return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
  12. 951
  13. 952 def _head(self, n, npartitions, compute, safe):
  14.  
  15. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
  16. 981
  17. 982 if compute:
  18. --> 983 result = result.compute()
  19. 984 return result
  20. 985
  21.  
  22. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
  23. 163 dask.base.compute
  24. 164 """
  25. --> 165 (result,) = compute(self, traverse=False, **kwargs)
  26. 166 return result
  27. 167
  28.  
  29. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
  30. 434 keys = [x.__dask_keys__() for x in collections]
  31. 435 postcomputes = [x.__dask_postcompute__() for x in collections]
  32. --> 436 results = schedule(dsk, keys, **kwargs)
  33. 437 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
  34. 438
  35.  
  36. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
  37. 2570 should_rejoin = False
  38. 2571 try:
  39. -> 2572 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  40. 2573 finally:
  41. 2574 for f in futures.values():
  42.  
  43. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
  44. 1870 direct=direct,
  45. 1871 local_worker=local_worker,
  46. -> 1872 asynchronous=asynchronous,
  47. 1873 )
  48. 1874
  49.  
  50. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
  51. 765 else:
  52. 766 return sync(
  53. --> 767 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  54. 768 )
  55. 769
  56.  
  57. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
  58. 332 if error[0]:
  59. 333 typ, exc, tb = error[0]
  60. --> 334 raise exc.with_traceback(tb)
  61. 335 else:
  62. 336 return result[0]
  63.  
  64. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/utils.py in f()
  65. 316 if callback_timeout is not None:
  66. 317 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
  67. --> 318 result[0] = yield future
  68. 319 except Exception as exc:
  69. 320 error[0] = sys.exc_info()
  70.  
  71. ~/miniconda/envs/test/lib/python3.7/site-packages/tornado/gen.py in run(self)
  72. 733
  73. 734 try:
  74. --> 735 value = future.result()
  75. 736 except Exception:
  76. 737 exc_info = sys.exc_info()
  77.  
  78. ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
  79. 1726 exc = CancelledError(key)
  80. 1727 else:
  81. -> 1728 raise exception.with_traceback(traceback)
  82. 1729 raise exc
  83. 1730 if errors == "skip":
  84.  
  85. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/dataframe/core.py in safe_head()
  86. 5739
  87. 5740 def safe_head(df, n):
  88. -> 5741 r = M.head(df, n)
  89. 5742 if len(r) != n:
  90. 5743 msg = (
  91.  
  92. ~/miniconda/envs/test/lib/python3.7/site-packages/dask/utils.py in __call__()
  93. 879
  94. 880 def __call__(self, obj, *args, **kwargs):
  95. --> 881 return getattr(obj, self.method)(*args, **kwargs)
  96. 882
  97. 883 def __reduce__(self):
  98.  
  99. AttributeError: 'NoneType' object has no attribute 'head'
  1. [36]:
  1. def compute_departure_timestamp(df):
  2. hours = df.CRSDepTime // 100
  3. hours_timedelta = pd.to_timedelta(hours, unit='h')
  4.  
  5. minutes = df.CRSDepTime % 100
  6. minutes_timedelta = pd.to_timedelta(minutes, unit='m')
  7.  
  8. return df.Date + hours_timedelta + minutes_timedelta
  9.  
  10. departure_timestamp = df.map_partitions(compute_departure_timestamp)
  11. departure_timestamp.head()
  1. [36]:
  1. 0 1990-01-01 15:40:00
  2. 1 1990-01-02 15:40:00
  3. 2 1990-01-03 15:40:00
  4. 3 1990-01-04 15:40:00
  5. 4 1990-01-05 15:40:00
  6. dtype: datetime64[ns]

Limitations

What doesn’t work?

Dask.dataframe only covers a small but well-used portion of the Pandas API. This limitation is for two reasons:

  • The Pandas API is huge

  • Some operations are genuinely hard to do in parallel (e.g. sort)

Additionally, some important operations like set_index work, but are slower than in Pandas because they include substantial shuffling of data, and may write out to disk.

Learn More

  1. [37]:
  1. client.shutdown()