概念

工作流(Workflow)是一个数据接收、计算、导出工具,把业务流程映射到页面上,在这里您的数据业务得到可视化,方便您更直观地来进行大数据分析流程管理。它的操作方式类似于思维脑图,直接在组件上右键或者通过拖拽在组件之间进行连线即可。

工作流 - 图1

您可以通过访问七牛资源主页的 大数据工作流引擎 ,点击体验新版本进入工作流管理界面。

工作流 - 图2

您可以根据需要创建一个或多个工作流,自行控制每一个工作流的启动停止,方便您便捷地管理数据流。如果您给您的计算任务设置的是手动执行,那么您需要在工作流管理界面点击执行按钮,计算任务才会启动。通过多种功能自由组合,满足您的各种计算需求。

工作流 - 图3

点击创建工作流进入工作流编辑界面。

工作流提供3种组件:数据源计算导出帮助您打通数据业务,一个完整的工作流至少要包含一个数据源组件和一个数据导出组件。定时器工具在您创建批量计算任务的时候可以设置定时启动/循环启动。后面会详述。在界面右上角您可以看到功能按钮:更新&启动更新退出。当您业务逻辑还没整理好的时候,但是想保存现有的操作,您可以点击更新来保存您当前所做的操作,这样做可以大大节省您的工作量。

工作流 - 图4

工作流中字段的数据结构可以是以下几种类型:

类型 解释 数据样例
date 日期类型,格式为RFC3339 2017-01-01T15:00:25Z07:00
string 字符串类型 “qiniu.com”
long 64位整数 1024
float 单精度64位浮点 322.00
boolean 布尔类型,值为truefalse false

工作流界面操作方法请观看视频教程。

视频教程:


数据源

数据源是工作流的起始节点,它可以接收实时上传的数据或读取离线存储的数据,在工作流中,目前支持以下几种类型的数据源:

名称 流式计算 批量计算 备注
消息队列 yes no 只能作用于流式计算,实时接收用户上传的数据;每一条进入消息队列的数据,都会被存储2天时间,过期自动删除
对象存储 no yes 只能作用于批量计算,可以一次性加载大量数据
CDN no yes 只能作用于批量计算,数据来源于七牛CDN服务
HDFS no yes 只能作用于批量计算,仅支持私有云,公有云不提供此服务

!> 注意:创建好工作流之后,无论是否启动该工作流,消息队列节点都可以接收数据。

消息队列节点相关参数填写

参数 必填 说明
名称 消息队列名称
字段信息 字段名称和字段类型
IP来源 数据来源的IP信息
时间字段 数据接收的时间
服务器内部反转译 针对为了写入而被序列化产生的\t和\n进行反转译,恢复为\t和\n

!> 注意:如果您的数据源新增了一些字段,可以使用添加新字段功能,更新消息队列。

对象存储节点相关参数填写

参数 必填 说明
名称 对象存储数据源节点名称
空间名称 您要读取的文件所在的bucket名称
文件类型 您要读取文件的格式
文件前缀 您要读取的文件名称的前缀

CDN日志节点相关参数填写

参数 必填 说明
名称 CDN日志数据源节点名称
域名 您的CDN服务的域名
文件过滤条件类型 日志产生的时间范围的选择方式(固定时间/相对时间)

当文件过滤条件类型选择“相对时间”时,过滤条件里可以引入魔法变量。魔法变量后文会详述。

工作流 - 图5

CDN日志数据源的字段类型不可更改,与七牛CDN服务产生的日志格式一致。

数据源节点的具体用法请观看视频教程。

视频教程:


数据计算

工作流提供两种计算模式供您选择:流式计算批量计算;流式计算即实时计算,批量计算即离线计算。

计算的触发模式分为调度执行和手动执行,调度执行是指将工作流启动后,系统自动将自动开始运行,并按照条件执行调度;手动执行是指将工作流运行一遍后停止。流式计算的计算任务属于调度执行模式,批量计算的计算任务可以是调度执行模式(定时执行循环执行)或者手动执行模式。

!> 注意1:一个工作流不能同时包含调度模式(流式计算/定时批量计算/循环批量计算)和手动执行模式(手动执行)。

!> 注意2:每一个工作流都可以同时包含一个或多个流式计算与批量计算,并且计算之间可以串行。

流式计算相关参数填写

参数 必填 说明
名称 计算任务名称
容器类型 计算任务需要的物理资源,不同的计算任务之间的计算资源互相隔离,互不影响
计算模式 SQL语句计算/自定义计算,两种计算方式可以并存,并且优先执行自定义计算
间隔时间 计算任务的运行时间间隔
数据起始位置 从最早还是最新数据开始计算

批量计算相关参数填写

参数 必填 说明
名称 计算任务名称
容器类型 计算任务需要的资源
数据源名称 当数据源是对象存储节点的时候,支持多个数据源JOIN运算,因此需要指定数据源
数据库表名 执行SQL计算的数据库表名
触发方式 设置计算的触发方式:定时启动/循环启动/手动执行

计算节点的具体用法请观看视频教程。

视频教程:


自定义计算

流式计算中,您可能遇到SQL满足不了的计算需求,这时您可以通过自定义计算完成。在这里我们提供引用Plugin和UDF帮您完成自定义计算。

Plugin管理

通过下载我们提供的代码模板,在此基础上编写您的代码(输入类、输出类、业务逻辑代码),打成Jar包。在工作流的工作界面,您可以通过 Plugin管理 上传您的Jar包到工作流,之后您就可以在流式计算中引用您自定义的Plugin来实现更为复杂的数据计算啦!

工作流 - 图6

!> 注意:目前Plugin仅支持Java语言。

Plugin模板下载链接—->>>点此跳转

Plugin具体用法请观看视频教程。

视频教程:

UDF管理

UDF(User-Defined Function):用户自定义函数。

UDF可以在SQL计算模式中使用,Workflow提供了提供了大约50多种内置UDF函数;如果这些函数不能满足您的需求,那么您也可以自行编写UDF,提交Jar包并添加到UDF函数管理,提交后即可使用。

工作流 - 图7

自定义UDF的过程与自定义Plugin类似,如下:

UDF模板下载链接—->>>点此跳转

解压后,使用Java IDE导入Pandora-UDF项目。

等待项目依赖加载完成后,可以在 src/main/java/com.pandora/ 目录下查看一个简单的示例。

这个示例中包含了一个名为 SimpleUdf的Class,在这个Class中有4个方法:

  1. 1. String parseTime(String t)
  2. Input RFC3339 格式转为 date time 时间格式
  3. @param input rfc3339 格式,形如 2017-04-05T16:41:42.651614Z
  4. @return 返回date time格式时间 形如 2017-04-05 16:41:42
  5. parseTime("2017-04-05T16:41:42.651614Z")
  6. 2. String parseTime(long t)
  7. 将时间戳转为 date time 时间格式
  8. @param input 时间戳,单位为毫秒
  9. @return 返回date time格式时间 形如 2017-04-05 16:41:42
  10. parseTime(1499324233000)
  11. 3. String parseTime(long t, String unit)
  12. Input RFC3339 格式转为 date time 时间格式
  13. @param input 时间戳,单位为毫秒
  14. @param unit 指定时间戳的单位,支持 s (秒), ms(毫秒), us(微妙), ns (纳秒)
  15. @return 返回date time格式时间 形如 2017-04-05 16:41:42
  16. parseTime(1499324233000, "ms")
  17. 4. String parseTime(long t, String unit)
  18. Input RFC3339 格式转为 date time 时间格式
  19. @param input 时间戳,单位为毫秒
  20. @param unit 指定时间精度,1毫秒等于多少该精度单位时间
  21. @return 返回date time格式时间 形如 2017-04-05 16:41:42
  22. parseTime(149932423300000000, 100000) 解析百纳秒时间戳

您可以在 src/main/java/com.pandora/ 目录下新建Class和方法,并在方法中编写UDF逻辑,代码编写完成后,需要将这个工程打成Jar包并上传至Workflow,新增到自定义函数里,然后就可以使用这个UDF了。

!> 注意:Jar包名称中不可包含-_号。

在SQL中使用UDF:

  1. SELECT
  2. parseTime(t1) t1,
  3. parseTime(t2) t2,
  4. parseTime(t3, "s") t3,
  5. parseTime(t4, 100000) t4
  6. from
  7. stream

魔法变量

魔法变量的概念类似于编程语言中的变量,即您可以定义一个变量,在数据计算中或者过滤条件中引用。目前魔法变量仅支持时间类型的值。

目前系统提供了8个内置变量,只能引用,不可修改和删除:

变量名称 类型 格式
date 时间表达式 $(date) yyyy-MM-dd
day 时间表达式 $(day) dd
hour 时间表达式 $(hour) HH
min 时间表达式 $(min) mm
mon 时间表达式 $(mon) MM
now 时间表达式 $(now) yyyy-MM-dd HH:mm:ss
sec 时间表达式 $(sec) ss
year 时间表达式 $(year) yyyy

用户也可以自行创建魔法变量,并且创建的魔法变量的值可以引用系统内置变量。

工作流 - 图9

当我们需要使用魔法变量的时候,只需要输入 $(变量名称) 即可,如:

  1. select * from stream where time = $(now)

数据导出

将数据源或计算任务中的数据导出到指定的地址。

目前我们支持将数据导出到以下地址:

  1. 1. 指定一个日志分析服务的日志仓库;
  2. 2. 指定一个时序数据库的数据仓库下的序列;
  3. 3. 指定一个HTTP服务器地址;
  4. 4. 指定一个对象存储的Bucket
  5. 5. 指定一个报表工作室的数据仓库下的数据表;

导出到日志分析填写参数

参数 必填 说明
名称 导出节点名称
仓库名称 您要进行日志分析的仓库名称,可以选择已有仓库或者创建新仓库
数据存储时限 导出的数据存储在日志仓库的时间限制
丢弃无效数据 是否忽略无效数据

导出到时序数据仓库填写参数

参数 必填 说明
名称 导出节点名称
数据库名称 您要进行时序数据分析的数据仓库名称
序列名称 数据仓库的表名,数据将会被导入到这个表当中
数据存储时限 导出的数据存储在时序数据仓库的时间限制
时间戳 数据导出的时间,默认使用当前时间
数据起始位置 从最早还是最新数据开始导出
丢弃无效数据 是否忽略无效数据

导出到HTTP地址填写参数

参数 必填 说明
名称 导出节点名称
服务器地址 ip或域名,例如:https://pipeline.qiniu.comhttps://127.0.0.1:7758
请求资源路径 具体地址,例如:/test/repos
导出类型 导出的文件格式
数据起始位置 从最早还是最新数据开始导出,默认从最新数据导出

导出到对象存储填写参数

参数 必填 说明
名称 导出节点名称
空间名称 导出的对象存储的bucket名称
文件前缀 导出的文件名称的前缀
导出类型 可以将文件导成四种格式:json、csv、text、parquet;其中json、csv和text可以选择是否将文件压缩,而parquet无需选择,默认自动压缩,压缩比大概为3-20倍
文件压缩 是否开启文件压缩功能
最大文件保存天数 数据导出在对象存储中的时限,以天为单位,超过这个时间范围的文件会被自动删除,当该字段为0或者为空时,则永久储存
文件分割策略 文件切割策略,可以按照文件大小切割:文件大小超过设置的值则进行切割;也可以按照时间间隔切割:文件导出时长超过设置的值则进行切割;也可以两者方式只要满足一种即进行切割
数据起始位置 从最早还是最新数据开始导出,默认从最新数据导出

!> 关于文件前缀,默认值为空(生成文件名会自动加上时间戳格式为yyyy-MM-dd-HH-mm-ss),支持魔法变量。

前缀用法说明:

1.前缀使用魔法变量
假如前缀的取值为kodo-parquet/date=$(year)-$(mon)-$(day)/hour=$(hour)/min=$(min)/$(sec),且生成某一文件时的北京标准时间为2017-01-12 15:30:00, 则前缀将被解析为kodo-parquet/date=2017-01-12/hour=15/min=30/00,其中的魔法变量$(year)、$(mon)、$(day)、$(hour)、$(min)、$(sec)分别对应文件生成时间2017-01-12 15:30:00的年、月、日、时、分、秒。

2.前缀使用默认值
假如生成某一文件时的北京标准时间为2017-01-12 15:30:00, 则前缀将被解析为2017-01-12-15-30-00

导出到报表服务填写参数

参数 必填 说明
名称 导出节点名称
数据库名称 导出到报表服务的数据库名称
数据表名称 导出的数据库的具体表名
数据起始位置 从最早还是最新数据开始导出,默认从最新数据导出

定时器

Workflow提供定时器功能,当您想为批量计算设置循环或定时规则时,您可以直接将定时器工具拖拽到批量计算节点上,然后在参数里填写相应的定时规则,您的批量运算任务就可以有条不紊的按照您设定的规则定时运行啦,非常方便!

工作流 - 图10

报警管理

Workflow支持报警管理,您可以填写报警人相关信息和报警指标,如果节点的运行情况满足您选择的报警指标,您就可以收到报警通知啦!

工作流 - 图11

工作流 - 图12

批量计算任务批次管理

当您的计算任务是手动执行的时候,Workflow提供任务批次管理,您可以看到您的数据运行任务执行的状态(运行中、成功、失败、就绪、已停止),对于停止或失败的任务,您可以对它进行重新执行操作。

工作流 - 图13

运行日志监控

Workflow提供节点的运行日志监控,您可以选择数据节点(导出节点、流式计算节点、批量计算节点)来查看各节点的运行日志,帮您宏观监控数据的运行情况,及时发现问题。

工作流 - 图14