Message queue

go-queue 之 kq(kafka)

Message queues are essential for large microservice systems, mainly to address peaks, reduce coupling between services and asynchronous capabilities.

go-queue 在 segmentio/kafka-go 这个包基础上,使用 go-zero 进行了上层统一封装,让开发人员更容易上手,将更多时间聚焦在开发业务上。https://github.com/zeromicro/go-queue

1.1 Config

  1. type KqConf struct {
  2. service.ServiceConf
  3. Brokers []string
  4. Group string
  5. Topic string
  6. Offset string `json:",options=first|last,default=last"`
  7. Conns int `json:",default=1"`
  8. Consumers int `json:",default=8"`
  9. Processors int `json:",default=8"`
  10. MinBytes int `json:",default=10240"` // 10K
  11. MaxBytes int `json:",default=10485760"` // 10M
  12. Username string `json:",optional"`
  13. Password string `json:",optional"`
  14. }
  • Brokers: kafka multiple Broker nodes

  • Group:Consumer Group

  • Topic: The subscribed topic

  • Offset:if the new topic kafka has no offset information or the current offset is invalid (history data is deleted), you need to specify whether to consume from scratch (first) or from end(last)

  • Conns: A kafka queue counterpart can correspond to multiple consumers, Connecs to the number of kafka queue and can be initialized multiple kafka queue, only one by default

  • Consumers: go-queue internal is a channel in which multiple goroutine obtains information from kafka into the writing process that controls the number of goroutine here (⚠️ not the amount of concentrates on real consumption)

  • Processors: When multiple goroutine among Consumers pulled kafka messages to channels within the process, we write the true consumption message into our own logic, using this parameter to control the amount of congeners currently consumed by the go-queue.

  • MinBytes: the minimum number of bytes returned at a time, if this number is not enough.

  • MaxBytes: the maximum number of bytes returned at a time. If the first message exceeds this limit, it will continue to pull the guaranteed consumer running. So it is not an absolute configuration, the message size also needs to be covered by the broker’smessage.max.byteslimit, and the topmax.message.bytes

  • Username: kafka account

  • Password:kafka password

1.2 Use go-queue Producer Pusher in go-Zero

First pull go-queue dependencies in the project

  1. $ go get github.com/zeromicro/go-queue@latest

Add current kafka configuration information to the etc/xxx.yaml configuration file

  1. Name: mq
  2. Host: 0.0.0.0
  3. Port: 8888
  4. ......
  5. KqPusherConf:
  6. Brokers:
  7. - 127.0.0.1:9092
  8. Topic: payment-success

Define configuration of go mapping in config.go under internal/config

  1. type Config struct {
  2. ......
  3. KqPusherConf struct {
  4. Brokers []string
  5. Topic string
  6. }
  7. }

Initialize a kq client in svc/serviceContext.go

  1. type ServiceContext struct {
  2. Config config.Config
  3. .....
  4. KqPusherClient *kq.Pusher
  5. }
  6. func NewServiceContext(c config.Config) *ServiceContext {
  7. return &ServiceContext{
  8. Config: c,
  9. .....
  10. KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),
  11. }
  12. }

Write business logic in logic, using kq client of go-queue to send messages to kafka

  1. .......
  2. func (l *PusherLogic) Pusher() error {
  3. //......业务逻辑....
  4. data := "zhangSan"
  5. if err := l.svcCtx.KqPusherClient.Push(data); err != nil {
  6. logx.Errorf("KqPusherClient Push Error , err :%v", err)
  7. }
  8. return nil
  9. }

Also, when we initialize a kq client in svc/serviceContext.go we can pass some optional parameters, kq.NewPusher is a third argument that supports passages

  • chunkSize : Due to efficiency problems, kq customers are mass submissions and bulk messages reach this size before they are submitted to kafka.
  • flushInterval: How often is flushInterva is submitted.This interval will be submitted to kafka even if the chunkSize is not reached

1.3 Use go-queue consumer consumer in go-zero

First pull go-queue dependencies in project

  1. $ go get github.com/zeromicro/go-queue@latest

Add current kafka configuration information to the etc/xxx.yaml configuration file

  1. Name: mq
  2. Host: 0.0.0.0
  3. Port: 8888
  4. #kq
  5. KqConsumerConf:
  6. Name: kqConsumer
  7. Brokers:
  8. - 127.0.1:9092
  9. Group: kqConsumer
  10. Topic: payment-success
  11. Offset: first
  12. Consumers: 8
  13. Processors: 8

Define configuration of go mapping in config.go under internal/config

  1. package config
  2. import (
  3. "github.com/zeromicro/go-queue/kq"
  4. "github.com/zeromicro/go-zero/rest"
  5. )
  6. type Config struct {
  7. rest.RestConf
  8. .......
  9. KqConsumerConf kq.KqConf
  10. }

Create a mqs folder under internal

Create a new paymentSuccess Consumer paymentSuccess.gounder the mqs folder

  1. package mqs
  2. import (
  3. "context"
  4. "github.com/zeromicro/go-zero/core/logx"
  5. "zerodocgo/internal/svc"
  6. )
  7. type PaymentSuccess struct {
  8. ctx context.Context
  9. svcCtx *svc.ServiceContext
  10. }
  11. func NewPaymentSuccess(ctx context.Context, svcCtx *svc.ServiceContext) *PaymentSuccess {
  12. return &PaymentSuccess{
  13. ctx: ctx,
  14. svcCtx: svcCtx,
  15. }
  16. }
  17. func (l *PaymentSuccess) Consume(key, val string) error {
  18. logx.Infof("PaymentSuccess key :%s , val :%s", key, val)
  19. return nil
  20. }

Create a new file mqs.go under the mqs folder to listen to multiple consumers, mqs.go

  1. package mqs
  2. import (
  3. "context"
  4. "zerodocgo/internal/config"
  5. "zerodocgo/internal/svc"
  6. "github.com/zeromicro/go-queue/kq"
  7. "github.com/zeromicro/go-zero/core/service"
  8. )
  9. func Consumers(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
  10. return []service.Service{
  11. //Listening for changes in consumption flow status
  12. kq.MustNewQueue(c.KqConsumerConf, NewPaymentSuccess(ctx, svcContext)),
  13. //.....
  14. }
  15. }

Start consumption waiting for consumption in main.go

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "github.com/zeromicro/go-zero/core/service"
  6. "zerodocgo/internal/mqs"
  7. "zerodocgo/internal/svc"
  8. "github.com/zeromicro/go-zero/core/conf"
  9. "github.com/zeromicro/go-zero/rest"
  10. "zerodocgo/internal/config"
  11. )
  12. var configFile = flag.String("f", "etc/mq.yaml", "the config file")
  13. func main() {
  14. flag.Parse()
  15. var c config.Config
  16. conf.MustLoad(*configFile, &c)
  17. server := rest.MustNewServer(c.RestConf)
  18. defer server.Stop()
  19. svcCtx := svc.NewServiceContext(c)
  20. ctx := context.Background()
  21. serviceGroup := service.NewServiceGroup()
  22. defer serviceGroup.Stop()
  23. for _, mq := range mqs.Consumers(ctx, svcCtx) {
  24. serviceGroup.Add(mq)
  25. }
  26. serviceGroup.Start()
  27. }

Of course, point arguments in consumer are optional when initializing kq.MustNewQueue in mqs.go

  • commitInterval: Commit to kafka broker interval, default is 1s
  • queueCapacity:kafka internal queue length
  • maxWait:Max time to wait for new data when fetches data from kafka bulks.
  • metrics:report consumption time per message, default will initialize internally, and usually no need to specify