Protocol

The scheduler, workers, and clients pass messages between each other.Semantically these messages encode commands, status updates, and data, like thefollowing:

  • Please compute the function sum on the data x and store in y
  • The computation y has been completed
  • Be advised that a new worker named alice is available for use
  • Here is the data for the keys 'x', and 'y'

In practice we represent these messages with dictionaries/mappings:

  1. {'op': 'compute',
  2. 'function': ...
  3. 'args': ['x']}
  4.  
  5. {'op': 'task-complete',
  6. 'key': 'y',
  7. 'nbytes': 26}
  8.  
  9. {'op': 'register-worker',
  10. 'address': '192.168.1.42',
  11. 'name': 'alice',
  12. 'nthreads': 4}
  13.  
  14. {'x': b'...',
  15. 'y': b'...'}

When we communicate these messages between nodes we need to serialize thesemessages down to a string of bytes that can then be deserialized on the otherend to their in-memory dictionary form. For simple cases several options existlike JSON, MsgPack, Protobuffers, and Thrift. The situation is made morecomplex by concerns like serializing Python functions and Python objects,optional compression, cross-language support, large messages, and efficiency.

This document describes the protocol used by dask.distributed today. Beadvised that this protocol changes rapidly as we continue to optimize forperformance.

Overview

We may split a single message into multiple message-part to suit differentprotocols. Generally small bits of data are encoded with MsgPack while largebytestrings and complex datatypes are handled by a custom format. Eachmessage-part gets its own header, which is always encoded as msgpack. Afterserializing all message parts we have a sequence of bytestrings or _frames_which we send along the wire, prepended with length information.

The application doesn’t know any of this, it just sends us Python dictionarieswith various datatypes and we produce a list of bytestrings that get written toa socket. This format is fast both for many frequent messages and for largemessages.

MsgPack for Messages

Most messages are encoded with MsgPack, a self describing semi-structuredserialization format that is very similar to JSON, but smaller, faster, nothuman-readable, and supporting of bytestrings and (soon) timestamps. We choseMsgPack as a base serialization format for the following reasons:

  • It does not require separate headers, and so is easy and flexible to usewhich is particularly important in an early stage project likedask.distributed
  • It is very fast, much faster than JSON, and there are nicely optimizedimplementations. With few exceptions (described later) MsgPack does not comeanywhere near being a bottleneck, even under heavy use.
  • Unlike JSON it supports bytestrings
  • It covers the standard set of types necessary to encode most information
  • It is widely implemented in a number of languages (see cross languagesection below)

However, MsgPack fails (correctly) in the following ways:

  • It does not provide any way for us to encode Python functions or userdefined data types
  • It does not support bytestrings greater than 4GB and is generallyinefficient for very large messages.

Because of these failings we supplement it with a language-specific protocoland a special case for large bytestrings.

CloudPickle for Functions and Some Data

Pickle and CloudPickle are Python libraries to serialize almost any Pythonobject, including functions. We use these libraries to transform the users’functions and data into bytes before we include them in the dictionary/map thatwe pass off to msgpack. In the introductory example you may have noticed thatwe skipped providing an example for the function argument:

  1. {'op': 'compute',
  2. 'function': ...
  3. 'args': ['x']}

That is because this value will actually be the result of callingcloudpickle.dumps(myfunction). Those bytes will then be included in thedictionary that we send off to msgpack, which will only have to deal withbytes rather than obscure Python functions.

Note: we actually call some combination of pickle and cloudpickle, dependingon the situation. This is for performance reasons.

Cross Language Specialization

The Client and Workers must agree on a language-specific serialization format.In the standard dask.distributed client and worker objects this ends upbeing the following:

  1. bytes = cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
  2. obj = cloudpickle.loads(bytes)

This varies between Python 2 and 3 and so your client and workers must matchtheir Python versions and software environments.

However, the Scheduler never uses the language-specific serialization andinstead only deals with MsgPack. If the client sends a pickled function up tothe scheduler the scheduler will not unpack function but will instead keep itas bytes. Eventually those bytes will be sent to a worker, which will thenunpack the bytes into a proper Python function. Because the Scheduler neverunpacks language-specific serialized bytes it may be in a different language.

The client and workers must share the same language and software environment,the scheduler may differ.

This has a few advantages:

  • The Scheduler is protected from unpickling unsafe code
  • The Scheduler can be run under pypy for improved performance. This isonly useful for larger clusters.
  • We could conceivably implement workers and clients for other languages(like R or Julia) and reuse the Python scheduler. The worker and clientcode is fairly simple and much easier to reimplement than the scheduler,which is complex.
  • The scheduler might some day be rewritten in more heavily optimized C or Go

Compression

Fast compression libraries like LZ4 or Snappy may increase effective bandwidthby compressing data before sending and decompressing it after reception. Thisis especially valuable on lower-bandwidth networks.

If either of these libraries is available (we prefer LZ4 to Snappy) then forevery message greater than 1kB we try to compress the message and, if thecompression is at least a 10% improvement, we send the compressed bytes ratherthan the original payload. We record the compression used within the header asa string like 'lz4' or 'snappy'.

To avoid compressing large amounts of uncompressable data we first try tocompress a sample. We take 10kB chunks from five locations in the dataset,arrange them together, and try compressing the result. If this doesn’t resultin significant compression then we don’t try to compress the full result.

Header

The header is a small dictionary encoded in msgpack that includes some metadataabout the message, such as compression.

Serializing Data

For administrative messages like updating status msgpack is sufficient.However for large results or Python specific data, like NumPy arrays or Pandas Dataframes, orfor larger results we need to use something else to convert Python objects tobytestrings. Exactly how we do this is described more in theSerialization documentation.

The application code marks Python specific results with the to_serializefunction:

  1. >>> import numpy as np
  2. >>> x = np.ones(5)
  3.  
  4. >>> from distributed.protocol import to_serialize
  5. >>> msg = {'status': 'OK', 'data': to_serialize(x)}
  6. >>> msg
  7. {'data': <Serialize: [ 1. 1. 1. 1. 1.]>, 'status': 'OK'}

We separate the message into two messages, one encoding all of the data to beserialized and, and one encoding everything else:

  1. {'key': 'x', 'address': 'alice'}
  2. {'data': <Serialize: [ 1. 1. 1. 1. 1.]>}

The first message we pass normally with msgpack. The second we pass in multipleparts, one part for each serialized piece of data (see serialization) and one header including types, compression, etc. used for eachvalue:

  1. {'keys': ['data'],
  2. 'compression': ['lz4']}
  3. b'...'
  4. b'...'

Frames

At the end of the pipeline we have a sequence of bytestrings or frames. Weneed to tell the receiving end how many frames there are and how long eachthese frames are. We order the frames and lengths of frames as follows:

  • The number of frames, stored as an 8 byte unsigned integer
  • The length of each frame, each stored as an 8 byte unsigned integer
  • Each of the frames In the following sections we describe how we create these frames.

Technical Version

A message is broken up into the following components:

  • 8 bytes encoding how many frames there are in the message (N) as auint64
  • 8 * N frames encoding the length of each frame as uint64 s
  • Header for the administrative message
  • The administrative message, msgpack encoded, possibly compressed
  • Header for all payload messages
  • Payload messages

Header for Administrative Message

The administrative message is arbitrary msgpack-encoded data. Usually adictionary. It may optionally be compressed. If so the compression type willbe in the header.

Payload frames and Header

These frames are optional.

Payload frames are used to send large or language-specific data. These valueswill be inserted into the administrative message after they are decoded. Theheader is msgpack encoded and contains encoding and compression information forthe all subsequent payload messages.

A Payload may be spread across many frames. Each frame may be separatelycompressed.

Simple Example

This simple example shows a minimal message. There is only an empty header anda small msgpack message. There are no additional payload frames

Message: {'status': 'OK'}

Frames:

  • Header: {}
  • Administrative Message: {'status': 'OK'}

Example with Custom Data

This example contains a single payload message composed of a single frame. Ituses a special serialization for NumPy arrays.

Message: {'op': 'get-data', 'data': np.ones(5)}

Frames:

  • Header: {}

  • Administrative Message: {'op': 'get-data'}

  • Payload header:

  1. {'headers': [{'type': 'numpy.ndarray',
  2. 'compression': 'lz4',
  3. 'count': 1,
  4. 'lengths': [40],
  5. 'dtype': '<f8',
  6. 'strides': (8,),
  7. 'shape': (5,)}],
  8. 'keys': [('data',)]}
  • Payload Frame: b'(\x00\x00\x00\x11\x00\x01\x00!\xf0?\x07\x00\x0f\x08\x00\x03P\x00\x00\x00\xf0?'