Task Message Protocol v2 (Draft Spec.)

Notes

  • Support for multiple languages via the lang header.

    Worker may redirect the message to a worker that supports the language.

  • Metadata moved to headers.

    This means that workers/intermediates can inspect the message and make decisions based on the headers without decoding the payload (which may be language specific, e.g. serialized by the Python specific pickle serializer).

  • Body is only for language specific data.

    • Python stores args/kwargs in body.
    • If a message uses raw encoding then the raw data will be passed as a single argument to the function.
    • Java/C, etc. can use a thrift/protobuf document as the body
  • Dispatches to actor based on c_type, c_meth headers

    c_meth is unused by python, but may be used in the future to specify class+method pairs.

  • Chain gains a dedicated field.

    Reducing the chain into a recursive callbacks argument causes problems when the recursion limit is exceeded.

    This is fixed in the new message protocol by specifying a list of signatures, each task will then pop a task off the list when sending the next message:

    1. execute_task(message)
    2. chain = message.headers['chain']
    3. if chain:
    4. sig = maybe_signature(chain.pop())
    5. sig.apply_async(chain=chain)
  • correlation_id replaces task_id field.

  • c_shadow lets you specify a different name for logs, monitors can be used for e.g. meta tasks that calls any function:

    1. from celery.utils.imports import qualname
    2. class PickleTask(Task):
    3. abstract = True
    4. def unpack_args(self, fun, args=()):
    5. return fun, args
    6. def apply_async(self, args, kwargs, **options):
    7. fun, real_args = self.unpack_args(*args)
    8. return super(PickleTask, self).apply_async(
    9. (fun, real_args, kwargs), shadow=qualname(fun), **options
    10. )
    11. @app.task(base=PickleTask)
    12. def call(fun, args, kwargs):
    13. return fun(*args, **kwargs)

Undecided

  • May consider moving callbacks/errbacks/chain into body.

    Will huge lists in headers cause overhead? The downside of keeping them in the body is that intermediates won’t be able to introspect these values.

Definition

  1. # protocol v2 implies UTC=True
  2. # 'class' header existing means protocol is v2
  3. properties = {
  4. 'correlation_id': (uuid)task_id,
  5. 'content_type': (string)mime,
  6. 'content_encoding': (string)encoding,
  7. # optional
  8. 'reply_to': (string)queue_or_url,
  9. }
  10. headers = {
  11. 'lang': (string)'py'
  12. 'c_type': (string)task,
  13. # optional
  14. 'c_meth': (string)unused,
  15. 'c_shadow': (string)replace_name,
  16. 'eta': (iso8601)eta,
  17. 'expires'; (iso8601)expires,
  18. 'callbacks': (list)Signature,
  19. 'errbacks': (list)Signature,
  20. 'chain': (list)Signature, # non-recursive, reversed list of signatures
  21. 'group': (uuid)group_id,
  22. 'chord': (uuid)chord_id,
  23. 'retries': (int)retries,
  24. 'timelimit': (tuple)(soft, hard),
  25. }
  26. body = (args, kwargs)

Example

  1. # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
  2. task_id = uuid()
  3. basic_publish(
  4. message=json.dumps([[2, 2], {}]),
  5. application_headers={
  6. 'lang': 'py',
  7. 'c_type': 'proj.tasks.add',
  8. 'chain': [
  9. # reversed chain list
  10. {'task': 'proj.tasks.add', 'args': (8, )},
  11. {'task': 'proj.tasks.add', 'args': (4, )},
  12. ]
  13. }
  14. properties={
  15. 'correlation_id': task_id,
  16. 'content_type': 'application/json',
  17. 'content_encoding': 'utf-8',
  18. }
  19. )