Pulsar Go client

提示:目前,CGo 客户端将被废弃,如果你想要了解更多关于 CGo 客户端的信息, 请参阅 CGo 客户端文档

您可以使用 Pulsar Go 客户端 来创建使用 Go 语言的 Pulsar 生产者(producer)消费者(consumer)readers

API docs available as well
For standard API docs, consult the Godoc.

安装

安装 go 工具包

You can install the pulsar library locally using go get.

  1. $ go get -u "github.com/apache/pulsar-client-go/pulsar"

Once installed locally, you can import it into your project:

  1. import "github.com/apache/pulsar-client-go/pulsar"

连接 URL

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

Pulsar protocol URLs are assigned to specific clusters, use the pulsar scheme and have a default port of 6650. Here’s an example for localhost:

  1. pulsar://localhost:6650

如果你有多个 broker,你可以使用下面的方法设置 URl:

  1. pulsar://localhost:6550,localhost:6651,localhost:6652

A URL for a production Pulsar cluster may look something like this:

  1. pulsar://pulsar.us-west.example.com:6650

If you’re using TLS authentication, the URL will look like something like this:

  1. pulsar+ssl://pulsar.us-west.example.com:6651

创建客户端

In order to interact with Pulsar, you’ll first need a Client object. You can create a client object using the NewClient function, passing in a ClientOptions object (more on configuration below). 下面是一个示例:

  1. import (
  2. "log"
  3. "time"
  4. "github.com/apache/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. client, err := pulsar.NewClient(pulsar.ClientOptions{
  8. URL: "pulsar://localhost:6650",
  9. OperationTimeout: 30 * time.Second,
  10. ConnectionTimeout: 30 * time.Second,
  11. })
  12. if err != nil {
  13. log.Fatalf("Could not instantiate Pulsar client: %v", err)
  14. }
  15. defer client.Close()
  16. }

如果你有多个 broker,你可以按照下面方式初始化 Pulsar Client:

  1. import (
  2. "log"
  3. "time"
  4. "github.com/apache/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. client, err := pulsar.NewClient(pulsar.ClientOptions{
  8. URL: "pulsar://localhost:6650,localhost:6651,localhost:6652",
  9. OperationTimeout: 30 * time.Second,
  10. ConnectionTimeout: 30 * time.Second,
  11. })
  12. if err != nil {
  13. log.Fatalf("Could not instantiate Pulsar client: %v", err)
  14. }
  15. defer client.Close()
  16. }

The following configurable parameters are available for Pulsar clients:

名称 | 描述 | 默认 | :———— | :————— |:————— | | URL | 为 Pulsar 服务配置服务 URL

如果你有多个 broker,可以为单个 Client 设置多个 Pulsar 集群地址。

This parameter is required. |无 | | ConnectionTimeout | 建立TCP连接超时 | 30s | | OperationTimeout | 设置操作超时。 Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed| 30s| | Authentication | Configure the authentication provider. 示例: Authentication:NewAuthenticationTLS("my-cert.pem", "my-key.pem") | 无验证 | | TLSTrustCertsFilePath | 设置可信 TLS 证书文件的路径 | | | TLSAllowInsecureConnection | 配置 Pulsar 客户端是否接受来自 broker 的不受信任的 TLS 证书| false | | ListenerName | 配置 VPC 用户连接 Pulsar broker 的网络模型 | | | MaxConnectionsPerBroker | 单个 broker 链接池中保存的大链接数 | 1 | | CustomMetricsLabels | 向所有从此客户端实例上报的指标添加自定义标签 | | | Logger | 配置客户端使用的logger | logrus.StandardLogger |

Producers

Pulsar producers publish messages to Pulsar topics. You can configure Go producers using a ProducerOptions object. 下面是一个示例:

  1. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  2. Topic: "my-topic",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
  8. Payload: []byte("hello"),
  9. })
  10. defer producer.Close()
  11. if err != nil {
  12. fmt.Println("Failed to publish message", err)
  13. }
  14. fmt.Println("Published message")

Producer operations

Pulsar Go producers have the following methods available:

Method说明Return type
Topic()Fetches the producer’s topicstring
Name()Fetches the producer’s namestring
Send(context.Context, ProducerMessage)Publishes a message to the producer’s topic. This call will block until the message is successfully acknowledged by the Pulsar broker, or an error will be thrown if the timeout set using the SendTimeout in the producer’s configuration is exceeded.(MessageID, error)
SendAsync(context.Context, ProducerMessage, func(MessageID, *ProducerMessage, error))Send a message, this call will be blocking until is successfully acknowledged by the Pulsar broker.
LastSequenceID()Get the last sequence id that was published by this producer. his represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that was published and acknowledged by the broker.int64
Flush()Flush all the messages buffered in the client and wait until all messages have been successfully persisted.error
Close()Closes the producer and releases all resources allocated to it. If Close() is called then no more messages will be accepted from the publisher. This method will block until all pending publish requests have been persisted by Pulsar. If an error is thrown, no pending writes will be retried.

生产者示例

如何在生产者中使用消息路由器

  1. client, err := NewClient(pulsar.ClientOptions{
  2. URL: serviceURL,
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. // Only subscribe on the specific partition
  9. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  10. Topic: "my-partitioned-topic-partition-2",
  11. SubscriptionName: "my-sub",
  12. })
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. defer consumer.Close()
  17. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  18. Topic: "my-partitioned-topic",
  19. MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
  20. fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
  21. return 2
  22. },
  23. })
  24. if err != nil {
  25. log.Fatal(err)
  26. }
  27. defer producer.Close()

生产者如何使用 schema 接口

  1. type testJSON struct {
  2. ID int `json:"id"`
  3. Name string `json:"name"`
  4. }
  1. var (
  2. exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
  3. "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
  4. )
  1. client, err := NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. properties := make(map[string]string)
  9. properties["pulsar"] = "hello"
  10. jsonSchemaWithProperties := NewJSONSchema(exampleSchemaDef, properties)
  11. producer, err := client.CreateProducer(ProducerOptions{
  12. Topic: "jsonTopic",
  13. Schema: jsonSchemaWithProperties,
  14. })
  15. assert.Nil(t, err)
  16. _, err = producer.Send(context.Background(), &ProducerMessage{
  17. Value: &testJSON{
  18. ID: 100,
  19. Name: "pulsar",
  20. },
  21. })
  22. if err != nil {
  23. log.Fatal(err)
  24. }
  25. producer.Close()

如何在生产者中使用相应的延迟

  1. client, err := NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topicName := newTopicName()
  9. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  10. Topic: topicName,
  11. DisableBatching: true,
  12. })
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. defer producer.Close()
  17. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  18. Topic: topicName,
  19. SubscriptionName: "subName",
  20. Type: Shared,
  21. })
  22. if err != nil {
  23. log.Fatal(err)
  24. }
  25. defer consumer.Close()
  26. ID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
  27. Payload: []byte(fmt.Sprintf("test")),
  28. DeliverAfter: 3 * time.Second,
  29. })
  30. if err != nil {
  31. log.Fatal(err)
  32. }
  33. fmt.Println(ID)
  34. ctx, canc := context.WithTimeout(context.Background(), 1*time.Second)
  35. msg, err := consumer.Receive(ctx)
  36. if err != nil {
  37. log.Fatal(err)
  38. }
  39. fmt.Println(msg.Payload())
  40. canc()
  41. ctx, canc = context.WithTimeout(context.Background(), 5*time.Second)
  42. msg, err = consumer.Receive(ctx)
  43. if err != nil {
  44. log.Fatal(err)
  45. }
  46. fmt.Println(msg.Payload())
  47. canc()

Producer 配置

Name | Description | Default | :———— | :————— |:————— | | Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | | | Name | Name specify a name for the producer. If not assigned, the system will generate a globally unique name which can be access with Producer.ProducerName(). | | | Properties | Properties 将附加一组应用程序定义的属性。这个属性将会在主题统计中可见 | | | SendTimeout | sendTimeout 设置未被服务器确认的消息的超时时间 | 30s | | DisableBlockIfQueueFull | DisableBlockIfQueueFull 控制如果生产者的消息队列已满,则 Send 和 SendAsync 是否阻塞 | false | | MaxPendingMessages | MaxPendingMessages 设置队列的最大大小,该队列包含待处理的消息以接收来自代理的确认。 | | | HashingScheme | HashingScheme change the HashingScheme used to chose the partition on where to publish a particular message. | JavaStringHash | | CompressionType | CompressionType set the compression type for the producer. | 不压缩 | | 压缩等级 | 定义所需的压缩级别 | 选项有:Default 、Faster 和 Better | Default | | MessageRouter | MessageRouter 通过 MessageRouter 的实现设置了自定义消息路由策略 | | | DisableBatching | DisableBatching 控制是否为生产者启用消息的自动批处理 | false | | BatchingMaxPublishDelay | BatchingMaxPublishDelay set the time period within which the messages sent will be batched | 1ms | | BatchingMaxMessages | BatchingMaxMessages set the maximum number of messages permitted in a batch. | 1000 | | BatchingMaxSize | BatchingMaxSize 设置批处理中允许的最大字节数 | 128KB | | Schema | Schema 通过传递 Schema 的实现来设置自定义模式类型 | bytes[] | | Interceptors | 拦截器链 这些拦截器在 ProducerInterceptor 接口中定义的某些点被调用。 | None | | MaxReconnectToBroker | MaxReconnectToBroker 设置最大重链次数 | ultimate | | BatcherBuilderType | BatcherBuilderType 设置批处理生成器类型。 这用于在启用批处理时创建批处理容器。 选项有:DefaultBatchBuilder 和 KeyBasedBatchBuilder | DefaultBatchBuilder |

消费者

Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions object. Here’s a basic example that uses channels:

  1. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  2. Topic: "topic-1",
  3. SubscriptionName: "my-sub",
  4. Type: pulsar.Shared,
  5. })
  6. if err != nil {
  7. log.Fatal(err)
  8. }
  9. defer consumer.Close()
  10. for i := 0; i < 10; i++ {
  11. msg, err := consumer.Receive(context.Background())
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
  16. msg.ID(), string(msg.Payload()))
  17. consumer.Ack(msg)
  18. }
  19. if err := consumer.Unsubscribe(); err != nil {
  20. log.Fatal(err)
  21. }

消费者操作

Pulsar Go consumers have the following methods available:

Method说明Return type
Subscription()Returns the consumer’s subscription namestring
Unsubcribe()Unsubscribes the consumer from the assigned topic. Throws an error if the unsubscribe operation is somehow unsuccessful.error
Receive(context.Context)Receives a single message from the topic. This method blocks until a message is available.(Message, error)
Chan()Chan 返回一个传递消息的 channel<-chan ConsumerMessage
Ack(Message)Acknowledges a message to the Pulsar broker
AckID(MessageID)Acknowledges a message to the Pulsar broker by message ID
ReconsumeLater(msg Message, delay time.Duration)ReconsumeLater 标记消息以在自定义延迟后重新发送
Nack(Message)Acknowledge the failure to process a single message.
NackID(MessageID)Acknowledge the failure to process a single message.
Seek(msgID MessageID)Reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.error
SeekByTime(time time.Time)Reset the subscription associated with this consumer to a specific message publish time.error
Close()Closes the consumer, disabling its ability to receive messages from the broker
Name()Name 返回消费者名称string

Receive example

如何使用正则表达(式) 的消费者

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. defer client.Close()
  5. p, err := client.CreateProducer(pulsar.ProducerOptions{
  6. Topic: topicInRegex,
  7. DisableBatching: true,
  8. })
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. defer p.Close()
  13. topicsPattern := fmt.Sprintf("persistent://%s/foo.*", namespace)
  14. opts := pulsar.ConsumerOptions{
  15. TopicsPattern: topicsPattern,
  16. SubscriptionName: "regex-sub",
  17. }
  18. consumer, err := client.Subscribe(opts)
  19. if err != nil {
  20. log.Fatal(err)
  21. }
  22. defer consumer.Close()

如何使用多topic 的Consumer

  1. func newTopicName() string {
  2. return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
  3. }
  4. topic1 := "topic-1"
  5. topic2 := "topic-2"
  6. client, err := NewClient(pulsar.ClientOptions{
  7. URL: "pulsar://localhost:6650",
  8. })
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. topics := []string{topic1, topic2}
  13. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  14. Topics: topics,
  15. SubscriptionName: "multi-topic-sub",
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer consumer.Close()

如何使用消费监听器

  1. import (
  2. "fmt"
  3. "log"
  4. "github.com/apache/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
  8. if err != nil {
  9. log.Fatal(err)
  10. }
  11. defer client.Close()
  12. channel := make(chan pulsar.ConsumerMessage, 100)
  13. options := pulsar.ConsumerOptions{
  14. Topic: "topic-1",
  15. SubscriptionName: "my-subscription",
  16. Type: pulsar.Shared,
  17. }
  18. options.MessageChannel = channel
  19. consumer, err := client.Subscribe(options)
  20. if err != nil {
  21. log.Fatal(err)
  22. }
  23. defer consumer.Close()
  24. // Receive messages from channel. The channel returns a struct which contains message and the consumer from where
  25. // the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
  26. // shared across multiple consumers as well
  27. for cm := range channel {
  28. msg := cm.Message
  29. fmt.Printf("Received message msgId: %v -- content: '%s'\n",
  30. msg.ID(), string(msg.Payload()))
  31. consumer.Ack(msg)
  32. }
  33. }

如何使用消费者接收超时器

  1. client, err := NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topic := "test-topic-with-no-messages"
  9. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  10. defer cancel()
  11. // create consumer
  12. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  13. Topic: topic,
  14. SubscriptionName: "my-sub1",
  15. Type: Shared,
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer consumer.Close()
  21. msg, err := consumer.Receive(ctx)
  22. fmt.Println(msg.Payload())
  23. if err != nil {
  24. log.Fatal(err)
  25. }

如何在消费者中使用schema

  1. type testJSON struct {
  2. ID int `json:"id"`
  3. Name string `json:"name"`
  4. }
  1. var (
  2. exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
  3. "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
  4. )
  1. client, err := NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. var s testJSON
  9. consumerJS := NewJSONSchema(exampleSchemaDef, nil)
  10. consumer, err := client.Subscribe(ConsumerOptions{
  11. Topic: "jsonTopic",
  12. SubscriptionName: "sub-1",
  13. Schema: consumerJS,
  14. SubscriptionInitialPosition: SubscriptionPositionEarliest,
  15. })
  16. assert.Nil(t, err)
  17. msg, err := consumer.Receive(context.Background())
  18. assert.Nil(t, err)
  19. err = msg.GetSchemaValue(&s)
  20. if err != nil {
  21. log.Fatal(err)
  22. }
  23. defer consumer.Close()

Consumer configuration

Name | Description | Default | :———— | :————— |:————— | | Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | | | Topics | Specify a list of topics this consumer will subscribe on. Either a topic, a list of topics or a topics pattern are required when subscribing| | | TopicsPattern | Specify a regular expression to subscribe to multiple topics under the same namespace. Either a topic, a list of topics or a topics pattern are required when subscribing | | | AutoDiscoveryPeriod | Specify the interval in which to poll for new partitions or new topics if using a TopicsPattern. | | | SubscriptionName | Specify the subscription name for this consumer. This argument is required when subscribing | | | Name | Set the consumer name | | | Properties | Properties attach a set of application defined properties to the producer This properties will be visible in the topic stats | | | Type | Select the subscription type to be used when subscribing to the topic. | Exclusive | | SubscriptionInitialPosition | InitialPosition at which the cursor will be set when subscribe | Latest | | DLQ | Configuration for Dead Letter Queue consumer policy. | no DLQ | | MessageChannel | Sets a MessageChannel for the consumer. When a message is received, it will be pushed to the channel for consumption | | | ReceiverQueueSize | Sets the size of the consumer receive queue. | 1000 | | NackRedeliveryDelay| 重新传递未能处理的消息的延迟时间 | 1 min | | ReadCompact | 如果启用,消费者将从压缩的主题中读取消息,而不是读取主题的完整消息积压 | false | | ReplicateSubscriptionState | 将订阅标记为已复制以使其跨集群保持同步 | false | | KeySharedPolicy | Key Shared 消费者策略的配置。 | | | RetryEnable | 自动重试开关,打开会将消息发送到默认填写的 DLQPolicy 主题 | false | | Interceptors | 拦截器链。 这些拦截器在 ConsumerInterceptor 接口中定义的某些点被调用。 | | | MaxReconnectToBroker | MaxReconnectToBroker 设置了重链 broker 的最大重试次数。 | ultimate | | Schema | Schema 通过传递 Schema 的实现来设置自定义模式类型 | bytes[] |

Reader

Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recent unacked message). You can configure Go readers using a ReaderOptions object. 下面是一个示例:

  1. reader, err := client.CreateReader(pulsar.ReaderOptions{
  2. Topic: "topic-1",
  3. StartMessageID: pulsar.EarliestMessageID(),
  4. })
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. defer reader.Close()

Reader operations

Pulsar Go readers have the following methods available:

Method说明Return type
Topic()Returns the reader’s topicstring
Next(context.Context)Receives the next message on the topic (analogous to the Receive method for consumers). This method blocks until a message is available.(Message, error)
HasNext()Check if there is any message available to read from the current position(bool, error)
Close()Closes the reader, disabling its ability to receive messages from the brokererror
Seek(MessageID)将与此 reader 关联的订阅重置为特定的消息IDerror
SeekByTime(time time.Time)将与此 reader 关联的订阅重置为特定的消息投递时间error

Reader 示例

如何使用阅读器读取“下一个”消息

Here’s an example usage of a Go reader that uses the Next() method to process incoming messages:

  1. import (
  2. "context"
  3. "fmt"
  4. "log"
  5. "github.com/apache/pulsar-client-go/pulsar"
  6. )
  7. func main() {
  8. client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. defer client.Close()
  13. reader, err := client.CreateReader(pulsar.ReaderOptions{
  14. Topic: "topic-1",
  15. StartMessageID: pulsar.EarliestMessageID(),
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer reader.Close()
  21. for reader.HasNext() {
  22. msg, err := reader.Next(context.Background())
  23. if err != nil {
  24. log.Fatal(err)
  25. }
  26. fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
  27. msg.ID(), string(msg.Payload()))
  28. }
  29. }

In the example above, the reader begins reading from the earliest available message (specified by pulsar.EarliestMessage). The reader can also begin reading from the latest message (pulsar.LatestMessage) or some other message ID specified by bytes using the DeserializeMessageID function, which takes a byte array and returns a MessageID object. 下面是一个示例:

  1. lastSavedId := // Read last saved message id from external store as byte[]
  2. reader, err := client.CreateReader(pulsar.ReaderOptions{
  3. Topic: "my-golang-topic",
  4. StartMessageID: pulsar.DeserializeMessageID(lastSavedId),
  5. })

如何使用阅读器读取特定消息

  1. client, err := NewClient(pulsar.ClientOptions{
  2. URL: lookupURL,
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topic := "topic-1"
  9. ctx := context.Background()
  10. // create producer
  11. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  12. Topic: topic,
  13. DisableBatching: true,
  14. })
  15. if err != nil {
  16. log.Fatal(err)
  17. }
  18. defer producer.Close()
  19. // send 10 messages
  20. msgIDs := [10]MessageID{}
  21. for i := 0; i < 10; i++ {
  22. msgID, err := producer.Send(ctx, &pulsar.ProducerMessage{
  23. Payload: []byte(fmt.Sprintf("hello-%d", i)),
  24. })
  25. assert.NoError(t, err)
  26. assert.NotNil(t, msgID)
  27. msgIDs[i] = msgID
  28. }
  29. // create reader on 5th message (not included)
  30. reader, err := client.CreateReader(pulsar.ReaderOptions{
  31. Topic: topic,
  32. StartMessageID: msgIDs[4],
  33. })
  34. if err != nil {
  35. log.Fatal(err)
  36. }
  37. defer reader.Close()
  38. // receive the remaining 5 messages
  39. for i := 5; i < 10; i++ {
  40. msg, err := reader.Next(context.Background())
  41. if err != nil {
  42. log.Fatal(err)
  43. }
  44. // create reader on 5th message (included)
  45. readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
  46. Topic: topic,
  47. StartMessageID: msgIDs[4],
  48. StartMessageIDInclusive: true,
  49. })
  50. if err != nil {
  51. log.Fatal(err)
  52. }
  53. defer readerInclusive.Close()

Reader configuration

Name | Description | Default | :———— | :————— |:————— | | Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | | | Name | Name set the reader name. | | | Properties | Attach a set of application defined properties to the reader. This properties will be visible in the topic stats | | | StartMessageID | StartMessageID initial reader positioning is done by specifying a message id. | | | StartMessageIDInclusive | If true, the reader will start at the StartMessageID, included. Default is false and the reader will start from the “next” message | false | | MessageChannel | MessageChannel sets a MessageChannel for the consumer When a message is received, it will be pushed to the channel for consumption| | | ReceiverQueueSize | ReceiverQueueSize sets the size of the consumer receive queue. | 1000 | | SubscriptionRolePrefix| SubscriptionRolePrefix set the subscription role prefix. | “reader” | | ReadCompacted | If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog of the topic. ReadCompacted can only be enabled when reading from a persistent topic. | false|

消息

The Pulsar Go client provides a ProducerMessage interface that you can use to construct messages to producer on Pulsar topics. Here’s an example message:

  1. msg := pulsar.ProducerMessage{
  2. Payload: []byte("Here is some message data"),
  3. Key: "message-key",
  4. Properties: map[string]string{
  5. "foo": "bar",
  6. },
  7. EventTime: time.Now(),
  8. ReplicationClusters: []string{"cluster1", "cluster3"},
  9. }
  10. if _, err := producer.send(msg); err != nil {
  11. log.Fatalf("Could not publish message due to: %v", err)
  12. }

The following methods parameters are available for ProducerMessage objects:

参数名说明
PayloadThe actual data payload of the message
ValueValue and payload is mutually exclusive, Value interface{} for schema message.
KeyThe optional key associated with the message (particularly useful for things like topic compaction)
OrderingKeyOrderingKey 设置消息的排序键。
PropertiesA key-value map (both keys and values must be strings) for any application-specific metadata attached to the message
EventTimeThe timestamp associated with the message
ReplicationClustersThe clusters to which this message will be replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default.
SequenceIDSet the sequence id to assign to the current message
DeliverAfter仅在特定的相对延迟后才请求传递消息
DeliverAt仅在特定的绝对时间戳或(在特定的绝对时间戳)之后传递消息

TLS 加密和身份验证

In order to use TLS encryption, you’ll need to configure your client to do so:

  • 使用 pulsar+ssl URL 类型
  • 设置 TLSTrustCertCertsFilePath 到你的客户端和 Pulsar broker 使用的 TLS 证书路径
  • 配置 认证 选项

下面是一个示例:

  1. opts := pulsar.ClientOptions{
  2. URL: "pulsar+ssl://my-cluster.com:6651",
  3. TLSTrustCertsFilePath: "/path/to/certs/my-cert.csr",
  4. Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem"),
  5. }

OAuth2 身份验证

To use OAuth2 authentication, you’ll need to configure your client to perform the following operations. This example shows how to configure OAuth2 authentication.

  1. oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
  2. "type": "client_credentials",
  3. "issuerUrl": "https://dev-kt-aa9ne.us.auth0.com",
  4. "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/",
  5. "privateKey": "/path/to/privateKey",
  6. "clientId": "0Xx...Yyxeny",
  7. })
  8. client, err := pulsar.NewClient(pulsar.ClientOptions{
  9. URL: "pulsar://my-cluster:6650",
  10. Authentication: oauth,
  11. })