Lineage

注意

Lineage 支持是非常实验性的,可能会发生变化。

Airflow可以帮助跟踪数据的来源,发生的事情以及数据随时间的变化。 这有助于实现审计跟踪和数据治理,还可以调试数据流。

气流通过任务的入口和出口跟踪数据。 让我们从一个例子开始,看看它是如何工作的。

  1. from airflow.operators.bash_operator import BashOperator
  2. from airflow.operators.dummy_operator import DummyOperator
  3. from airflow.lineage.datasets import File
  4. from airflow.models import DAG
  5. from datetime import timedelta
  6. FILE_CATEGORIES = [ "CAT1" , "CAT2" , "CAT3" ]
  7. args = {
  8. 'owner' : 'airflow' ,
  9. 'start_date' : airflow . utils . dates . days_ago ( 2 )
  10. }
  11. dag = DAG (
  12. dag_id = 'example_lineage' , default_args = args ,
  13. schedule_interval = '0 0 * * *' ,
  14. dagrun_timeout = timedelta ( minutes = 60 ))
  15. f_final = File ( "/tmp/final" )
  16. run_this_last = DummyOperator ( task_id = 'run_this_last' , dag = dag ,
  17. inlets = { "auto" : True },
  18. outlets = { "datasets" : [ f_final ,]})
  19. f_in = File ( "/tmp/whole_directory/" )
  20. outlets = []
  21. for file in FILE_CATEGORIES :
  22. f_out = File ( "/tmp/ {} /{{{{ execution_date }}}}" . format ( file ))
  23. outlets . append ( f_out )
  24. run_this = BashOperator (
  25. task_id = 'run_me_first' , bash_command = 'echo 1' , dag = dag ,
  26. inlets = { "datasets" : [ f_in ,]},
  27. outlets = { "datasets" : outlets }
  28. )
  29. run_this . set_downstream ( run_this_last )

任务采用参数<cite>入口</cite>和<cite>出口</cite> 。 入口可以由数据集列表<cite>{“数据集”:[dataset1,dataset2]}</cite>手动定义,也可以配置为从上游任务中查找出口<cite>{“task_ids”:[“task_id1”,“task_id2”]}</cite>或者可以配置为从直接上游任务<cite>{“auto”:True}</cite>或它们的组合中获取出口。 出口被定义为数据集列表<cite>{“数据集”:[dataset1,dataset2]}</cite> 。 在执行任务时,数据集的任何字段都使用上下文进行模板化。

注意

如果操作员支持,操作员可以自动添加入口和出口。

在示例DAG任务中, <cite>run_me_first</cite>是一个BashOperator,它接收从列表生成的3个入口: <cite>CAT1</cite> , <cite>CAT2</cite> , <cite>CAT3</cite> 。 请注意, <cite>execution_date</cite>是一个模板化字段,将在任务运行时呈现。

注意

在幕后,Airflow将沿袭元数据作为任务的<cite>pre_execute</cite>方法的一部分进行准备。 当任务完成执行<cite>时,</cite>将调用<cite>post_execute</cite>并将lineage元数据推送到XCOM中。 因此,如果您要创建自己的覆盖此方法的运算符,请确保分别使用<cite>prepare_lineage</cite>和<cite>apply_lineage</cite>修饰您的方法。

Apache Atlas

Airflow可以将其沿袭元数据发送到Apache Atlas。 您需要启用<cite>atlas</cite>后端并正确配置它,例如在<cite>airflow.cfg中</cite> :

  1. [ lineage ]
  2. backend = airflow . lineage . backend . atlas
  3. [ atlas ]
  4. username = my_username
  5. password = my_password
  6. host = host
  7. port = 21000

请确保安装了<cite>atlasclient</cite>软件包。