Client

The Client is the primary entry point for users of dask.distributed.

After we setup a cluster, we initialize a Client by pointingit to the address of a Scheduler:

  1. >>> from distributed import Client
  2. >>> client = Client('127.0.0.1:8786')

There are a few different ways to interact with the cluster through the client:

  • The Client satisfies most of the standard concurrent.futures - PEP-3148interface with .submit, .map functions and Future objects,allowing the immediate and direct submission of tasks.
  • The Client registers itself as the default Dask scheduler, and so runs alldask collections like dask.array, dask.bag, dask.dataframe and dask.delayed
  • The Client has additional methods for manipulating data remotely. See thefull API for a thorough list.

Concurrent.futures

We can submit individual function calls with the client.submit method ormany function calls with the client.map method

  1. >>> def inc(x):
  2. return x + 1
  3.  
  4. >>> x = client.submit(inc, 10)
  5. >>> x
  6. <Future - key: inc-e4853cffcc2f51909cdb69d16dacd1a5>
  7.  
  8. >>> L = client.map(inc, range(1000))
  9. >>> L
  10. [<Future - key: inc-e4853cffcc2f51909cdb69d16dacd1a5>,
  11. <Future - key: inc-...>,
  12. <Future - key: inc-...>,
  13. <Future - key: inc-...>, ...]

These results live on distributed workers.

We can submit tasks on futures. The function will go to the machine where thefutures are stored and run on the result once it has completed.

  1. >>> y = client.submit(inc, x) # Submit on x, a Future
  2. >>> total = client.submit(sum, L) # Map on L, a list of Futures

We gather back the results using either the Future.result method for singlefutures or client.gather method for many futures at once.

  1. >>> x.result()
  2. 11
  3.  
  4. >>> client.gather(L)
  5. [1, 2, 3, 4, 5, ...]

But, as always, we want to minimize communicating results back to the localprocess. It’s often best to leave data on the cluster and operate on itremotely with functions like submit, map, get and compute.See efficiency for more information on efficient use ofdistributed.

Dask

The parent library Dask contains objects like dask.array, dask.dataframe,dask.bag, and dask.delayed, which automatically produce parallel algorithmson larger datasets. All dask collections work smoothly with the distributedscheduler.

When we create a Client object it registers itself as the default Daskscheduler. All .compute() methods will automatically start using thedistributed system.

  1. client = Client('scheduler:8786')
  2.  
  3. my_dataframe.sum().compute() # Now uses the distributed system by default

We can stop this behavior by using the set_as_default=False keywordargument when starting the Client.

Dask’s normal .compute() methods are synchronous, meaning that they blockthe interpreter until they complete. Dask.distributed allows the new abilityof asynchronous computing, we can trigger computations to occur in thebackground and persist in memory while we continue doing other work. This istypically handled with the Client.persist and Client.compute methodswhich are used for larger and smaller result sets respectively.

  1. >>> df = client.persist(df) # trigger all computations, keep df in memory
  2. >>> type(df)
  3. dask.DataFrame

For more information see the page on Managing Computation.

Pure Functions by Default

By default we assume that all functions are pure. If this is not the case weshould use the pure=False keyword argument.

The client associates a key to all computations. This key is accessible onthe Future object.

  1. >>> from operator import add
  2. >>> x = client.submit(add, 1, 2)
  3. >>> x.key
  4. 'add-ebf39f96ad7174656f97097d658f3fa2'

This key should be the same across all computations with the same inputs andacross all machines. If we run the computation above on any computer with thesame environment then we should get the exact same key.

The scheduler avoids redundant computations. If the result is already inmemory from a previous call then that old result will be used rather thanrecomputing it. Calls to submit or map are idempotent in the common case.

While convenient, this feature may be undesired for impure functions, likerandom. In these cases two calls to the same function with the same inputsshould produce different results. We accomplish this with the pure=Falsekeyword argument. In this case keys are randomly generated (by uuid4.)

  1. >>> import numpy as np
  2. >>> client.submit(np.random.random, 1000, pure=False).key
  3. 'random_sample-fc814a39-ee00-42f3-8b6f-cac65bcb5556'
  4. >>> client.submit(np.random.random, 1000, pure=False).key
  5. 'random_sample-a24e7220-a113-47f2-a030-72209439f093'

Async/await Operation

If we are operating in an asynchronous environment then the blocking functionslisted above become asynchronous equivalents. You must start your clientwith the asynchronous=True keyword and yield or await blockingfunctions.

  1. async def f():
  2. client = await Client(asynchronous=True)
  3. future = client.submit(func, *args)
  4. result = await future
  5. return result

If you want to reuse the same client in asynchronous and synchronousenvironments you can apply the asynchronous=True keyword at each methodcall.

  1. client = Client() # normal blocking client
  2.  
  3. async def f():
  4. futures = client.map(func, L)
  5. results = await client.gather(futures, asynchronous=True)
  6. return results

See the Asynchronous documentation for more information.

For more information on how to use dask.distributed you may want to look at thefollowing pages: