消息队列

概述

消息队列是一种应用程序间通信的方式,它可以实现异步通信,提高系统的可用性和可扩展性。在 go-zero 中,我们使用了 go-queue

任务目标

  • 了解 go-queue 的基本使用
  • 了解如何在 go-zero 中使用消息队列

生产者

go-queue 生产者很简单,只需要 kafka 地址,topic 即可创建一个 Pusher 对象。

  1. NewPusher(addrs []string, topic string, opts ...PushOption)

代码示例

  1. pusher := kq.NewPusher([]string{
  2. "127.0.0.1:19092",
  3. "127.0.0.1:19093",
  4. "127.0.0.1:19094",
  5. }, "test")
  6. if err:=pusher.Push("foo");err!=nil{
  7. log.Fatal(err)
  8. }

消费者

配置介绍

kq 的配置结构体声明如下:

  1. type KqConf struct {
  2. service.ServiceConf
  3. Brokers []string
  4. Group string
  5. Topic string
  6. Offset string `json:",options=first|last,default=last"`
  7. Conns int `json:",default=1"`
  8. Consumers int `json:",default=8"`
  9. Processors int `json:",default=8"`
  10. MinBytes int `json:",default=10240"` // 10K
  11. MaxBytes int `json:",default=10485760"` // 10M
  12. Username string `json:",optional"`
  13. Password string `json:",optional"`
  14. ForceCommit bool `json:",default=true"`
  15. }

service.ServiceConf 请参考 基础服务配置

消息队列 - 图1参数消息队列 - 图2类型消息队列 - 图3是否必填消息队列 - 图4默认值消息队列 - 图5说明
Brokers[]stringkafka 集群地址
Groupstring消费者组
Topicstring消息主题
Offsetstringlast消费者消费的起始位置,可选值为 first 和 last
Connsint1连接数
Consumersint8消费者数
Processorsint8消息处理器数
MinBytesint10240消息最小字节数
MaxBytesint10485760消息最大字节数
Usernamestring用户名
Passwordstring密码
ForceCommitbooltrue是否强制提交

代码示例

  • config.yaml
  • main.go
  1. Name: kq
  2. Brokers:
  3. - 127.0.0.1:19092
  4. - 127.0.0.1:19093
  5. - 127.0.0.1:19094
  6. Group: foo
  7. Topic: test
  8. Offset: first
  9. Consumers: 1
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/zeromicro/go-queue/kq"
  5. "github.com/zeromicro/go-zero/core/conf"
  6. )
  7. func main() {
  8. var c kq.KqConf
  9. conf.MustLoad("config.yaml", &c)
  10. q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
  11. fmt.Printf("=> %s\n", v)
  12. return nil
  13. }))
  14. defer q.Stop()
  15. q.Start()
  16. }