Delay Queue

Overview

With regard to extended tasks, there are many scenarios in which the return of stocks is automatically closed without payment after 20 minutes.

go-queue implements the time queue dq, in addition to providing kafka message queue kq.The bottom layer of the go-queue is used for the beanstalkd.

Config

  1. type (
  2. Beanstalk struct {
  3. Endpoint string
  4. Tube string
  5. }
  6. DqConf struct {
  7. Beanstalks []Beanstalk
  8. Redis redis.RedisConf
  9. }
  10. )
  • Beantalks: multiple Beanstalk node configurations

  • Redis:redis configuration, mainly using Setnx here

pusher using dq in go-zero

First pull go-queue dependencies in the project

  1. $ go get github.com/zeromicro/go-queue@latest

Add current dq configuration information to the etc/xxx.yaml configuration file

  1. Name: dq
  2. Host: 0.0.0.0
  3. Port: 8888
  4. ......
  5. DqConf:
  6. Beanstalks:
  7. - Endpoint: 127.0.0.1:7771
  8. Tube: tube1
  9. - Endpoint: 127.0.0.1:7772
  10. Tube: tube2

Define configuration of go mapping in config.go under internal/config

  1. type Config struct {
  2. ......
  3. DqConf struct {
  4. Brokers []string
  5. Topic string
  6. }
  7. }

Initialize a penchant dq client in svc/serviceContext.go

  1. type ServiceContext struct {
  2. Config config.Config
  3. .....
  4. DqPusherClient dq.Producer
  5. }
  6. func NewServiceContext(c config.Config) *ServiceContext {
  7. return &ServiceContext{
  8. Config: c,
  9. .....
  10. DqPusherClient: dq.NewProducer(c.DqConf.Beanstalks),
  11. }
  12. }

Write business logic in logic, using go-queue dq client to send messages to beanstalk

  1. .......
  2. func (l *PusherLogic) Pusher() error {
  3. msg := "data"
  4. // 1、5s后执行
  5. deplayResp, err := l.svcCtx.DqPusherClient.Delay([]byte(msg), time.Second*5)
  6. if err != nil {
  7. logx.Errorf("error from DqPusherClient Delay err : %v", err)
  8. }
  9. logx.Infof("resp : %s", deplayResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
  10. // 2、在某个指定时间执行
  11. atResp, err := l.svcCtx.DqPusherClient.At([]byte(msg), time.Now())
  12. if err != nil {
  13. logx.Errorf("error from DqPusherClient Delay err : %v", err)
  14. }
  15. logx.Infof("resp : %s", atResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
  16. return nil
  17. }

Use dq consumer consumer in go-zero

First pull go-queue dependencies in the project

  1. $ go get github.com/zeromicro/go-queue@latest

Add current kafka configuration information to the etc/xxx.yaml configuration file

  1. Name: dq
  2. Host: 0.0.0.0
  3. Port: 8889
  4. .....
  5. #dq
  6. DqConf:
  7. Beanstalks:
  8. - Endpoint: 127.0.0.1:7771
  9. Tube: tube1
  10. - Endpoint: 127.0.0.1:7772
  11. Tube: tube2
  12. Redis:
  13. Host: 127.0.0.1:6379
  14. Type: node

Define configuration of go mapping in config.go under internal/config

  1. package config
  2. import (
  3. "github.com/zeromicro/go-queue/dq"
  4. "github.com/zeromicro/go-zero/rest"
  5. )
  6. type Config struct {
  7. rest.RestConf
  8. .......
  9. DqConf dq.DqConf
  10. }

Initialize consumer dq client in svc/serviceContext.go

  1. type ServiceContext struct {
  2. Config config.Config
  3. .....
  4. DqConsumer dq.Consumer
  5. }
  6. func NewServiceContext(c config.Config) *ServiceContext {
  7. return &ServiceContext{
  8. Config: c,
  9. .....
  10. DqConsumer: dq.NewConsumer(c.DqConf),
  11. }
  12. }

logic 中消费延时消息

  1. func (l *PusherLogic) Consumer() error {
  2. l.svcCtx.DqConsumer.Consume(func(body []byte) {
  3. logx.Infof("consumer job %s \n", string(body))
  4. })
  5. }

Write in the end, the beanstalk is not reliant on redis, but go-queue is the better we want to prevent repeated consumption in a short period of time, using redis Setnx to allow us to filter spent messages within a short period of time

References

  1. Beanstalkd Introduction and Installation