定时任务 Cron

1 Example

Example定时任务 Cron - 图1 (opens new window)

ego 版本:ego@v0.4.1

2 定时任务配置

  1. type Config struct {
  2. // Required. 触发时间
  3. // 默认最小单位为分钟.比如:
  4. // "* * * * * *" 代表每分钟执行
  5. // 如果 EnableSeconds = true. 那么最小单位为秒. 示例:
  6. // "*/3 * * * * * *" 代表每三秒钟执行一次
  7. Spec string
  8. WaitLockTime time.Duration // 抢锁等待时间,默认 4s
  9. LockTTL time.Duration // 租期,默认 16s
  10. RefreshGap time.Duration // 锁刷新间隔时间, 默认 4s
  11. WaitUnlockTime time.Duration // 解锁等待时间,默认 1s
  12. // 任务时间重叠策略, 可选项:
  13. // skip,queue,concurrent
  14. // 如果上一个任务执行较慢,到达了新任务执行时间,那么新任务选择跳过,排队,并发执行的策略,新任务默认选择skip策略
  15. DelayExecType string
  16. // 是否分布式任务,默认否
  17. // 如果设置为 true. 那么需要设置 ecron.WithLock Option
  18. // 框架会使用分布式锁保证同时只有一个节点在执行当前分布式任务
  19. EnableDistributedTask bool
  20. EnableImmediatelyRun bool // 是否立刻执行,默认否
  21. EnableSeconds bool // 是否使用秒作解析器,默认否
  22. }

3 常规定时任务

3.1 用户配置

  1. [cron.test]
  2. enableDistributedTask = false # 是否分布式任务,默认否,如果存在分布式任务,会只执行该定时人物
  3. enableImmediatelyRun = false # 是否立刻执行,默认否
  4. enableSeconds = false # 是否使用秒作解析器,默认否
  5. spec = "*/5 * * * * *" # 执行时间
  6. delayExecType = "skip" # skip,queue,concurrent,如果上一个任务执行较慢,到达了新任务执行时间,那么新任务选择跳过,排队,并发执行的策略,新任务默认选择skip策略

3.2 用户代码

配置创建一个 的配置项,其中内容按照上文 HTTP 的配置进行填写。以上这个示例里这个配置 key 是cron.test

代码中创建一个 cron 服务, ecron.Load(“”).Build() ,代码中的 key 和配置中的 key 。创建完 cron 后, 将他添加到 ego new 出来应用的 Schedule 方法中。

  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/gotomicro/ego"
  7. "github.com/gotomicro/ego/core/elog"
  8. "github.com/gotomicro/ego/core/etrace"
  9. "github.com/gotomicro/ego/task/ecron"
  10. )
  11. // export EGO_DEBUG=true && go run main.go --config=config.toml
  12. func main() {
  13. err := ego.New().Cron(cronJob1(), cronJob2()).Run()
  14. if err != nil {
  15. elog.Panic("startup engine", elog.FieldErr(err))
  16. }
  17. }
  18. // 异常任务
  19. func cronJob1() ecron.Ecron {
  20. job := func(ctx context.Context) error {
  21. elog.Info("info job1", elog.FieldTid(etrace.ExtractTraceID(ctx)))
  22. elog.Warn("warn job1", elog.FieldTid(etrace.ExtractTraceID(ctx)))
  23. fmt.Println("run job1", elog.FieldTid(etrace.ExtractTraceID(ctx)))
  24. return errors.New("exec job1 error")
  25. }
  26. cron := ecron.Load("cron.test").Build(ecron.WithJob(job))
  27. return cron
  28. }
  29. // 正常任务
  30. func cronJob2() ecron.Ecron {
  31. job := func(ctx context.Context) error {
  32. elog.Info("info job2", elog.FieldTid(etrace.ExtractTraceID(ctx)))
  33. elog.Warn("warn job2", elog.FieldTid(etrace.ExtractTraceID(ctx)))
  34. fmt.Println("run job2", elog.FieldTid(etrace.ExtractTraceID(ctx)))
  35. return nil
  36. }
  37. cron := ecron.Load("cron.test").Build(ecron.WithJob(job))
  38. return cron
  39. }

4 分布式定时任务

4.1 用户配置

  1. [cron.test]
  2. enableDistributedTask = true # 是否分布式任务,默认否,如果存在分布式任务,会只执行该定时人物
  3. enableImmediatelyRun = false # 是否立刻执行,默认否
  4. delayExecType = "skip" # skip,queue,concurrent,如果上一个任务执行较慢,到达了新任务执行时间,那么新任务选择跳过,排队,并发执行的策略,新任务默认选择skip策略
  5. enableSeconds = true # 启用秒单位
  6. spec = "*/3 * * * * *"
  7. [redis.test]
  8. debug = true # ego增加redis debug,打开后可以看到,配置名、地址、耗时、请求数据、响应数据
  9. addr = "127.0.0.1:6379"
  10. enableAccessInterceptor = true
  11. enableAccessInterceptorReq = true
  12. enableAccessInterceptorRes = true

4.2 用户代码

配置创建一个 的配置项,其中内容按照上文HTTP的配置进行填写。以上这个示例里这个配置key是cron.test

代码中创建一个 cron 服务, ecron.Load(“”).Build() ,代码中的 key 和配置中的 key 。创建完 cron 后, 将他添加到 ego new 出来应用的 Schedule 方法中。

  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "github.com/gotomicro/ego"
  6. "github.com/gotomicro/ego/core/elog"
  7. "github.com/gotomicro/ego/task/ecron"
  8. "github.com/gotomicro/ego-component/eredis"
  9. "github.com/gotomicro/ego-component/eredis/ecronlock"
  10. )
  11. var (
  12. redis *eredis.Component
  13. locker *ecronlock.Component
  14. )
  15. // export EGO_DEBUG=true && go run main.go --config=config.toml
  16. func main() {
  17. err := ego.New().Invoker(initRedis).Cron(cronJob()).Run()
  18. if err != nil {
  19. elog.Panic("startup", elog.FieldErr(err))
  20. }
  21. }
  22. func initRedis() error {
  23. redis = eredis.Load("redis.test").Build()
  24. // 构造分布式任务锁,目前已实现redis版本. 如果希望自定义,可以实现 ecron.Lock 接口
  25. locker = ecronlock.DefaultContainer().Build(ecronlock.WithClient(redis))
  26. return nil
  27. }
  28. func cronJob() ecron.Ecron {
  29. cron := ecron.Load("cron.test").Build(
  30. // 设置分布式锁
  31. ecron.WithLock(locker.NewLock("ego-component:cronjob:syncXxx")),
  32. ecron.WithJob(helloWorld),
  33. )
  34. return cron
  35. }
  36. func helloWorld(ctx context.Context) error {
  37. log.Println("cron job running")
  38. return nil
  39. }