常见问题

为什么我的任务没有安排好?

您的任务可能无法安排的原因有很多。 以下是一些常见原因:

  • 您的脚本是否“编译”,Airflow引擎是否可以解析它并找到您的DAG对象。 要对此进行测试,您可以运行airflow list_dags并确认您的DAG显示在列表中。 您还可以运行airflow list_tasks foo_dag_id --tree并确认您的任务按预期显示在列表中。 如果您使用CeleryExecutor,您可能需要确认这既适用于调度程序运行的位置,也适用于工作程序运行的位置。
  • 包含DAG的文件是否在内容的某处包含字符串“airflow”和“DAG”? 在搜索DAG目录时,Airflow忽略不包含“airflow”和“DAG”的文件,以防止DagBag解析导入与用户的DAG并置的所有python文件。
  • 你的start_date设置正确吗? 在传递start_date + scheduler_interval之后,Airflow调度程序会立即触发任务。
  • 您的schedule_interval设置正确吗? 默认schedule_interval是一天( datetime.timedelta(1) )。 您必须直接为实例化的DAG对象指定不同的schedule_interval ,而不是default_param ,因为任务实例不会覆盖其父DAG的schedule_interval
  • 您的start_date超出了在UI中可以看到的位置吗? 如果将start_date设置为3个月之前的某个时间,您将无法在UI的主视图中看到它,但您应该能够在Menu -> Browse ->Task Instances看到它。
  • 是否满足任务的依赖性。 直接位于任务上游的任务实例需要处于success状态。 此外,如果已设置depends_on_past=True ,则上一个任务实例需要成功(除非它是该任务的第一次运行)。 此外,如果wait_for_downstream=True ,请确保您了解其含义。 您可以从Task Instance Details页面查看如何设置这些属性。
  • 您需要创建并激活DagRuns吗? DagRun表示整个DAG的特定执行,并具有状态(运行,成功,失败,……)。 调度程序在向前移动时创建新的DagRun,但永远不会及时创建新的DagRun。 调度程序仅评估running DagRuns以查看它可以触发的任务实例。 请注意,清除任务实例(从UI或CLI)确实将DagRun的状态设置为恢复运行。 您可以通过单击DAG的计划标记来批量查看DagRuns列表并更改状态。
  • 是否达到了DAG的concurrency参数? concurrency定义了允许DAG running任务实例的数量,超过这一点,事物就会排队。
  • 是否达到了DAG的max_active_runs参数? max_active_runs定义允许的DAG running并发实例的数量。

您可能还想阅读文档的“计划程序”部分,并确保完全了解其进度。

如何根据其他任务的失败触发任务?

查看文档“概念Trigger Rule部分中的“ Trigger Rule部分

安装airflow [crypto]后,为什么连接密码仍未在元数据db中加密?

查看文档“配置”部分中的“ Connections部分

start_date什么关系?

start_date是前DagRun时代的部分遗产,但它在很多方面仍然具有相关性。 创建新DAG时,您可能希望使用default_args为任务设置全局start_date 。 要创建的第一个DagRun将基于所有任务的min(start_date) 。 从那时起,调度程序根据您的schedule_interval创建新的DagRuns,并在满足您的依赖项时运行相应的任务实例。 在向DAG引入新任务时,您需要特别注意start_date ,并且可能希望重新激活非活动DagRuns以正确启用新任务。

我们建议不要使用动态值作为start_date ,尤其是datetime.now()因为它可能非常混乱。 一旦周期结束,任务就会被触发,理论上, @hourly DAG永远不会达到一小时后,因为now()会移动。

以前我们还建议使用与schedule_interval相关的舍入start_date 。 这意味着@hourly将在00:00分钟:秒,午夜的@monthly工作,在这个月的第一个月的@monthly工作。 这不再是必需的。 现在,Airflow将自动对齐start_dateschedule_interval ,方法是使用start_date作为开始查看的时刻。

您可以使用任何传感器或TimeDeltaSensor来延迟计划间隔内的任务执行。 虽然schedule_interval允许指定datetime.timedelta对象,但我们建议使用宏或cron表达式,因为它强制执行舍入计划的这种想法。

使用depends_on_past=True时,必须特别注意start_date因为过去的依赖关系不会仅针对为任务指定的start_date的特定计划强制执行。 除非您计划为新任务运行回填,否则在引入新的depends_on_past=True时及时观察DagRun活动状态也很重要。

另外需要注意的是,在回填CLI命令的上下文中,任务start_date会被回填命令start_date覆盖。 这允许对具有depends_on_past=True任务实际启动的回填,如果不是这样,则回填就不会启动。

如何动态创建DAG?

Airflow在DAGS_FOLDER查找其全局命名空间中包含DAG对象的模块,并在DagBag添加它找到的对象。 知道这一切我们需要的是一种在全局命名空间中动态分配变量的方法,这可以在python中使用globals()函数轻松完成,标准库的行为就像一个简单的字典。

  1. for i in range ( 10 ):
  2. dag_id = 'foo_ {} ' . format ( i )
  3. globals ()[ dag_id ] = DAG ( dag_id )
  4. # or better, call a function that returns a DAG object!

我的进程列表中的所有airflow run命令是什么?

airflow run命令有很多层,这意味着它可以调用自身。

  • 基本airflow run :启动执行程序,并告诉它运行airflow run --local命令。 如果使用Celery,这意味着它会在队列中放置一个命令,使其在worker上运行远程。 如果使用LocalExecutor,则转换为在子进程池中运行它。
  • 本地airflow run --local :启动airflow run --raw命令(如下所述)作为子airflow run --raw ,负责发出心跳,监听外部airflow run --raw信号,并确保在子进程失败时进行一些清理
  • 原始airflow run --raw运行实际操作员的执行方法并执行实际工作

我的气流dag如何运行得更快?

我们可以控制三个变量来改善气流dag性能:

  • parallelism :此变量控制气流工作者可以同时运行的任务实例的数量。 用户可以增加airflow.cfg的并行度变量。
  • concurrency :Airflow调度程序在任何给定时间都将为您的DAG运行不超过$concurrency任务实例。 并发性在Airflow DAG中定义。 如果未在DAG上设置并发性,则调度程序将使用dag_concurrency条目的缺省值。
  • max_active_runs :在给定时间,Airflow调度程序将运行不超过DAG的max_active_runs DagRuns。 如果未在DAG中设置max_active_runs ,则调度程序将使用airflow.cfg max_active_runs_per_dag条目的缺省值。

我们如何减少气流UI页面加载时间?

如果你的dag需要很长时间才能加载,你可以将airflow.cfgdefault_dag_run_display_number配置的值airflow.cfg到一个较小的值。 此可配置控制在UI中显示的dag run的数量,默认值为25。

如何修复异常:全局变量explicit_defaults_for_timestamp需要打开(1)?

这意味着在mysql服务器中禁用了explicit_defaults_for_timestamp ,您需要通过以下方式启用它:

  1. 在my.cnf文件的mysqld部分下设置explicit_defaults_for_timestamp = 1
  2. 重启Mysql服务器。

如何减少生产中的气流dag调度延迟?

  • max_threads :Scheduler将并行生成多个线程来安排dags。 这由max_threads控制,默认值为2.用户应在生产中将此值增加到更大的值(例如,调度程序运行的cpus的数量 - 1)。
  • scheduler_heartbeat_sec :用户应考虑将scheduler_heartbeat_sec配置增加到更高的值(例如60秒),该值控制气流调度程序获取心跳的频率并更新作业在数据库中的条目。