在只消费实时发布的消息,而不需要保证持久性的应用程序中可以使用非持久化 topic。由于没有持久化消息的开销,使用非持久化 topic 可以减少消息发布的延迟。

以下说明和命令中所使用 topic 的名称结构如下:

  1. non-persistent://tenant/namespace/topic

非持久化 topic 资源

获取统计信息

获取非持久化 topic 的统计数据。

  • msgRateIn:本地发布者和副本发布者每秒发布消息速率和

  • msgThroughputIn:同上,但不是每秒发布消息数,而是每秒发布消息的字节数

  • msgRateOut:本地 consumer 和副本 consumer 每秒分发消息数量和

  • msgThroughputOut:同上,但不是每秒分发消息数,而是每秒分发消息的字节数

  • averageMsgSize:最后一次间隔内发布消息的平均字节大小

  • publishers: The list of all local publishers into the topic. There can be zero or thousands

  • averageMsgSize:此发布者在最后一个间隔内发布消息的平均字节大小

  • producerId:此 topic 内 producer 的内部标识符

  • producerName:客户端库为 producer 生成的内部标识符

  • address:连接到 producer 的 IP 地址和 source 端口

  • connectedSince:Producer 创建时或上次连接到此 producer 时的时间戳

  • subscriptions:Topic 所有本地订阅的列表

  • my-subscription:订阅名称(客户端定义)

  • type:订阅类型

  • consumers:连接到此订阅的 consumer 列表

  • consumerName:客户端库为 consumer 生成的内部标识符

  • availablePermits:Consumer 在客户端库监听队列中可以容纳的消息数量。 值小于 1 意味着客户端库队列已满,不能继续调用 receive()。 非负整数值意味着 consumer 可以随时接收消息。

  • replication:Topic 交叉副本的统计信息。

  • connected:外部复制器是否已经连接

  • inboundConnection:连接到此 broker 的远程集群发布者 broker 的 IP 和端口。

  • inboundConnectedSince:正在发布消息到远程集群的 TCP 连接。 如果没有连接到本地发布者,一分钟后连接将自动关闭。

  • msgDropRate:每次连接中,允许发布者 broker 配置的消息数量不能超过此参数值。若超过阈值,则丢弃所有阈值外的已发布消息。 若限制不可用或连接不可写入,broker 也会丢弃订阅的消息。

  1. {
  2. "msgRateIn": 4641.528542257553,
  3. "msgThroughputIn": 44663039.74947473,
  4. "msgRateOut": 0,
  5. "msgThroughputOut": 0,
  6. "averageMsgSize": 1232439.816728665,
  7. "storageSize": 135532389160,
  8. "msgDropRate" : 0.0,
  9. "publishers": [
  10. {
  11. "msgRateIn": 57.855383881403576,
  12. "msgThroughputIn": 558994.7078932219,
  13. "averageMsgSize": 613135,
  14. "producerId": 0,
  15. "producerName": null,
  16. "address": null,
  17. "connectedSince": null,
  18. "msgDropRate" : 0.0
  19. }
  20. ],
  21. "subscriptions": {
  22. "my-topic_subscription": {
  23. "msgRateOut": 0,
  24. "msgThroughputOut": 0,
  25. "msgBacklog": 116632,
  26. "type": null,
  27. "msgRateExpired": 36.98245516804671,
  28. "consumers" : [ {
  29. "msgRateOut" : 20343.506296021893,
  30. "msgThroughputOut" : 2.0979855364233278E7,
  31. "msgRateRedeliver" : 0.0,
  32. "consumerName" : "fe3c0",
  33. "availablePermits" : 950,
  34. "unackedMessages" : 0,
  35. "blockedConsumerOnUnackedMsgs" : false,
  36. "address" : "/10.73.210.249:60578",
  37. "connectedSince" : "2017-07-26 15:13:48.026-0700",
  38. "clientVersion" : "1.19-incubating-SNAPSHOT"
  39. } ],
  40. "msgDropRate" : 432.2390921571593
  41. }
  42. },
  43. "replication": {}
  44. }

pulsar-admin

可以使用 stats 命令来获取 topic 的统计信息。

  1. $ pulsar-admin non-persistent stats \
  2. non-persistent://test-tenant/ns1/tp1 \

REST API

GET /admin/v2/non-persistent/:tenant/:namespace/:topic/stats

Java

  1. String topic = "non-persistent://my-tenant/my-namespace/my-topic";
  2. admin.nonPersistentTopics().getStats(topic);

获取内部统计信息

获取 topic 的详细统计信息。

pulsar-admin

可以使用 stats-internal 命令来获取 topic 的内部统计信息。

  1. $ pulsar-admin non-persistent stats-internal \
  2. non-persistent://test-tenant/ns1/tp1 \
  3. {
  4. "entriesAddedCounter" : 48834,
  5. "numberOfEntries" : 0,
  6. "totalSize" : 0,
  7. "cursors" : {
  8. "s1" : {
  9. "waitingReadOp" : false,
  10. "pendingReadOps" : 0,
  11. "messagesConsumedCounter" : 0,
  12. "cursorLedger" : 0,
  13. "cursorLedgerLastEntry" : 0
  14. }
  15. }
  16. }

REST API

GET /admin/v2/non-persistent/:tenant/:namespace/:topic/internalStats

Java

  1. String topic = "non-persistent://my-tenant/my-namespace/my-topic";
  2. admin.nonPersistentTopics().getInternalStats(topic);

创建分区 topic

Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.

pulsar-admin

  1. $ bin/pulsar-admin non-persistent create-partitioned-topic \
  2. non-persistent://my-tenant/my-namespace/my-topic \
  3. --partitions 4

REST API

PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions

Java

  1. String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
  2. int numPartitions = 4;
  3. admin.nonPersistentTopics().createPartitionedTopic(topicName, numPartitions);

获取元数据

Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:

字段含义
partitionsTopic 分区的数量

pulsar-admin

  1. $ pulsar-admin non-persistent get-partitioned-topic-metadata \
  2. non-persistent://my-tenant/my-namespace/my-topic
  3. {
  4. "partitions": 4
  5. }

REST API

GET /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions

Java

  1. String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
  2. admin.nonPersistentTopics().getPartitionedTopicMetadata(topicName);

卸载 topic

可以卸载 topic。

pulsar-admin

可以使用 unload 命令卸载 topic。

  1. $ pulsar-admin non-persistent unload \
  2. non-persistent://test-tenant/ns1/tp1 \

REST API

PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/unload

Java

  1. String topic = "non-persistent://my-tenantmy-namespace/my-topic";
  2. admin.nonPersistentTopics().unload(topic);