消息队列

go-queue 之 kq(kafka)

消息队列对于大型微服务系统是必不可少的,主要是用来解决削峰、降低服务之间的耦合度以及异步能力。

go-queue 在 segmentio/kafka-go 这个包基础上,使用 go-zero 进行了上层统一封装,让开发人员更容易上手,将更多时间聚焦在开发业务上。https://github.com/zeromicro/go-queue

1.1 Config

  1. type KqConf struct {
  2. service.ServiceConf
  3. Brokers []string
  4. Group string
  5. Topic string
  6. Offset string `json:",options=first|last,default=last"`
  7. Conns int `json:",default=1"`
  8. Consumers int `json:",default=8"`
  9. Processors int `json:",default=8"`
  10. MinBytes int `json:",default=10240"` // 10K
  11. MaxBytes int `json:",default=10485760"` // 10M
  12. Username string `json:",optional"`
  13. Password string `json:",optional"`
  14. }
  • Brokers: kafka 的多个 Broker 节点

  • Group:消费者组

  • Topic:订阅的 Topic 主题

  • Offset:如果新的 topic kafka 没有对应的 offset 信息,或者当前的 offset 无效了(历史数据被删除),那么需要指定从头(first)消费还是从尾(last)部消费

  • Conns: 一个 kafka queue 对应可对应多个 consumer,Conns 对应 kafka queue 数量,可以同时初始化多个 kafka queue,默认只启动一个

  • Consumers : go-queue 内部是起多个 goroutine 从 kafka 中获取信息写入进程内的 channel,这个参数是控制此处的 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)

  • Processors: 当 Consumers 中的多个 goroutine 将 kafka 消息拉取到进程内部的 channel 后,我们要真正消费消息写入我们自己逻辑,go-queue 内部通过此参数控制当前消费的并发 goroutine 数量

  • MinBytes: fetch 一次返回的最小字节数,如果不够这个字节数就等待.

  • MaxBytes: fetch 一次返回的最大字节数,如果第一条消息的大小超过了这个限制仍然会继续拉取保证 consumer 的正常运行.因此并不是一个绝对的配置,消息的大小还需要受到 broker 的message.max.bytes限制,以及 topic 的max.message.bytes的限制

  • Username: kafka 的账号

  • Password:kafka 的密码

1.2 go-zero 中使用 go-queue 生产者 pusher

项目中首先要拉取 go-queue 的依赖

  1. $ go get github.com/zeromicro/go-queue@latest

在 etc/xxx.yaml 配置文件中添加当前的 kafka 配置信息

  1. Name: mq
  2. Host: 0.0.0.0
  3. Port: 8888
  4. ......
  5. KqPusherConf:
  6. Brokers:
  7. - 127.0.0.1:9092
  8. Topic: payment-success

在 internal/config 下的 config.go 中定义 go 映射的配置

  1. type Config struct {
  2. ......
  3. KqPusherConf struct {
  4. Brokers []string
  5. Topic string
  6. }
  7. }

在 svc/serviceContext.go 中初始化 pusher 的 kq client

  1. type ServiceContext struct {
  2. Config config.Config
  3. .....
  4. KqPusherClient *kq.Pusher
  5. }
  6. func NewServiceContext(c config.Config) *ServiceContext {
  7. return &ServiceContext{
  8. Config: c,
  9. .....
  10. KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),
  11. }
  12. }

在 logic 中写业务逻辑使用 go-queue 的 kq client 发送消息到 kafka

  1. .......
  2. func (l *PusherLogic) Pusher() error {
  3. //......业务逻辑....
  4. data := "zhangSan"
  5. if err := l.svcCtx.KqPusherClient.Push(data); err != nil {
  6. logx.Errorf("KqPusherClient Push Error , err :%v", err)
  7. }
  8. return nil
  9. }

另外,我们在 svc/serviceContext.go 中初始化 pusher 的 kq client 时候,我们可以传递一些可选参数,kq.NewPusher 第三个参数是 options,就是支持传递的可选参数

  • chunkSize : 由于效率问题,kq client 是批量提交,批量消息体达到此大小才会提交给 kafka。
  • flushInterval:间隔多久提交一次。即使未达到 chunkSize 但是达到了这个间隔时间也会向 kafka 提交

1.3 go-zero 中使用 go-queue 消费者 consumer

项目中首先要拉取 go-queue 的依赖

  1. $ go get github.com/zeromicro/go-queue@latest

在 etc/xxx.yaml 配置文件中添加当前的 kafka 配置信息

  1. Name: mq
  2. Host: 0.0.0.0
  3. Port: 8888
  4. #kq
  5. KqConsumerConf:
  6. Name: kqConsumer
  7. Brokers:
  8. - 127.0.1:9092
  9. Group: kqConsumer
  10. Topic: payment-success
  11. Offset: first
  12. Consumers: 8
  13. Processors: 8

在 internal/config 下的 config.go 中定义 go 映射的配置

  1. package config
  2. import (
  3. "github.com/zeromicro/go-queue/kq"
  4. "github.com/zeromicro/go-zero/rest"
  5. )
  6. type Config struct {
  7. rest.RestConf
  8. .......
  9. KqConsumerConf kq.KqConf
  10. }

在 internal 下新建一个 mqs 文件夹

在 mqs 文件夹下新建一个 paymentSuccess 消费者 paymentSuccess.go

  1. package mqs
  2. import (
  3. "context"
  4. "github.com/zeromicro/go-zero/core/logx"
  5. "zerodocgo/internal/svc"
  6. )
  7. type PaymentSuccess struct {
  8. ctx context.Context
  9. svcCtx *svc.ServiceContext
  10. }
  11. func NewPaymentSuccess(ctx context.Context, svcCtx *svc.ServiceContext) *PaymentSuccess {
  12. return &PaymentSuccess{
  13. ctx: ctx,
  14. svcCtx: svcCtx,
  15. }
  16. }
  17. func (l *PaymentSuccess) Consume(key, val string) error {
  18. logx.Infof("PaymentSuccess key :%s , val :%s", key, val)
  19. return nil
  20. }

在 mqs 文件夹下新建一个文件 mqs.go 用来监听多个消费者,mqs.go 代码如下

  1. package mqs
  2. import (
  3. "context"
  4. "zerodocgo/internal/config"
  5. "zerodocgo/internal/svc"
  6. "github.com/zeromicro/go-queue/kq"
  7. "github.com/zeromicro/go-zero/core/service"
  8. )
  9. func Consumers(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
  10. return []service.Service{
  11. //Listening for changes in consumption flow status
  12. kq.MustNewQueue(c.KqConsumerConf, NewPaymentSuccess(ctx, svcContext)),
  13. //.....
  14. }
  15. }

在 main.go 中启动 consumers 等待消费

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "github.com/zeromicro/go-zero/core/service"
  6. "zerodocgo/internal/mqs"
  7. "zerodocgo/internal/svc"
  8. "github.com/zeromicro/go-zero/core/conf"
  9. "github.com/zeromicro/go-zero/rest"
  10. "zerodocgo/internal/config"
  11. )
  12. var configFile = flag.String("f", "etc/mq.yaml", "the config file")
  13. func main() {
  14. flag.Parse()
  15. var c config.Config
  16. conf.MustLoad(*configFile, &c)
  17. server := rest.MustNewServer(c.RestConf)
  18. defer server.Stop()
  19. svcCtx := svc.NewServiceContext(c)
  20. ctx := context.Background()
  21. serviceGroup := service.NewServiceGroup()
  22. defer serviceGroup.Stop()
  23. for _, mq := range mqs.Consumers(c,ctx, svcCtx) {
  24. serviceGroup.Add(mq)
  25. }
  26. serviceGroup.Start()
  27. }

当然,consumer 中在 mqs.go 中 kq.MustNewQueue 初始化时候点个参数也是可选参数

  • commitInterval : 提交给 kafka broker 间隔时间,默认是 1s
  • queueCapacity:kafka 内部队列长度
  • maxWait:从 kafka 批量获取数据时,等待新数据到来的最大时间。
  • metrics:上报消费每个消息消费时间,默认会内部初始化,一般也不需要指定