Journey of a Task

We follow a single task through the user interface, scheduler, worker nodes,and back. Hopefully this helps to illustrate the inner workings of the system.

User code

A user computes the addition of two variables already on the cluster, then pulls the result back to the local process.

  1. client = Client('host:port')
  2. x = e.submit(...)
  3. y = e.submit(...)
  4.  
  5. z = client.submit(add, x, y) # we follow z
  6.  
  7. print(z.result())

Step 1: Client

z begins its life when the Client.submit function sends the followingmessage to the Scheduler:

  1. {'op': 'update-graph',
  2. 'tasks': {'z': (add, x, y)},
  3. 'keys': ['z']}

The client then creates a Future object with the key 'z' and returnsthat object back to the user. This happens even before the message has beenreceived by the scheduler. The status of the future says 'pending'.

Step 2: Arrive in the Scheduler

A few milliseconds later, the scheduler receives this message on an open socket.

The scheduler updates its state with this little graph that shows how to computez:

  1. scheduler.update_graph(tasks=msg['tasks'], keys=msg['keys'])

The scheduler also updates a lot of other state. Notably, it has to identifythat x and y are themselves variables, and connect all of thosedependencies. This is a long and detail oriented process that involvesupdating roughly 10 sets and dictionaries. Interested readers shouldinvestigate distributed/scheduler.py::update_graph(). While this is fairlycomplex and tedious to describe rest assured that it all happens in constanttime and in about a millisecond.

Step 3: Select a Worker

Once the latter of x and y finishes, the scheduler notices that all ofz’s dependencies are in memory and that z itself may now run. Which workershould z select? We consider a sequence of criteria:

  • First, we quickly downselect to only those workers that have either xor y in local memory.
  • Then, we select the worker that would have to gather the least number ofbytes in order to get both x and y locally. E.g. if two differentworkers have x and y and if y takes up more bytes than xthen we select the machine that holds y so that we don’t have tocommunicate as much.
  • If there are multiple workers that require the minimum number ofcommunication bytes then we select the worker that is the least busy

z considers the workers and chooses one based on the above criteria. In thecommon case the choice is pretty obvious after step 1. z waits on a stackassociated with the chosen worker. The worker may still be busy though, so zmay wait a while.

Note: This policy is under flux and this part of this document is quitepossibly out of date.

Step 4: Transmit to the Worker

Eventually the worker finishes a task, has a spare core, and z finds itself atthe top of the stack (note, that this may be some time after the last sectionif other tasks placed themselves on top of the worker’s stack in the meantime.)

We place z into a worker_queue associated with that worker and aworker_core coroutine pulls it out. z’s function, the keys associatedto its arguments, and the locations of workers that hold those keys are packedup into a message that looks like this:

  1. {'op': 'compute',
  2. 'function': execute_task,
  3. 'args': ((add, 'x', 'y'),),
  4. 'who_has': {'x': {(worker_host, port)},
  5. 'y': {(worker_host, port), (worker_host, port)}},
  6. 'key': 'z'}

This message is serialized and sent across a TCP socket to the worker.

Step 5: Execute on the Worker

The worker unpacks the message, and notices that it needs to have both xand y. If the worker does not already have both of these then it gathersthem from the workers listed in the who_has dictionary also in the message.For each key that it doesn’t have it selects a valid worker from who_has atrandom and gathers data from it.

After this exchange, the worker has both the value for x and the value fory. So it launches the computation add(x, y) in a localThreadPoolExecutor and waits on the result.

In the mean time the worker repeats this process concurrently for other tasks.Nothing blocks.

Eventually the computation completes. The Worker stores this result in itslocal memory:

  1. data['x'] = ...

And transmits back a success, and the number of bytes of the result:

  1. Worker: Hey Scheduler, 'z' worked great.
  2. I'm holding onto it.
  3. It takes up 64 bytes.

The worker does not transmit back the actual value for z.

Step 6: Scheduler Aftermath

The scheduler receives this message and does a few things:

  • It notes that the worker has a free core, and sends up another task ifavailable
  • If x or y are no longer needed then it sends a message out torelevant workers to delete them from local memory.
  • It sends a message to all of the clients that z is ready and so allclient Future objects that are currently waiting should, wake up. Inparticular, this wakes up the z.result() command executed by the useroriginally.

Step 7: Gather

When the user calls z.result() they wait both on the completion of thecomputation and for the computation to be sent back over the wire to the localprocess. Usually this isn’t necessary, usually you don’t want to move databack to the local process but instead want to keep in on the cluster.

But perhaps the user really wanted to actually know this value, so they calledz.result().

The scheduler checks who has z and sends them a message asking for the result.This message doesn’t wait in a queue or for other jobs to complete, it startsinstantly. The value gets serialized, sent over TCP, and then deserialized andreturned to the user (passing through a queue or two on the way.)

Step 8: Garbage Collection

The user leaves this part of their code and the local variable z goes outof scope. The Python garbage collector cleans it up. This triggers adecremented reference on the client (we didn’t mention this, but when wecreated the Future we also started a reference count.) If this is the onlyinstance of a Future pointing to z then we send a message up to thescheduler that it is OK to release z. The user no longer requires it topersist.

The scheduler receives this message and, if there are no computations thatmight depend on z in the immediate future, it removes elements of this keyfrom local scheduler state and adds the key to a list of keys to be deletedperiodically. Every 500 ms a message goes out to relevant workers telling themwhich keys they can delete from their local memory. The graph/recipe to createthe result of z persists in the scheduler for all time.

Overhead

The user experiences this in about 10 milliseconds, depending on networklatency.