在只消费实时发布的消息,而不需要保证持久性的应用程序中可以使用非持久化 topic。由于没有持久化消息的开销,使用非持久化 topic 可以减少消息发布的延迟。

以下说明和命令中所使用 topic 的名称结构如下:

  1. non-persistent://tenant/namespace/topic

非持久化 topic 资源

获取统计信息

获取非持久化 topic 的统计数据。

  • msgRateIn: The sum of all local and replication publishers’ publish rates in messages per second

  • msgThroughputIn: Same as above, but in bytes per second instead of messages per second

  • msgRateOut: The sum of all local and replication consumers’ dispatch rates in messages per second

  • msgThroughputOut: Same as above, but in bytes per second instead of messages per second

  • averageMsgSize: The average size in bytes of messages published within the last interval

  • publishers: The list of all local publishers into the topic. There can be zero or thousands

  • averageMsgSize: Average message size in bytes from this publisher within the last interval

  • producerId: Internal identifier for this producer on this topic

  • producerName: Internal identifier for this producer, generated by the client library

  • address: IP address and source port for the connection of this producer

  • connectedSince: Timestamp this producer was created or last reconnected

  • subscriptions: The list of all local subscriptions to the topic

  • my-subscription: The name of this subscription (client defined)

  • type: This subscription type

  • consumers: The list of connected consumers for this subscription

  • consumerName: Internal identifier for this consumer, generated by the client library

  • availablePermits: The number of messages this consumer has space for in the client library’s listen queue. 值小于 1 意味着客户端库队列已满,不能继续调用 receive()。 非负整数值意味着 consumer 可以随时接收消息。

  • replication: This section gives the stats for cross-colo replication of this topic

  • connected: Whether the outbound replicator is connected

  • inboundConnection: The IP and port of the broker in the remote cluster’s publisher connection to this broker

  • inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。

  • msgDropRate: for publisher: publish: broker only allows configured number of in flight per connection, and drops all other published messages above the threshold. 若限制不可用或连接不可写入,broker 也会丢弃订阅的消息。

  1. {
  2. "msgRateIn": 4641.528542257553,
  3. "msgThroughputIn": 44663039.74947473,
  4. "msgRateOut": 0,
  5. "msgThroughputOut": 0,
  6. "averageMsgSize": 1232439.816728665,
  7. "storageSize": 135532389160,
  8. "msgDropRate" : 0.0,
  9. "publishers": [
  10. {
  11. "msgRateIn": 57.855383881403576,
  12. "msgThroughputIn": 558994.7078932219,
  13. "averageMsgSize": 613135,
  14. "producerId": 0,
  15. "producerName": null,
  16. "address": null,
  17. "connectedSince": null,
  18. "msgDropRate" : 0.0
  19. }
  20. ],
  21. "subscriptions": {
  22. "my-topic_subscription": {
  23. "msgRateOut": 0,
  24. "msgThroughputOut": 0,
  25. "msgBacklog": 116632,
  26. "type": null,
  27. "msgRateExpired": 36.98245516804671,
  28. "consumers" : [ {
  29. "msgRateOut" : 20343.506296021893,
  30. "msgThroughputOut" : 2.0979855364233278E7,
  31. "msgRateRedeliver" : 0.0,
  32. "consumerName" : "fe3c0",
  33. "availablePermits" : 950,
  34. "unackedMessages" : 0,
  35. "blockedConsumerOnUnackedMsgs" : false,
  36. "address" : "/10.73.210.249:60578",
  37. "connectedSince" : "2017-07-26 15:13:48.026-0700",
  38. "clientVersion" : "1.19-incubating-SNAPSHOT"
  39. } ],
  40. "msgDropRate" : 432.2390921571593
  41. }
  42. },
  43. "replication": {}
  44. }

pulsar-admin

可以使用 stats 命令来获取 topic 的统计信息。

  1. $ pulsar-admin non-persistent stats \
  2. non-persistent://test-tenant/ns1/tp1 \

REST API

GET /admin/v2/non-persistent/:tenant/:namespace/:topic/stats

Java

  1. String topic = "non-persistent://my-tenant/my-namespace/my-topic";
  2. admin.nonPersistentTopics().getStats(topic);

获取内部统计信息

获取 topic 的详细统计信息。

pulsar-admin

可以使用 stats-internal 命令来获取 topic 的内部统计信息。

  1. $ pulsar-admin non-persistent stats-internal \
  2. non-persistent://test-tenant/ns1/tp1 \
  3. {
  4. "entriesAddedCounter" : 48834,
  5. "numberOfEntries" : 0,
  6. "totalSize" : 0,
  7. "cursors" : {
  8. "s1" : {
  9. "waitingReadOp" : false,
  10. "pendingReadOps" : 0,
  11. "messagesConsumedCounter" : 0,
  12. "cursorLedger" : 0,
  13. "cursorLedgerLastEntry" : 0
  14. }
  15. }
  16. }

REST API

GET /admin/v2/non-persistent/:tenant/:namespace/:topic/internalStats

Java

  1. String topic = "non-persistent://my-tenant/my-namespace/my-topic";
  2. admin.nonPersistentTopics().getInternalStats(topic);

创建分区 topic

Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.

Note

By default, after 60 seconds of creation, topics are considered inactive and deleted automatically to prevent from generating trash data.

To disable this feature, set brokerDeleteInactiveTopicsEnabled to false.

To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to your desired value.

For more information about these two parameters, see here.

pulsar-admin

  1. $ bin/pulsar-admin non-persistent create-partitioned-topic \
  2. non-persistent://my-tenant/my-namespace/my-topic \
  3. --partitions 4

REST API

PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions

Java

  1. String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
  2. int numPartitions = 4;
  3. admin.nonPersistentTopics().createPartitionedTopic(topicName, numPartitions);

获取元数据

Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:

字段含义
partitionsTopic 分区的数量

pulsar-admin

  1. $ pulsar-admin non-persistent get-partitioned-topic-metadata \
  2. non-persistent://my-tenant/my-namespace/my-topic
  3. {
  4. "partitions": 4
  5. }

REST API

GET /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions

Java

  1. String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
  2. admin.nonPersistentTopics().getPartitionedTopicMetadata(topicName);

卸载 topic

可以卸载 topic。

pulsar-admin

可以使用 unload 命令卸载 topic。

  1. $ pulsar-admin non-persistent unload \
  2. non-persistent://test-tenant/ns1/tp1 \

REST API

PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/unload

Java

  1. String topic = "non-persistent://my-tenantmy-namespace/my-topic";
  2. admin.nonPersistentTopics().unload(topic);