gkafka

gkafka模块实现了对kafka消息队列系统的客户端功能封装,支持分组消费指定起始位置等特性,并提供简便易用的API接口。

使用方式:

  1. import "gitee.com/johng/gf/g/database/gkafka"

方法列表:godoc.org/github.com/johng-cn/gf/g/database/gkafka

  1. type Client
  2. func NewClient(config *Config) *Client
  3. func (client *Client) AsyncSend(message *Message) error
  4. func (client *Client) Close()
  5. func (client *Client) MarkOffset(topic string, partition int, offset int, metadata ...string) error
  6. func (client *Client) Receive() (*Message, error)
  7. func (client *Client) SyncSend(message *Message) error
  8. func (client *Client) Topics() ([]string, error)
  9. type Config
  10. func NewConfig() *Config
  11. type Message
  12. func (msg *Message) MarkOffset()

使用示例

生产者

  1. package main
  2. import (
  3. "gitee.com/johng/gf/g/database/gkafka"
  4. "fmt"
  5. "gitee.com/johng/gf/g/os/gtime"
  6. "time"
  7. )
  8. // 创建kafka生产客户端
  9. func newKafkaClientProducer(topic string) *gkafka.Client {
  10. kafkaConfig := gkafka.NewConfig()
  11. kafkaConfig.Servers = "localhost:9092"
  12. kafkaConfig.AutoMarkOffset = false
  13. kafkaConfig.Topics = topic
  14. return gkafka.NewClient(kafkaConfig)
  15. }
  16. func main () {
  17. client := newKafkaClientProducer("test")
  18. defer client.Close()
  19. for {
  20. if err := client.SyncSend(&gkafka.Message{Value: []byte(gtime.Now().String())}); err != nil {
  21. fmt.Println(err)
  22. }
  23. time.Sleep(time.Second)
  24. }
  25. }

消费者

  1. package main
  2. import (
  3. "fmt"
  4. "gitee.com/johng/gf/g/database/gkafka"
  5. )
  6. // 创建kafka消费客户端
  7. func newKafkaClientConsumer(topic, group string) *gkafka.Client {
  8. kafkaConfig := gkafka.NewConfig()
  9. kafkaConfig.Servers = "localhost:9092"
  10. kafkaConfig.AutoMarkOffset = false
  11. kafkaConfig.Topics = topic
  12. kafkaConfig.GroupId = group
  13. return gkafka.NewClient(kafkaConfig)
  14. }
  15. func main () {
  16. group := "test-group"
  17. topic := "test"
  18. client := newKafkaClientConsumer(topic, group)
  19. defer client.Close()
  20. // 标记开始读取的offset位置
  21. client.MarkOffset(topic, 0, 6)
  22. for {
  23. if msg, err := client.Receive(); err != nil {
  24. fmt.Println(err)
  25. break
  26. } else {
  27. fmt.Println(msg.Partition, msg.Offset, string(msg.Value))
  28. msg.MarkOffset()
  29. }
  30. }
  31. }