Worker

Overview

Workers provide two functions:

  • Compute tasks as directed by the scheduler
  • Store and serve computed results to other workers or clients

Each worker contains a ThreadPool that it uses to evaluate tasks as requestedby the scheduler. It stores the results of these tasks locally and serves themto other workers or clients on demand. If the worker is asked to evaluate atask for which it does not have all of the necessary data then it will reachout to its peer workers to gather the necessary dependencies.

A typical conversation between a scheduler and two workers Alice and Bob maylook like the following:

  1. Scheduler -> Alice: Compute ``x <- add(1, 2)``!
  2. Alice -> Scheduler: I've computed x and am holding on to it!
  3.  
  4. Scheduler -> Bob: Compute ``y <- add(x, 10)``!
  5. You will need x. Alice has x.
  6. Bob -> Alice: Please send me x.
  7. Alice -> Bob: Sure. x is 3!
  8. Bob -> Scheduler: I've computed y and am holding on to it!

Storing Data

Data is stored locally in a dictionary in the .data attribute thatmaps keys to the results of function calls.

  1. >>> worker.data
  2. {'x': 3,
  3. 'y': 13,
  4. ...
  5. '(df, 0)': pd.DataFrame(...),
  6. ...
  7. }

This .data attribute is a MutableMapping that is typically acombination of in-memory and on-disk storage with an LRU policy to move databetween them.

Thread Pool

Each worker sends computations to a thread in aconcurrent.futures.ThreadPoolExecutorfor computation. These computations occur in the same process as the Workercommunication server so that they can access and share data efficiently betweeneach other. For the purposes of data locality all threads within a worker areconsidered the same worker.

If your computations are mostly numeric in nature (for example NumPy and Pandascomputations) and release the GIL entirely then it is advisable to rundask-worker processes with many threads and one process. This reducescommunication costs and generally simplifies deployment.

If your computations are mostly Python code and don’t release the GIL then itis advisable to run dask-worker processes with many processes and onethread per process:

  1. $ dask-worker scheduler:8786 --nprocs 8 --nthreads 1

This will launch 8 worker processes each of which has its ownThreadPoolExecutor of size 1.

If your computations are external to Python and long-running and don’t releasethe GIL then beware that while the computation is running the worker processwill not be able to communicate to other workers or to the scheduler. Thissituation should be avoided. If you don’t link in your own custom C/Fortrancode then this topic probably doesn’t apply.

Command Line tool

Use the dask-worker command line tool to start an individual worker. Hereare the available options:

  1. $ dask-worker --help
  2. Usage: dask-worker [OPTIONS] SCHEDULER
  3.  
  4. Options:
  5. --worker-port INTEGER Serving worker port, defaults to randomly assigned
  6. --http-port INTEGER Serving http port, defaults to randomly assigned
  7. --nanny-port INTEGER Serving nanny port, defaults to randomly assigned
  8. --port INTEGER Deprecated, see --nanny-port
  9. --host TEXT Serving host. Defaults to an ip address that can
  10. hopefully be visible from the scheduler network.
  11. --nthreads INTEGER Number of threads per process. Defaults to number of
  12. cores
  13. --nprocs INTEGER Number of worker processes to launch. Defaults to one.
  14. --name TEXT Alias
  15. --memory-limit TEXT Maximum bytes of memory that this worker should use.
  16. Use 0 for unlimited, or 'auto' for
  17. TOTAL_MEMORY * min(1, nthreads / total_nthreads)
  18. --no-nanny
  19. --help Show this message and exit.

Internal Scheduling

Internally tasks that come to the scheduler proceed through the followingpipeline: Dask worker task states The worker also tracks data dependencies that are required to run the tasksabove. These follow through a simpler pipeline: Dask worker dependency states As tasks arrive they are prioritized and put into a heap. They are then takenfrom this heap in turn to have any remote dependencies collected. For eachdependency we select a worker at random that has that data and collect thedependency from that worker. To improve bandwidth we opportunistically gatherother dependencies of other tasks that are known to be on that worker, up to amaximum of 200MB of data (too little data and bandwidth suffers, too much dataand responsiveness suffers). We use a fixed number of connections (around10-50) so as to avoid overly-fragmenting our network bandwidth. After alldependencies for a task are in memory we transition the task to the ready stateand put the task again into a heap of tasks that are ready to run.

We collect from this heap and put the task into a thread from a local threadpool to execute.

Optionally, this task may identify itself as a long-running task (seeTasks launching tasks), at which point it secedes from thethread pool.

A task either errs or its result is put into memory. In either case a responseis sent back to the scheduler.

Memory Management

Workers are given a target memory limit to stay under with thecommand line —memory-limit keyword or the memory_limit= Pythonkeyword argument, which sets the memory limit per worker processes launchedby dask-worker

  1. $ dask-worker tcp://scheduler:port --memory-limit=auto # TOTAL_MEMORY * min(1, nthreads / total_nthreads)
  2. $ dask-worker tcp://scheduler:port --memory-limit=4e9 # four gigabytes per worker process.

Workers use a few different heuristics to keep memory use beneath this limit:

  • At 60% of memory load (as estimated by sizeof), spill least recently used data to disk
  • At 70% of memory load, spill least recently used data to disk regardless ofwhat is reported by sizeof
  • At 80% of memory load, stop accepting new work on local thread pool
  • At 95% of memory load, terminate and restart the worker These values can be configured by modifying the ~/.config/dask/distributed.yaml file
  1. distributed:
  2. worker:
  3. # Fractions of worker memory at which we take action to avoid memory blowup
  4. # Set any of the lower three values to False to turn off the behavior entirely
  5. memory:
  6. target: 0.60 # target fraction to stay below
  7. spill: 0.70 # fraction at which we spill to disk
  8. pause: 0.80 # fraction at which we pause worker threads
  9. terminate: 0.95 # fraction at which we terminate the worker

Spill data to Disk

Every time the worker finishes a task it estimates the size in bytes that theresult costs to keep in memory using the sizeof function. This functiondefaults to sys.getsizeof for arbitrary objects which uses the standardPython sizeof protocol, but also hasspecial-cased implementations for common data types like NumPy arrays andPandas dataframes.

When the sum of the number of bytes of the data in memory exceeds 60% of theavailable threshold the worker will begin to dump the least recently used datato disk. You can control this location with the —local-directorykeyword.:

  1. $ dask-worker tcp://scheduler:port --memory-limit 4e9 --local-directory /scratch

That data is still available and will be read back from disk when necessary.On the diagnostic dashboard status page disk I/O will show up in the taskstream plot as orange blocks. Additionally the memory plot in the upper leftwill become orange and then red.

Monitor process memory load

The approach above can fail for a few reasons

  • Custom objects may not report their memory size accurately
  • User functions may take up more RAM than expected
  • Significant amounts of data may accumulate in network I/O buffers

To address this we periodically monitor the memory of the worker process every200 ms. If the system reported memory use is above 70% of the target memoryusage then the worker will start dumping unused data to disk, even if internalsizeof recording hasn’t yet reached the normal 60% limit.

Halt worker threads

At 80% load the worker’s thread pool will stop accepting new tasks. Thisgives time for the write-to-disk functionality to take effect even in the faceof rapidly accumulating data.

Kill Worker

At 95% memory load a worker’s nanny process will terminate it. This is toavoid having our worker job being terminated by an external job scheduler (likeYARN, Mesos, SGE, etc..). After termination the nanny will restart the workerin a fresh state.

Nanny

Dask workers are by default launched, monitored, and managed by a small Nannyprocess.

  • class distributed.nanny.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port=0, nthreads=None, ncores=None, loop=None, local_dir=None, local_directory='dask-worker-space', services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port=None, protocol=None, config=None, **worker_kwargs)[source]
  • A process to manage worker processes

The nanny spins up Worker processes, watches then, and kills or restartsthem as necessary. It is necessary if you want to use theClient.restart method, or to restart the worker automatically ifit gets to the terminate fractiom of its memory limit.

The parameters for the Nanny are mostly the same as those for the Worker.

See also

  • Worker

API Documentation

  • class distributed.worker.Worker(scheduler_ip=None, scheduler_port=None, scheduler_file=None, ncores=None, nthreads=None, loop=None, local_dir=None, local_directory=None, services=None, service_ports=None, service_kwargs=None, name=None, reconnect=True, memory_limit='auto', executor=None, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, security=None, contact_address=None, memory_monitor_interval='200ms', extensions=None, metrics={}, startup_information={}, data=None, interface=None, host=None, port=None, protocol=None, dashboard_address=None, nanny=None, plugins=(), low_level_profiler=False, validate=None, profile_cycle_interval=None, lifetime=None, lifetime_stagger=None, lifetime_restart=None, **kwargs)[source]
  • Worker node in a Dask distributed cluster

Workers perform two functions:

  • Serve data from a local dictionary
  • Perform computation on that data and on data from peers Workers keep the scheduler informed of their data and use that scheduler togather data from other workers when necessary to perform a computation.

You can start a worker with the dask-worker command line application:

  1. $ dask-worker scheduler-ip:port

Use the —help flag to see more options:

  1. $ dask-worker --help

The rest of this docstring is about the internal state the the worker usesto manage and track internal computations.

State

Informational State

These attributes don’t change significantly during execution.

    • nthreads: int:
    • Number of nthreads used by this worker process
    • executor: concurrent.futures.ThreadPoolExecutor:
    • Executor used to perform computation
    • local_directory: path:
    • Path on local machine to store temporary files
    • scheduler: rpc:
    • Location of scheduler. See .ip/.port attributes.
    • name: string:
    • Alias
    • services: {str: Server}:
    • Auxiliary web servers running on this worker
  • service_ports: {str: port}:
    • total_out_connections: int
    • The maximum number of concurrent outgoing requests for data
    • total_in_connections: int
    • The maximum number of concurrent incoming requests for data
  • total_comm_nbytes: int
    • batched_stream: BatchedSend
    • A batched stream along which we communicate to the scheduler
    • log: [(message)]
    • A structured and queryable log. See Worker.story Volatile State

This attributes track the progress of tasks that this worker is trying tocomplete. In the descriptions below a key is the name of a task thatwe want to compute and dep is the name of a piece of dependent datathat we want to collect from others.

    • data: {key: object}:
    • Prefer using the host attribute instead of this, unlessmemory_limit and at least one of memory_target_fraction ormemory_spill_fraction values are defined, in that case, this attributeis a zict.Buffer, from which information on LRU cache can be queried.
    • data.memory: {key: object}:
    • Dictionary mapping keys to actual values stored in memory. Onlyavailable if condition for data being a zict.Buffer is met.
    • data.disk: {key: object}:
    • Dictionary mapping keys to actual values stored on disk. Onlyavailable if condition for data being a zict.Buffer is met.
    • task_state: {key: string}:
    • The state of all tasks that the scheduler has asked us to compute.Valid states include waiting, constrained, executing, memory, erred
    • tasks: {key: dict}
    • The function, args, kwargs of a task. We run this when appropriate
    • dependencies: {key: {deps}}
    • The data needed by this key to run
    • dependents: {dep: {keys}}
    • The keys that use this dependency
    • data_needed: deque(keys)
    • The keys whose data we still lack, arranged in a deque
    • waiting_for_data: {kep: {deps}}
    • A dynamic verion of dependencies. All dependencies that we still don’thave for a particular key.
    • ready: [keys]
    • Keys that are ready to run. Stored in a LIFO stack
    • constrained: [keys]
    • Keys for which we have the data to run, but are waiting on abstractresources like GPUs. Stored in a FIFO deque
    • executing: {keys}
    • Keys that are currently executing
    • executed_count: int
    • A number of tasks that this worker has run in its lifetime
    • long_running: {keys}
    • A set of keys of tasks that are running and have started their ownlong-running clients.
    • dep_state: {dep: string}:
    • The state of all dependencies required by our tasksValid states include waiting, flight, and memory
    • who_has: {dep: {worker}}
    • Workers that we believe have this data
    • has_what: {worker: {deps}}
    • The data that we care about that we think a worker has
    • pending_data_per_worker: {worker: [dep]}
    • The data on each worker that we still want, prioritized as a deque
    • in_flight_tasks: {task: worker}
    • All dependencies that are coming to us in current peer-to-peerconnections and the workers from which they are coming.
    • in_flight_workers: {worker: {task}}
    • The workers from which we are currently gathering data and thedependencies we expect from those connections
    • comm_bytes: int
    • The total number of bytes in flight
    • suspicious_deps: {dep: int}
    • The number of times a dependency has not been where we expected it
    • nbytes: {key: int}
    • The size of a particular piece of data
    • types: {key: type}
    • The type of a particular piece of data
    • threads: {key: int}
    • The ID of the thread on which the task ran
    • active_threads: {int: key}
    • The keys currently running on active threads
    • exceptions: {key: exception}
    • The exception caused by running a task if it erred
    • tracebacks: {key: traceback}
    • The exception caused by running a task if it erred
    • startstops: {key: [{startstop}]}
    • Log of transfer, load, and compute times for a task
    • priorities: {key: tuple}
    • The priority of a key given by the scheduler. Determines run order.
    • durations: {key: float}
    • Expected duration of a task
    • resource_restrictions: {key: {str: number}}
    • Abstract resources required to run a task

Parameters:

  • scheduler_ip: str
  • scheduler_port: int
  • ip: str, optional
  • data: MutableMapping, type, None
  • The object to use for storage, builds a disk-backed LRU dict by default

  • nthreads: int, optional

  • loop: tornado.ioloop.IOLoop
  • local_directory: str, optional
  • Directory where we place local resources

  • name: str, optional

  • memory_limit: int, float, string
  • Number of bytes of memory that this worker should use.Set to zero for no limit. Set to ‘auto’ to calculateas system.MEMORY_LIMIT * min(1, nthreads / total_cores)Use strings or numbers like 5GB or 5e9

  • memory_target_fraction: float

  • Fraction of memory to try to stay beneath

  • memory_spill_fraction: float

  • Fraction of memory at which we start spilling to disk

  • memory_pause_fraction: float

  • Fraction of memory at which we stop running new tasks

  • executor: concurrent.futures.Executor

  • resources: dict
  • Resources that this worker has like {'GPU': 2}

  • nanny: str

  • Address on which to contact nanny, if it exists

  • lifetime: str

  • Amount of time like “1 hour” after which we gracefully shut down the worker.This defaults to None, meaning no explicit shutdown time.

  • lifetime_stagger: str

  • Amount of time like “5 minutes” to stagger the lifetime valueThe actual lifetime will be selected uniformly at random betweenlifetime +/- lifetime_stagger

  • lifetime_restart: bool

  • Whether or not to restart a worker after it has reached its lifetimeDefault False

See also

Examples

Use the command line to start a worker:

  1. $ dask-scheduler
  2. Start scheduler at 127.0.0.1:8786
  3.  
  4. $ dask-worker 127.0.0.1:8786
  5. Start worker at: 127.0.0.1:1234
  6. Registered with scheduler at: 127.0.0.1:8786