Creating Custom @task Decorators

As of Airflow 2.2 it is possible add custom decorators to the TaskFlow interface from within a provider package and have those decorators appear natively as part of the @task.____ design.

For an example. Let’s say you were trying to create an easier mechanism to run python functions as “foo” tasks. The steps to create and register @task.foo are:

  1. Create a FooDecoratedOperator

    In this case, we are assuming that you have an existing FooOperator that takes a python function as an argument. By creating a FooDecoratedOperator that inherits from FooOperator and airflow.decorators.base.DecoratedOperator, Airflow will supply much of the needed functionality required to treat your new class as a taskflow native class.

    You should also override the custom_operator_name attribute to provide a custom name for the task. For example, _DockerDecoratedOperator in the apache-airflow-providers-docker provider sets this to @task.docker to indicate the decorator name it implements.

  2. Create a foo_task function

    Once you have your decorated class, create a function like this, to convert the new FooDecoratedOperator into a TaskFlow function decorator!

    1. from typing import TYPE_CHECKING
    2. from airflow.decorators.base import task_decorator_factory
    3. if TYPE_CHECKING:
    4. from airflow.decorators.base import TaskDecorator
    5. def foo_task(
    6. python_callable: Callable | None = None,
    7. multiple_outputs: bool | None = None,
    8. **kwargs,
    9. ) -> "TaskDecorator":
    10. return task_decorator_factory(
    11. python_callable=python_callable,
    12. multiple_outputs=multiple_outputs,
    13. decorated_operator_class=FooDecoratedOperator,
    14. **kwargs,
    15. )
  3. Register your new decorator in get_provider_info of your provider

    Finally, add a key-value task-decorators to the dict returned from the provider entrypoint. This should be a list with each item containing name and class-name keys. When Airflow starts, the ProviderManager class will automatically import this value and task.foo will work as a new decorator!

    1. def get_provider_info():
    2. return {
    3. "package-name": "foo-provider-airflow",
    4. "name": "Foo",
    5. "task-decorators": [
    6. {
    7. "name": "foo",
    8. # "Import path" and function name of the `foo_task`
    9. "class-name": "name.of.python.package.foo_task",
    10. }
    11. ],
    12. # ...
    13. }

    Please note that the name must be a valid python identifier.

(Optional) Adding IDE auto-completion support

Note

This section mostly applies to the apache-airflow managed providers. We have not decided if we will allow third-party providers to register auto-completion in this way.

For better or worse, Python IDEs can not auto-complete dynamically generated methods (see JetBrain’s write up on the subject).

To hack around this problem, a type stub airflow/decorators/__init__.pyi is provided to statically declare the type signature of each task decorator. A newly added task decorator should declare its signature stub like this:

airflow/decorators/__init__.pyi

  1. def docker(
  2. self,
  3. *,
  4. multiple_outputs: bool | None = None,
  5. use_dill: bool = False, # Added by _DockerDecoratedOperator.
  6. python_command: str = "python3",
  7. # 'command', 'retrieve_output', and 'retrieve_output_path' are filled by
  8. # _DockerDecoratedOperator.
  9. image: str,
  10. api_version: str | None = None,
  11. container_name: str | None = None,
  12. cpus: float = 1.0,
  13. docker_url: str = "unix://var/run/docker.sock",
  14. environment: dict[str, str] | None = None,
  15. private_environment: dict[str, str] | None = None,
  16. force_pull: bool = False,
  17. mem_limit: float | str | None = None,
  18. host_tmp_dir: str | None = None,
  19. network_mode: str | None = None,
  20. tls_ca_cert: str | None = None,
  21. tls_client_cert: str | None = None,
  22. tls_client_key: str | None = None,
  23. tls_hostname: str | bool | None = None,
  24. tls_ssl_version: str | None = None,
  25. tmp_dir: str = "/tmp/airflow",
  26. user: str | int | None = None,
  27. mounts: list[str] | None = None,
  28. working_dir: str | None = None,
  29. xcom_all: bool = False,
  30. docker_conn_id: str | None = None,
  31. dns: list[str] | None = None,
  32. dns_search: list[str] | None = None,
  33. auto_remove: bool = False,
  34. shm_size: int | None = None,
  35. tty: bool = False,
  36. privileged: bool = False,
  37. cap_add: str | None = None,
  38. extra_hosts: dict[str, str] | None = None,
  39. **kwargs,
  40. ) -> TaskDecorator:
  41. """Create a decorator to convert the decorated callable to a Docker task.
  42. :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
  43. Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
  44. :param use_dill: Whether to use dill or pickle for serialization
  45. :param python_command: Python command for executing functions, Default: python3
  46. :param image: Docker image from which to create the container.
  47. If image tag is omitted, "latest" will be used.
  48. :param api_version: Remote API version. Set to ``auto`` to automatically
  49. detect the server's version.
  50. :param container_name: Name of the container. Optional (templated)
  51. :param cpus: Number of CPUs to assign to the container. This value gets multiplied with 1024.
  52. :param docker_url: URL of the host running the docker daemon.
  53. Default is unix://var/run/docker.sock
  54. :param environment: Environment variables to set in the container. (templated)
  55. :param private_environment: Private environment variables to set in the container.
  56. These are not templated, and hidden from the website.
  57. :param force_pull: Pull the docker image on every run. Default is False.
  58. :param mem_limit: Maximum amount of memory the container can use.
  59. Either a float value, which represents the limit in bytes,
  60. or a string like ``128m`` or ``1g``.
  61. :param host_tmp_dir: Specify the location of the temporary directory on the host which will
  62. be mapped to tmp_dir. If not provided defaults to using the standard system temp directory.
  63. :param network_mode: Network mode for the container.
  64. It can be one of the following:
  65. bridge - Create new network stack for the container with default docker bridge network
  66. None - No networking for this container
  67. container:<name|id> - Use the network stack of another container specified via <name|id>
  68. host - Use the host network stack. Incompatible with `port_bindings`
  69. '<network-name>|<network-id>' - Connects the container to user created network(using `docker
  70. network create` command)
  71. :param tls_ca_cert: Path to a PEM-encoded certificate authority
  72. to secure the docker connection.
  73. :param tls_client_cert: Path to the PEM-encoded certificate
  74. used to authenticate docker client.
  75. :param tls_client_key: Path to the PEM-encoded key used to authenticate docker client.
  76. :param tls_hostname: Hostname to match against
  77. the docker server certificate or False to disable the check.
  78. :param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
  79. :param tmp_dir: Mount point inside the container to
  80. a temporary directory created on the host by the operator.
  81. The path is also made available via the environment variable
  82. ``AIRFLOW_TMP_DIR`` inside the container.
  83. :param user: Default user inside the docker container.
  84. :param mounts: List of mounts to mount into the container, e.g.
  85. ``['/host/path:/container/path', '/host/path2:/container/path2:ro']``.
  86. :param working_dir: Working directory to
  87. set on the container (equivalent to the -w switch the docker client)
  88. :param xcom_all: Push all the stdout or just the last line.
  89. The default is False (last line).
  90. :param docker_conn_id: ID of the Airflow connection to use
  91. :param dns: Docker custom DNS servers
  92. :param dns_search: Docker custom DNS search domain
  93. :param auto_remove: Auto-removal of the container on daemon side when the
  94. container's process exits.
  95. The default is False.
  96. :param shm_size: Size of ``/dev/shm`` in bytes. The size must be
  97. greater than 0. If omitted uses system default.
  98. :param tty: Allocate pseudo-TTY to the container
  99. This needs to be set see logs of the Docker container.
  100. :param privileged: Give extended privileges to this container.
  101. :param cap_add: Include container capabilities
  102. """

The signature should allow only keyword-only arguments, including one named multiple_outputs that’s automatically provided by default. All other arguments should be copied directly from the real FooOperator, and we recommend adding a comment to explain what arguments are filled automatically by FooDecoratedOperator and thus not included.

If the new decorator can be used without arguments (e.g. @task.python instead of @task.python()), You should also add an overload that takes a single callable immediately after the “real” definition so mypy can recognize the function as a “bare decorator”:

airflow/decorators/__init__.pyi

  1. @overload
  2. def python(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...

Once the change is merged and the next Airflow (minor or patch) release comes out, users will be able to see your decorator in IDE auto-complete. This auto-complete will change based on the version of the provider that the user has installed.

Please note that this step is not required to create a working decorator, but does create a better experience for users of the provider.