Serialization

When we communicate data between computers we first convert that data into asequence of bytes that can be communicated across the network. Choices made inserialization can affect performance and security.

The standard Python solution to this, Pickle, is often but not always the rightsolution. Dask uses a number of different serialization schemes in differentsituations. These are extensible to allow users to control in sensitivesituations and also to enable library developers to plug in more performantserialization solutions.

This document first describes Dask’s default solution for serialization andthen discusses ways to control and extend that serialiation.

Defaults

There are three kinds of messages passed through the Dask network:

  • Small administrative messages like “Worker A has finished task X” or “I’mrunning out of memory”. These are always serialized with msgpack.
  • Movement of program data, such as Numpy arrays and Pandas dataframes. Thisuses a combination of pickle and custom serializers and is the topic of thenext section
  • Computational tasks like f(x) that are defined and serialized on clientprocesses and deserialized and run on worker processes. These areserialized using a fixed scheme decided on by those libraries. Today thisis a combination of pickle and cloudpickle.

Serialization families

Use

For the movement of program data (item 2 above) we can use a few differentfamilies of serializers. By default the following families are built in:

  • Pickle and cloudpickle
  • Msgpack
  • Custom per-type serializers that come with Dask for the specialserialization of important classes of data like Numpy arrays

You can choose which families you want to use to serialize data and todeserialize data when you create a Client

  1. from dask.distributed import Client
  2. client = Client('tcp://scheduler-address:8786',
  3. serializers=['dask', 'pickle'],
  4. deserializers=['dask', 'msgpack'])

This can be useful if, for example, you are sensitive about receivingPickle-serialized data for security reasons.

Dask uses the serializers ['dask', 'pickle'] by default, trying to use daskcustom serializers (described below) if they work and then falling back topickle/cloudpickle.

Extend

These families can be extended by creating two functions, dumps and loads,which return and consume a msgpack-encodable header, and a list of byte-likeobjects. These must then be included in the distributed.protocol.serializedictionary with an appropriate name. Here is the definition ofpickle_dumps and pickle_loads to serve as an example.

  1. import pickle
  2.  
  3. def pickle_dumps(x):
  4. header = {'serializer': 'pickle'}
  5. frames = [pickle.dumps(x)]
  6. return header, frames
  7.  
  8. def pickle_loads(header, frames):
  9. if len(frames) > 1: # this may be cut up for network reasons
  10. frame = ''.join(frames)
  11. else:
  12. frame = frames[0]
  13. return pickle.loads(frame)
  14.  
  15. from distributed.protocol.serialize import register_serialization_family
  16. register_serialization_family('pickle', pickle_dumps, pickle_loads)

After this the name 'pickle' can be used in the serializers= anddeserializers= keywords in Client and other parts of Dask.

Communication Context

Note

This is an experimental feature and may change without notice

Dask Comms can provide additional context toserialization family functions if they provide a context= keyword.This allows serialization to behave differently according to how it is beingused.

  1. def my_dumps(x, context=None):
  2. if context and 'recipient' in context:
  3. # check if we're sending to the same host or not

The context depends on the kind of communication. For example when sendingover TCP, the address of the sender (us) and the recipient are available in adictionary.

  1. >>> context
  2. {'sender': 'tcp://127.0.0.1:1234', 'recipient': 'tcp://127.0.0.1:5678'}

Other comms may provide other information.

Dask Serialization Family

Use

Dask maintains its own custom serialization family that special cases a fewimportant types, like Numpy arrays. These serializers either operate moreefficiently than Pickle, or serialize types that Pickle can not handle.

You don’t need to do anything special to use this family of serializers. It ison by default (along with pickle). Note that Dask custom serializers may usepickle internally in some cases. It should not be considered more secure.

Extend

dask_serialize(arg, *args, **kwargs)Single Dispatch for dask_serialize
dask_deserialize(arg, *args, **kwargs)Single Dispatch for dask_deserialize

As with serialization families in general, the Dask family in particular isalso extensible. This is a good way to support custom serialization of asingle type of object. The method is similar, you create serialize anddeserialize function that create and consume a header and frames, and thenregister them with Dask.

  1. class Human(object):
  2. def __init__(self, name):
  3. self.name = name
  4.  
  5. from distributed.protocol import dask_serialize, dask_deserialize
  6.  
  7. @dask_serialize.register(Human)
  8. def serialize(human: Human) -> Tuple[Dict, List[bytes]]:
  9. header = {}
  10. frames = [human.name.encode()]
  11. return header, frames
  12.  
  13. @dask_deserialize.register(Human)
  14. def deserialize(header: Dict, frames: List[bytes]) -> Human:
  15. return Human(frames[0].decode())

Traverse attributes

register_generic(cls)Register dask(de)serialize to traverse through _dict

A common case is that your object just wraps Numpy arrays or other objects thatDask already serializes well. For example, Scikit-Learn estimators mostlysurround Numpy arrays with a bit of extra metadata. In these cases you canregister your class for custom Dask serialization with theregister_genericfunction.

API

serialize(x[, serializers, onerror, context])Convert object to a header and list of bytestrings
deserialize(header, frames[, deserializers])Convert serialized header and list of bytestrings back to a Python object
dask_serialize(arg, *args, **kwargs)Single Dispatch for dask_serialize
dask_deserialize(arg, *args, **kwargs)Single Dispatch for dask_deserialize
register_generic(cls)Register dask(de)serialize to traverse through dict
  • distributed.protocol.serialize.serialize(x, serializers=None, on_error='message', context=None)[source]
  • Convert object to a header and list of bytestrings

This takes in an arbitrary Python object and returns a msgpack serializableheader and a list of bytes or memoryview objects.

The serialization protocols to use are configurable: a list of namesdefine the set of serializers to use, in order. These names are keys inthe serializer_registry dict (e.g., ‘pickle’, ‘msgpack’), which mapsto the de/serialize functions. The name ‘dask’ is special, and will use theper-class serialization methods. None gives the default list['dask', 'pickle'].

Returns:

  • header: dictionary containing any msgpack-serializable metadata
  • frames: list of bytes or memoryviews, commonly of length one

See also

  • deserialize
  • Convert header and frames back to object
  • to_serialize
  • Mark that data in a message should be serialized
  • register_serialization
  • Register custom serialization functions

Examples

  1. >>> serialize(1)
  2. ({}, [b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00K\x01.'])
  1. >>> serialize(b'123') # some special types get custom treatment
  2. ({'type': 'builtins.bytes'}, [b'123'])
  1. >>> deserialize(*serialize(1))
  2. 1
  • distributed.protocol.serialize.deserialize(header, frames, deserializers=None)[source]
  • Convert serialized header and list of bytestrings back to a Python object

Parameters:

  • header: dict
  • frames: list of bytes
  • deserializers:Optional[Dict[str, Tuple[Callable, Callable, bool]]]
  • An optional dict mapping a name to a (de)serializer.See dask_serialize and dask_deserialize for more.

See also

  • distributed.protocol.serialize.daskserialize(_arg, *args, **kwargs)
  • Single Dispatch for dask_serialize
  • distributed.protocol.serialize.daskdeserialize(_arg, *args, **kwargs)
  • Single Dispatch for dask_deserialize
  • distributed.protocol.serialize.registergeneric(_cls)[source]
  • Register dask(de)serialize to traverse through _dict

Normally when registering new classes for Dask’s custom serialization youneed to manage headers and frames, which can be tedious. If all you wantto do is traverse through your object and apply serialize to all of yourobject’s attributes then this function may provide an easier path.

This registers a class for the custom Dask serialization family. Itserializes it by traversing through its dict of attributes and applyingserialize and deserialize recursively. It collects a set of framesand keeps small attributes in the header. Deserialization reverses thisprocess.

This is a good idea if the following hold:

  • Most of the bytes of your object are composed of data types that Dask’scustom serializtion already handles well, like Numpy arrays.
  • Your object doesn’t require any special constructor logic, other thanobject.new(cls)

See also

Examples

  1. >>> import sklearn.base
  2. >>> from distributed.protocol import register_generic
  3. >>> register_generic(sklearn.base.BaseEstimator)