Adaptive Deployments

It is possible to grow and shrink Dask clusters based on current use. Thisallows you to run Dask permanently on your cluster and have it only take upresources when necessary. Dask contains the logic about when to grow andshrink but relies on external cluster managers to launch and killdask-worker jobs. This page describes the policies about adaptivelyresizing Dask clusters based on load, how to connect these policies to aparticular job scheduler, and an example implementation.

Dynamically scaling a Dask cluster up and down requires tight integration withan external cluster management system that can deploy dask-worker jobsthroughout the cluster. Several systems are in wide use today, includingcommon examples like SGE, SLURM, Torque, Condor, LSF, Yarn, Mesos, Marathon,Kubernetes, etc… These systems can be quite different from each other, butall are used to manage distributed services throughout different kinds ofclusters.

The large number of relevant systems, the challenges of rigorously testingeach, and finite development time precludes the systematic inclusion of allsolutions within the dask/distributed repository. Instead, we include ageneric interface that can be extended by someone with basic understanding oftheir cluster management tool. We encourage these as third party modules.

Policies

We control the number of workers based on current load and memory use. Thescheduler checks itself periodically to determine if more or fewer workers areneeded.

If there are excess unclaimed tasks, or if the memory of the current workers ismore nearing full then the scheduler tries to increase the number of workers bya fixed factor, defaulting to 2. This causes exponential growth while growthis useful.

If there are idle workers and if the memory of the current workers is nearingempty then we gracefully retire the idle workers with the least amount of datain memory. We first move these results to the surviving workers and thenremove the idle workers from the cluster. This shrinks the cluster whilegracefully preserving intermediate results, shrinking the cluster when excesssize is not useful.

Adaptive class interface

The distributed.deploy.Adaptive class contains the logic about when to askfor new workers, and when to close idle ones. This class requires both ascheduler and a cluster object.

The cluster object must support two methods, scale_up(n, **kwargs), whichtakes in a target number of total workers for the cluster andscale_down(workers), which takes in a list of addresses to remove from thecluster. The Adaptive class will call these methods with the correct values atthe correct times.

  1. class MyCluster(object):
  2. async def scale_up(self, n, **kwargs):
  3. """
  4. Bring the total count of workers up to ``n``
  5.  
  6. This function/coroutine should bring the total number of workers up to
  7. the number ``n``.
  8.  
  9. This can be implemented either as a function or as a Tornado coroutine.
  10. """
  11. raise NotImplementedError()
  12.  
  13. async def scale_down(self, workers):
  14. """
  15. Remove ``workers`` from the cluster
  16.  
  17. Given a list of worker addresses this function should remove those
  18. workers from the cluster. This may require tracking which jobs are
  19. associated to which worker address.
  20.  
  21. This can be implemented either as a function or as a Tornado coroutine.
  22. """
  23.  
  24. from distributed.deploy import Adaptive
  25.  
  26. scheduler = Scheduler()
  27. cluster = MyCluster()
  28. adapative_cluster = Adaptive(scheduler, cluster)
  29. scheduler.start()

Implementing these scale_up and scale_down functions depends stronglyon the cluster management system. See LocalCluster foran example.

Marathon: an example

We now present an example project that implements this cluster interface backedby the Marathon cluster management tool on Mesos. Full source code and testingapparatus is available here: http://github.com/mrocklin/dask-marathon

The implementation is small. It uses the Marathon HTTP API through themarathon Python client library.We reproduce the full body of the implementation below as an example:

  1. from marathon import MarathonClient, MarathonApp
  2. from marathon.models.container import MarathonContainer
  3.  
  4. class MarathonCluster(object):
  5. def __init__(self, scheduler,
  6. executable='dask-worker',
  7. docker_image='mrocklin/dask-distributed',
  8. marathon_address='http://localhost:8080',
  9. name=None, **kwargs):
  10. self.scheduler = scheduler
  11.  
  12. # Create Marathon App to run dask-worker
  13. args = [executable, scheduler.address,
  14. '--name', '$MESOS_TASK_ID'] # use Mesos task ID as worker name
  15. if 'mem' in kwargs:
  16. args.extend(['--memory-limit',
  17. str(int(kwargs['mem'] * 0.6 * 1e6))])
  18. kwargs['cmd'] = ' '.join(args)
  19. container = MarathonContainer({'image': docker_image})
  20.  
  21. app = MarathonApp(instances=0, container=container, **kwargs)
  22.  
  23. # Connect and register app
  24. self.client = MarathonClient(marathon_address)
  25. self.app = self.client.create_app(name or 'dask-%s' % uuid.uuid4(), app)
  26.  
  27. def scale_up(self, instances):
  28. self.marathon_client.scale_app(self.app.id, instances=instances)
  29.  
  30. def scale_down(self, workers):
  31. for w in workers:
  32. self.marathon_client.kill_task(self.app.id,
  33. self.scheduler.worker_info[w]['name'],
  34. scale=True)

Subclassing Adaptive

The default behaviors of Adaptive controlling when to scale up or down, andby how much, may not be appropriate for your cluster manager or workload. Forexample, you may have tasks that require a worker with more memory than usual.This means we need to pass through some additional keyword arguments tocluster.scale_up call.

  1. from distributed.deploy import Adaptive
  2.  
  3. class MyAdaptive(Adaptive):
  4. def get_scale_up_kwargs(self):
  5. kwargs = super(Adaptive, self).get_scale_up_kwargs()
  6. # resource_restrictions maps task keys to a dict of restrictions
  7. restrictions = self.scheduler.resource_restrictions.values()
  8. memory_restrictions = [x.get('memory') for x in restrictions
  9. if 'memory' in x]
  10.  
  11. if memory_restrictions:
  12. kwargs['memory'] = max(memory_restrictions)
  13.  
  14. return kwargs

So if there are any tasks that are waiting to be run on a worker with enoughmemory, the kwargs dictionary passed to cluster.scale_up will includea key and value for 'memory' (your Cluster.scale_up method needs to beable to support this).