Development Guidelines

This repository is part of the Dask projects. General development guidelinesincluding where to ask for help, a layout of repositories, testing practices,and documentation and style standards are available at the Dask developerguidelines in the main documentation.

Install

After setting up an environment as described in the Dask developerguidelines you can clone this repository with git:

  1. git clone git@github.com:dask/distributed.git

and install it from source:

  1. cd distributed
  2. python setup.py install

Test

Test using py.test:

  1. py.test distributed --verbose

Tornado

Dask.distributed is a Tornado TCP application. Tornado provides us with both acommunication layer on top of sockets, as well as a syntax for writingasynchronous coroutines, similar to asyncio. You can make modest changes tothe policies within this library without understanding much about Tornado,however moderate changes will probably require you to understand TornadoIOLoops, coroutines, and a little about non-blocking communication.. TheTornado API documentation is quite good and we recommend that you read thefollowing resources:

Additionally, if you want to interact at a low level with the communicationbetween workers and scheduler then you should understand the TornadoTCPServer and IOStream available here:

Dask.distributed wraps a bit of logic around Tornado. SeeFoundations for more information.

Writing Tests

Testing distributed systems is normally quite difficult because it is difficultto inspect the state of all components when something goes wrong. Fortunately,the non-blocking asynchronous model within Tornado allows us to run ascheduler, multiple workers, and multiple clients all within a single thread.This gives us predictable performance, clean shutdowns, and the ability to dropinto any point of the code during execution.At the same time, sometimes we want everything to run in different processes inorder to simulate a more realistic setting.

The test suite contains three kinds of tests

  • @gen_cluster: Fully asynchronous tests where all components live in thesame event loop in the main thread. These are good for testing complexlogic and inspecting the state of the system directly. They are alsoeasier to debug and cause the fewest problems with shutdowns.
  • def test_foo(client): Tests with multiple processes forked from the masterprocess. These are good for testing the synchronous (normal user) API andwhen triggering hard failures for resilience tests.
  • popen: Tests that call out to the command line to start the system.These are rare and mostly for testing the command line interface.If you are comfortable with the Tornado interface then you will be happiestusing the @gen_cluster style of test
  1. from distributed.utils_test import gen_cluster
  2.  
  3. @gen_cluster(client=True)
  4. def test_submit(c, s, a, b):
  5. assert isinstance(c, Client)
  6. assert isinstance(s, Scheduler)
  7. assert isinstance(a, Worker)
  8. assert isinstance(b, Worker)
  9.  
  10. future = c.submit(inc, 1)
  11. assert future.key in c.futures
  12.  
  13. # result = future.result() # This synchronous API call would block
  14. result = yield future
  15. assert result == 2
  16.  
  17. assert future.key in s.tasks
  18. assert future.key in a.data or future.key in b.data

The @gencluster decorator sets up a scheduler, client, and workers foryou and cleans them up after the test. It also allows you to directly inspectthe state of every element of the cluster directly. However, you can not usethe normal synchronous API (doing so will cause the test to wait forever) andinstead you need to use the coroutine API, where all blocking functions areprepended with an underscore (). Beware, it is a common mistake to usethe blocking interface within these tests.

If you want to test the normal synchronous API you can use the clientpytest fixture style test, which sets up a scheduler and workers for you indifferent forked processes:

  1. from distributed.utils_test import client
  2.  
  3. def test_submit(client):
  4. future = client.submit(inc, 10)
  5. assert future.result() == 11

Additionally, if you want access to the scheduler and worker processes you canalso add the s, a, b fixtures as well.

  1. from distributed.utils_test import client
  2.  
  3. def test_submit(client, s, a, b):
  4. future = client.submit(inc, 10)
  5. assert future.result() == 11 # use the synchronous/blocking API here
  6.  
  7. a['proc'].terminate() # kill one of the workers
  8.  
  9. result = future.result() # test that future remains valid
  10. assert result == 2

In this style of test you do not have access to the scheduler or workers. Thevariables s, a, b are now dictionaries holding amultiprocessing.Process object and a port integer. However, you can nowuse the normal synchronous API (never use yield in this style of test) and youcan close processes easily by terminating them.

Typically for most user-facing functions you will find both kinds of tests.The @gen_cluster tests test particular logic while the client pytestfixture tests test basic interface and resilience.

You should avoid popen style tests unless absolutely necessary, such as ifyou need to test the command line interface.