Queue

[!TIP] This document is machine-translated by Google. If you find grammatical and semantic errors, and the document description is not clear, please PR

In the development of daily tasks, we will have many asynchronous, batch, timing, and delayed tasks to be processed. There is go-queue in go-zero. It is recommended to use go-queue for processing. Go-queue itself is also developed based on go-zero. There are two modes

  • dq : Depends on beanstalkd, distributed, can be stored, delayed, timing settings, shutdown and restart can be re-executed, messages will not be lost, very simple to use, redis setnx is used in go-queue to ensure that each message is only consumed once, usage scenarios Mainly used for daily tasks.
  • kq: Depends on Kafka, so I won’t introduce more about it, the famous Kafka, the usage scenario is mainly to do message queue

    We mainly talk about dq. The use of kq is also the same, but it depends on the bottom layer. If you haven’t used beanstalkd, you can google it first. It’s still very easy to use.

etc/job.yaml : Configuration file

  1. Name: job
  2. Log:
  3. ServiceName: job
  4. Level: info
  5. # dq depends on Beanstalks, redis, Beanstalks configuration, redis configuration
  6. DqConf:
  7. Beanstalks:
  8. - Endpoint: 127.0.0.1:7771
  9. Tube: tube1
  10. - Endpoint: 127.0.0.1:7772
  11. Tube: tube2
  12. Redis:
  13. Host: 127.0.0.1:6379
  14. Type: node

Internal/config/config.go: Parse dq corresponding etc/*.yaml configuration

  1. /**
  2. * @Description Configuration file
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package config
  9. import (
  10. "github.com/tal-tech/go-queue/dq"
  11. "github.com/tal-tech/go-zero/core/service"
  12. )
  13. type Config struct {
  14. service.ServiceConf
  15. DqConf dq.DqConf
  16. }

Handler/router.go : Responsible for multi-task registration

  1. /**
  2. * @Description Register job
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package handler
  9. import (
  10. "context"
  11. "github.com/tal-tech/go-zero/core/service"
  12. "job/internal/logic"
  13. "job/internal/svc"
  14. )
  15. func RegisterJob(serverCtx *svc.ServiceContext,group *service.ServiceGroup) {
  16. group.Add(logic.NewProducerLogic(context.Background(),serverCtx))
  17. group.Add(logic.NewConsumerLogic(context.Background(),serverCtx))
  18. group.Start()
  19. }

ProducerLogic: One of the job business logic

  1. /**
  2. * @Description Producer task
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package logic
  9. import (
  10. "context"
  11. "github.com/tal-tech/go-queue/dq"
  12. "github.com/tal-tech/go-zero/core/logx"
  13. "github.com/tal-tech/go-zero/core/threading"
  14. "job/internal/svc"
  15. "strconv"
  16. "time"
  17. )
  18. type Producer struct {
  19. ctx context.Context
  20. svcCtx *svc.ServiceContext
  21. logx.Logger
  22. }
  23. func NewProducerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Producer {
  24. return &Producer{
  25. ctx: ctx,
  26. svcCtx: svcCtx,
  27. Logger: logx.WithContext(ctx),
  28. }
  29. }
  30. func (l *Producer)Start() {
  31. logx.Infof("start Producer \n")
  32. threading.GoSafe(func() {
  33. producer := dq.NewProducer([]dq.Beanstalk{
  34. {
  35. Endpoint: "localhost:7771",
  36. Tube: "tube1",
  37. },
  38. {
  39. Endpoint: "localhost:7772",
  40. Tube: "tube2",
  41. },
  42. })
  43. for i := 1000; i < 1005; i++ {
  44. _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
  45. if err != nil {
  46. logx.Error(err)
  47. }
  48. }
  49. })
  50. }
  51. func (l *Producer)Stop() {
  52. logx.Infof("stop Producer \n")
  53. }

Another job business logic

  1. /**
  2. * @Description Consumer task
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package logic
  9. import (
  10. "context"
  11. "github.com/tal-tech/go-zero/core/logx"
  12. "github.com/tal-tech/go-zero/core/threading"
  13. "job/internal/svc"
  14. )
  15. type Consumer struct {
  16. ctx context.Context
  17. svcCtx *svc.ServiceContext
  18. logx.Logger
  19. }
  20. func NewConsumerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Consumer {
  21. return &Consumer{
  22. ctx: ctx,
  23. svcCtx: svcCtx,
  24. Logger: logx.WithContext(ctx),
  25. }
  26. }
  27. func (l *Consumer)Start() {
  28. logx.Infof("start consumer \n")
  29. threading.GoSafe(func() {
  30. l.svcCtx.Consumer.Consume(func(body []byte) {
  31. logx.Infof("consumer job %s \n" ,string(body))
  32. })
  33. })
  34. }
  35. func (l *Consumer)Stop() {
  36. logx.Infof("stop consumer \n")
  37. }

svc/servicecontext.go

  1. /**
  2. * @Description Configuration
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package svc
  9. import (
  10. "job/internal/config"
  11. "github.com/tal-tech/go-queue/dq"
  12. )
  13. type ServiceContext struct {
  14. Config config.Config
  15. Consumer dq.Consumer
  16. }
  17. func NewServiceContext(c config.Config) *ServiceContext {
  18. return &ServiceContext{
  19. Config: c,
  20. Consumer: dq.NewConsumer(c.DqConf),
  21. }
  22. }

main.go startup file

  1. /**
  2. * @Description Startup file
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package main
  9. import (
  10. "flag"
  11. "fmt"
  12. "github.com/tal-tech/go-zero/core/conf"
  13. "github.com/tal-tech/go-zero/core/logx"
  14. "github.com/tal-tech/go-zero/core/service"
  15. "job/internal/config"
  16. "job/internal/handler"
  17. "job/internal/svc"
  18. "os"
  19. "os/signal"
  20. "syscall"
  21. "time"
  22. )
  23. var configFile = flag.String("f", "etc/job.yaml", "the config file")
  24. func main() {
  25. flag.Parse()
  26. var c config.Config
  27. conf.MustLoad(*configFile, &c)
  28. ctx := svc.NewServiceContext(c)
  29. group := service.NewServiceGroup()
  30. handler.RegisterJob(ctx,group)
  31. ch := make(chan os.Signal)
  32. signal.Notify(ch, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  33. for {
  34. s := <-ch
  35. logx.Info("get a signal %s", s.String())
  36. switch s {
  37. case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
  38. fmt.Printf("stop group")
  39. group.Stop()
  40. logx.Info("job exit")
  41. time.Sleep(time.Second)
  42. return
  43. case syscall.SIGHUP:
  44. default:
  45. return
  46. }
  47. }
  48. }