延时队列

概述

关于延时任务,在很多场景也会被使用到,比如订单 20 分钟后未支付自动关闭归还库存等。

go-queue 除了提供了 kafka 消息队列 kq 之外,也实现了延时队列 dq。目前 go-queue 的延时队列底层是使用的 beanstalkd

Config

  1. type (
  2. Beanstalk struct {
  3. Endpoint string
  4. Tube string
  5. }
  6. DqConf struct {
  7. Beanstalks []Beanstalk
  8. Redis redis.RedisConf
  9. }
  10. )
  • Beanstalks: 多个 Beanstalk 节点配置

  • Redis:redis 配置,主要在这里面使用 Setnx 去重

go-zero 中使用 dq 的 pusher

项目中首先要拉取 go-queue 的依赖

  1. $ go get github.com/zeromicro/go-queue@latest

在 etc/xxx.yaml 配置文件中添加当前的 dq 配置信息

  1. Name: dq
  2. Host: 0.0.0.0
  3. Port: 8888
  4. ......
  5. DqConf:
  6. Beanstalks:
  7. - Endpoint: 127.0.0.1:7771
  8. Tube: tube1
  9. - Endpoint: 127.0.0.1:7772
  10. Tube: tube2

在 internal/config 下的 config.go 中定义 go 映射的配置

  1. type Config struct {
  2. ......
  3. DqConf struct {
  4. Brokers []string
  5. Topic string
  6. }
  7. }

在 svc/serviceContext.go 中初始化 pusher 的 dq client

  1. type ServiceContext struct {
  2. Config config.Config
  3. .....
  4. DqPusherClient dq.Producer
  5. }
  6. func NewServiceContext(c config.Config) *ServiceContext {
  7. return &ServiceContext{
  8. Config: c,
  9. .....
  10. DqPusherClient: dq.NewProducer(c.DqConf.Beanstalks),
  11. }
  12. }

在 logic 中写业务逻辑使用 go-queue 的 dq client 发送消息到 beanstalk

  1. .......
  2. func (l *PusherLogic) Pusher() error {
  3. msg := "data"
  4. // 1、5s后执行
  5. deplayResp, err := l.svcCtx.DqPusherClient.Delay([]byte(msg), time.Second*5)
  6. if err != nil {
  7. logx.Errorf("error from DqPusherClient Delay err : %v", err)
  8. }
  9. logx.Infof("resp : %s", deplayResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
  10. // 2、在某个指定时间执行
  11. atResp, err := l.svcCtx.DqPusherClient.At([]byte(msg), time.Now())
  12. if err != nil {
  13. logx.Errorf("error from DqPusherClient Delay err : %v", err)
  14. }
  15. logx.Infof("resp : %s", atResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
  16. return nil
  17. }

go-zero 中使用 dq 消费者 consumer

项目中首先要拉取 go-queue 的依赖

  1. $ go get github.com/zeromicro/go-queue@latest

在 etc/xxx.yaml 配置文件中添加当前的 kafka 配置信息

  1. Name: dq
  2. Host: 0.0.0.0
  3. Port: 8889
  4. .....
  5. #dq
  6. DqConf:
  7. Beanstalks:
  8. - Endpoint: 127.0.0.1:7771
  9. Tube: tube1
  10. - Endpoint: 127.0.0.1:7772
  11. Tube: tube2
  12. Redis:
  13. Host: 127.0.0.1:6379
  14. Type: node

在 internal/config 下的 config.go 中定义 go 映射的配置

  1. package config
  2. import (
  3. "github.com/zeromicro/go-queue/dq"
  4. "github.com/zeromicro/go-zero/rest"
  5. )
  6. type Config struct {
  7. rest.RestConf
  8. .......
  9. DqConf dq.DqConf
  10. }

在 svc/serviceContext.go 中初始化 consumer 的 dq client

  1. type ServiceContext struct {
  2. Config config.Config
  3. .....
  4. DqConsumer dq.Consumer
  5. }
  6. func NewServiceContext(c config.Config) *ServiceContext {
  7. return &ServiceContext{
  8. Config: c,
  9. .....
  10. DqConsumer: dq.NewConsumer(c.DqConf),
  11. }
  12. }

logic 中消费延时消息

  1. func (l *PusherLogic) Consumer() error {
  2. l.svcCtx.DqConsumer.Consume(func(body []byte) {
  3. logx.Infof("consumer job %s \n", string(body))
  4. })
  5. }

写在最后,本身 beanstalk 不依赖 redis 的,但是 go-queue 为我们想的更周到防止短时间内重复消费,便使用了 redis 的 Setnx 帮我们在短时间内过滤掉消费过的消息

参考文献

  1. 《beanstalkd 介绍及安装》