Callbacks

A valuable component of logging and monitoring is the use of task callbacks to act upon changes in state of a given task, or across all tasks in a given DAG. For example, you may wish to alert when certain tasks have failed, or have the last task in your DAG invoke a callback when it succeeds.

Note

Callback functions are only invoked when the task state changes due to execution by a worker. As such, task changes set by the command line interface (CLI) or user interface (UI) do not execute callback functions.

Callback Types

There are five types of task events that can trigger a callback:

Name

Description

on_success_callback

Invoked when the task succeeds

on_failure_callback

Invoked when the task fails

sla_miss_callback

Invoked when a task misses its defined SLA

on_retry_callback

Invoked when the task is up for retry

on_execute_callback

Invoked right before the task begins executing.

Example

In the following example, failures in any task call the task_failure_alert function, and success in the last task calls the dag_success_alert function:

  1. import datetime
  2. import pendulum
  3. from airflow import DAG
  4. from airflow.operators.empty import EmptyOperator
  5. def task_failure_alert(context):
  6. print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
  7. def dag_success_alert(context):
  8. print(f"DAG has succeeded, run_id: {context['run_id']}")
  9. with DAG(
  10. dag_id="example_callback",
  11. schedule=None,
  12. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  13. dagrun_timeout=datetime.timedelta(minutes=60),
  14. catchup=False,
  15. on_success_callback=None,
  16. on_failure_callback=task_failure_alert,
  17. tags=["example"],
  18. ) as dag:
  19. task1 = EmptyOperator(task_id="task1")
  20. task2 = EmptyOperator(task_id="task2")
  21. task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
  22. task1 >> task2 >> task3

Note

As of Airflow 2.6.0, callbacks now supports a list of callback functions, allowing users to specify multiple functions to be executed in the desired event. Simply pass a list of callback functions to the callback args when defining your DAG/task callbacks: e.g on_failure_callback=[callback_func_1, callback_func_2]