• go-zero 分布式定时任务

日常任务开发中,我们会有很多异步、批量、定时、延迟任务要处理,go-zero中有go-queue,推荐使用go-queue去处理,go-queue本身也是基于go-zero开发的,其本身是有两种模式

  • dq : 依赖于beanstalkd,分布式,可存储,延迟、定时设置,关机重启可以重新执行,消息不会丢失,使用非常简单,go-queue中使用了redis setnx保证了每条消息只被消费一次,使用场景主要是用来做日常任务使用
  • kq:依赖于kafka,这个就不多介绍啦,大名鼎鼎的kafka,使用场景主要是做消息队列

    我们主要说一下dq,kq使用也一样的,只是依赖底层不同,如果没使用过beanstalkd,没接触过beanstalkd的可以先google一下,使用起来还是挺容易的。

etc/job.yaml : 配置文件

  1. Name: job
  2. Log:
  3. ServiceName: job
  4. Level: info
  5. #dq依赖Beanstalks、redis ,Beanstalks配置、redis配置
  6. DqConf:
  7. Beanstalks:
  8. - Endpoint: 127.0.0.1:7771
  9. Tube: tube1
  10. - Endpoint: 127.0.0.1:7772
  11. Tube: tube2
  12. Redis:
  13. Host: 127.0.0.1:6379
  14. Type: node

Internal/config/config.go :解析dq对应etc/*.yaml配置

  1. /**
  2. * @Description 配置文件
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package config
  9. import (
  10. "github.com/tal-tech/go-queue/dq"
  11. "github.com/tal-tech/go-zero/core/service"
  12. )
  13. type Config struct {
  14. service.ServiceConf
  15. DqConf dq.DqConf
  16. }

Handler/router.go : 负责注册多任务

  1. /**
  2. * @Description 注册job
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package handler
  9. import (
  10. "context"
  11. "github.com/tal-tech/go-zero/core/service"
  12. "job/internal/logic"
  13. "job/internal/svc"
  14. )
  15. func RegisterJob(serverCtx *svc.ServiceContext,group *service.ServiceGroup) {
  16. group.Add(logic.NewProducerLogic(context.Background(),serverCtx))
  17. group.Add(logic.NewConsumerLogic(context.Background(),serverCtx))
  18. group.Start()
  19. }

ProducerLogic: 其中一个job业务逻辑

  1. /**
  2. * @Description 生产者任务
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package logic
  9. import (
  10. "context"
  11. "github.com/tal-tech/go-queue/dq"
  12. "github.com/tal-tech/go-zero/core/logx"
  13. "github.com/tal-tech/go-zero/core/threading"
  14. "job/internal/svc"
  15. "strconv"
  16. "time"
  17. )
  18. type Producer struct {
  19. ctx context.Context
  20. svcCtx *svc.ServiceContext
  21. logx.Logger
  22. }
  23. func NewProducerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Producer {
  24. return &Producer{
  25. ctx: ctx,
  26. svcCtx: svcCtx,
  27. Logger: logx.WithContext(ctx),
  28. }
  29. }
  30. func (l *Producer)Start() {
  31. logx.Infof("start Producer \n")
  32. threading.GoSafe(func() {
  33. producer := dq.NewProducer([]dq.Beanstalk{
  34. {
  35. Endpoint: "localhost:7771",
  36. Tube: "tube1",
  37. },
  38. {
  39. Endpoint: "localhost:7772",
  40. Tube: "tube2",
  41. },
  42. })
  43. for i := 1000; i < 1005; i++ {
  44. _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
  45. if err != nil {
  46. logx.Error(err)
  47. }
  48. }
  49. })
  50. }
  51. func (l *Producer)Stop() {
  52. logx.Infof("stop Producer \n")
  53. }

另外一个Job业务逻辑

  1. /**
  2. * @Description 消费者任务
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package logic
  9. import (
  10. "context"
  11. "github.com/tal-tech/go-zero/core/logx"
  12. "github.com/tal-tech/go-zero/core/threading"
  13. "job/internal/svc"
  14. )
  15. type Consumer struct {
  16. ctx context.Context
  17. svcCtx *svc.ServiceContext
  18. logx.Logger
  19. }
  20. func NewConsumerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Consumer {
  21. return &Consumer{
  22. ctx: ctx,
  23. svcCtx: svcCtx,
  24. Logger: logx.WithContext(ctx),
  25. }
  26. }
  27. func (l *Consumer)Start() {
  28. logx.Infof("start consumer \n")
  29. threading.GoSafe(func() {
  30. l.svcCtx.Consumer.Consume(func(body []byte) {
  31. logx.Infof("consumer job %s \n" ,string(body))
  32. })
  33. })
  34. }
  35. func (l *Consumer)Stop() {
  36. logx.Infof("stop consumer \n")
  37. }

svc/servicecontext.go

  1. /**
  2. * @Description 配置
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package svc
  9. import (
  10. "job/internal/config"
  11. "github.com/tal-tech/go-queue/dq"
  12. )
  13. type ServiceContext struct {
  14. Config config.Config
  15. Consumer dq.Consumer
  16. }
  17. func NewServiceContext(c config.Config) *ServiceContext {
  18. return &ServiceContext{
  19. Config: c,
  20. Consumer: dq.NewConsumer(c.DqConf),
  21. }
  22. }

main.go启动文件

  1. /**
  2. * @Description 启动文件
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package main
  9. import (
  10. "flag"
  11. "fmt"
  12. "github.com/tal-tech/go-zero/core/conf"
  13. "github.com/tal-tech/go-zero/core/logx"
  14. "github.com/tal-tech/go-zero/core/service"
  15. "job/internal/config"
  16. "job/internal/handler"
  17. "job/internal/svc"
  18. "os"
  19. "os/signal"
  20. "syscall"
  21. "time"
  22. )
  23. var configFile = flag.String("f", "etc/job.yaml", "the config file")
  24. func main() {
  25. flag.Parse()
  26. //配置
  27. var c config.Config
  28. conf.MustLoad(*configFile, &c)
  29. ctx := svc.NewServiceContext(c)
  30. //注册job
  31. group := service.NewServiceGroup()
  32. handler.RegisterJob(ctx,group)
  33. //捕捉信号
  34. ch := make(chan os.Signal)
  35. signal.Notify(ch, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  36. for {
  37. s := <-ch
  38. logx.Info("get a signal %s", s.String())
  39. switch s {
  40. case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
  41. fmt.Printf("stop group")
  42. group.Stop()
  43. logx.Info("job exit")
  44. time.Sleep(time.Second)
  45. return
  46. case syscall.SIGHUP:
  47. default:
  48. return
  49. }
  50. }
  51. }

常见问题:

为什么使用dp,需要使用redis

  • 因为beanstalk是单点服务,无法保证高可用。dp可以使用多个单点beanstalk服务,互相备份 & 保证高可用。使用redis解决重复消费问题。