Actors

Note

This is an experimental feature and is subject to change without notice

Note

This is an advanced feature and may not be suitable for beginning users.It is rarely necessary for common workloads.

Actors enable stateful computations within a Dask workflow. They are usefulfor some rare algorithms that require additional performance and are willing tosacrifice resilience.

An actor is a pointer to a user-defined-object living on a remote worker.Anyone with that actor can call methods on that remote object.

Example

Here we create a simple Counter class, instantiate that class on one worker,and then call methods on that class remotely.

  1. class Counter:
  2. """ A simple class to manage an incrementing counter """
  3. n = 0
  4.  
  5. def __init__(self):
  6. self.n = 0
  7.  
  8. def increment(self):
  9. self.n += 1
  10. return self.n
  11.  
  12. def add(self, x):
  13. self.n += x
  14. return self.n
  15.  
  16. from dask.distributed import Client # Start a Dask Client
  17. client = Client()
  18.  
  19. future = client.submit(Counter, actor=True) # Create a Counter on a worker
  20. counter = future.result() # Get back a pointer to that object
  21.  
  22. counter
  23. # <Actor: Counter, key=Counter-1234abcd>
  24.  
  25. future = counter.increment() # Call remote method
  26. future.result() # Get back result
  27. # 1
  28.  
  29. future = counter.add(10) # Call remote method
  30. future.result() # Get back result
  31. # 11

Motivation

Actors are motivated by some of the challenges of using pure task graphs.

Normal Dask computations are composed of a graph of functions.This approach has a few limitations that are good for resilience, but cannegatively affect performance:

  • State: The functions should not mutate their inputs in-place or rely onglobal state. They should instead operate in a pure-functional manner,consuming inputs and producing separate outputs.
  • Central Overhead: The execution location and order is determined by thecentralized scheduler. Because the scheduler is involved in every decisionit can sometimes create a central bottleneck.

Some workloads may need to update state directly, or may involve more tinytasks than the scheduler can handle (the scheduler can coordinate about 4000tasks per second).

Actors side-step both of these limitations:

  • State: Actors can hold on to and mutate state. They are allowed toupdate their state in-place.
  • Overhead: Operations on actors do not inform the central scheduler, andso do not contribute to the 4000 task/second overhead. They also avoid anextra network hop and so have lower latencies.

Create an Actor

You create an actor by submitting a Class to run on a worker using normal Daskcomputation functions like submit, map, compute, or persist,and using the actors= keyword (or actor= on submit).

  1. future = client.submit(Counter, actors=True)

You can use all other keywords to these functions like workers=,resources=, and so on to control where this actor ends up.

This creates a normal Dask future on which you can call .result() to getthe Actor once it has successfully run on a worker.

  1. >>> counter = future.result()
  2. >>> counter
  3. <Actor: Counter, key=...>

A Counter object has been instantiated on one of the workers, and thisActor object serves as our proxy to that remote object. It has the samemethods and attributes.

  1. >>> dir(counter)
  2. ['add', 'increment', 'n']

Call Remote Methods

However accessing an attribute or calling a method will trigger a communicationto the remote worker, run the method on the remote worker in a separate threadpool, and then communicate the result back to the calling side. For attributeaccess these operations block and return when finished, for method calls theyreturn an ActorFuture immediately.

  1. >>> future = counter.increment() # Immediately returns an ActorFuture
  2. >>> future.result() # Block until finished and result arrives
  3. 1

ActorFuture are similar to normal Dask Future objects, but not as fullyfeatured. They curently only support the result method and nothing else.They don’t currently work with any other Dask functions that expect futures,like as_completed, wait, or client.gather. They can’t be placedinto additional submit or map calls to form dependencies. They communicatetheir results immediately (rather than waiting for result to be called) andcache the result on the future itself.

Access Attributes

If you define an attribute at the class level then that attribute will beaccessible to the actor.

  1. class Counter:
  2. n = 0 # Recall that we defined our class with `n` as a class variable
  3.  
  4. ...
  5.  
  6. >>> counter.n # Blocks until finished
  7. 1

Attribute access blocks automatically. It’s as though you called .result().

Execution on the Worker

When you call a method on an actor, your arguments get serialized and sentto the worker that owns the actor’s object. If you do this from a worker thiscommunication is direct. If you do this from a Client then this will be directif the Client has direct access to the workers (create a client withClient(…, direct_to_workers=True) if direct connections are possible) orby proxying through the scheduler if direct connections from the client to theworkers are not possible.

The appropriate method of the Actor’s object is then called in a separatethread, the result captured, and then sent back to the calling side. Currentlyworkers have only a single thread for actors, but this may change in thefuture.

The result is sent back immediately to the calling side, and is not stored onthe worker with the actor. It is cached on the ActorFuture object.

Calling from coroutines and async/await

If you use actors within a coroutine or async/await function then actor methodsand attrbute access will return Tornado futures

  1. async def f():
  2. counter = await client.submit(Counter, actor=True)
  3.  
  4. await counter.increment()
  5. n = await counter.n

Coroutines and async/await on the Actor

If you define an async def function on the actor class then that methodwill run on the Worker’s event loop thread rather than a separate thread.

  1. def Waiter(object):
  2. def __init__(self):
  3. self.event = tornado.locks.Event()
  4.  
  5. async def set(self):
  6. self.event.set()
  7.  
  8. async def wait(self):
  9. await self.event.wait()
  10.  
  11. waiter = client.submit(Waiter, actor=True).result()
  12. waiter.wait().result() # waits until set, without consuming a worker thread

Performance

Worker operations currently have about 1ms of latency, on top of any networklatency that may exist. However other activity in a worker may easily increasethese latencies if enough other activities are present.

Limitations

Actors offer advanced capabilities, but with some cost:

  • No Resilience: No effort is made to make actor workloads resilient toworker failure. If the worker dies while holding an actor that actor islost forever.
  • No Diagnostics: Because the scheduler is not informed about actorcomputations no diagnostics are available about these computations.
  • No Load balancing: Actors are allocated onto workers evenly, withoutserious consideration given to avoiding communication.
  • Experimental: Actors are a new feature and subject to change withoutwarning