Opportunistic Caching

EXPERIMENTAL FEATURE added to Version 0.6.2 and above - see disclaimer.

Dask usually removes intermediate values as quickly as possible in order tomake space for more data to flow through your computation. However, in somecases, we may want to hold onto intermediate values, because they might beuseful for future computations in an interactive session.

We need to balance the following concerns:

  • Intermediate results might be useful in future unknown computations
  • Intermediate results also fill up memory, reducing space for the rest of ourcurrent computationNegotiating between these two concerns helps us to leverage the memory that wehave available to speed up future, unanticipated computations. Which intermediate resultsshould we keep?

This document explains an experimental, opportunistic caching mechanism that automaticallypicks out and stores useful tasks.

Motivating Example

Consider computing the maximum value of a column in a CSV file:

  1. >>> import dask.dataframe as dd
  2. >>> df = dd.read_csv('myfile.csv')
  3. >>> df.columns
  4. ['first-name', 'last-name', 'amount', 'id', 'timestamp']
  5.  
  6. >>> df.amount.max().compute()
  7. 1000

Even though our full dataset may be too large to fit in memory, the singledf.amount column may be small enough to hold in memory just in case itmight be useful in the future. This is often the case during data exploration,because we investigate the same subset of our data repeatedly before moving on.

For example, we may now want to find the minimum of the amount column:

  1. >>> df.amount.min().compute()
  2. -1000

Under normal operations, this would need to read through the entire CSV file overagain. This is somewhat wasteful and stymies interactive data exploration.

Two Simple Solutions

If we know ahead of time that we want both the maximum and minimum, we cancompute them simultaneously. Dask will share intermediates intelligently,reading through the dataset only once:

  1. >>> dd.compute(df.amount.max(), df.amount.min())
  2. (1000, -1000)

If we know that this column fits in memory, then we can also explicitlycompute the column and then continue forward with straight Pandas:

  1. >>> amount = df.amount.compute()
  2. >>> amount.max()
  3. 1000
  4. >>> amount.min()
  5. -1000

If either of these solutions work for you, great. Otherwise, continue on for a third approach.

Automatic Opportunistic Caching

Another approach is to watch all intermediate computations, and guess whichones might be valuable to keep for the future. Dask has an opportunisticcaching mechanism that stores intermediate tasks that show the followingcharacteristics:

  • Expensive to compute
  • Cheap to store
  • Frequently usedWe can activate a fixed sized cache as a callback:
  1. >>> from dask.cache import Cache
  2. >>> cache = Cache(2e9) # Leverage two gigabytes of memory
  3. >>> cache.register() # Turn cache on globally

Now the cache will watch every small part of the computation and judge thevalue of that part based on the three characteristics listed above (expensiveto compute, cheap to store, and frequently used).

Dask will hold on to 2GB of thebest intermediate results it can find, evicting older results as better resultscome in. If the df.amount column fits in 2GB, then probably all of it willbe stored while we keep working on it.

If we start work on something else,then the df.amount column will likely be evicted to make space for othermore timely results:

  1. >>> df.amount.max().compute() # slow the first time
  2. 1000
  3. >>> df.amount.min().compute() # fast because df.amount is in the cache
  4. -1000
  5. >>> df.id.nunique().compute() # starts to push out df.amount from cache

Cache tasks, not expressions

This caching happens at the low-level scheduling layer, not the high-levelDask DataFrame or Dask Array layer. We don’t explicitly cache the columndf.amount. Instead, we cache the hundreds of small pieces of that columnthat form the dask graph. It could be that we end up caching only a fractionof the column.

This means that the opportunistic caching mechanism described above works for all Daskcomputations, as long as those computations employ a consistent naming scheme(as all of Dask DataFrame, Dask Array, and Dask Delayed do).

You can see which tasks are held by the cache by inspecting the followingattributes of the cache object:

  1. >>> cache.cache.data
  2. <stored values>
  3. >>> cache.cache.heap.heap
  4. <scores of items in cache>
  5. >>> cache.cache.nbytes
  6. <number of bytes per item in cache>

The cache object is powered by cachey, a tiny library for opportunisticcaching.

Disclaimer

This feature is still experimental, and can cause your computation to fill up RAM.

Restricting your cache to a fixed size like 2GB requires Dask to accurately countthe size of each of our objects in memory. This can be tricky, particularlyfor Pythonic objects like lists and tuples, and for DataFrames that containobject dtypes.

It is entirely possible that the caching mechanism willundercount the size of objects, causing it to use up more memory thananticipated, which can lead to blowing up RAM and crashing your session.