Managing non-partitioned topics

您可以使用Pulsar的admin API来创建和管理非分区主题。

本教程中,topic 名称的结构为:

  1. persistent://tenant/namespace/topic

非分区主题相关资源

创建

Pulsar中的非分区主题必须显式创建。 在创建一个新的非分区主题时,您必须提供为该分区提供一个分区名称。

Note

By default, after 60 seconds of creation, topics are considered inactive and deleted automatically to prevent from generating trash data.

To disable this feature, set brokerDeleteInactiveTopicsEnabled to false.

To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to your desired value.

For more information about these two parameters, see here.

pulsar-admin

您可以使用create命令来创建非分区主题,并指定分区名称作为一个参数。 下面是一个示例:

  1. $ bin/pulsar-admin topics create \
  2. persistent://my-tenant/my-namespace/my-topic

Note

如果已经存在一个同名的分区主题,例如’xyz-topic’,并且其分区数量比要创建的主题名称中包含的数字要大,在本例中为11(分区索引从0开始),那么只有要创建的非分区主题的名称中包含由’-partition-‘及紧接着的一个数字组成的后缀时,例如,’xyz-topic-partition-10’,创建非分区主题才是允许的。 否则,该主题的创建将会失败。

REST API

PUT /admin/v2/persistent/:tenant/:namespace/:topic

Java

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().createNonPartitionedTopic(topicName);

删除

pulsar-admin

要删除非分区主题,可以使用 删除 命令,并按名称指定主题:

  1. $ bin/pulsar-admin topics delete \
  2. persistent://my-tenant/my-namespace/my-topic

REST API

DELETE /admin/v2/persistent/:tenant/:namespace/:topic

Java

  1. admin.topics().delete(persistentTopic);

获取资源列表

该命令列出指定命名空间下的主题列表。

pulsar-admin

  1. $ pulsar-admin topics list tenant/namespace
  2. persistent://tenant/namespace/topic1
  3. persistent://tenant/namespace/topic2

REST API

GET /admin/v2/persistent/:tenant/:namespace

Java

  1. admin.topics().getList(namespace);

统计信息

It shows current statistics of a given topic. Here’s an example payload:

The following stats are available:

统计信息Description
msgRateInThe sum of all local and replication publishers’ publish rates in messages per second
msgThroughputInSame as msgRateIn but in bytes per second instead of messages per second
msgRateOutThe sum of all local and replication consumers’ dispatch rates in messages per second
msgThroughputOutSame as msgRateOut but in bytes per second instead of messages per second
averageMsgSizeAverage message size, in bytes, from this publisher within the last interval
storageSizeThe sum of the ledgers’ storage size for this topic
publishersThe list of all local publishers into the topic. There can be anywhere from zero to thousands.
producerIdInternal identifier for this producer on this topic
producerNameInternal identifier for this producer, generated by the client library
addressIP address and source port for the connection of this producer
connectedSinceTimestamp this producer was created or last reconnected
subscriptionsThe list of all local subscriptions to the topic
my-subscriptionThe name of this subscription (client defined)
msgBacklogThe count of messages in backlog for this subscription
msgBacklogNoDelayedThe count of messages in backlog without delayed messages for this subscription
typeThis subscription type
msgRateExpiredThe rate at which messages were discarded instead of dispatched from this subscription due to TTL
consumersThe list of connected consumers for this subscription
consumerNameInternal identifier for this consumer, generated by the client library
availablePermitsThe number of messages this consumer has space for in the client library’s listen queue. A value of 0 means the client library’s queue is full and receive() isn’t being called. 非零值意味着 consumer 可以接收消息。
replicationThis section gives the stats for cross-colo replication of this topic
replicationBacklogThe outbound replication backlog in messages
connectedWhether the outbound replicator is connected
replicationDelayInSecondsHow long the oldest message has been waiting to be sent through the connection, if connected is true
inboundConnectionThe IP and port of the broker in the remote cluster’s publisher connection to this broker
inboundConnectedSinceThe TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。

pulsar-admin

The stats for the topic and its connected producers and consumers can be fetched by using the stats command, specifying the topic by name:

  1. $ pulsar-admin topics stats \
  2. persistent://test-tenant/namespace/topic \
  3. --get-precise-backlog

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/stats

Java

  1. admin.topics().getStats(persistentTopic, false /* is precise backlog */);