Asynchronous Operation

Dask can run fully asynchronously and so interoperate with other highlyconcurrent applications. Internally Dask is built on top of Tornado coroutinesbut also has a compatibility layer for asyncio (see below).

Basic Operation

When starting a client provide the asynchronous=True keyword to tell Daskthat you intend to use this client within an asynchronous context, such as afunction defined with async/await syntax.

  1. async def f():
  2. client = await Client(asynchronous=True)

Operations that used to block now provide Tornado coroutines on which you canawait.

Fast functions that only submit work remain fast and don’t need to be awaited.This includes all functions that submit work to the cluster, like submit,map, compute, and persist.

  1. future = client.submit(lambda x: x + 1, 10)

You can await futures directly

  1. result = await future
  2.  
  3. >>> print(result)
  4. 11

Or you can use the normal client methods. Any operation that waited until itreceived information from the scheduler should now be await’ed.

  1. result = await client.gather(future)

If you want to use an asynchronous function with a synchronous Client(one made without the asynchronous=True keyword) then you can apply theasynchronous=True keyword at each method call and use the Client.syncfunction to run the asynchronous function:

  1. from dask.distributed import Client
  2.  
  3. client = Client() # normal blocking client
  4.  
  5. async def f():
  6. future = client.submit(lambda x: x + 1, 10)
  7. result = await client.gather(future, asynchronous=True)
  8. return result
  9.  
  10. client.sync(f)

Python 2 Compatibility

Everything here works with Python 2 if you replace await with yield.See more extensive comparison in the example below.

Example

This self-contained example starts an asynchronous client, submits a trivialjob, waits on the result, and then shuts down the client. You can seeimplementations for Python 2 and 3 and for Asyncio and Tornado.

Python 3 with Tornado or Asyncio

  1. from dask.distributed import Client
  2.  
  3. async def f():
  4. client = await Client(asynchronous=True)
  5. future = client.submit(lambda x: x + 1, 10)
  6. result = await future
  7. await client.close()
  8. return result
  9.  
  10. # Either use Tornado
  11. from tornado.ioloop import IOLoop
  12. IOLoop().run_sync(f)
  13.  
  14. # Or use asyncio
  15. import asyncio
  16. asyncio.get_event_loop().run_until_complete(f())

Python 2/3 with Tornado

  1. from dask.distributed import Client
  2. from tornado import gen
  3.  
  4. @gen.coroutine
  5. def f():
  6. client = yield Client(asynchronous=True)
  7. future = client.submit(lambda x: x + 1, 10)
  8. result = yield future
  9. yield client.close()
  10. raise gen.Return(result)
  11.  
  12. from tornado.ioloop import IOLoop
  13. IOLoop().run_sync(f)

Use Cases

Historically this has been used in a few kinds of applications:

  • To integrate Dask into other asynchronous services (such as web backends),supplying a computational engine similar to Celery, but while stillmaintaining a high degree of concurrency and not blocking needlessly.
  • For computations that change or update state very rapidly, such as iscommon in some advanced machine learning workloads.
  • To develop the internals of Dask’s distributed infrastucture, which iswritten entirely in this style.
  • For complex control and data structures in advanced applications.