Managing partitioned topics

You can use Pulsar's admin API to create and manage partitioned topics.

In all of the instructions and commands below, the topic name structure is:

  1. persistent://tenant/namespace/topic

Partitioned topics resources

Create

Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic youneed to provide a name for the topic as well as the desired number of partitions.

pulsar-admin

You can create partitioned topics using the create-partitioned-topiccommand and specifying the topic name as an argument and the number of partitions using the -p or —partitions flag.Here's an example:

  1. $ bin/pulsar-admin topics create-partitioned-topic \
  2. persistent://my-tenant/my-namespace/my-topic \
  3. --partitions 4

REST API

PUT/admin/v2/persistent/:tenant/:namespace/:topic/partitions

Java

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";
  2. int numPartitions = 4;
  3. admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);

Nonpartitioned topics resources

Create

Nonpartitioned topics in Pulsar must be explicitly created if allowAutoTopicCreation or createIfMissing is disabled.When creating a non-partitioned topic, you need to provide a topic name.

pulsar-admin

You can create non-partitioned topics using the createcommand and specifying the topic name as an argument. This is an example command:

  1. $ bin/pulsar-admin topics create persistent://my-tenant/my-namespace/my-topic

REST API

PUTadmin/v2/persistent/:tenant/:namespace/:topic

Java

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().createNonPartitionedTopic(topicName);

Get metadata

Partitioned topics have metadata associated with them that you can fetch as a JSON object.The following metadata fields are currently available:

FieldMeaning
partitionsThe number of partitions into which the topic is divided

pulsar-admin

You can see the number of partitions in a partitioned topic using theget-partitioned-topic-metadatasubcommand. Here's an example:

  1. $ pulsar-admin topics get-partitioned-topic-metadata \
  2. persistent://my-tenant/my-namespace/my-topic
  3. {
  4. "partitions": 4
  5. }

REST API

GET/admin/v2/persistent/:tenant/:namespace/:topic/partitions

Java

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getPartitionedTopicMetadata(topicName);

Update

You can update the number of partitions on an existing partitioned topicif the topic is non-global. To update, the new number of partitions must be greaterthan the existing number.

Decrementing the number of partitions would deleting the topic, which is not supported in Pulsar.

Already created partitioned producers and consumers will automatically find the newly created partitions.

pulsar-admin

Partitioned topics can be updated using theupdate-partitioned-topic command.

  1. $ pulsar-admin topics update-partitioned-topic \
  2. persistent://my-tenant/my-namespace/my-topic \
  3. --partitions 8

REST API

POST/admin/v2/persistent/:tenant/:cluster/:namespace/:destination/partitions

Java

  1. admin.persistentTopics().updatePartitionedTopic(persistentTopic, numPartitions);

Delete

pulsar-admin

Partitioned topics can be deleted using thedelete-partitioned-topic command, specifying the topic by name:

  1. $ bin/pulsar-admin topics delete-partitioned-topic \
  2. persistent://my-tenant/my-namespace/my-topic

REST API

DELETE/admin/v2/persistent/:topic/:namespace/:destination/partitions

Java

  1. admin.persistentTopics().delete(persistentTopic);

List

It provides a list of persistent topics existing under a given namespace.

pulsar-admin

  1. $ pulsar-admin topics list tenant/namespace
  2. persistent://tenant/namespace/topic1
  3. persistent://tenant/namespace/topic2

REST API

GET/admin/v2/persistent/:tenant/:namespace

Java

  1. admin.persistentTopics().getList(namespace);

Stats

It shows current statistics of a given partitioned topic. Here's an example payload:

  1. {
  2. "msgRateIn": 4641.528542257553,
  3. "msgThroughputIn": 44663039.74947473,
  4. "msgRateOut": 0,
  5. "msgThroughputOut": 0,
  6. "averageMsgSize": 1232439.816728665,
  7. "storageSize": 135532389160,
  8. "publishers": [
  9. {
  10. "msgRateIn": 57.855383881403576,
  11. "msgThroughputIn": 558994.7078932219,
  12. "averageMsgSize": 613135,
  13. "producerId": 0,
  14. "producerName": null,
  15. "address": null,
  16. "connectedSince": null
  17. }
  18. ],
  19. "subscriptions": {
  20. "my-topic_subscription": {
  21. "msgRateOut": 0,
  22. "msgThroughputOut": 0,
  23. "msgBacklog": 116632,
  24. "type": null,
  25. "msgRateExpired": 36.98245516804671,
  26. "consumers": []
  27. }
  28. },
  29. "replication": {}
  30. }

The following stats are available:

StatDescription
msgRateInThe sum of all local and replication publishers’ publish rates in messages per second
msgThroughputInSame as msgRateIn but in bytes per second instead of messages per second
msgRateOutThe sum of all local and replication consumers’ dispatch rates in messages per second
msgThroughputOutSame as msgRateOut but in bytes per second instead of messages per second
averageMsgSizeAverage message size, in bytes, from this publisher within the last interval
storageSizeThe sum of the ledgers’ storage size for this topic
publishersThe list of all local publishers into the topic. There can be anywhere from zero to thousands.
producerIdInternal identifier for this producer on this topic
producerNameInternal identifier for this producer, generated by the client library
addressIP address and source port for the connection of this producer
connectedSinceTimestamp this producer was created or last reconnected
subscriptionsThe list of all local subscriptions to the topic
my-subscriptionThe name of this subscription (client defined)
msgBacklogThe count of messages in backlog for this subscription
typeThis subscription type
msgRateExpiredThe rate at which messages were discarded instead of dispatched from this subscription due to TTL
consumersThe list of connected consumers for this subscription
consumerNameInternal identifier for this consumer, generated by the client library
availablePermitsThe number of messages this consumer has space for in the client library’s listen queue. A value of 0 means the client library’s queue is full and receive() isn’t being called. A nonzero value means this consumer is ready to be dispatched messages.
replicationThis section gives the stats for cross-colo replication of this topic
replicationBacklogThe outbound replication backlog in messages
connectedWhether the outbound replicator is connected
replicationDelayInSecondsHow long the oldest message has been waiting to be sent through the connection, if connected is true
inboundConnectionThe IP and port of the broker in the remote cluster’s publisher connection to this broker
inboundConnectedSinceThe TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.

pulsar-admin

The stats for the partitioned topic and its connected producers and consumers can be fetched by using thepartitioned-stats command, specifying the topic by name:

  1. $ pulsar-admin topics partitioned-stats \
  2. persistent://test-tenant/namespace/topic \
  3. --per-partition

REST API

GET/admin/v2/persistent/:tenant/:namespace/:topic/partitioned-stats

Java

  1. admin.persistentTopics().getStats(persistentTopic);

Internal stats

It shows detailed statistics of a topic.

StatDescription
entriesAddedCounterMessages published since this broker loaded this topic
numberOfEntriesTotal number of messages being tracked
totalSizeTotal storage size in bytes of all messages
currentLedgerEntriesCount of messages written to the ledger currently open for writing
currentLedgerSizeSize in bytes of messages written to ledger currently open for writing
lastLedgerCreatedTimestampTime when last ledger was created
lastLedgerCreationFailureTimestamptime when last ledger was failed
waitingCursorsCountHow many cursors are caught up and waiting for a new message to be published
pendingAddEntriesCountHow many messages have (asynchronous) write requests we are waiting on completion
lastConfirmedEntryThe ledgerid:entryid of the last message successfully written. If the entryid is -1, then the ledger has been opened or is currently being opened but has no entries written yet.
stateThe state of the cursor ledger. Open means we have a cursor ledger for saving updates of the markDeletePosition.
ledgersThe ordered list of all ledgers for this topic holding its messages
cursorsThe list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.
markDeletePositionThe ack position: the last message the subscriber acknowledged receiving
readPositionThe latest position of subscriber for reading message
waitingReadOpThis is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published.
pendingReadOpsThe counter for how many outstanding read requests to the BookKeepers we have in progress
messagesConsumedCounterNumber of messages this cursor has acked since this broker loaded this topic
cursorLedgerThe ledger being used to persistently store the current markDeletePosition
cursorLedgerLastEntryThe last entryid used to persistently store the current markDeletePosition
individuallyDeletedMessagesIf Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position
lastLedgerSwitchTimestampThe last time the cursor ledger was rolled over
  1. {
  2. "entriesAddedCounter": 20449518,
  3. "numberOfEntries": 3233,
  4. "totalSize": 331482,
  5. "currentLedgerEntries": 3233,
  6. "currentLedgerSize": 331482,
  7. "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
  8. "lastLedgerCreationFailureTimestamp": null,
  9. "waitingCursorsCount": 1,
  10. "pendingAddEntriesCount": 0,
  11. "lastConfirmedEntry": "324711539:3232",
  12. "state": "LedgerOpened",
  13. "ledgers": [
  14. {
  15. "ledgerId": 324711539,
  16. "entries": 0,
  17. "size": 0
  18. }
  19. ],
  20. "cursors": {
  21. "my-subscription": {
  22. "markDeletePosition": "324711539:3133",
  23. "readPosition": "324711539:3233",
  24. "waitingReadOp": true,
  25. "pendingReadOps": 0,
  26. "messagesConsumedCounter": 20449501,
  27. "cursorLedger": 324702104,
  28. "cursorLedgerLastEntry": 21,
  29. "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
  30. "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
  31. "state": "Open"
  32. }
  33. }
  34. }

pulsar-admin

The internal stats for the partitioned topic can be fetched by using thestats-internal command, specifying the topic by name:

  1. $ pulsar-admin topics stats-internal \
  2. persistent://test-tenant/namespace/topic

REST API

GET/admin/v2/persistent/:tenant/:namespace/:topic/internalStats

Java

  1. admin.persistentTopics().getInternalStats(persistentTopic);