作业监听器

可通过配置多个任务监听器,在任务执行前和执行后执行监听的方法。监听器分为每台作业节点均执行和分布式场景中仅单一节点执行2种。

1. 每台作业节点均执行的监听

若作业处理作业服务器的文件,处理完成后删除文件,可考虑使用每个节点均执行清理任务。此类型任务实现简单,且无需考虑全局分布式任务是否完成,请尽量使用此类型监听器。

步骤:

  • 定义监听器
  1. public class MyElasticJobListener implements ElasticJobListener {
  2. @Override
  3. public void beforeJobExecuted(ShardingContexts shardingContexts) {
  4. // do something ...
  5. }
  6. @Override
  7. public void afterJobExecuted(ShardingContexts shardingContexts) {
  8. // do something ...
  9. }
  10. }
  • 将监听器作为参数传入JobScheduler
  1. public class JobMain {
  2. public static void main(String[] args) {
  3. new JobScheduler(createRegistryCenter(), createJobConfiguration(), new MyElasticJobListener()).init();
  4. }
  5. private static CoordinatorRegistryCenter createRegistryCenter() {
  6. CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
  7. regCenter.init();
  8. return regCenter;
  9. }
  10. private static LiteJobConfiguration createJobConfiguration() {
  11. // 创建作业配置
  12. ...
  13. }
  14. }

2. 分布式场景中仅单一节点执行的监听

若作业处理数据库数据,处理完成后只需一个节点完成数据清理任务即可。此类型任务处理复杂,需同步分布式环境下作业的状态同步,提供了超时设置来避免作业不同步导致的死锁,请谨慎使用。

步骤:

  • 定义监听器
  1. public class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener {
  2. public TestDistributeOnceElasticJobListener(long startTimeoutMills, long completeTimeoutMills) {
  3. super(startTimeoutMills, completeTimeoutMills);
  4. }
  5. @Override
  6. public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
  7. // do something ...
  8. }
  9. @Override
  10. public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
  11. // do something ...
  12. }
  13. }
  • 将监听器作为参数传入JobScheduler
  1. public class JobMain {
  2. public static void main(String[] args) {
  3. long startTimeoutMills = 5000L;
  4. long completeTimeoutMills = 10000L;
  5. new JobScheduler(createRegistryCenter(), createJobConfiguration(), new MyDistributeOnceElasticJobListener(startTimeoutMills, completeTimeoutMills)).init();
  6. }
  7. private static CoordinatorRegistryCenter createRegistryCenter() {
  8. CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
  9. regCenter.init();
  10. return regCenter;
  11. }
  12. private static LiteJobConfiguration createJobConfiguration() {
  13. // 创建作业配置
  14. ...
  15. }
  16. }