Golang SDK

与 C++/Python SDK 类似,Golang SDK 也分为 ProducerConsumer 两部分,下面对其进行介绍。

Producer

首先 import 必要的 package

  1. import (
  2. "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client"
  3. "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config"
  4. "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log"
  5. "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util"
  6. )

之后设置 Producer 的配置,下面例子中访问本地 Master,订阅 topic 为 demo_0

  1. cfg, err := config.ParseAddress("127.0.0.1:8715?topic=demo_0")

如果有多个 topic 需要进行访问,也可以直接对 config 的 topic 进行修改

  1. cfg.Producer.Topics = []string{"demo", "demo_0", "demo_1"}

配置完成后,新建 Producer 的实例,在这个过程中,SDK 会向 TubeMQ Master 申请注册,并发送心跳拿到 topic 的元数据

  1. p, err := client.NewProducer(cfg)

之后构造消息并发送即可

  1. demoData := "hello_tubemq"
  2. msg := client.Message{
  3. Topic: cfg.Producer.Topics[topicIndex], // 可以从订阅的 topic 列表中选择
  4. Data: []byte(demoData),
  5. DataLen: int32(len(demoData)),
  6. }
  7. success, errCode, errMsg := p.SendMessage(&msg) // 向 tubemq 发送 message,返回是否成功,错误码以及错误信息

Consumer

ConsumerProducer 的大致相同,除了在设置 config 时,有消费 group 的概念

  1. cfg, err := config.ParseAddress("127.0.0.1:8715?topic=demo_0&group=test_group")

之后参考 Producer 的用法进行消费即可

  1. c, err := client.NewConsumer(cfg) // 新建 Consumer 实例
  2. cr, err := c.GetMessage() // 获取消息
  3. _, err = c.Confirm(cr.ConfirmContext, true) // 获取后向 tubemq 进行 confirm

Example

上述文档内容为示例,省去了一些细节,完整可以运行的例子请参考 Golang SDK Example