概念

译者:@ImPerat0R_

Airflow Platform是用于描述,执行和监控工作流的工具。

核心理念

DAGs

在Airflow中,DAG(或者叫有向无环图)是您要运行的所有任务的集合,以反映其关系和依赖关系的方式进行组织。

例如,一个简单的DAG可以包含三个任务:A,B和C.可以说A必须在B可以运行之前成功运行,但C可以随时运行。它可以说任务A在5分钟后超时,并且B可以重新启动最多5次以防它失败。它也可能会说工作流程将在每天晚上10点运行,但不应该在某个特定日期之前开始。

通过这种方式,DAG描述了您希望如何执行工作流程; 但请注意,我们还没有说过我们真正想做的事情!A,B和C可以是任何东西。当C发送电子邮件时,也许A准备B进行分析的数据。或者也许A监控你的位置,这样B可以打开你的车库门,而C打开你的房子灯。 重要的是,DAG并不关心其组成任务的作用;它的工作是确保无论他们做什么在正确的时间,或正确的顺序,或正确处理任何意外的问题。

DAG是在标准Python文件中定义的,这些文件放在Airflow的DAG_FOLDER中。Airflow将执行每个文件中的代码以动态构建DAG对象。 您可以拥有任意数量的DAG,每个DAG都可以描述任意数量的任务。通常,每个应该对应于单个逻辑工作流。

注意

搜索DAG时,Airflow将仅考虑字符串“airflow”和“DAG”都出现在.py文件内容中的文件。

范围

Airflow将加载它可以从DAG文件导入的任何DAG对象。重要的是,这意味着DAG必须出现在globals()。考虑以下两个DAG。只会加载dag_1 ; 另一个只出现在本地范围内。

  1. dag_1 = DAG('this_dag_will_be_discovered')
  2. def my_function():
  3. dag_2 = DAG('but_this_dag_will_not')
  4. my_function()

有时这可以很好地利用。例如,SubDagOperator的常见模式是定义函数内的子标记,以便Airflow不会尝试将其作为独立的DAG加载。

默认参数

如果将default_args字典传递给DAG,它将把它们应用于任何运算符。这使得很容易将公共参数应用于许多运算符而无需多次键入。

  1. default_args = {
  2. 'start_date': datetime(2016, 1, 1),
  3. 'owner': 'Airflow'
  4. }
  5. dag = DAG('my_dag', default_args=default_args)
  6. op = DummyOperator(task_id='dummy', dag=dag)
  7. print(op.owner) # Airflow

上下文管理器

在Airflow 1.8中添加

DAG可用作上下文管理器,以自动将新操作器分配给该DAG。

  1. with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
  2. op = DummyOperator('op')
  3. op.dag is dag # True

运营商

虽然DAG描述了如何运行工作流,但Operators确定实际完成的工作。

操作器描述工作流中的单个任务。 操作器通常(但并非总是)是原子的,这意味着他们可以独立运营,而不需要与任何其他操作器共享资源。DAG将确保操作器以正确的顺序运行; 除了这些依赖项之外,操作器通常独立运行。实际上,它们可能在两台完全不同的机器上运行。

这是一个微妙但非常重要的一点:通常,如果两个操作器需要共享信息,如文件名或少量数据,您应该考虑将它们组合到一个操作器中。如果绝对无法避免,Airflow确实具有操作员交叉通信的功能,称为XCom,本文档的其他部分对此进行了描述。

Airflow为许多常见任务提供操作器,包括:

  • BashOperator - 执行bash命令
  • PythonOperator - 调用任意Python函数
  • EmailOperator - 发送电子邮件
  • SimpleHttpOperator - 发送HTTP请求
  • MySqlOperatorSqliteOperatorPostgresOperatorMsSqlOperatorOracleOperatorJdbcOperator等 - 执行SQL命令
  • Sensor - 等待一定时间,文件,数据库行,S3键等…

除了这些基本构建块之外,还有许多特定的操作器: DockerOperatorHiveOperatorS3FileTransformOperatorPrestoToMysqlOperatorSlackOperator……你会明白的!

airflow/contrib/目录包含更多由社区构建的操作器。这些运算符并不总是像主发行版中那样完整或经过良好测试,但允许用户更轻松地向平台添加新功能。

如果将操作器分配给DAG,则操作器仅由Airflow加载。

请参阅使用操作器了解如何使用Airflow操作器。

DAG分配

在Airflow 1.8中添加

操作器不必立即分配给DAG(之前的dag是必需参数)。但是,一旦将操作器分配给DAG,就无法转移或取消分配。在创建运算符时,通过延迟赋值或甚至从其他运算符推断,可以显式地完成DAG分配。

  1. dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
  2. # sets the DAG explicitly
  3. explicit_op = DummyOperator(task_id='op1', dag=dag)
  4. # deferred DAG assignment
  5. deferred_op = DummyOperator(task_id='op2')
  6. deferred_op.dag = dag
  7. # inferred DAG assignment (linked operators must be in the same DAG)
  8. inferred_op = DummyOperator(task_id='op3')
  9. inferred_op.set_upstream(deferred_op)

位运算符

在Airflow 1.8中添加

传统上,使用set_upstream()set_downstream()方法设置运算符关系。在Airflow 1.8中,这可以通过Python 位运算符>><<来完成。以下四个语句在功能上都是等效的:

  1. op1 >> op2
  2. op1.set_downstream(op2)
  3. op2 << op1
  4. op2.set_upstream(op1)

当使用位运算符时,关系设置在位运算符指向的方向上。例如,op1 >> op2表示op1先运行,op2第二运行。可以组成多个运算符 - 请记住,链从左到右执行,并且始终返回最右边的对象。 例如:

  1. op1 >> op2 >> op3 << op4

相当于:

  1. op1.set_downstream(op2)
  2. op2.set_downstream(op3)
  3. op3.set_upstream(op4)

为方便起见,位运算符也可以与DAG一起使用。 例如:

  1. dag >> op1 >> op2

相当于:

  1. op1.dag = dag
  2. op1.set_downstream(op2)

我们可以把这一切放在一起构建一个简单的管道:

  1. with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
  2. (
  3. DummyOperator(task_id='dummy_1')
  4. >> BashOperator(
  5. task_id='bash_1',
  6. bash_command='echo "HELLO!"')
  7. >> PythonOperator(
  8. task_id='python_1',
  9. python_callable=lambda: print("GOODBYE!"))
  10. )

任务

一旦操作器被实例化,它就被称为“任务”。实例化在调用抽象操作器时定义特定值,参数化任务成为DAG中的节点。

任务实例

任务实例表示任务的特定运行,其特征在于dag,任务和时间点的组合。 任务实例也有一个指示状态,可以是“运行”,“成功”,“失败”,“跳过”,“重试”等。

工作流程

您现在熟悉Airflow的核心构建模块。 有些概念可能听起来非常相似,但词汇表可以概念化如下:

  • DAG:描述工作应该发生的顺序
  • 操作器:作为执行某些工作的模板的类
  • 任务:操作器参数化实例
  • 任务实例:1)已分配给DAG的任务,2)具有与DAG的特定运行相关联的状态

通过组合DAGsOperators来创建TaskInstances,您可以构建复杂的工作流。

附加功能

除了核心Airflow对象之外,还有许多更复杂的功能可以实现限制同时访问资源,交叉通信,条件执行等行为。

钩子是外部平台和数据库的接口,如Hive,S3,MySQL,Postgres,HDFS和Pig。Hooks尽可能实现通用接口,并充当操作器的构建块。他们还使用airflow.models.Connection模型来检索主机名和身份验证信息。钩子将身份验证代码和信息保存在管道之外,集中在元数据数据库中。

钩子在Python脚本,Airflow airflow.operators.PythonOperator以及iPython或Jupyter Notebook等交互式环境中使用它们也非常有用。

当有太多进程同时运行时,某些系统可能会被淹没。Airflow池可用于限制任意任务集上的执行并行性 。通过为池命名并为其分配多个工作槽来在UI( Menu -> Admin -> Pools )中管理池列表。然后,在创建任务时(即,实例化运算符),可以使用pool参数将任务与其中一个现有池相关联。

  1. aggregate_db_message_job = BashOperator(
  2. task_id='aggregate_db_message_job',
  3. execution_timeout=timedelta(hours=3),
  4. pool='ep_data_pipeline_db_msg_agg',
  5. bash_command=aggregate_db_message_job_cmd,
  6. dag=dag)
  7. aggregate_db_message_job.set_upstream(wait_for_empty_queue)

pool参数可以与priority_weight结合使用,以定义队列中的优先级,以及在池中打开的槽时首先执行哪些任务。默认的priority_weight1,可以碰到任何数字。 在对队列进行排序以评估接下来应该执行哪个任务时,我们使用priority_weight与来自此任务下游任务的所有priority_weight值相加。您可以使用它来执行特定的重要任务,并相应地优先处理该任务的整个路径。

当插槽填满时,任务将照常安排。达到容量后,可运行的任务将排队,其状态将在UI中显示。当插槽空闲时,排队的任务将根据priority_weight (任务及其后代)开始运行。

请注意,默认情况下,任务不会分配给任何池,并且它们的执行并行性仅限于执行程序的设置。

连接

外部系统的连接信息存储在Airflow元数据数据库中,并在UI中进行管理(Menu -> Admin -> Connections)。在那里定义了conn_id ,并附加了主机名/登录/密码/结构信息。 Airflow管道可以简单地引用集中管理的conn_id而无需在任何地方硬编码任何此类信息。

可以定义具有相同conn_id许多连接,并且在这种情况下,并且当挂钩使用来自BaseHookget_connection方法时,Airflow将随机选择一个连接,允许在与重试一起使用时进行一些基本的负载平衡和容错。

Airflow还能够通过操作系统中的环境变量引用连接。但它只支持URI格式。如果您需要为连接指定extra信息,请使用Web UI。

如果在Airflow元数据数据库和环境变量中都定义了具有相同conn_id连接,则Airflow将仅引用环境变量中的连接(例如,给定conn_id postgres_master,在开始搜索元数据数据库之前,Airflow将优先在环境变量中搜索AIRFLOW_CONN_POSTGRES_MASTER并直接引用它)。

许多钩子都有一个默认的conn_id,使用该挂钩的操作器不需要提供显式连接ID。 例如,PostgresHook的默认conn_idpostgres_default

请参阅管理连接以了解如何创建和管理连接。

队列

使用CeleryExecutor时,可以指定发送任务到Celery队列。queue是BaseOperator的一个属性,因此任何任务都可以分配给任何队列。 环境的默认队列配置在airflow.cfgcelery -> default_queue。这定义了未指定任务时分配给的队列,以及Airflow工作程序在启动时侦听的队列。

Workers可以收听一个或多个任务队列。当工作程序启动时(使用命令airflow worker),可以指定一组逗号分隔的队列名称(例如, airflow worker -q spark)。然后,该worker将仅接收连接到指定队列的任务。

如果您需要特定的workers,从资源角度来看(例如,一个工作人员可以毫无问题地执行数千个任务),或者从环境角度(您希望工作人员从Spark群集中运行),这可能非常有用本身,因为它需要一个非常具体的环境和安全的权利)。

XComs

XComs允许任务交换消息,允许更细微的控制形式和共享状态。该名称是“交叉通信”的缩写。XComs主要由键,值和时间戳定义,但也跟踪创建XCom的任务/DAG以及何时应该可见的属性。任何可以被pickle的对象都可以用作XCom值,因此用户应该确保使用适当大小的对象。

可以“推”(发送)或“拉”(接收)XComs。当任务推送XCom时,它通常可用于其他任务。任务可以通过调用xcom_push()方法随时推送XComs。 此外,如果任务返回一个值(来自其Operator的execute()方法,或者来自PythonOperator的python_callable函数),则会自动推送包含该值的XCom。

任务调用xcom_pull()来检索XComs,可选地根据key,source task_ids和source dag_id等条件应用过滤器。默认情况下, xcom_pull()过滤掉从执行函数返回时被自动赋予XCom的键(与手动推送的XCom相反)。

如果为task_ids传递xcom_pull单个字符串,则返回该任务的最新XCom值; 如果传递了task_ids列表,则返回相应的XCom值列表。

  1. # inside a PythonOperator called 'pushing_task'
  2. def push_function():
  3. return value
  4. # inside another PythonOperator where provide_context=True
  5. def pull_function(**context):
  6. value = context['task_instance'].xcom_pull(task_ids='pushing_task')

也可以直接在模板中提取XCom,这是一个示例:

  1. SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

请注意,XCom与变量类似,但专门用于任务间通信而非全局设置。

变量

变量是将任意内容或设置存储和检索为Airflow中的简单键值存储的通用方法。可以从UI(Admin -> Variables),代码或CLI列出,创建,更新和删除变量。此外,json设置文件可以通过UI批量上传。虽然管道代码定义和大多数常量和变量应该在代码中定义并存储在源代码控制中,但是通过UI可以访问和修改某些变量或配置项会很有用。

  1. from airflow.models import Variable
  2. foo = Variable.get("foo")
  3. bar = Variable.get("bar", deserialize_json=True)

第二个调用假设json内容,并将反序列化为bar。请注意, Variable是sqlalchemy模型,可以这样使用。

您可以使用jinja模板中的变量,其语法如下:

  1. echo {{ var.value.<variable_name> }}

或者如果需要从变量反序列化json对象:

  1. echo {{ var.json.<variable_name> }}

分枝

有时您需要一个工作流来分支,或者只根据任意条件走下某条路径,这通常与上游任务中发生的事情有关。 一种方法是使用BranchPythonOperator

BranchPythonOperator与PythonOperator非常相似,只是它需要一个返回task_id的python_callable。返回返回的task_id,并跳过所有其他路径。Python函数返回的task_id必须直接引用BranchPythonOperator任务下游的任务。

请注意,使用depends_on_past=True下游的任务在逻辑上是不合理的,因为skipped状态将总是导致依赖于过去成功的块任务。 skipped状态在所有直接上游任务被skipped地方传播。

如果你想跳过一些任务,请记住你不能有一个空路径,如果是这样,那就做一个虚拟任务。

像这样,跳过虚拟任务“branch_false”

https://airflow.apache.org/_images/branch_good.png

不喜欢这样,跳过连接任务

https://airflow.apache.org/_images/branch_bad.png

SubDAGs

SubDAG非常适合重复模式。在使用Airflow时,定义一个返回DAG对象的函数是一个很好的设计模式。

Airbnb在加载数据时使用阶段检查交换模式。数据在临时表中暂存,然后对该表执行数据质量检查。 一旦检查全部通过,分区就会移动到生产表中。

再举一个例子,考虑以下DAG:

https://airflow.apache.org/_images/subdag_before.png

我们可以将所有并行task-*运算符组合到一个SubDAG中,以便生成的DAG类似于以下内容:

https://airflow.apache.org/_images/subdag_after.png

请注意,SubDAG运算符应包含返回DAG对象的工厂方法。 这将阻止SubDAG在主UI中被视为单独的DAG。 例如:

  1. #dags/subdag.py
  2. from airflow.models import DAG
  3. from airflow.operators.dummy_operator import DummyOperator
  4. # Dag is returned by a factory method
  5. def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
  6. dag = DAG(
  7. '%s.%s' % (parent_dag_name, child_dag_name),
  8. schedule_interval=schedule_interval,
  9. start_date=start_date,
  10. )
  11. dummy_operator = DummyOperator(
  12. task_id='dummy_task',
  13. dag=dag,
  14. )
  15. return dag

然后可以在主DAG文件中引用此SubDAG:

  1. # main_dag.py
  2. from datetime import datetime, timedelta
  3. from airflow.models import DAG
  4. from airflow.operators.subdag_operator import SubDagOperator
  5. from dags.subdag import sub_dag
  6. PARENT_DAG_NAME = 'parent_dag'
  7. CHILD_DAG_NAME = 'child_dag'
  8. main_dag = DAG(
  9. dag_id=PARENT_DAG_NAME,
  10. schedule_interval=timedelta(hours=1),
  11. start_date=datetime(2016, 1, 1)
  12. )
  13. sub_dag = SubDagOperator(
  14. subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
  15. main_dag.schedule_interval),
  16. task_id=CHILD_DAG_NAME,
  17. dag=main_dag,
  18. )

您可以从主DAG的图形视图放大SubDagOperator,以显示SubDAG中包含的任务:

https://airflow.apache.org/_images/subdag_zoom.png

使用SubDAG时的一些其他提示:

  • 按照惯例,SubDAG的dag_id应以其父级和点为前缀。 和在parent.child
  • 通过将参数传递给SubDAG运算符来共享主DAG和SubDAG之间的参数(如上所示)
  • SubDAG必须有一个计划并启用。如果SubDAG的时间表设置为None@once ,SubDAG将成功完成而不做任何事情
  • 清除SubDagOperator也会清除其中的任务状态
  • 在SubDagOperator上标记成功不会影响其中的任务状态
  • 避免在任务中使用depends_on_past=True,因为这可能会造成混淆
  • 可以为SubDAG指定执行程序。 如果要在进程中运行SubDAG并有效地将其并行性限制为1,则通常使用SequentialExecutor。使用LocalExecutor可能会有问题,因为它可能会过度消耗您的workers,在单个插槽中运行多个任务

有关演示,请参阅airflow/example_dags

SLAs

服务级别协议或者叫任务或DAG应该成功的时间,可以在任务级别设置为timedelta。如果此时一个或多个实例未成功,则会发送警报电子邮件,详细说明错过其SLA的任务列表。该事件也记录在数据库中,并在Browse->Missed SLAs下的Web UI中显示,其中可以分析和记录事件。

触发规则

虽然正常的工作流行为是在所有直接上游任务都成功时触发任务,但Airflow允许更复杂的依赖项设置。

所有运算符都有一个trigger_rule参数,该参数定义生成的任务被触发的规则。trigger_rule的默认值是all_success,可以定义为“当所有直接上游任务都成功时触发此任务”。此处描述的所有其他规则都基于直接父任务,并且是在创建任务时可以传递给任何操作员的值:

  • all_success :(默认)所有父任务都成功了
  • all_failed :所有父all_failed都处于failedupstream_failed状态
  • all_done :所有父任务都完成了他们的执行
  • one_failed :一旦至少一个父任务就会触发,它不会等待所有父任务完成
  • one_success :一旦至少一个父任务成功就触发,它不会等待所有父母完成
  • dummy :依赖项仅用于显示,随意触发

请注意,这些可以与depends_on_past(boolean)结合使用,当设置为True,如果任务的先前计划未成功,则不会触发任务。

只运行最新的

标准工作流行为涉及为特定日期/时间范围运行一系列任务。但是,某些工作流执行的任务与运行时无关,但需要按计划运行,就像标准的cron作业一样。在这些情况下,暂停期间错过的回填或运行作业会浪费CPU周期。

对于这种情况,您可以使用LatestOnlyOperator跳过在DAG的最近计划运行期间未运行的任务。如果现在的时间不在其execution_time和下一个计划的execution_time之间,则LatestOnlyOperator跳过所有直接下游任务及其自身。

必须意识到跳过的任务和触发器规则之间的相互作用。跳过的任务将通过触发器规则all_successall_failed级联,但不是all_doneone_failedone_successdummy。如果您希望将LatestOnlyOperator与不级联跳过的触发器规则一起使用,则需要确保LatestOnlyOperator直接位于您要跳过的任务的上游。

通过使用触发器规则来混合应该在典型的日期/时间依赖模式下运行的任务和使用LatestOnlyOperator任务是可能的。

例如,考虑以下dag:

  1. #dags/latest_only_with_trigger.py
  2. import datetime as dt
  3. from airflow.models import DAG
  4. from airflow.operators.dummy_operator import DummyOperator
  5. from airflow.operators.latest_only_operator import LatestOnlyOperator
  6. from airflow.utils.trigger_rule import TriggerRule
  7. dag = DAG(
  8. dag_id='latest_only_with_trigger',
  9. schedule_interval=dt.timedelta(hours=4),
  10. start_date=dt.datetime(2016, 9, 20),
  11. )
  12. latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
  13. task1 = DummyOperator(task_id='task1', dag=dag)
  14. task1.set_upstream(latest_only)
  15. task2 = DummyOperator(task_id='task2', dag=dag)
  16. task3 = DummyOperator(task_id='task3', dag=dag)
  17. task3.set_upstream([task1, task2])
  18. task4 = DummyOperator(task_id='task4', dag=dag,
  19. trigger_rule=TriggerRule.ALL_DONE)
  20. task4.set_upstream([task1, task2])

在这个dag的情况下,对于除最新运行之外的所有运行,latest_only任务将显示为跳过。task1直接位于latest_only下游,并且除了最新的之外还将跳过所有运行。task2完全独立于latest_only,将在所有计划的时间段内运行。task3task1task2下游,由于默认的trigger_ruleall_success将从all_success接收级联跳过。task4task1task2下游,但由于其trigger_rule设置为all_done因此一旦跳过all_done(有效的完成状态)并且task2成功,它将立即触发。

https://airflow.apache.org/_images/latest_only_with_trigger.png

僵尸与不死

任务实例一直在死,通常是正常生命周期的一部分,但有时会出乎人到意料。

僵尸任务的特点是没有心跳(由工作定期发出)和数据库中的running状态。当工作节点无法访问数据库,Airflow进程在外部被终止或者节点重新启动时,它们可能会发生。僵尸任务查杀是由调度程序的进程定期执行。

不死进程的特点是存在进程和匹配的心跳,但Airflow不知道此任务在数据库中running。这种不匹配通常在数据库状态发生变化时发生,最有可能是通过删除UI中“任务实例”视图中的行。指示任务验证其作为心跳例程的一部分的状态,并在确定它们处于这种“不死”状态时终止自身。

集群策略

您的本地Airflow设置文件可以定义一个policy功能,该功能可以根据其他任务或DAG属性改变任务属性。它接收单个参数作为对任务对象的引用,并期望改变其属性。

例如,此函数可以在使用特定运算符时应用特定队列属性,或强制执行任务超时策略,确保没有任务运行超过48小时。 以下是airflow_settings.py

  1. def policy(task):
  2. if task.__class__.__name__ == 'HivePartitionSensor':
  3. task.queue = "sensor_queue"
  4. if task.timeout > timedelta(hours=48):
  5. task.timeout = timedelta(hours=48)

文档和注释

可以在Web界面中显示的dag和任务对象中添加文档或注释(dag为“Graph View”,任务为“Task Details”)。如果定义了一组特殊任务属性,它们将被呈现为丰富内容:

属性 渲染到
doc monospace
doc_json JSON
doc_yaml YAML
doc_md markdown
doc_rst reStructuredText

请注意,对于dags,doc_md是解释的唯一属性。

如果您的任务是从配置文件动态构建的,则此功能特别有用,它允许您公开Airflow中相关任务的配置。

  1. """
  2. ### My great DAG
  3. """
  4. dag = DAG('my_dag', default_args=default_args)
  5. dag.doc_md = __doc__
  6. t = BashOperator("foo", dag=dag)
  7. t.doc_md = """\
  8. #Title"
  9. Here's a [url](www.airbnb.com)
  10. """

此内容将分别在“图表视图”和“任务详细信息”页面中呈现为markdown。

Jinja模板

Airflow充分利用了Jinja Templating的强大功能,这可以成为与宏结合使用的强大工具(参见部分)。

例如,假设您希望使用BashOperator将执行日期作为环境变量传递给Bash脚本。

  1. # The execution date as YYYY-MM-DD
  2. date = "{{ ds }}"
  3. t = BashOperator(
  4. task_id='test_env',
  5. bash_command='/tmp/test.sh ',
  6. dag=dag,
  7. env={'EXECUTION_DATE': date})

这里, {{ ds }}是一个宏,并且由于BashOperatorenv参数是使用Jinja模板化的,因此执行日期将作为Bash脚本中名为EXECUTION_DATE的环境变量提供。

您可以将Jinja模板与文档中标记为“模板化”的每个参数一起使用。模板替换发生在调用运算符的pre_execute函数之前。

打包的dags

虽然通常会在单个.py文件中指定dags,但有时可能需要将dag及其依赖项组合在一起。 例如,您可能希望将多个dag组合在一起以将它们一起版本,或者您可能希望将它们一起管理,或者您可能需要一个额外的模块,默认情况下在您运行airflow的系统上不可用。 为此,您可以创建一个zip文件,其中包含zip文件根目录中的dag,并在目录中解压缩额外的模块。

例如,您可以创建一个如下所示的zip文件:

  1. my_dag1.py
  2. my_dag2.py
  3. package1/__init__.py
  4. package1/functions.py

Airflow将扫描zip文件并尝试加载my_dag1.pymy_dag2.py 。 它不会进入子目录,因为它们被认为是潜在的包。

如果您想将模块依赖项添加到DAG,您基本上也可以这样做,但是更多的是使用virtualenv和pip。

  1. virtualenv zip_dag
  2. source zip_dag/bin/activate
  3. mkdir zip_dag_contents
  4. cd zip_dag_contents
  5. pip install --install-option = "--install-lib= $PWD " my_useful_package
  6. cp ~/my_dag.py .
  7. zip -r zip_dag.zip *

注意

zip文件将插入模块搜索列表(sys.path)的开头,因此它将可用于驻留在同一解释器中的任何其他代码。

注意

打包的dags不能与打开pickling时一起使用。

注意

打包的dag不能包含动态库(例如libz.so),如果模块需要这些库,则需要在系统上使用这些库。换句话说,只能打包纯python模块。