Azkaban Flow 2.0的使用

一、Flow 2.0 简介

1.1 Flow 2.0 的产生

Azkaban 目前同时支持 Flow 1.0 和 Flow2.0 ,但是官方文档上更推荐使用 Flow 2.0,因为 Flow 1.0 会在将来的版本被移除。Flow 2.0 的主要设计思想是提供 1.0 所没有的流级定义。用户可以将属于给定流的所有 job / properties 文件合并到单个流定义文件中,其内容采用 YAML 语法进行定义,同时还支持在流中再定义流,称为为嵌入流或子流。

1.2 基本结构

项目 zip 将包含多个流 YAML 文件,一个项目 YAML 文件以及可选库和源代码。Flow YAML 文件的基本结构如下:

  • 每个 Flow 都在单个 YAML 文件中定义;
  • 流文件以流名称命名,如:my-flow-name.flow
  • 包含 DAG 中的所有节点;
  • 每个节点可以是作业或流程;
  • 每个节点 可以拥有 name, type, config, dependsOn 和 nodes sections 等属性;
  • 通过列出 dependsOn 列表中的父节点来指定节点依赖性;
  • 包含与流相关的其他配置;
  • 当前 properties 文件中流的所有常见属性都将迁移到每个流 YAML 文件中的 config 部分。

官方提供了一个比较完善的配置样例,如下:

  1. config:
  2. user.to.proxy: azktest
  3. param.hadoopOutData: /tmp/wordcounthadoopout
  4. param.inData: /tmp/wordcountpigin
  5. param.outData: /tmp/wordcountpigout
  6. # This section defines the list of jobs
  7. # A node can be a job or a flow
  8. # In this example, all nodes are jobs
  9. nodes:
  10. # Job definition
  11. # The job definition is like a YAMLified version of properties file
  12. # with one major difference. All custom properties are now clubbed together
  13. # in a config section in the definition.
  14. # The first line describes the name of the job
  15. - name: AZTest
  16. type: noop
  17. # The dependsOn section contains the list of parent nodes the current
  18. # node depends on
  19. dependsOn:
  20. - hadoopWC1
  21. - NoOpTest1
  22. - hive2
  23. - java1
  24. - jobCommand2
  25. - name: pigWordCount1
  26. type: pig
  27. # The config section contains custom arguments or parameters which are
  28. # required by the job
  29. config:
  30. pig.script: src/main/pig/wordCountText.pig
  31. - name: hadoopWC1
  32. type: hadoopJava
  33. dependsOn:
  34. - pigWordCount1
  35. config:
  36. classpath: ./*
  37. force.output.overwrite: true
  38. input.path: ${param.inData}
  39. job.class: com.linkedin.wordcount.WordCount
  40. main.args: ${param.inData} ${param.hadoopOutData}
  41. output.path: ${param.hadoopOutData}
  42. - name: hive1
  43. type: hive
  44. config:
  45. hive.script: src/main/hive/showdb.q
  46. - name: NoOpTest1
  47. type: noop
  48. - name: hive2
  49. type: hive
  50. dependsOn:
  51. - hive1
  52. config:
  53. hive.script: src/main/hive/showTables.sql
  54. - name: java1
  55. type: javaprocess
  56. config:
  57. Xms: 96M
  58. java.class: com.linkedin.foo.HelloJavaProcessJob
  59. - name: jobCommand1
  60. type: command
  61. config:
  62. command: echo "hello world from job_command_1"
  63. - name: jobCommand2
  64. type: command
  65. dependsOn:
  66. - jobCommand1
  67. config:
  68. command: echo "hello world from job_command_2"

二、YAML语法

想要使用 Flow 2.0 进行工作流的配置,首先需要了解 YAML 。YAML 是一种简洁的非标记语言,有着严格的格式要求的,如果你的格式配置失败,上传到 Azkaban 的时候就会抛出解析异常。

2.1 基本规则

  1. 大小写敏感 ;
  2. 使用缩进表示层级关系 ;
  3. 缩进长度没有限制,只要元素对齐就表示这些元素属于一个层级;
  4. 使用#表示注释 ;
  5. 字符串默认不用加单双引号,但单引号和双引号都可以使用,双引号表示不需要对特殊字符进行转义;
  6. YAML 中提供了多种常量结构,包括:整数,浮点数,字符串,NULL,日期,布尔,时间。

2.2 对象的写法

  1. # value 与 : 符号之间必须要有一个空格
  2. key: value

2.3 map的写法

  1. # 写法一 同一缩进的所有键值对属于一个map
  2. key:
  3. key1: value1
  4. key2: value2
  5. # 写法二
  6. {key1: value1, key2: value2}

2.3 数组的写法

  1. # 写法一 使用一个短横线加一个空格代表一个数组项
  2. - a
  3. - b
  4. - c
  5. # 写法二
  6. [a,b,c]

2.5 单双引号

支持单引号和双引号,但双引号不会对特殊字符进行转义:

  1. s1: '内容\n 字符串'
  2. s2: "内容\n 字符串"
  3. 转换后:
  4. { s1: '内容\\n 字符串', s2: '内容\n 字符串' }

2.6 特殊符号

一个 YAML 文件中可以包括多个文档,使用 --- 进行分割。

2.7 配置引用

Flow 2.0 建议将公共参数定义在 config 下,并通过 ${} 进行引用。

三、简单任务调度

3.1 任务配置

新建 flow 配置文件:

  1. nodes:
  2. - name: jobA
  3. type: command
  4. config:
  5. command: echo "Hello Azkaban Flow 2.0."

在当前的版本中,Azkaban 同时支持 Flow 1.0 和 Flow 2.0,如果你希望以 2.0 的方式运行,则需要新建一个 project 文件,指明是使用的是 Flow 2.0:

  1. azkaban-flow-version: 2.0

3.2 打包上传

Azkaban Flow 2.0 的使用 - 图1

3.3 执行结果

由于在 1.0 版本中已经介绍过 Web UI 的使用,这里就不再赘述。对于 1.0 和 2.0 版本,只有配置方式有所不同,其他上传执行的方式都是相同的。执行结果如下:

Azkaban Flow 2.0 的使用 - 图2

四、多任务调度

和 1.0 给出的案例一样,这里假设我们有五个任务(jobA——jobE), D 任务需要在 A,B,C 任务执行完成后才能执行,而 E 任务则需要在 D 任务执行完成后才能执行,相关配置文件应如下。可以看到在 1.0 中我们需要分别定义五个配置文件,而在 2.0 中我们只需要一个配置文件即可完成配置。

  1. nodes:
  2. - name: jobE
  3. type: command
  4. config:
  5. command: echo "This is job E"
  6. # jobE depends on jobD
  7. dependsOn:
  8. - jobD
  9. - name: jobD
  10. type: command
  11. config:
  12. command: echo "This is job D"
  13. # jobD depends on jobA、jobB、jobC
  14. dependsOn:
  15. - jobA
  16. - jobB
  17. - jobC
  18. - name: jobA
  19. type: command
  20. config:
  21. command: echo "This is job A"
  22. - name: jobB
  23. type: command
  24. config:
  25. command: echo "This is job B"
  26. - name: jobC
  27. type: command
  28. config:
  29. command: echo "This is job C"

五、内嵌流

Flow2.0 支持在一个 Flow 中定义另一个 Flow,称为内嵌流或者子流。这里给出一个内嵌流的示例,其 Flow 配置如下:

  1. nodes:
  2. - name: jobC
  3. type: command
  4. config:
  5. command: echo "This is job C"
  6. dependsOn:
  7. - embedded_flow
  8. - name: embedded_flow
  9. type: flow
  10. config:
  11. prop: value
  12. nodes:
  13. - name: jobB
  14. type: command
  15. config:
  16. command: echo "This is job B"
  17. dependsOn:
  18. - jobA
  19. - name: jobA
  20. type: command
  21. config:
  22. command: echo "This is job A"

内嵌流的 DAG 图如下:

Azkaban Flow 2.0 的使用 - 图3

执行情况如下:

Azkaban Flow 2.0 的使用 - 图4

参考资料

  1. Azkaban Flow 2.0 Design
  2. Getting started with Azkaban Flow 2.0