Foundations

You should read through the quickstart before reading this document.

Distributed computing is hard for two reasons:

  • Consistent coordination of distributed systems requires sophistication
  • Concurrent network programming is tricky and error prone

The foundations of dask.distributed provide abstractions to hide somecomplexity of concurrent network programming (#2). These abstractions ease theconstruction of sophisticated parallel systems (#1) in a safer environment.However, as with all layered abstractions, ours has flaws. Critical feedbackis welcome.

Concurrency with Tornado Coroutines

Worker and Scheduler nodes operate concurrently. They serve several overlappingrequests and perform several overlapping computations at the same time withoutblocking. There are several approaches for concurrent programming, we’vechosen to use Tornado for the following reasons:

  • Developing and debugging is more comfortable without threads
  • Tornado’s documentation is excellent
  • Stackoverflow coverage is excellent
  • Performance is satisfactory

Endpoint-to-endpoint Communication

The various distributed endpoints (Client, Scheduler, Worker) communicateby sending each other arbitrary Python objects. Encoding, sending and thendecoding those objects is the job of the communication layer.

Ancillary services such as a Bokeh-based Web interface, however, have theirown implementation and semantics.

Protocol Handling

While the abstract communication layer can transfer arbitrary Pythonobjects (as long as they are serializable), participants in a distributedcluster concretely obey the distributed Protocol, which specifiesrequest-response semantics using a well-defined message format.

Dedicated infrastructure in distributed handles the various aspectsof the protocol, such as dispatching the various operations supported byan endpoint.

Servers

Worker, Scheduler, and Nanny objects all inherit from a Server class.

  • class distributed.core.Server(handlers, blocked_handlers=None, stream_handlers=None, connection_limit=512, deserialize=True, io_loop=None)[source]
  • Dask Distributed Server

Superclass for endpoints in a distributed cluster, such as Workerand Scheduler objects.

Handlers

Servers define operations with a handlers dict mapping operation namesto functions. The first argument of a handler function will be a Commfor the communication established with the client. Other argumentswill receive inputs from the keys of the incoming message which willalways be a dictionary.

  1. >>> def pingpong(comm):
  2. ... return b'pong'
  1. >>> def add(comm, x, y):
  2. ... return x + y
  1. >>> handlers = {'ping': pingpong, 'add': add}
  2. >>> server = Server(handlers) # doctest: +SKIP
  3. >>> server.listen('tcp://0.0.0.0:8000') # doctest: +SKIP

Message Format

The server expects messages to be dictionaries with a special key, _‘op’_that corresponds to the name of the operation, and other key-value pairs asrequired by the function.

So in the example above the following would be good messages.

  • {'op': 'ping'}
  • {'op': 'add', 'x': 10, 'y': 20}

RPC

To interact with remote servers we typically use rpc objects whichexpose a familiar method call interface to invoke remote operations.

  • class distributed.core.rpc(arg=None, comm=None, deserialize=True, timeout=None, connection_args=None, serializers=None, deserializers=None)[source]
  • Conveniently interact with a remote server
  1. >>> remote = rpc(address) # doctest: +SKIP
  2. >>> response = yield remote.add(x=10, y=20) # doctest: +SKIP

One rpc object can be reused for several interactions.Additionally, this object creates and destroys many comms as necessaryand so is safe to use in multiple overlapping communications.

When done, close comms explicitly.

  1. >>> remote.close_comms() # doctest: +SKIP

Examples

Here is a small example using distributed.core to create and interact with acustom server.

Server Side

  1. import asyncio
  2. from distributed.core import Server
  3.  
  4. def add(comm, x=None, y=None): # simple handler, just a function
  5. return x + y
  6.  
  7. async def stream_data(comm, interval=1): # complex handler, multiple responses
  8. data = 0
  9. while True:
  10. await asyncio.sleep(interval)
  11. data += 1
  12. await comm.write(data)
  13.  
  14. s = Server({'add': add, 'stream_data': stream_data})
  15. s.listen('tcp://:8888') # listen on TCP port 8888
  16.  
  17. asyncio.get_event_loop().run_forever()

Client Side

  1. import asyncio
  2. from distributed.core import connect
  3.  
  4. async def f():
  5. comm = await connect('tcp://127.0.0.1:8888')
  6. await comm.write({'op': 'add', 'x': 1, 'y': 2})
  7. result = await comm.read()
  8. await comm.close()
  9. print(result)
  10.  
  11. >>> asyncio.get_event_loop().run_until_complete(g())
  12. 3
  13.  
  14. async def g():
  15. comm = await connect('tcp://127.0.0.1:8888')
  16. await comm.write({'op': 'stream_data', 'interval': 1})
  17. while True:
  18. result = await comm.read()
  19. print(result)
  20.  
  21. >>> asyncio.get_event_loop().run_until_complete(g())
  22. 1
  23. 2
  24. 3
  25. ...

Client Side with rpc

RPC provides a more pythonic interface. It also provides other benefits, suchas using multiple streams in concurrent cases. Most distributed code usesrpc. The exception is when we need to perform multiple reads or writes, aswith the stream data case above.

  1. import asyncio
  2. from distributed.core import rpc
  3.  
  4. async def f():
  5. # comm = await connect('tcp://127.0.0.1', 8888)
  6. # await comm.write({'op': 'add', 'x': 1, 'y': 2})
  7. # result = await comm.read()
  8. with rpc('tcp://127.0.0.1:8888') as r:
  9. result = await r.add(x=1, y=2)
  10.  
  11. print(result)
  12.  
  13. >>> asyncio.get_event_loop().run_until_complete(f())
  14. 3