短时任务Job

ego版本:ego@v0.8.5

背景

通常我们有许多程序是短时任务,执行一下就结束。这种场景通常有以下三种方式:

  • 执行某个一次性任务,例如:执行程序的安装,或者mock数据
  • 将生命周期托管给例如k8s job或者xxljob,由他们控制job的执行时间,执行二进制
  • 通过定时任务来调用某个job http接口

最简单的Job

Example

项目地址短时任务Job - 图1 (opens new window)

如果命令行参数里有 --job ,那么框架会优先执行这个 job,停止所有的 servercronjob 可以执行一个,也可以执行多个。执行一个方式 --job=jobname ,执行多个方式,用逗号分割 jobname,例如: --job=jobname1,jobname2,jobname3

  1. package main
  2. import (
  3. "errors"
  4. "fmt"
  5. "go.uber.org/zap"
  6. "github.com/gotomicro/ego"
  7. "github.com/gotomicro/ego/core/elog"
  8. "github.com/gotomicro/ego/core/etrace"
  9. "github.com/gotomicro/ego/task/ejob"
  10. )
  11. // export EGO_DEBUG=true && go run main.go --job=job1
  12. func main() {
  13. if err := ego.New().Job(
  14. ejob.Job("job1", job1),
  15. ejob.Job("job2", job2),
  16. ).Run(); err != nil {
  17. elog.Error("start up", zap.Error(err))
  18. }
  19. }
  20. func job1(ctx ejob.Context) error {
  21. fmt.Println("i am job runner, traceId: ", etrace.ExtractTraceID(ctx.Ctx))
  22. return nil
  23. }
  24. func job2(ctx ejob.Context) error {
  25. fmt.Println("i am error job runner, traceId: ", etrace.ExtractTraceID(ctx.Ctx))
  26. return errors.New("i am error")
  27. }

HTTP的Job

Example

项目地址短时任务Job - 图2 (opens new window)

注册Job后,可以通过内网的治理服务得到job的信息

  1. ~ curl http://ip:port/job/list
  2. ~ ["job"]

1 用接口调用Job服务

  1. ~ export EGO_DEBUG=true && go run main.go --config=config.toml # 启动服务
  2. ~ curl -v -XPOST -d '{"username":"ego"}' -H 'X-Ego-Job-Name:job' -H 'X-Ego-Job-RunID:xxxx' -H 'test=1' http://127.0.0.1:9003/jobs ## 调用服务

入参

  • X-Ego-Job-Name:用于查找job的名称,必传
  • X-Ego-Job-RunID:用于记录此次job的执行ID号,请保证唯一,必传
  • Data: 用户自行传入,可选

响应

  • 状态码:正确返回200,参数错误返回400
  • Header:将入参的 X-Ego-Job-Name,X-Ego-Job-RunID 的数据返回
  • 参数错误:返回400,并会返回X-Ego-Job-Err的header错误

httpjoberror.png

成功响应

httpjobsuccess.png

2 用命令行模拟HTTP调用Job服务

  1. export EGO_DEBUG=true && go run main.go --config=config.toml --job=job --job-data='{"username":"ego"}' --job-header='test=1'

入参

  • --job:任务名称
  • --job-data: 模拟HTTP data
  • --job-header: 模拟HTTP header,—job-header=’test=1;test2=2’

响应

看job的执行日志,是否有error信息

代码

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "github.com/gotomicro/ego"
  7. "github.com/gotomicro/ego/core/elog"
  8. "github.com/gotomicro/ego/server/egovernor"
  9. "github.com/gotomicro/ego/task/ejob"
  10. "go.uber.org/zap"
  11. )
  12. // 如果是Job 命令行执行 export EGO_DEBUG=true && go run main.go --config=config.toml --job=job --job-data='{"username":"ego"}' --job-header='test=1'
  13. // 如果是Job HTTP执行 1 export EGO_DEBUG=true && go run main.go --config=config.toml
  14. // 如果是Job HTTP执行 2 curl -v -XPOST -d '{"username":"ego"}' -H 'X-Ego-Job-Name:job' -H 'X-Ego-Job-RunID:xxxx' -H 'test=1' http://127.0.0.1:9003/jobs
  15. func main() {
  16. if err := ego.New().Job(
  17. ejob.Job("job", job),
  18. ).Serve(
  19. egovernor.Load("server.governor").Build(),
  20. ).Run(); err != nil {
  21. elog.Error("start up", zap.Error(err))
  22. }
  23. }
  24. type data struct {
  25. Username string
  26. }
  27. func job(ctx ejob.Context) error {
  28. bytes, _ := ioutil.ReadAll(ctx.Request.Body)
  29. d := data{}
  30. _ = json.Unmarshal(bytes, &d)
  31. fmt.Println(d.Username)
  32. fmt.Println(ctx.Request.Header.Get("test"))
  33. ctx.Writer.Write([]byte("i am ok"))
  34. return nil
  35. }