• go-zero 分布式定时任务

    日常任务开发中,我们会有很多异步、批量、定时、延迟任务要处理,go-zero中有go-queue,推荐使用go-queue去处理,go-queue本身也是基于go-zero开发的,其本身是有两种模式

    • dq : 依赖于beanstalkd,分布式,可存储,延迟、定时设置,关机重启可以重新执行,消息不会丢失,使用非常简单,go-queue中使用了redis setnx保证了每条消息只被消费一次,使用场景主要是用来做日常任务使用
    • kq:依赖于kafka,这个就不多介绍啦,大名鼎鼎的kafka,使用场景主要是做消息队列

    我们主要说一下dq,kq使用也一样的,只是依赖底层不同,如果没使用过beanstalkd,没接触过beanstalkd的可以先google一下,使用起来还是挺容易的。

    etc/job.yaml : 配置文件

    1. Name: job
    2. Log:
    3. ServiceName: job
    4. Level: info
    5. #dq依赖Beanstalks、redis ,Beanstalks配置、redis配置
    6. DqConf:
    7. Beanstalks:
    8. - Endpoint: 127.0.0.1:7771
    9. Tube: tube1
    10. - Endpoint: 127.0.0.1:7772
    11. Tube: tube2
    12. Redis:
    13. Host: 127.0.0.1:6379
    14. Type: node

    Internal/config/config.go :解析dq对应etc/*.yaml配置

    1. /**
    2. * @Description 配置文件
    3. * @Author Mikael
    4. * @Email 13247629622@163.com
    5. * @Date 2021/1/18 12:05
    6. * @Version 1.0
    7. **/
    8. package config
    9. import (
    10. "github.com/tal-tech/go-queue/dq"
    11. "github.com/tal-tech/go-zero/core/service"
    12. )
    13. type Config struct {
    14. service.ServiceConf
    15. DqConf dq.DqConf
    16. }

    Handler/router.go : 负责注册多任务

    1. /**
    2. * @Description 注册job
    3. * @Author Mikael
    4. * @Email 13247629622@163.com
    5. * @Date 2021/1/18 12:05
    6. * @Version 1.0
    7. **/
    8. package handler
    9. import (
    10. "context"
    11. "github.com/tal-tech/go-zero/core/service"
    12. "job/internal/logic"
    13. "job/internal/svc"
    14. )
    15. func RegisterJob(serverCtx *svc.ServiceContext,group *service.ServiceGroup) {
    16. group.Add(logic.NewProducerLogic(context.Background(),serverCtx))
    17. group.Add(logic.NewConsumerLogic(context.Background(),serverCtx))
    18. group.Start()
    19. }

    ProducerLogic: 其中一个job业务逻辑

    ```go /**

    • @Description 生产者任务
    • @Author Mikael
    • @Email 13247629622@163.com
    • @Date 2021/1/18 12:05
    • @Version 1.0 **/ package logic

    import (

    1. "context"
    2. "github.com/tal-tech/go-queue/dq"
    3. "github.com/tal-tech/go-zero/core/logx"
    4. "github.com/tal-tech/go-zero/core/threading"
    5. "job/internal/svc"
    6. "strconv"
    7. "time"

    )

type Producer struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger }

func NewProducerLogic(ctx context.Context, svcCtx svc.ServiceContext) Producer { return &Producer{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } }

func (l *Producer)Start() {

  1. logx.Infof("start Producer \n")
  2. threading.GoSafe(func() {
  3. producer := dq.NewProducer([]dq.Beanstalk{
  4. {
  5. Endpoint: "localhost:7771",
  6. Tube: "tube1",
  7. },
  8. {
  9. Endpoint: "localhost:7772",
  10. Tube: "tube2",
  11. },
  12. })
  13. for i := 1000; i < 1005; i++ {
  14. _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
  15. if err != nil {
  16. logx.Error(err)
  17. }
  18. }
  19. })

}

func (l *Producer)Stop() { logx.Infof(“stop Producer \n”) }

  1. 另外一个Job业务逻辑
  2. ```go
  3. /**
  4. * @Description 消费者任务
  5. * @Author Mikael
  6. * @Email 13247629622@163.com
  7. * @Date 2021/1/18 12:05
  8. * @Version 1.0
  9. **/
  10. package logic
  11. import (
  12. "context"
  13. "github.com/tal-tech/go-zero/core/logx"
  14. "github.com/tal-tech/go-zero/core/threading"
  15. "job/internal/svc"
  16. )
  17. type Consumer struct {
  18. ctx context.Context
  19. svcCtx *svc.ServiceContext
  20. logx.Logger
  21. }
  22. func NewConsumerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Consumer {
  23. return &Consumer{
  24. ctx: ctx,
  25. svcCtx: svcCtx,
  26. Logger: logx.WithContext(ctx),
  27. }
  28. }
  29. func (l *Consumer)Start() {
  30. logx.Infof("start consumer \n")
  31. threading.GoSafe(func() {
  32. l.svcCtx.Consumer.Consume(func(body []byte) {
  33. logx.Infof("consumer job %s \n" ,string(body))
  34. })
  35. })
  36. }
  37. func (l *Consumer)Stop() {
  38. logx.Infof("stop consumer \n")
  39. }

svc/servicecontext.go

  1. /**
  2. * @Description 配置
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package svc
  9. import (
  10. "job/internal/config"
  11. "github.com/tal-tech/go-queue/dq"
  12. )
  13. type ServiceContext struct {
  14. Config config.Config
  15. Consumer dq.Consumer
  16. }
  17. func NewServiceContext(c config.Config) *ServiceContext {
  18. return &ServiceContext{
  19. Config: c,
  20. Consumer: dq.NewConsumer(c.DqConf),
  21. }
  22. }

main.go启动文件

  1. /**
  2. * @Description 启动文件
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package main
  9. import (
  10. "flag"
  11. "fmt"
  12. "github.com/tal-tech/go-zero/core/conf"
  13. "github.com/tal-tech/go-zero/core/logx"
  14. "github.com/tal-tech/go-zero/core/service"
  15. "job/internal/config"
  16. "job/internal/handler"
  17. "job/internal/svc"
  18. "os"
  19. "os/signal"
  20. "syscall"
  21. "time"
  22. )
  23. var configFile = flag.String("f", "etc/job.yaml", "the config file")
  24. func main() {
  25. flag.Parse()
  26. //配置
  27. var c config.Config
  28. conf.MustLoad(*configFile, &c)
  29. ctx := svc.NewServiceContext(c)
  30. //注册job
  31. group := service.NewServiceGroup()
  32. handler.RegisterJob(ctx,group)
  33. //捕捉信号
  34. ch := make(chan os.Signal)
  35. signal.Notify(ch, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  36. for {
  37. s := <-ch
  38. logx.Info("get a signal %s", s.String())
  39. switch s {
  40. case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
  41. fmt.Printf("stop group")
  42. group.Stop()
  43. logx.Info("job exit")
  44. time.Sleep(time.Second)
  45. return
  46. case syscall.SIGHUP:
  47. default:
  48. return
  49. }
  50. }
  51. }

常见问题:

为什么使用dp,需要使用redis

  • 因为beanstalk是单点服务,无法保证高可用。dp可以使用多个单点beanstalk服务,互相备份 & 保证高可用。使用redis解决重复消费问题。