Launch Tasks from Tasks

Sometimes it is convenient to launch tasks from other tasks.For example you may not know what computations to run until you have theresults of some initial computations.

Motivating example

We want to download one piece of data and turn it into a list. Then we want tosubmit one task for every element of that list. We don’t know how long thelist will be until we have the data.

So we send off our original download_and_convert_to_list function, whichdownloads the data and converts it to a list on one of our worker machines:

  1. future = client.submit(download_and_convert_to_list, uri)

But now we need to submit new tasks for individual parts of this data. We havethree options.

  • Gather the data back to the local process and then submit new jobs from thelocal process
  • Gather only enough information about the data back to the local process andsubmit jobs from the local process
  • Submit a task to the cluster that will submit other tasks directly fromthat worker

Gather the data locally

If the data is not large then we can bring it back to the client to perform thenecessary logic on our local machine:

  1. >>> data = future.result() # gather data to local process
  2. >>> data # data is a list
  3. [...]
  4.  
  5. >>> futures = e.map(process_element, data) # submit new tasks on data
  6. >>> analysis = e.submit(aggregate, futures) # submit final aggregation task

This is straightforward and, if data is small then it is probably thesimplest, and therefore correct choice. However, if data is large then wehave to choose another option.

Submit tasks from client

We can run small functions on our remote data to determine enough to submit theright kinds of tasks. In the following example we compute the len functionon data remotely and then break up data into its various elements.

  1. >>> n = client.submit(len, data) # compute number of elements
  2. >>> n = n.result() # gather n (small) locally
  3.  
  4. >>> from operator import getitem
  5. >>> elements = [client.submit(getitem, data, i) for i in range(n)] # split data
  6.  
  7. >>> futures = client.map(process_element, elements)
  8. >>> analysis = client.submit(aggregate, futures)

We compute the length remotely, gather back this very small result, and thenuse it to submit more tasks to break up the data and process on the cluster.This is more complex because we had to go back and forth a couple of timesbetween the cluster and the local process, but the data moved was very small,and so this only added a few milliseconds to our total processing time.

Extended Example

Computing the Fibonacci numbers creates involves a recursive function. When thefunction is run, it calls itself using values it computed. We will use this asan example throughout this documentation to illustrate different techniques ofsubmitting tasks from tasks.

  1. def fib(n):
  2. if n < 2:
  3. return n
  4. a = fib(n - 1)
  5. b = fib(n - 2)
  6. return a + b
  7.  
  8. print(fib(10)) # prints "55"

We will use this example to show the different interfaces.

Submit tasks from worker

Note: this interface is new and experimental. It may be changed withoutwarning in future versions.

We can submit tasks from other tasks. This allows us to make decisions whileon worker nodes.

To submit new tasks from a worker that worker must first create a new clientobject that connects to the scheduler. There are three options for this:

  • dask.delayed and dask.compute
  • get_client with secede and rejoin
  • worker_client

dask.delayed

The Dask delayed behaves as normal: it submits the functions to the graph,optimizes for less bandwidth/computation and gathers the results. For moredetail, see dask.delayed.

  1. from distributed import Client
  2. from dask import delayed, compute
  3.  
  4.  
  5. @delayed
  6. def fib(n):
  7. if n < 2:
  8. return n
  9. # We can use dask.delayed and dask.compute to launch
  10. # computation from within tasks
  11. a = fib(n - 1) # these calls are delayed
  12. b = fib(n - 2)
  13. a, b = compute(a, b) # execute both in parallel
  14. return a + b
  15.  
  16. if __name__ == "__main__":
  17. # these features require the dask.distributed scheduler
  18. client = Client()
  19.  
  20. result = fib(10).compute()
  21. print(result) # prints "55"

Getting the client on a worker

The get_client function provides a normalClient object that gives full access to the dask cluster, including the abilityto submit, scatter, and gather results.

  1. from distributed import Client, get_client, secede, rejoin
  2.  
  3. def fib(n):
  4. if n < 2:
  5. return n
  6. client = get_client()
  7. a_future = client.submit(fib, n - 1)
  8. b_future = client.submit(fib, n - 2)
  9. a, b = client.gather([a_future, b_future])
  10. return a + b
  11.  
  12. if __name__ == "__main__":
  13. client = Client()
  14. future = client.submit(fib, 10)
  15. result = future.result()
  16. print(result) # prints "55"

However, this can deadlock the scheduler if too many tasks request jobs atonce. Each task does not communicate to the scheduler that they are waiting onresults and are free to compute other tasks. This can deadlock the cluster ifevery scheduling slot is running a task and they all request more tasks.

To avoid this deadlocking issue we can use secede and rejoin. Thesefunctions will remove and rejoin the current task from the clusterrespectively.

  1. def fib(n):
  2. if n < 2:
  3. return n
  4. client = get_client()
  5. a_future = client.submit(fib, n - 1)
  6. b_future = client.submit(fib, n - 2)
  7. secede()
  8. a, b = client.gather([a_future, b_future])
  9. rejoin()
  10. return a + b

Connection with context manager

The worker_client function performs thesame task as get_client, but is implementedas a context manager. Using worker_client as a context manager ensures proper cleanup on theworker.

  1. from dask.distributed import worker_client
  2.  
  3.  
  4. def fib(n):
  5. if n < 2:
  6. return n
  7. with worker_client() as client:
  8. a_future = client.submit(fib, n - 1)
  9. b_future = client.submit(fib, n - 2)
  10. a, b = client.gather([a_future, b_future])
  11. return a + b
  12.  
  13. if __name__ == "__main__":
  14. client = Client()
  15. future = client.submit(fib, 10)
  16. result = future.result()
  17. print(result) # prints "55"

Tasks that invoke worker_client areconservatively assumed to be long running. They can take a long time,waiting for other tasks to finish, gathering results, etc. In order to avoidhaving them take up processing slots the following actions occur whenever atask invokes worker_client.

  • The thread on the worker running this function secedes from the threadpool and goes off on its own. This allows the thread pool to populate thatslot with a new thread and continue processing additional tasks withoutcounting this long running task against its normal quota.
  • The Worker sends a message back to the scheduler temporarily increasing itsallowed number of tasks by one. This likewise lets the scheduler allocatemore tasks to this worker, not counting this long running task against it.

Establishing a connection to the scheduler takes a few milliseconds and so itis wise for computations that use this feature to be at least a few timeslonger in duration than this.