Message queue

Overview

The message queue is a way of communicating between applications, enabling asynchronous communication and increasing the availability and scalability of the system.In go-zero, we used go-queue

Task Targets

  • Learn about the basic usage of go-queue
  • Learn how to use message queues in go-zero

Producer

go-queue producers are simple. Only kafka addresses are needed to create a Pusher object.

  1. NewPusher(addrs []string, topic string, opts ...PushOption)

Code Example

  1. pusher := kq.NewPusher([]string{
  2. "127.0.0.1:19092",
  3. "127.0.0.1:19093",
  4. "127.0.0.1:19094",
  5. }, "test")
  6. if err:=pusher.Push("foo");err!=nil{
  7. log.Fatal(err)
  8. }

Consumer

Configure Introduction

kq Configuration Structure states the following:

  1. type KqConf struct {
  2. service.ServiceConf
  3. Brokers []string
  4. Group string
  5. Topic string
  6. Offset string `json:",options=first|last,default=last"`
  7. Conns int `json:",default=1"`
  8. Consumers int `json:",default=8"`
  9. Processors int `json:",default=8"`
  10. MinBytes int `json:",default=10240"` // 10K
  11. MaxBytes int `json:",default=10485760"` // 10M
  12. Username string `json:",optional"`
  13. Password string `json:",optional"`
  14. ForceCommit bool `json:",default=true"`
  15. }

service.ServiceConf reference Basic Service Configuration

Message queue - 图1ParamsMessage queue - 图2DataTypeMessage queue - 图3Required?Message queue - 图4Default valueMessage queue - 图5Note
Brokers[]stringYESkafka cluster address
GroupstringYESConsumer Group
TopicstringYESMessage topic
OffsetstringNOlastStarting position of consumer consumption, optional values first and last
ConnsintNO1Number of connections
ConsumersintNO8Number of consumers
ProcessorsintNO8Number of Message Processors
MinBytesintNO10240Minimum number of bytes
MaxBytesintNO10485760Maximum number of messages
UsernamestringNOUsername
PasswordstringNOPassword
ForceCommitboolNOtrueForce Commit or not

Code Example

  • config.yaml
  • main.go
  1. Name: kq
  2. Brokers:
  3. - 127.0.0.1:19092
  4. - 127.0.0.1:19093
  5. - 127.0.0.1:19094
  6. Group: foo
  7. Topic: test
  8. Offset: first
  9. Consumers: 1
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/zeromicro/go-queue/kq"
  5. "github.com/zeromicro/go-zero/core/conf"
  6. )
  7. func main() {
  8. var c kq.KqConf
  9. conf.MustLoad("config.yaml", &c)
  10. q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
  11. fmt.Printf("=> %s\n", v)
  12. return nil
  13. }))
  14. defer q.Stop()
  15. q.Start()
  16. }