Communications

Workers, the Scheduler, and Clients communicate by sending each otherPython objects (such as Protocol messages or user data).The communication layer handles appropriate encoding and shippingof those Python objects between the distributed endpoints. Thecommunication layer is able to select between different transportimplementations, depending on user choice or (possibly) internaloptimizations.

The communication layer lives in the distributed.comm package.

Addresses

Communication addresses are canonically represented as URIs, such astcp://127.0.0.1:1234. For compatibility with existing code, if theURI scheme is omitted, a default scheme of tcp is assumed (so127.0.0.1:456 is really the same as tcp://127.0.0.1:456).The default scheme may change in the future.

The following schemes are currently implemented in the distributedsource tree:

  • tcp is the main transport; it uses TCP sockets and allows for IPv4and IPv6 addresses.
  • tls is a secure transport using the well-known TLS protocol overTCP sockets. Using it requires specifying keys andcertificates as outlined in TLS/SSL.
  • inproc is an in-process transport using simple object queues; iteliminates serialization and I/O overhead, providing almost zero-costcommunication between endpoints as long as they are situated in thesame process.

Some URIs may be valid for listening but not for connecting.For example, the URI tcp:// will listen on all IPv4 and IPv6 addressesand on an arbitrary port, but you cannot connect to that address.

Higher-level APIs in distributed may accept other address formats forconvenience or compatibility, for example a (host, port) pair. However,the abstract communications layer always deals with URIs.

Functions

There are a number of top-level functions in distributed.commto help deal with addresses:

  • distributed.comm.parseaddress(_addr, strict=False)[source]
  • Split address into its scheme and scheme-dependent location string.
  1. >>> parse_address('tcp://127.0.0.1')
  2. ('tcp', '127.0.0.1')

If strict is set to true the address must have a scheme.

  • distributed.comm.unparseaddress(_scheme, loc)[source]
  • Undo parse_address().
  1. >>> unparse_address('tcp', '127.0.0.1')
  2. 'tcp://127.0.0.1'
  • distributed.comm.normalizeaddress(_addr)[source]
  • Canonicalize address, adding a default scheme if necessary.
  1. >>> normalize_address('tls://[::1]')
  2. 'tls://[::1]'
  3. >>> normalize_address('[::1]')
  4. 'tcp://[::1]'
  • distributed.comm.resolveaddress(_addr)[source]
  • Apply scheme-specific address resolution to addr, replacingall symbolic references with concrete location specifiers.

In practice, this can mean hostnames are resolved to IP addresses.

  1. >>> resolve_address('tcp://localhost:8786')
  2. 'tcp://127.0.0.1:8786'
  • distributed.comm.getaddress_host(_addr)[source]
  • Return a hostname / IP address identifying the machine this addressis located on.

In contrast to get_address_host_port(), this function should alwayssucceed for well-formed addresses.

  1. >>> get_address_host('tcp://1.2.3.4:80')
  2. '1.2.3.4'

Communications API

The basic unit for dealing with established communications is the Commobject:

  • class distributed.comm.Comm[source]
  • A message-oriented communication object, representing an establishedcommunication channel. There should be only one reader and onewriter at a time: to manage current communications, even with asingle peer, you must create distinct Comm objects.

Messages are arbitrary Python objects. Concrete implementationsof this class can implement different serialization mechanismsdepending on the underlying transport’s characteristics.

  • abort(self)[source]
  • Close the communication immediately and abruptly.Useful in destructors or generators’ finally blocks.

  • close(self)[source]

  • Close the communication cleanly. This will attempt to flushoutgoing buffers before actually closing the underlying transport.

This method is a coroutine.

  • closed(self)[source]
  • Return whether the stream is closed.

  • extra_info

  • Return backend-specific information about the communication,as a dict. Typically, this is information which is initializedwhen the communication is established and doesn’t vary afterwards.

  • local_address

  • The local address. For logging and debugging purposes only.

  • peer_address

  • The peer’s address. For logging and debugging purposes only.

  • read(self, deserializers=None)[source]

  • Read and return a message (a Python object).

This method is a coroutine.

Parameters:

  1. - **deserializers**:Optional[Dict[str, Tuple[Callable, Callable, bool]]]
  2. -

An optional dict appropriate for distributed.protocol.deserialize.See Serialization for more.

  • write(self, msg, serializers=None, on_error=None)[source]
  • Write a message (a Python object).

This method is a coroutine.

Parameters:

  1. - **msg :**
  2. -
  3. - **on_error**:Optional[str]
  4. -

The behavior when serialization fails. Seedistributed.protocol.core.dumps for valid values.

You don’t create Comm objects directly: you either listen forincoming communications, or connect to a peer listening for connections:

  • distributed.comm.connect(addr, timeout=None, deserialize=True, connection_args=None)[source]
  • Connect to the given address (a URI such as tcp://127.0.0.1:1234)and yield a Comm object. If the connection attempt fails, it isretried until the timeout is expired.
  • distributed.comm.listen(addr, handle_comm, deserialize=True, connection_args=None)[source]
  • Create a listener object with the given parameters. When its start()method is called, the listener will listen on the given address(a URI such as tcp://0.0.0.0) and call handle_comm with aComm object for each incoming connection.

handle_comm can be a regular function or a coroutine.

Listener objects expose the following interface:

  • class distributed.comm.core.Listener[source]
    • contact_address
    • An address this listener can be contacted on. This can bedifferent from listen_address if the latter is some wildcardaddress such as ‘tcp://0.0.0.0:123’.

    • listen_address

    • The listening address as a URI string.

    • start(self)[source]

    • Start listening for incoming connections.

    • stop(self)[source]

    • Stop listening. This does not shutdown already establishedcommunications, but prevents accepting new ones.

Extending the Communication Layer

Each transport is represented by a URI scheme (such as tcp) andbacked by a dedicated Backend implementation, which providesentry points into all transport-specific routines.

Out-of-tree backends can be registered under the group distributed.comm.backendsin setuptools entry_points. For example, a hypothetical dask_udp packagewould register its UDP backend class by including the following in its setup.py file:

  1. setup(name="dask_udp",
  2. entry_points={
  3. "distributed.comm.backends": [
  4. "udp=dask_udp.backend:UDPBackend",
  5. ]
  6. },
  7. ...
  8. )
  • class distributed.comm.registry.Backend[source]
  • A communication backend, selected by a given URI scheme (e.g. ‘tcp’).

    • getaddress_host(_self, loc)[source]
    • Get a host name (normally an IP address) identifying the host theaddress is located on.loc is a scheme-less address.

    • getaddress_host_port(_self, loc)[source]

    • Get the (host, port) tuple of the scheme-less address loc.This should only be implemented by IP-based transports.

    • getconnector(_self)[source]

    • Get a connector object usable for connecting to addresses.

    • getlistener(_self, loc, handle_comm, deserialize, **connection_args)[source]

    • Get a listener object for the scheme-less address loc.

    • getlocal_address_for(_self, loc)[source]

    • Get the local listening address suitable for reaching loc.

    • resolveaddress(_self, loc)[source]

    • Resolve the address into a canonical form.loc is a scheme-less address.

Simple implementations may return loc unchanged.