概念

资源容器(container):

XSpark本身是一个云计算应用,所以资源容器的概念其实就是CPU(规格 & 数量)和内存(大小)。

Zepplin:

Zepplin是一个数据可视化工具,可以将计算的结果以多种图表的样式进行展示,例如折线图、直方图图等等。

云存储:

云存储指的是七牛提供的对象存储服务,它是XSpark的数据来源。

!> XSpark 不仅仅支持云存储,也支持其他多种数据源,例如MySQL、HDFS等等。

调度节点(Master):

调度节点是用来管理和分发分布式计算任务,一般来说,当获取较大数据源或计算结果较大时,调度节点配置越高,性能越好;它使用的资源包含在资源容器当中。

计算节点(worker):

计算节点是用来执行分布式计算任务,一般来说,计算节点越多,计算速度越快;它使用的资源也包含在资源容器当中。

创建XSpark应用

操作流程:

在七牛控制台最下方的应用市场进入XSpark首页。

然后点击新建部署来创建我们第一个XSpark应用。

填写相应内容之后,点击确定创建,等待系统部署完成。

填写参数说明:

参数 必填 说明
应用名称 用来标识该应用的唯一性
应用别名 备注标识,支持中文,没有特殊含义
部署区域 计算与存储所使用的物理资源所在区域;
为了降低成本,请尽量选择离数据源较近的区域

操作演示:

XSpark - 图1

初始化配置

操作流程:

部署完成后,我们需要选择此应用的Worker规格以及数量,选择完成后点击提交按钮,等待应用初始化完成即可。

填写参数说明:

参数 必填 说明
Worker规格 不同大小的CPU内存
Worker数量 选择的Worker规格使用数量

!> Worker的概念: Spark集群的执行节点,用来执行分布式计算任务,一般来说,数量越多执行效率越高,规格与数量的大小与数量和执行效率成正比,请酌情选择。

除Worker外,还有一个Master(调度节点)的概念,调度节点是用来管理和分发分布式计算任务,一般来说,当获取较大数据源或计算结果较大时,调度节点配置越高,性能越好;它使用的资源包含在资源容器当中。

操作演示:

XSpark - 图2

开始使用XSpark

XSpark数据源

目前XSpark 支持以KODO,FUSION 作为数据源来做数据分析。各数据源以不同的URL Schema来区分。

如:

  1. KODO: qiniu://
  2. FUSION: fusion://

本文档如无特别说明则默认以KODO作为数据源来阐述使用方式。如需使用FUSION, 请跳转附件1

Spark scala 示例

操作流程:

点击开始使用,在新弹出的页面上选择XSpark Tutorial进入代码编辑窗口。

在代码编辑窗口中,操作演示所使用的示例是读取七牛对象存储的Bucket中的companies.json文件,然后输出这个文件的所有字段信息。

意思就是,在代码编辑窗口中,我们需要指定一个文件路径,然后就可以对该文件中的数据进行计算,计算的代码由用户自行编写。

操作演示:

XSpark - 图3

加载第三方依赖

操作流程:

点击代码编辑窗口之间的缝隙,可以增加一个新的代码编辑窗口。

我们再顶部打开一个新的代码编辑窗口,然后输入

  1. %spark.dep

换行后,即可添加第三方依赖,代码示例如下:

  1. z.load("第三方依赖地址")

?>我们也支持添加第三方镜像仓库来加速依赖下载,示例如下:

  1. z.addRepo("3rdRepo").url("http://maven.aliyun.com/nexus/content/groups/public/")

!>注意:如果遇到如下提示,需要停止XSpark已经在运行的任务,才能使用加载第三方依赖功能,停止的方法见下图:

  1. Must be used before SparkInterpreter (%spark) initialized
  2. Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter

XSpark - 图4

操作演示:

XSpark - 图5

Spark SQL 示例

操作流程:

我们可以在代码编辑窗口中编写Spark SQL来对数据进行分析。

首先我们需要将读取的文件注册成一张表,示例代码如下:

  1. // 首先从对象存储中读取 companies.json文件
  2. var dataPath = "qiniu://resourves/companies.json"
  3. // 然后使用json的方式来读取文件
  4. var table = sqlContext.read.json(dataPath)
  5. // 将读取的文件注册为一张表
  6. table.registerTempTable("companies")

将文件注册成表之后,我们即可编写 SQL 对数据进行分析:

  1. %sql select name, sum(number_of_employees) number from companies group by name order by number desc limit 5

!> 注意: 编写SQL时, 头部必须包含%sql

操作演示:

XSpark - 图6

定时任务

操作流程:

XSpark使用cron表达式来配置定时任务的执行频率。

点击页面上的时钟图标,来配置定时

快捷选择项中,我们支持1分钟、5分钟、1小时、3小时、6小时、12小时、1天;

也可以手动输入cron表达式自定义定时周期。

建议勾选:auto-restart interpreter on cron execution使得每次运行都是独立的运行环境。

操作演示:

XSpark - 图7

基于XSpark的Python语言支持

XSpark - 图8

基于XSpark的R语言支持

XSpark - 图9

基于XSpark的机器学习

XSpark - 图10

Spark UI

对于有一定经验的Spark开发人员,可以访问SparkUI来看当前运行的任务状态。

XSpark - 图11

XSpark容器监控

XSpark提供了容器级别的CPU,内存,磁盘监控。可以给Spark任务的调优和故障排查提供有力的支持。

XSpark - 图12

XSpark邮件告警功能

邮件告警经常配合定时任务来使用,当任务失败时,会有邮件发送具体的失败信息到指定的邮箱内。

XSpark - 图13

XSpark快捷重启Interpreter功能

重启Interpretr会初始化Spark解释器。重启Interpreter会结束掉当前正在运行的spark任务,释放资源。

XSpark - 图14

XSpark 备份/恢复代码

当你需要重新新建另外一个XSpark实例时,而又不想丢弃之前在Zeppelin写了的代码,这时候你应该尝试XSpark提供的代码备份/恢复功能了。当然除此之外,常常备份自己的代码是个好习惯。

备份代码

点击备份,它会自动备份您当前实例的所有代码。

XSpark - 图15

恢复代码

恢复时第一步需要选择要恢复的代码来自于哪一个您曾经备份过的XSpark实例, 第二步选择需要恢复的代码。

XSpark - 图16

Spark-jobserver 使用

为了能够在提供让用户自己提交代码,管理任务的目的。我们在XSpark里提供了Restful风格的Spark-jobserver。用户可以通过Spark-jobserver的api在非notebook的情况下使用XSpark集群。

首先我们需要获取到你自己的JobServer地址及使用方式。

XSpark - 图17

API使用方式:

http --auth-qiniu=~/.qiniu/your_ak_sk.conf GET $JOBSERVER_REST_URL

!> 注意:

  • ~/.qiniu/your_ak_sk.conf是访问七牛服务的ak/sk,需要创建一个your_ak_sk.conf文件,然后把内容替换成你自己的ak/sk.
  1. {
  2. "access_key": "***************************************",
  3. "secret_key": "***************************************",
  4. "auth": "qiniu/mac"
  5. }
  • $JOBSERVER_REST_URL 替换为你获取的地址

注:上面使用到的http命令是httpie的工具。httpie是一个 HTTP 的命令行客户端。其目标是让 CLI 和 web 服务之间的交互尽可能的人性化。
安装方式:
pip install --upgrade https://github.com/kirk-enterprise/httpie/tarball/master

Rest API 介绍

大体上我们提供了下面几个用于操作Spark任务的api。

1. Jars

1.1 提交Jar
  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf $JOBSERVER_REST_URL/jars/$JAR_NAME @job-server-tests/target/scala-2.10/job-server-tests_$VER.jar

其中:$JAR_NAME 替换成自行定义的jar名字即可

返回值:

  1. 200 OK
  2. {
  3. "result": "Jar uploaded",
  4. "status": "SUCCESS"
  5. }
1.2 查询Jar
  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf $JOBSERVER_REST_URL/jars

返回值:

  1. 200 OK
  2. {
  3. "test": "2017-06-07T15:18:58.581+08:00",
  4. }

2. Contexts

2.1 创建Context
  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf POST '$JOBSERVER_REST_URL/contexts/$CONTEXT_NAME?num-cpu-cores=4&memory-per-node=512m'

其中:

  • $CONTEXT_NAME 替换成你自行定义的context name
  • num-cpu-cores 配置SparkContext使用cpu核心数
  • memory-per-node 配置每个节点使用的内存大小

返回值:

  1. 200 OK
  2. {
  3. "result": "Context created",
  4. "status": "SUCCESS"
  5. }
2.2 查看Context
  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf GET $JOBSERVER_REST_URL/contexts

返回值:

  1. 200 OK
  2. [
  3. "test-context"
  4. ]
2.3 删除Context
  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf DELETE '$JOBSERVER_REST_URL/contexts/$CONTEXT_NAME'

其中:$CONTEXT_NAME为你要删除的Context名称

返回值:

  1. 200 OK
  2. {
  3. "result": "",
  4. "status": "Success"
  5. }

3. Jobs

3.1 提交Job
  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf POST '$JOBSERVER_REST_URL/jobs?appName=$APP_NAME&classPath=$CLASSNAME&sync={true|false}&context=$CONTEXT_NAME' < input.json

其中:

  • $APP_NAME 为前面提交的Jar的名字
  • $CLASSNAME 为Jar里的Job类路径
  • $CONTEXT_NAME 为前面提交的Context名称
  • sync={true|false} 控制Job的提交是否是同步的
  • input.json 为提交任务的参数信息,如:
  1. {
  2. input.string:"a b c a b see"
  3. }

返回值:

  1. 200 OK
  2. 同步:
  3. {
  4. "jobId": "58da6978-3c22-43f0-89d6-31e14268f03c",
  5. "result": ...
  6. }
  7. 异步:
  8. {
  9. "classPath": "spark.jobserver.WordCountExample",
  10. "context": "test-context",
  11. "duration": "Job not done yet",
  12. "jobId": "58da6978-3c22-43f0-89d6-31e14268f03c",
  13. "startTime": "2017-06-07T15:25:06.171+08:00",
  14. "status": "STARTED"
  15. }
3.2 获取Jobs
  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf $JOBSERVER_REST_URL/jobs

返回值:

  1. 200 OK
  2. [
  3. {
  4. "classPath": "spark.jobserver.WordCountExample",
  5. "context": "test-context",
  6. "duration": "0.049 secs",
  7. "jobId": "58da6978-3c22-43f0-89d6-31e14268f03c",
  8. "startTime": "2017-06-07T15:25:06.171+08:00",
  9. "status": "FINISHED"
  10. },
  11. ...
  12. ]
3.3 获取Job结果
  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf $JOBSERVER_REST_URL/jobs/$JOB_ID

其中:$JOB_ID是任务Id,可以从提交的任务返回的信息,及获取所有Jobs时获取。

返回值:

  1. 200 OK
  2. {
  3. "jobId": "58da6978-3c22-43f0-89d6-31e14268f03c",
  4. "result": ...
  5. }
例子:

(1). 同步提交,等待结果返回

  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf POST '$JOBSERVER_REST_URL/jobs?appName=test&classPath=spark.jobserver.WordCountExample&sync=true&context=test-context' < input.json

(2). Ad-hoc方式提交,不使用已有context

  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf POST '$JOBSERVER_REST_URL/jobs?appName=test&classPath=spark.jobserver.WordCountExample' < input.json

(3). 附带Jar包依赖

  1. http --auth-qiniu=~/.qiniu/your_ak_sk.conf POST '$JOBSERVER_REST_URL/jobs?appName=test&classPath=spark.jobserver.WordCountExample&sync=true&context=test-context' < input.json
  2. {
  3. dependent-jar-uris = ["file:///myjars/deps01.jar", "file:///myjars/deps02.jar"],
  4. input.string:" a b c a b see"
  5. }

到此为止,基本上你应该清楚了怎么来使用Spark-jobServer api了,那么接下来我们来学习下如何编写Spark-Jobserver的应用

编写Spark-JobServer App

这里我们直接使用Spark-JobServer的模板来创建一个Spark-JobServer 的app。

  1. sbt new spark-jobserver/spark-jobserver.g8

根据提示输入你的project信息

  1. organization [com.example]:
  2. name [wordcount]:
  3. version [0.1.0-SNAPSHOT]:
  4. package [com.example.wordcount]:
  5. scala_version [2.10.6]:
  6. sbt_version [0.13.12]:
  7. sjs_version [-SNAPSHOT]:
  8. Template applied in ./wordcount

这里为了演示说明我们直接使用默认的wordcount来讲解:

  1. object WordCountExample extends SparkJob {
  2. def main(args: Array[String]) {
  3. val conf = new SparkConf().setMaster("local[4]").setAppName("WordCountExample")
  4. val sc = new SparkContext(conf)
  5. val config = ConfigFactory.parseString("")
  6. val results = runJob(sc, config)
  7. println("Result is " + results)
  8. }
  9. override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
  10. Try(config.getString("input.string"))
  11. .map(x => SparkJobValid)
  12. .getOrElse(SparkJobInvalid("No input.string config param"))
  13. }
  14. override def runJob(sc: SparkContext, config: Config): Any = {
  15. sc.parallelize(config.getString("input.string").split(" ").toSeq).countByValue
  16. }
  17. }

从上面代码中我们可以看到,编写一个spark-jobserver 的app需要实现SparkJob这个trait及重写两个核心方法。

  1. object SampleJob extends SparkJob {
  2. override def runJob(sc: SparkContext, jobConfig: Config): Any = ???
  3. override def validate(sc: SparkContext, config: Config): SparkJobValidation = ???
  4. }

如果需要修改jar包依赖
cd $project 修改 build.sbt

编译打包

  1. cd $project
  2. sbt package

到此编译好的jar可以根据前面的提交jar的方式来测试你的第一个Spark-Jobserver app了

注:新版的Spark-jobserver api

  1. object SampleJob extends NewSparkJob {
  2. def runJob(sc: SparkContext, runtime: JobEnvironment, data: JobData): JobOutput = ???
  3. def validate(sc: SparkContext, runtime: JobEnvironment, config: Config):
  4. JobData Or Every[ValidationProblem] = = ???
  5. }

更多详情可以参考 Spark-JobServer

附件

1. FUSION使用说明

当前XSpark 支持以FUSION作为数据源来进行数据分析。FUSION使用首先需要使用fusion:// 最为文件的schema。

文件路径规则:

  1. fusion://域名/日期/小时/文件名

!> 文件路径中“域名”,“日期”不支持通配。支持“小时”及“文件名”通配。如:fusion://{域名}/{日期}/2[1-3]/* 表示获取21,22,23点的所有日志。

例子:

  1. val textFile=sc.textFile("fusion://if-pbl.qiniudn.com/2017-05-22/16/part-00000.gz")
  2. textFile.count()

默认只支持访问当前账号下的fusion数据源。如需访问其他账号请联系七牛客服人员。