使用Operators(执行器)

译者:@ImPerat0R_@ThinkingChen

operator(执行器)代表一个理想情况下是幂等的任务。operator(执行器)决定了DAG运行时实际执行的内容。

有关更多信息,请参阅Operators Concepts文档和Operators API Reference

BashOperator

使用BashOperatorBash shell中执行命令。

  1. run_this = BashOperator(
  2. task_id='run_after_loop',
  3. bash_command='echo 1',
  4. dag=dag)

模板

您可以使用Jinja模板来参数化bash_command参数。

  1. also_run_this = BashOperator(
  2. task_id='also_run_this',
  3. bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
  4. dag=dag,
  5. )

故障排除

找不到Jinja模板

在使用bash_command参数直接调用Bash脚本时,需要在脚本名称后添加空格。这是因为Airflow尝试将Jinja模板应用于一个失败的脚本。

  1. t2 = BashOperator(
  2. task_id='bash_example',
  3. # 这将会出现`Jinja template not found`的错误
  4. # bash_command="/home/batcher/test.sh",
  5. # 在加了空格之后,这会正常工作
  6. bash_command="/home/batcher/test.sh ",
  7. dag=dag)

PythonOperator

使用PythonOperator执行Python回调。

  1. def print_context ( ds , ** kwargs ):
  2. pprint ( kwargs )
  3. print ( ds )
  4. return 'Whatever you return gets printed in the logs'
  5. run_this = PythonOperator (
  6. task_id = 'print_the_context' ,
  7. provide_context = True ,
  8. python_callable = print_context ,
  9. dag = dag )

传递参数

使用op_argsop_kwargs参数将额外参数传递给Python的回调函数。

  1. def my_sleeping_function(random_base):
  2. """这是一个将在DAG执行体中运行的函数"""
  3. time.sleep(random_base)
  4. # Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
  5. for i in range(5):
  6. task = PythonOperator(
  7. task_id='sleep_for_' + str(i),
  8. python_callable=my_sleeping_function,
  9. op_kwargs={'random_base': float(i) / 10},
  10. dag=dag,
  11. )
  12. run_this >> task

模板

当您将provide_context参数设置为True,Airflow会传入一组额外的关键字参数:一个用于每个Jinja模板变量和一个templates_dict参数。

templates_dict参数是模板化的,因此字典中的每个值都被评估为Jinja模板

Google 云平台 Operators(执行器)

GoogleCloudStorageToBigQueryOperator

使用GoogleCloudStorageToBigQueryOperator执行BigQuery加载作业。

GceInstanceStartOperator

允许启动一个已存在的Google Compute Engine实例。

在此示例中,参数值从Airflow变量中提取。此外,default_args字典用于将公共参数传递给单个DAG中的所有operator(执行器)。

  1. PROJECT_ID = models.Variable.get('PROJECT_ID', '')
  2. LOCATION = models.Variable.get('LOCATION', '')
  3. INSTANCE = models.Variable.get('INSTANCE', '')
  4. SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', '')
  5. SET_MACHINE_TYPE_BODY = {
  6. 'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, SHORT_MACHINE_TYPE_NAME)
  7. }
  8. default_args = {
  9. 'start_date': airflow.utils.dates.days_ago(1)
  10. }

通过将所需的参数传递给构造函数来定义GceInstanceStartOperator

  1. gce_instance_start = GceInstanceStartOperator(
  2. project_id=PROJECT_ID,
  3. zone=LOCATION,
  4. resource_id=INSTANCE,
  5. task_id='gcp_compute_start_task'
  6. )

GceInstanceStopOperator

允许停止一个已存在的Google Compute Engine实例。

参数定义请参阅上面的GceInstanceStartOperator

通过将所需的参数传递给构造函数来定义GceInstanceStopOperator

  1. gce_instance_stop = GceInstanceStopOperator(
  2. project_id=PROJECT_ID,
  3. zone=LOCATION,
  4. resource_id=INSTANCE,
  5. task_id='gcp_compute_stop_task'
  6. )

GceSetMachineTypeOperator

允许把一个已停止实例的机器类型改变至特定的类型。

参数定义请参阅上面的GceInstanceStartOperator

通过将所需的参数传递给构造函数来定义GceSetMachineTypeOperator

  1. gce_set_machine_type = GceSetMachineTypeOperator(
  2. project_id=PROJECT_ID,
  3. zone=LOCATION,
  4. resource_id=INSTANCE,
  5. body=SET_MACHINE_TYPE_BODY,
  6. task_id='gcp_compute_set_machine_type'
  7. )

GcfFunctionDeleteOperator

使用default_args字典来传递参数给operator(执行器)。

  1. PROJECT_ID = models.Variable.get('PROJECT_ID', '')
  2. LOCATION = models.Variable.get('LOCATION', '')
  3. ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
  4. # A fully-qualified name of the function to delete
  5. FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
  6. ENTRYPOINT)
  7. default_args = {
  8. 'start_date': airflow.utils.dates.days_ago(1)
  9. }

使用GcfFunctionDeleteOperator来从Google Cloud Functions删除一个函数。

  1. t1 = GcfFunctionDeleteOperator(
  2. task_id="gcf_delete_task",
  3. name=FUNCTION_NAME
  4. )

故障排除

如果你想要使用服务账号来运行或部署一个operator(执行器),但得到了一个403禁止的错误,这意味着你的服务账号没有正确的Cloud IAM权限。

  1. 指定该服务账号为Cloud Functions Developer角色。
  2. 授权Cloud Functions的运行账户为Cloud IAM Service Account User角色。

使用gcloud分配Cloud IAM权限的典型方法如下所示。只需将您的Google Cloud Platform项目ID替换为PROJECT_ID,将SERVICE_ACCOUNT_EMAIL替换为您的服务帐户的电子邮件ID即可。

  1. gcloud iam service-accounts add-iam-policy-binding \
  2. PROJECT_ID@appspot.gserviceaccount.com \
  3. --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
  4. --role="roles/iam.serviceAccountUser"

细节请参阅Adding the IAM service agent user role to the runtime service

GcfFunctionDeployOperator

使用GcfFunctionDeployOperator来从Google Cloud Functions部署一个函数。

以下Airflow变量示例显示了您可以使用的default_args的各种变体和组合。变量定义如下:

  1. PROJECT_ID = models.Variable.get('PROJECT_ID', '')
  2. LOCATION = models.Variable.get('LOCATION', '')
  3. SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
  4. SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
  5. SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')
  6. ZIP_PATH = models.Variable.get('ZIP_PATH', '')
  7. ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
  8. FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
  9. ENTRYPOINT)
  10. RUNTIME = 'nodejs6'
  11. VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True)

使用这些变量,您可以定义请求的主体:

  1. body = {
  2. "name": FUNCTION_NAME,
  3. "entryPoint": ENTRYPOINT,
  4. "runtime": RUNTIME,
  5. "httpsTrigger": {}
  6. }

创建DAG时,default_args字典可用于传递正文和其他参数:

  1. default_args = {
  2. 'start_date': dates.days_ago(1),
  3. 'project_id': PROJECT_ID,
  4. 'location': LOCATION,
  5. 'body': body,
  6. 'validate_body': VALIDATE_BODY
  7. }

请注意,在上面的示例中,body和default_args都是不完整的。根据设置的变量,如何传递源代码相关字段可能有不同的变体。目前,您可以传递sourceArchiveUrl,sourceRepository或sourceUploadUrl,CloudFunction API规范中所述。此外,default_args可能包含zip_path参数,以在部署源代码之前运行上载源代码的额外步骤。在最后一种情况下,您还需要在正文中提供一个空的sourceUploadUrl参数。

基于上面定义的变量,此处显示了设置源代码相关字段的示例逻辑:

  1. if SOURCE_ARCHIVE_URL:
  2. body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
  3. elif SOURCE_REPOSITORY:
  4. body['sourceRepository'] = {
  5. 'url': SOURCE_REPOSITORY
  6. }
  7. elif ZIP_PATH:
  8. body['sourceUploadUrl'] = ''
  9. default_args['zip_path'] = ZIP_PATH
  10. elif SOURCE_UPLOAD_URL:
  11. body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
  12. else:
  13. raise Exception("Please provide one of the source_code parameters")

创建operator(执行器)的代码如下:

  1. deploy_task = GcfFunctionDeployOperator(
  2. task_id="gcf_deploy_task",
  3. name=FUNCTION_NAME
  4. )

Troubleshooting

如果你想要使用服务账号来运行或部署一个operator(执行器),但得到了一个403禁止的错误,这意味着你的服务账号没有正确的Cloud IAM权限。

  1. 指定该服务账号为Cloud Functions Developer角色。
  2. 授权Cloud Functions的运行账户为Cloud IAM Service Account User角色。

使用gcloud分配Cloud IAM权限的典型方法如下所示。只需将您的Google Cloud Platform项目ID替换为PROJECT_ID,将SERVICE_ACCOUNT_EMAIL的替换为您的服务帐户的电子邮件ID即可。

  1. gcloud iam service-accounts add-iam-policy-binding \
  2. PROJECT_ID@appspot.gserviceaccount.com \
  3. --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
  4. --role="roles/iam.serviceAccountUser"

细节请参阅Adding the IAM service agent user role to the runtime service

如果您的函数的源代码位于Google Source Repository中,请确保您的服务帐户具有Source Repository Viewer角色,以便在必要时可以下载源代码。

CloudSqlInstanceDatabaseCreateOperator

在Cloud SQL实例中创建新数据库。

有关参数定义,请参阅上面的GceInstanceStartOperator

通过将所需的参数传递给构造函数来定义CloudSqlInstanceDatabaseCreateOperator

参数

示例DAG中的一些参数取自环境变量:

  1. PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
  2. INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
  3. DB_NAME = os.environ.get('DB_NAME', 'testdb')

使用operator(执行器)

  1. sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
  2. project_id=PROJECT_ID,
  3. body=db_create_body,
  4. instance=INSTANCE_NAME,
  5. task_id='sql_db_create_task'
  6. )

示例请求体:

  1. db_create_body = {
  2. "instance": INSTANCE_NAME,
  3. "name": DB_NAME,
  4. "project": PROJECT_ID
  5. }

模版

  1. template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

更多信息

有关数据库插入,请参阅Google Cloud SQL API文档

CloudSqlInstanceDatabaseDeleteOperator

在Cloud SQL实例中删除数据库。

有关参数定义,请参阅CloudSqlInstanceDatabaseDeleteOperator

参数

示例DAG中的一些参数取自环境变量:

  1. PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
  2. INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
  3. DB_NAME = os.environ.get('DB_NAME', 'testdb')

使用operator(执行器)

  1. sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
  2. project_id=PROJECT_ID,
  3. instance=INSTANCE_NAME,
  4. database=DB_NAME,
  5. task_id='sql_db_delete_task'
  6. )

模版

  1. template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
  2. 'api_version')

更多信息

有关数据库删除,请参阅Google Cloud SQL API文档

CloudSqlInstanceDatabasePatchOperator

使用修补程序语义更新包含有关Cloud SQL实例内数据库的信息的资源。请参阅: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch

有关参数定义,请参阅CloudSqlInstanceDatabasePatchOperator

参数

示例DAG中的一些参数取自环境变量:

  1. PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
  2. INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
  3. DB_NAME = os.environ.get('DB_NAME', 'testdb')

使用operator(执行器)

  1. sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
  2. project_id=PROJECT_ID,
  3. body=db_patch_body,
  4. instance=INSTANCE_NAME,
  5. database=DB_NAME,
  6. task_id='sql_db_patch_task'
  7. )

示例请求体:

  1. db_patch_body = {
  2. "charset": "utf16",
  3. "collation": "utf16_general_ci"
  4. }

模版

  1. template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
  2. 'api_version')

更多信息

有关数据库修改,请参阅Google Cloud SQL API文档

CloudSqlInstanceDeleteOperator

示例DAG中的一些参数取自环境变量:

  1. PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
  2. INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
  3. DB_NAME = os.environ.get('DB_NAME', 'testdb')

使用operator(执行器)

  1. sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
  2. project_id=PROJECT_ID,
  3. instance=INSTANCE_NAME,
  4. task_id='sql_instance_delete_task'
  5. )

模版

  1. template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

更多信息

有关删除,请参阅Google Cloud SQL API文档

CloudSqlInstanceCreateOperator

在Google Cloud Platform中创建新的Cloud SQL实例。

有关参数定义,请参阅CloudSqlInstanceCreateOperator

如果存在具有相同名称的实例,则不会执行任何操作,并且operator(执行器)将成功执行。

参数

示例DAG中的一些参数取自环境变量:

  1. PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
  2. INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
  3. DB_NAME = os.environ.get('DB_NAME', 'testdb')

定义实例的示例:

  1. body = {
  2. "name": INSTANCE_NAME,
  3. "settings": {
  4. "tier": "db-n1-standard-1",
  5. "backupConfiguration": {
  6. "binaryLogEnabled": True,
  7. "enabled": True,
  8. "startTime": "05:00"
  9. },
  10. "activationPolicy": "ALWAYS",
  11. "dataDiskSizeGb": 30,
  12. "dataDiskType": "PD_SSD",
  13. "databaseFlags": [],
  14. "ipConfiguration": {
  15. "ipv4Enabled": True,
  16. "requireSsl": True,
  17. },
  18. "locationPreference": {
  19. "zone": "europe-west4-a"
  20. },
  21. "maintenanceWindow": {
  22. "hour": 5,
  23. "day": 7,
  24. "updateTrack": "canary"
  25. },
  26. "pricingPlan": "PER_USE",
  27. "replicationType": "ASYNCHRONOUS",
  28. "storageAutoResize": False,
  29. "storageAutoResizeLimit": 0,
  30. "userLabels": {
  31. "my-key": "my-value"
  32. }
  33. },
  34. "databaseVersion": "MYSQL_5_7",
  35. "region": "europe-west4",
  36. }

使用operator(执行器)

  1. sql_instance_create_task = CloudSqlInstanceCreateOperator(
  2. project_id=PROJECT_ID,
  3. body=body,
  4. instance=INSTANCE_NAME,
  5. task_id='sql_instance_create_task'
  6. )

模版

  1. template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

更多信息

有关插入,请参阅Google Cloud SQL API文档

CloudSqlInstancePatchOperator

更新Google Cloud Platform中的Cloud SQL实例的设置(部分更新)。

有关参数定义,请参阅CloudSqlInstancePatchOperator

这是部分更新,因此仅设置/更新正文中指定的设置的值。现有实例的其余部分将保持不变。

参数

示例DAG中的一些参数取自环境变量:

  1. PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
  2. INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
  3. DB_NAME = os.environ.get('DB_NAME', 'testdb')

定义实例的示例:

  1. patch_body = {
  2. "name": INSTANCE_NAME,
  3. "settings": {
  4. "dataDiskSizeGb": 35,
  5. "maintenanceWindow": {
  6. "hour": 3,
  7. "day": 6,
  8. "updateTrack": "canary"
  9. },
  10. "userLabels": {
  11. "my-key-patch": "my-value-patch"
  12. }
  13. }
  14. }

使用operator(执行器)

  1. sql_instance_patch_task = CloudSqlInstancePatchOperator(
  2. project_id=PROJECT_ID,
  3. body=patch_body,
  4. instance=INSTANCE_NAME,
  5. task_id='sql_instance_patch_task'
  6. )

模版

  1. template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')

更多信息

有关部分更新,请参阅Google Cloud SQL API文档