Listener Plugin of Airflow

Airflow has feature that allows to add listener for monitoring and tracking the task state using Plugins.

This is a simple example listener plugin of Airflow that helps to track the task state and collect useful metadata information about the task, dag run and dag.

This is an example plugin for Airflow that allows to create listener plugin of Airflow. This plugin works by using SQLAlchemy’s event mechanism. It watches the task instance state change in the table level and triggers event. This will be notified for all the tasks across all the DAGs.

In this plugin, an object reference is derived from the base class airflow.plugins_manager.AirflowPlugin.

Listener plugin uses pluggy app under the hood. Pluggy is an app built for plugin management and hook calling for Pytest. Pluggy enables function hooking so it allows building “pluggable” systems with your own customization over that hooking.

Using this plugin, following events can be listened:

  • task instance is in running state.

  • task instance is in success state.

  • task instance is in failure state.

  • dag run is in running state.

  • dag run is in success state.

  • dag run is in failure state.

  • on start before event like airflow job, scheduler or backfilljob

  • before stop for event like airflow job, scheduler or backfilljob

Listener Registration

A listener plugin with object reference to listener object is registered as part of airflow plugin. The following is a skeleton for us to implement a new listener:

  1. from airflow.plugins_manager import AirflowPlugin
  2. # This is the listener file created where custom code to monitor is added over hookimpl
  3. import listener
  4. class MetadataCollectionPlugin(AirflowPlugin):
  5. name = "MetadataCollectionPlugin"
  6. listeners = [listener]

Next, we can check code added into listener and see implementation methods for each of those listeners. After the implementation, the listener part gets executed during all the task execution across all the DAGs

For reference, here’s the plugin code within listener.py class that shows list of tables in the database:

This example listens when the task instance is in running state

airflow/example_dags/plugins/event_listener.py[source]

  1. @hookimpl
  2. def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
  3. """
  4. This method is called when task state changes to RUNNING.
  5. Through callback, parameters like previous_task_state, task_instance object can be accessed.
  6. This will give more information about current task_instance that is running its dag_run,
  7. task and dag information.
  8. """
  9. print("Task instance is in running state")
  10. print(" Previous state of the Task instance:", previous_state)
  11. state: TaskInstanceState = task_instance.state
  12. name: str = task_instance.task_id
  13. start_date = task_instance.start_date
  14. dagrun = task_instance.dag_run
  15. dagrun_status = dagrun.state
  16. task = task_instance.task
  17. dag = task.dag
  18. dag_name = None
  19. if dag:
  20. dag_name = dag.dag_id
  21. print(f"Current task name:{name} state:{state} start_date:{start_date}")
  22. print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")

Similarly, code to listen after task_instance success and failure can be implemented.

This example listens when the dag run is change to failed state

airflow/example_dags/plugins/event_listener.py[source]

  1. @hookimpl
  2. def on_dag_run_failed(dag_run: DagRun, message: str):
  3. """
  4. This method is called when dag run state changes to FAILED.
  5. """
  6. print("Dag run in failure state")
  7. dag_id = dag_run.dag_id
  8. run_id = dag_run.run_id
  9. external_trigger = dag_run.external_trigger
  10. print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")

Similarly, code to listen after dag_run success and during running state can be implemented.

The listener plugin files required to add the listener implementation is added as part of the Airflow plugin into $AIRFLOW_HOME/plugins/ folder and loaded during Airflow startup.