Queue

[!TIP] This document is machine-translated by Google. If you find grammatical and semantic errors, and the document description is not clear, please PR

In the development of daily tasks, we will have many asynchronous, batch, timing, and delayed tasks to be processed. There is go-queue in go-zero. It is recommended to use go-queue for processing. Go-queue itself is also developed based on go-zero. There are two modes

  • dq : Depends on beanstalkd, distributed, can be stored, delayed, timing settings, shutdown and restart can be re-executed, messages will not be lost, very simple to use, redis setnx is used in go-queue to ensure that each message is only consumed once, usage scenarios Mainly used for daily tasks.
  • kq: Depends on Kafka, so I won’t introduce more about it, the famous Kafka, the usage scenario is mainly to do message queue

    We mainly talk about dq. The use of kq is also the same, but it depends on the bottom layer. If you haven’t used beanstalkd, you can google it first. It’s still very easy to use.

    etc/job.yaml : Configuration file

    1. Name: job
    2. Log:
    3. ServiceName: job
    4. Level: info
    5. # dq depends on Beanstalks, redis, Beanstalks configuration, redis configuration
    6. DqConf:
    7. Beanstalks:
    8. - Endpoint: 127.0.0.1:7771
    9. Tube: tube1
    10. - Endpoint: 127.0.0.1:7772
    11. Tube: tube2
    12. Redis:
    13. Host: 127.0.0.1:6379
    14. Type: node

    Internal/config/config.go: Parse dq corresponding etc/*.yaml configuration

    ```go /**

  • @Description Configuration file
  • @Author Mikael
  • @Email 13247629622@163.com
  • @Date 2021/1/18 12:05
  • @Version 1.0 **/

    package config

    import ( “github.com/tal-tech/go-queue/dq” “github.com/tal-tech/go-zero/core/service”

    )

    type Config struct { service.ServiceConf DqConf dq.DqConf }

    ```

    Handler/router.go : Responsible for multi-task registration

    ```go /**

  • @Description Register job
  • @Author Mikael
  • @Email 13247629622@163.com
  • @Date 2021/1/18 12:05
  • @Version 1.0 **/ package handler

    import ( “context” “github.com/tal-tech/go-zero/core/service” “job/internal/logic” “job/internal/svc” )

    func RegisterJob(serverCtx svc.ServiceContext,group service.ServiceGroup) {

    group.Add(logic.NewProducerLogic(context.Background(),serverCtx)) group.Add(logic.NewConsumerLogic(context.Background(),serverCtx))

    group.Start()

    } ```

    ProducerLogic: One of the job business logic

    ```go /**

  • @Description Producer task
  • @Author Mikael
  • @Email 13247629622@163.com
  • @Date 2021/1/18 12:05
  • @Version 1.0 **/ package logic

    import ( “context” “github.com/tal-tech/go-queue/dq” “github.com/tal-tech/go-zero/core/logx” “github.com/tal-tech/go-zero/core/threading” “job/internal/svc” “strconv” “time” )

type Producer struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger }

func NewProducerLogic(ctx context.Context, svcCtx svc.ServiceContext) Producer { return &Producer{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } }

func (l *Producer)Start() {

  1. logx.Infof("start Producer \n")
  2. threading.GoSafe(func() {
  3. producer := dq.NewProducer([]dq.Beanstalk{
  4. {
  5. Endpoint: "localhost:7771",
  6. Tube: "tube1",
  7. },
  8. {
  9. Endpoint: "localhost:7772",
  10. Tube: "tube2",
  11. },
  12. })
  13. for i := 1000; i < 1005; i++ {
  14. _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
  15. if err != nil {
  16. logx.Error(err)
  17. }
  18. }
  19. })

}

func (l *Producer)Stop() { logx.Infof(“stop Producer \n”) }

  1. Another job business logic
  2. ```go
  3. /**
  4. * @Description Consumer task
  5. * @Author Mikael
  6. * @Email 13247629622@163.com
  7. * @Date 2021/1/18 12:05
  8. * @Version 1.0
  9. **/
  10. package logic
  11. import (
  12. "context"
  13. "github.com/tal-tech/go-zero/core/logx"
  14. "github.com/tal-tech/go-zero/core/threading"
  15. "job/internal/svc"
  16. )
  17. type Consumer struct {
  18. ctx context.Context
  19. svcCtx *svc.ServiceContext
  20. logx.Logger
  21. }
  22. func NewConsumerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Consumer {
  23. return &Consumer{
  24. ctx: ctx,
  25. svcCtx: svcCtx,
  26. Logger: logx.WithContext(ctx),
  27. }
  28. }
  29. func (l *Consumer)Start() {
  30. logx.Infof("start consumer \n")
  31. threading.GoSafe(func() {
  32. l.svcCtx.Consumer.Consume(func(body []byte) {
  33. logx.Infof("consumer job %s \n" ,string(body))
  34. })
  35. })
  36. }
  37. func (l *Consumer)Stop() {
  38. logx.Infof("stop consumer \n")
  39. }

svc/servicecontext.go

  1. /**
  2. * @Description Configuration
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package svc
  9. import (
  10. "job/internal/config"
  11. "github.com/tal-tech/go-queue/dq"
  12. )
  13. type ServiceContext struct {
  14. Config config.Config
  15. Consumer dq.Consumer
  16. }
  17. func NewServiceContext(c config.Config) *ServiceContext {
  18. return &ServiceContext{
  19. Config: c,
  20. Consumer: dq.NewConsumer(c.DqConf),
  21. }
  22. }

main.go startup file

  1. /**
  2. * @Description Startup file
  3. * @Author Mikael
  4. * @Email 13247629622@163.com
  5. * @Date 2021/1/18 12:05
  6. * @Version 1.0
  7. **/
  8. package main
  9. import (
  10. "flag"
  11. "fmt"
  12. "github.com/tal-tech/go-zero/core/conf"
  13. "github.com/tal-tech/go-zero/core/logx"
  14. "github.com/tal-tech/go-zero/core/service"
  15. "job/internal/config"
  16. "job/internal/handler"
  17. "job/internal/svc"
  18. "os"
  19. "os/signal"
  20. "syscall"
  21. "time"
  22. )
  23. var configFile = flag.String("f", "etc/job.yaml", "the config file")
  24. func main() {
  25. flag.Parse()
  26. var c config.Config
  27. conf.MustLoad(*configFile, &c)
  28. ctx := svc.NewServiceContext(c)
  29. group := service.NewServiceGroup()
  30. handler.RegisterJob(ctx,group)
  31. ch := make(chan os.Signal)
  32. signal.Notify(ch, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  33. for {
  34. s := <-ch
  35. logx.Info("get a signal %s", s.String())
  36. switch s {
  37. case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
  38. fmt.Printf("stop group")
  39. group.Stop()
  40. logx.Info("job exit")
  41. time.Sleep(time.Second)
  42. return
  43. case syscall.SIGHUP:
  44. default:
  45. return
  46. }
  47. }
  48. }