Data Locality

Data movement often needlessly limits performance.

This is especially true for analytic computations. Dask.distributed minimizesdata movement when possible and enables the user to take control whennecessary. This document describes current scheduling policies and user APIaround data locality.

Current Policies

Task Submission

In the common case distributed runs tasks on workers that already holddependent data. If you have a task f(x) that requires some data x thenthat task will very likely be run on the worker that already holds x.

If a task requires data split among multiple workers, then the scheduler choosesto run the task on the worker that requires the least data transfer to it.The size of each data element is measured by the workers using thesys.getsizeof function, which depends on the sizeof protocolgenerally available on most relevant Python objects.

Data Scatter

When a user scatters data from their local process to the distributed networkthis data is distributed in a round-robin fashion grouping by number of cores.So for example If we have two workers Alice and Bob, each with twocores and we scatter out the list range(10) as follows:

  1. futures = client.scatter(range(10))

Then Alice and Bob receive the following data

  • Alice: [0, 1, 4, 5, 8, 9]
  • Bob: [2, 3, 6, 7]

User Control

Complex algorithms may require more user control.

For example the existence of specialized hardware such as GPUs or databaseconnections may restrict the set of valid workers for a particular task.

In these cases use the workers= keyword argument to the submit,map, or scatter functions, providing a hostname, IP address, or aliasas follows:

  1. future = client.submit(func, *args, workers=['Alice'])
  • Alice: [0, 1, 4, 5, 8, 9, new_result]
  • Bob: [2, 3, 6, 7]

Required data will always be moved to these workers, even if the volume of thatdata is significant. If this restriction is only a preference and not a strictrequirement, then add the allow_other_workers keyword argument to signalthat in extreme cases such as when no valid worker is present, another may beused.

  1. future = client.submit(func, *args, workers=['Alice'],
  2. allow_other_workers=True)

Additionally the scatter function supports a broadcast= keywordargument to enforce that the all data is sent to all workers rather thanround-robined. If new workers arrive they will not automatically receive thisdata.

  1. futures = client.scatter([1, 2, 3], broadcast=True) # send data to all workers
  • Alice: [1, 2, 3]
  • Bob: [1, 2, 3]

Valid arguments for workers= include the following:

  • A single IP addresses, IP/Port pair, or hostname like the following:
  1. 192.168.1.100, 192.168.1.100:8989, alice, alice:8989
  • A list or set of the above:
  1. ['alice'], ['192.168.1.100', '192.168.1.101:9999']

If only a hostname or IP is given then any worker on that machine will beconsidered valid. Additionally, you can provide aliases to workers uponcreation.:

  1. $ dask-worker scheduler_address:8786 --name worker_1

And then use this name when specifying workers instead.

  1. client.map(func, sequence, workers='worker_1')

Specify workers with Compute/Persist

The workers= keyword in scatter, submit, and map is fairlystraightforward, taking either a worker hostname, host:port pair or a sequenceof those as valid inputs:

  1. client.submit(f, x, workers='127.0.0.1')
  2. client.submit(f, x, workers='127.0.0.1:55852')
  3. client.submit(f, x, workers=['192.168.1.101', '192.168.1.100'])

For more complex computations, such as occur with dask collections likedask.dataframe or dask.delayed, we sometimes want to specify that certain partsof the computation run on certain workers while other parts run on otherworkers.

  1. x = delayed(f)(1)
  2. y = delayed(f)(2)
  3. z = delayed(g)(x, y)
  4.  
  5. future = client.compute(z, workers={z: '127.0.0.1',
  6. x: '192.168.0.1'})

Here the values of the dictionary are of the same form as before, a host, ahost:port pair, or a list of these. The keys in this case are either daskcollections or tuples of dask collections. All of the final keys of thesecollections will run on the specified machines; dependencies can run anywhereunless they are also listed in workers=. We explore this through a set ofexamples:

The computation z = f(x, y) runs on the host 127.0.0.1. The othertwo computations for x and y can run anywhere.

  1. future = client.compute(z, workers={z: '127.0.0.1'})

The computations for both z and x must run on 127.0.0.1

  1. future = client.compute(z, workers={z: '127.0.0.1',
  2. x: '127.0.0.1'})

Use a tuple to group collections. This is shorthand for the above.

  1. future = client.compute(z, workers={(x, y): '127.0.0.1'})

Recall that all options for workers= in scatter/submit/map hold here aswell.

  1. future = client.compute(z, workers={(x, y): ['192.168.1.100', '192.168.1.101:9999']})

Set allow_other_workers=True to make these loose restrictions rather thanhard requirements.

  1. future = client.compute(z, workers={(x, y): '127.0.0.1'},
  2. allow_other_workers=True)

Provide a collection to allowother_workers=[…] to say thatthe keys for only some of the collections are loose. In the case below z_must run on 127.0.0.1 while x should run on 127.0.0.1 but canrun elsewhere if necessary:

  1. future = client.compute(z, workers={(x, y): '127.0.0.1'},
  2. allow_other_workers=[x])

This works fine with persist and with any dask collection (any object witha .dask_graph() method):

  1. df = dd.read_csv('s3://...')
  2. df = client.persist(df, workers={df: ...})

See the efficiency page to learn about best practices.