开发指南

0. 环境要求

a. Java

请使用JDK1.7及其以上版本。详情参见

b. Zookeeper

请使用Zookeeper 3.4.6及其以上版本。详情参见

c. Maven

请使用Maven 3.0.4及其以上版本。详情参见

1. 作业开发

Elastic-Job-Lite和Elastic-Job-Cloud提供统一作业接口,开发者仅需对业务作业进行一次开发,之后可根据不同的配置以及部署至不同的Lite或Cloud环境。

Elastic-Job提供Simple、Dataflow和Script 3种作业类型。 方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。

a. Simple类型作业

意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

  1. public class MyElasticJob implements SimpleJob {
  2. @Override
  3. public void execute(ShardingContext context) {
  4. switch (context.getShardingItem()) {
  5. case 0:
  6. // do something by sharding item 0
  7. break;
  8. case 1:
  9. // do something by sharding item 1
  10. break;
  11. case 2:
  12. // do something by sharding item 2
  13. break;
  14. // case n: ...
  15. }
  16. }
  17. }

b. Dataflow类型作业

Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

  1. public class MyElasticJob implements DataflowJob<Foo> {
  2. @Override
  3. public List<Foo> fetchData(ShardingContext context) {
  4. switch (context.getShardingItem()) {
  5. case 0:
  6. List<Foo> data = // get data from database by sharding item 0
  7. return data;
  8. case 1:
  9. List<Foo> data = // get data from database by sharding item 1
  10. return data;
  11. case 2:
  12. List<Foo> data = // get data from database by sharding item 2
  13. return data;
  14. // case n: ...
  15. }
  16. }
  17. @Override
  18. public void processData(ShardingContext shardingContext, List<Foo> data) {
  19. // process data
  20. // ...
  21. }
  22. }

流式处理

可通过DataflowJobConfiguration配置是否流式处理。

流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

c. Script类型作业

Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

  1. #!/bin/bash
  2. echo sharding execution context is $*

作业运行时输出

sharding execution context is {“jobName”:“scriptElasticDemoJob”,“shardingTotalCount”:10,“jobParameter”:“”,“shardingItem”:0,“shardingParameter”:“A”}

2. 作业配置

Elastic-Job配置分为3个层级,分别是Core, Type和Root。每个层级使用相似于装饰者模式的方式装配。

Core对应JobCoreConfiguration,用于提供作业核心配置信息,如:作业名称、分片总数、CRON表达式等。

Type对应JobTypeConfiguration,有3个子类分别对应SIMPLE, DATAFLOW和SCRIPT类型作业,提供3种作业需要的不同配置,如:DATAFLOW类型是否流式处理或SCRIPT类型的命令行等。

Root对应JobRootConfiguration,有2个子类分别对应Lite和Cloud部署类型,提供不同部署类型所需的配置,如:Lite类型的是否需要覆盖本地配置或Cloud占用CPU或Memory数量等。

a. 使用Java代码配置

通用作业配置

  1. // 定义作业核心配置
  2. JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
  3. // 定义SIMPLE类型配置
  4. SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName());
  5. // 定义Lite作业根配置
  6. JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
  7. // 定义作业核心配置
  8. JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("demoDataflowJob", "0/30 * * * * ?", 10).build();
  9. // 定义DATAFLOW类型配置
  10. DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, DataflowDemoJob.class.getCanonicalName(), true);
  11. // 定义Lite作业根配置
  12. JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();
  13. // 定义作业核心配置配置
  14. JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder("demoScriptJob", "0/45 * * * * ?", 10).build();
  15. // 定义SCRIPT类型配置
  16. ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "test.sh");
  17. // 定义Lite作业根配置
  18. JobRootConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptCoreConfig).build();

b. Spring命名空间配置

与Spring容器配合使用作业,可将作业Bean配置为Spring Bean,并在作业中通过依赖注入使用Spring容器管理的数据源等对象。可用placeholder占位符从属性文件中取值。Lite可考虑使用Spring命名空间方式简化配置。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
  5. xmlns:job="http://www.dangdang.com/schema/ddframe/job"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans.xsd
  8. http://www.dangdang.com/schema/ddframe/reg
  9. http://www.dangdang.com/schema/ddframe/reg/reg.xsd
  10. http://www.dangdang.com/schema/ddframe/job
  11. http://www.dangdang.com/schema/ddframe/job/job.xsd
  12. ">
  13. <!--配置作业注册中心 -->
  14. <reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
  15. <!-- 配置简单作业-->
  16. <job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
  17. <bean id="yourRefJobBeanId" class="xxx.MySimpleRefElasticJob">
  18. <property name="fooService" ref="xxx.FooService"/>
  19. </bean>
  20. <!-- 配置关联Bean作业-->
  21. <job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
  22. <!-- 配置数据流作业-->
  23. <job:dataflow id="throughputDataflow" class="xxx.MyThroughputDataflowElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
  24. <!-- 配置脚本作业-->
  25. <job:script id="scriptElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" />
  26. <!-- 配置带监听的简单作业-->
  27. <job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C">
  28. <job:listener class="xx.MySimpleJobListener"/>
  29. <job:distributed-listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
  30. </job:simple>
  31. <!-- 配置带作业数据库事件追踪的简单作业-->
  32. <job:simple id="eventTraceElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" event-trace-rdb-data-source="yourDataSource">
  33. </job:simple>
  34. </beans>

配置项详细说明请参见配置手册

3. 作业启动

a. Java启动方式

  1. public class JobDemo {
  2. public static void main(String[] args) {
  3. new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
  4. }
  5. private static CoordinatorRegistryCenter createRegistryCenter() {
  6. CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
  7. regCenter.init();
  8. return regCenter;
  9. }
  10. private static LiteJobConfiguration createJobConfiguration() {
  11. // 创建作业配置
  12. ...
  13. }
  14. }

b. Spring启动方式

将配置Spring命名空间的xml通过Spring启动,作业将自动加载。