Manage topics

Topics - 图1tip

This page only shows some frequently used operations.

  • For the latest and complete information about Pulsar admin, including commands, flags, descriptions, and more, see Pulsar admin doc.

  • For the latest and complete information about REST API, including parameters, responses, samples, and more, see REST API doc.

  • For the latest and complete information about Java admin API, including classes, methods, descriptions, and more, see Java admin API doc.

Pulsar has persistent and non-persistent topics. A persistent topic is a logical endpoint for publishing and consuming messages. The topic name structure for persistent topics is:

  1. persistent://tenant/namespace/topic

Non-persistent topics are used in applications that only consume real-time published messages and do not need persistent guarantees. In this way, it reduces message-publish latency by removing overhead of persisting messages. The topic name structure for non-persistent topics is:

  1. non-persistent://tenant/namespace/topic

Manage topic resources

Whether it is a persistent or non-persistent topic, you can obtain the topic resources through pulsar-admin tool, REST API and Java.

Topics - 图2note

In REST API, :schema stands for persistent or non-persistent. :tenant, :namespace, :x are variables, replace them with the real tenant, namespace, and x names when using them. Take GET /admin/v2/:schema/:tenant/:namespace/getList as an example, to get the list of persistent topics in REST API, use https://pulsar.apache.org/admin/v2/persistent/my-tenant/my-namespace. To get the list of non-persistent topics in REST API, use https://pulsar.apache.org/admin/v2/non-persistent/my-tenant/my-namespace.

List of topics

You can get the list of topics under a given namespace in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics list my-tenant/my-namespace

GET /admin/v2/:schema/:tenant/:namespace/getList

  1. String namespace = "my-tenant/my-namespace";
  2. admin.topics().getList(namespace);

Grant permission

You can grant permissions on a client role to perform specific actions on a given topic in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics grant-permission \
  2. --actions produce,consume \
  3. --role application1 \
  4. persistent://test-tenant/ns1/tp1

POST /admin/v2/:schema/:tenant/:namespace/:topic/permissions/:role/grantPermissionsOnTopic

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String role = "test-role";
  3. Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);
  4. admin.topics().grantPermission(topic, role, actions);

Get permission

You can fetch permission in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics permissions persistent://test-tenant/ns1/tp1

Example output:

  1. application1 [consume, produce]

GET /admin/v2/:schema/:tenant/:namespace/:topic/permissions/getPermissionsOnTopic

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().getPermissions(topic);

Revoke permission

You can revoke permissions granted on a client role in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics revoke-permission \
  2. --role application1 \
  3. persistent://test-tenant/ns1/tp1

DELETE /admin/v2/:schema/:tenant/:namespace/:topic/permissions/:role/revokePermissionsOnTopic

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String role = "test-role";
  3. admin.topics().revokePermissions(topic, role);

Delete topic

You can delete a topic in the following ways. You cannot delete a topic if any active subscription or producer is connected to the topic.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics delete persistent://test-tenant/ns1/tp1

DELETE /admin/v2/:schema/:tenant/:namespace/:topic/deleteTopic

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().delete(topic);

Unload topic

You can unload a topic in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics unload persistent://test-tenant/ns1/tp1

PUT /admin/v2/:schema/:tenant/:namespace/:topic/unload/unloadTopic

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().unload(topic);

Get stats

For the detailed statistics of a topic, see Pulsar statistics.

The following is an example of a topic status.

  1. {
  2. "msgRateIn" : 0.0,
  3. "msgThroughputIn" : 0.0,
  4. "msgRateOut" : 0.0,
  5. "msgThroughputOut" : 0.0,
  6. "bytesInCounter" : 504,
  7. "msgInCounter" : 9,
  8. "bytesOutCounter" : 2296,
  9. "msgOutCounter" : 41,
  10. "averageMsgSize" : 0.0,
  11. "msgChunkPublished" : false,
  12. "storageSize" : 504,
  13. "backlogSize" : 0,
  14. "filteredEntriesCount" : 100,
  15. "earliestMsgPublishTimeInBacklogs": 0,
  16. "offloadedStorageSize" : 0,
  17. "publishers" : [ {
  18. "accessMode" : "Shared",
  19. "msgRateIn" : 0.0,
  20. "msgThroughputIn" : 0.0,
  21. "averageMsgSize" : 0.0,
  22. "chunkedMessageRate" : 0.0,
  23. "producerId" : 0,
  24. "metadata" : { },
  25. "address" : "/127.0.0.1:65402",
  26. "connectedSince" : "2021-06-09T17:22:55.913+08:00",
  27. "clientVersion" : "2.9.0-SNAPSHOT",
  28. "producerName" : "standalone-1-0"
  29. } ],
  30. "waitingPublishers" : 0,
  31. "subscriptions" : {
  32. "sub-demo" : {
  33. "msgRateOut" : 0.0,
  34. "msgThroughputOut" : 0.0,
  35. "bytesOutCounter" : 2296,
  36. "msgOutCounter" : 41,
  37. "msgRateRedeliver" : 0.0,
  38. "chunkedMessageRate" : 0,
  39. "msgBacklog" : 0,
  40. "backlogSize" : 0,
  41. "earliestMsgPublishTimeInBacklog": 0,
  42. "msgBacklogNoDelayed" : 0,
  43. "blockedSubscriptionOnUnackedMsgs" : false,
  44. "msgDelayed" : 0,
  45. "unackedMessages" : 0,
  46. "type" : "Exclusive",
  47. "activeConsumerName" : "20b81",
  48. "msgRateExpired" : 0.0,
  49. "totalMsgExpired" : 0,
  50. "lastExpireTimestamp" : 0,
  51. "lastConsumedFlowTimestamp" : 1623230565356,
  52. "lastConsumedTimestamp" : 1623230583946,
  53. "lastAckedTimestamp" : 1623230584033,
  54. "lastMarkDeleteAdvancedTimestamp" : 1623230584033,
  55. "filterProcessedMsgCount": 100,
  56. "filterAcceptedMsgCount": 100,
  57. "filterRejectedMsgCount": 0,
  58. "filterRescheduledMsgCount": 0,
  59. "consumers" : [ {
  60. "msgRateOut" : 0.0,
  61. "msgThroughputOut" : 0.0,
  62. "bytesOutCounter" : 2296,
  63. "msgOutCounter" : 41,
  64. "msgRateRedeliver" : 0.0,
  65. "chunkedMessageRate" : 0.0,
  66. "consumerName" : "20b81",
  67. "availablePermits" : 959,
  68. "unackedMessages" : 0,
  69. "avgMessagesPerEntry" : 314,
  70. "blockedConsumerOnUnackedMsgs" : false,
  71. "lastAckedTimestamp" : 1623230584033,
  72. "lastConsumedTimestamp" : 1623230583946,
  73. "metadata" : { },
  74. "address" : "/127.0.0.1:65172",
  75. "connectedSince" : "2021-06-09T17:22:45.353+08:00",
  76. "clientVersion" : "2.9.0-SNAPSHOT"
  77. } ],
  78. "allowOutOfOrderDelivery": false,
  79. "consumersAfterMarkDeletePosition" : { },
  80. "nonContiguousDeletedMessagesRanges" : 0,
  81. "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
  82. "durable" : true,
  83. "replicated" : false
  84. }
  85. },
  86. "replication" : { },
  87. "deduplicationStatus" : "Disabled",
  88. "nonContiguousDeletedMessagesRanges" : 0,
  89. "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
  90. "ownerBroker" : "localhost:8080"
  91. }

To get the status of a topic, you can use the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics stats persistent://test-tenant/ns1/tp1

GET /admin/v2/:schema/:tenant/:namespace/:topic/stats/getStats

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().getStats(topic);

Get internal stats

For the detailed internal statistics inside a topic, see Pulsar statistics.

The following is an example of the internal statistics of a topic.

  1. {
  2. "entriesAddedCounter":0,
  3. "numberOfEntries":0,
  4. "totalSize":0,
  5. "currentLedgerEntries":0,
  6. "currentLedgerSize":0,
  7. "lastLedgerCreatedTimestamp":"2021-01-22T21:12:14.868+08:00",
  8. "lastLedgerCreationFailureTimestamp":null,
  9. "waitingCursorsCount":0,
  10. "pendingAddEntriesCount":0,
  11. "lastConfirmedEntry":"3:-1",
  12. "state":"LedgerOpened",
  13. "ledgers":[
  14. {
  15. "ledgerId":3,
  16. "entries":0,
  17. "size":0,
  18. "offloaded":false,
  19. "metadata":null
  20. }
  21. ],
  22. "cursors":{
  23. "test":{
  24. "markDeletePosition":"3:-1",
  25. "readPosition":"3:-1",
  26. "waitingReadOp":false,
  27. "pendingReadOps":0,
  28. "messagesConsumedCounter":0,
  29. "cursorLedger":4,
  30. "cursorLedgerLastEntry":1,
  31. "individuallyDeletedMessages":"[]",
  32. "lastLedgerSwitchTimestamp":"2021-01-22T21:12:14.966+08:00",
  33. "state":"Open",
  34. "numberOfEntriesSinceFirstNotAckedMessage":0,
  35. "totalNonContiguousDeletedMessagesRange":0,
  36. "properties":{
  37. }
  38. }
  39. },
  40. "schemaLedgers":[
  41. {
  42. "ledgerId":1,
  43. "entries":11,
  44. "size":10,
  45. "offloaded":false,
  46. "metadata":null
  47. }
  48. ],
  49. "compactedLedger":{
  50. "ledgerId":-1,
  51. "entries":-1,
  52. "size":-1,
  53. "offloaded":false,
  54. "metadata":null
  55. }
  56. }

To get the internal status of a topic, you can use the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics stats-internal persistent://test-tenant/ns1/tp1

GET /admin/v2/:schema/:tenant/:namespace/:topic/internalStats/getInternalStats

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().getInternalStats(topic);

Peek messages

You can peek a number of messages for a specific subscription of a given topic in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics peek-messages \
  2. --count 10 --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1

Example output:

  1. Message ID: 77:2
  2. Publish time: 1668674963028
  3. Event time: 0
  4. +-------------------------------------------------+
  5. | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
  6. +--------+-------------------------------------------------+----------------+
  7. |00000000| 68 65 6c 6c 6f 2d 31 |hello-1 |
  8. +--------+-------------------------------------------------+----------------+

GET /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition/peekNthMessage

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. int numMessages = 1;
  4. admin.topics().peekMessages(topic, subName, numMessages);

Get message by ID

You can fetch the message with the given ledger ID and entry ID in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics get-message-by-id \
  2. -l 10 -e 0 persistent://public/default/my-topic

GET /admin/v2/:schema/:tenant/:namespace/:topic/ledger/:ledgerId/entry/:entryId/getMessageById

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. long ledgerId = 10;
  3. long entryId = 10;
  4. admin.topics().getMessageById(topic, ledgerId, entryId);

Examine messages

You can examine a specific message on a topic by position relative to the earliest or the latest message.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics examine-messages \
  2. -i latest -m 1 persistent://public/default/my-topic

GET /admin/v2/:schema/:tenant/:namespace/:topic/examinemessage?initialPosition=:initialPosition&messagePosition=:messagePosition/examineMessage

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().examineMessage(topic, "latest", 1);

Get message ID

You can get message ID published at or just after the given datetime.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics get-message-id \
  2. persistent://public/default/my-topic \
  3. -d 2021-06-28T19:01:17Z

GET /admin/v2/:schema/:tenant/:namespace/:topic/messageid/:timestamp/getMessageIdByTimestamp

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. long timestamp = System.currentTimeMillis()
  3. admin.topics().getMessageIdByTimestamp(topic, timestamp);

Skip messages

You can skip a number of messages for a specific subscription of a given topic in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics skip \
  2. --count 10 --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1

POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages/skipMessages

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. int numMessages = 1;
  4. admin.topics().skipMessages(topic, subName, numMessages);

Skip all messages

You can skip all the old messages for a specific subscription of a given topic.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics clear-backlog \
  2. --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1

POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/skip_all/skipAllMessages

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. admin.topics().skipAllMessages(topic, subName);

Reset cursor

You can reset a subscription cursor position back to the position which is recorded X minutes before. It essentially calculates the time and position of the cursor at X minutes before and resets it at that position. You can reset the cursor in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics reset-cursor \
  2. --subscription my-subscription --time 10 \
  3. persistent://test-tenant/ns1/tp1

POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/resetcursor/:timestamp/resetCursor

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. long timestamp = 2342343L;
  4. admin.topics().resetCursor(topic, subName, timestamp);

Look up topic’s owner broker

You can locate the owner broker of the given topic in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics lookup persistent://test-tenant/ns1/tp1

Example output:

  1. "pulsar://broker1.org.com:4480"

GET /lookup/v2/topic/:topic-domain/:tenant/:namespace/:topic/lookupTopicAsync

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.lookups().lookupDestination(topic);

Look up partitioned topic’s owner broker

You can locate the owner broker of the given partitioned topic in the following ways.

  • pulsar-admin
  • Java
  1. pulsar-admin topics partitioned-lookup persistent://test-tenant/ns1/my-topic

Example output:

  1. "persistent://test-tenant/ns1/my-topic-partition-0 pulsar://localhost:6650"
  2. "persistent://test-tenant/ns1/my-topic-partition-1 pulsar://localhost:6650"
  3. "persistent://test-tenant/ns1/my-topic-partition-2 pulsar://localhost:6650"
  4. "persistent://test-tenant/ns1/my-topic-partition-3 pulsar://localhost:6650"

Lookup the partitioned topics sorted by broker URL

  1. pulsar-admin topics partitioned-lookup \
  2. persistent://test-tenant/ns1/my-topic --sort-by-broker

Example output:

  1. pulsar://localhost:6650 [persistent://test-tenant/ns1/my-topic-partition-0, persistent://test-tenant/ns1/my-topic-partition-1, persistent://test-tenant/ns1/my-topic-partition-2, persistent://test-tenant/ns1/my-topic-partition-3]
  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.lookups().lookupPartitionedTopic(topic);

Get bundle

You can get the range of the bundle that the given topic belongs to in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics bundle-range persistent://test-tenant/ns1/tp1

Example output:

  1. "0x00000000_0xffffffff"

GET /lookup/v2/topic/:topic_domain/:tenant/:namespace/:topic/bundle/getNamespaceBundle

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.lookups().getBundleRange(topic);

Get subscriptions

You can check all subscription names for a given topic in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics subscriptions persistent://test-tenant/ns1/tp1

Example output:

  1. my-subscription

GET /admin/v2/:schema/:tenant/:namespace/:topic/subscriptions/getSubscriptions

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().getSubscriptions(topic);

Last Message Id

You can get the last committed message ID for a persistent topic. It is available since 2.3.0 release.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics last-message-id topic-name

Example output:

  1. {
  2. "ledgerId" : 97,
  3. "entryId" : 9,
  4. "partitionIndex" : -1
  5. }

Get /admin/v2/:schema/:tenant/:namespace/:topic/lastMessageId/getLastMessageId

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().getLastMessage(topic);

Get backlog size

You can get the backlog size of a single partition topic or a non-partitioned topic with a given message ID (in bytes).

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics get-backlog-size \
  2. -m 1:1 \
  3. persistent://test-tenant/ns1/tp1-partition-0

PUT /admin/v2/:schema/:tenant/:namespace/:topic/backlogSize/getBacklogSizeByMessageId

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. MessageId messageId = MessageId.earliest;
  3. admin.topics().getBacklogSizeByMessageId(topic, messageId);

Configure deduplication snapshot interval

Get deduplication snapshot interval

To get the topic-level deduplication snapshot interval, use one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics get-deduplication-snapshot-interval my-topic

GET /admin/v2/topics/:tenant/:namespace/:topic/deduplicationSnapshotInterval/getDeduplicationSnapshotInterval

  1. admin.topics().getDeduplicationSnapshotInterval(topic)

Set deduplication snapshot interval

To set the topic-level deduplication snapshot interval, use one of the following methods.

Prerequisite brokerDeduplicationEnabled must be set to true.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics set-deduplication-snapshot-interval my-topic -i 1000

POST /admin/v2/topics/:tenant/:namespace/:topic/deduplicationSnapshotInterval/setDeduplicationSnapshotInterval

  1. {
  2. "interval": 1000
  3. }
  1. admin.topics().setDeduplicationSnapshotInterval(topic, 1000)

Remove deduplication snapshot interval

To remove the topic-level deduplication snapshot interval, use one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics remove-deduplication-snapshot-interval my-topic

DELETE /admin/v2/topics/:tenant/:namespace/:topic/deduplicationSnapshotInterval/deleteDeduplicationSnapshotInterval

  1. admin.topics().removeDeduplicationSnapshotInterval(topic)

Configure inactive topic policies

Get inactive topic policies

To get the topic-level inactive topic policies, use one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics get-inactive-topic-policies my-topic

GET /admin/v2/topics/:tenant/:namespace/:topic/inactiveTopicPolicies/getInactiveTopicPolicies

  1. admin.topics().getInactiveTopicPolicies(topic)

Set inactive topic policies

To set the topic-level inactive topic policies, use one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics set-inactive-topic-policies my-topic

POST /admin/v2/topics/:tenant/:namespace/:topic/inactiveTopicPolicies/setInactiveTopicPolicies

  1. admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies)

Remove inactive topic policies

To remove the topic-level inactive topic policies, use one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics remove-inactive-topic-policies my-topic

DELETE /admin/v2/topics/:tenant/:namespace/:topic/inactiveTopicPolicies/removeInactiveTopicPolicies

  1. admin.topics().removeInactiveTopicPolicies(topic)

Configure offload policies

Get offload policies

To get the topic-level offload policies, use one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics get-offload-policies my-topic

GET /admin/v2/topics/:tenant/:namespace/:topic/offloadPolicies/getOffloadPolicies

  1. admin.topics().getOffloadPolicies(topic)

Set offload policies

To set the topic-level offload policies, use one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics set-offload-policies my-topic

POST /admin/v2/topics/:tenant/:namespace/:topic/offloadPolicies/setOffloadPolicies

  1. admin.topics().setOffloadPolicies(topic, offloadPolicies)

Remove offload policies

To remove the topic-level offload policies, use one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics remove-offload-policies my-topic

DELETE /admin/v2/topics/:tenant/:namespace/:topic/offloadPolicies/removeOffloadPolicies

  1. admin.topics().removeOffloadPolicies(topic)

Manage non-partitioned topics

You can use Pulsar admin API to create, delete and check the status of non-partitioned topics.

Create

Non-partitioned topics must be explicitly created. When creating a new non-partitioned topic, you need to provide a name for the topic.

By default, 60 seconds after creation, topics are considered inactive and deleted automatically to avoid generating trash data. To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to a specific value.

For more information about the two parameters, see here.

You can create non-partitioned topics in the following ways.

  • pulsar-admin
  • REST API
  • Java

When you create non-partitioned topics with the create command, you need to specify the topic name as an argument.

  1. pulsar-admin topics create \
  2. persistent://my-tenant/my-namespace/my-topic

Topics - 图3note

When you create a non-partitioned topic with the suffix ‘-partition-‘ followed by numeric value like ‘xyz-topic-partition-x’ for the topic name, if a partitioned topic with same suffix ‘xyz-topic-partition-y’ exists, then the numeric value(x) for the non-partitioned topic must be larger than the number of partitions(y) of the partitioned topic. Otherwise, you cannot create such a non-partitioned topic.

PUT /admin/v2/:schema/:tenant/:namespace/:topic/createNonPartitionedTopic

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().createNonPartitionedTopic(topicName);

Delete

You can delete non-partitioned topics in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics delete \
  2. persistent://my-tenant/my-namespace/my-topic

DELETE /admin/v2/:schema/:tenant/:namespace/:topic/deleteTopic

  1. admin.topics().delete(topic);

List

You can get the list of topics under a given namespace in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics list tenant/namespace

Example output:

  1. persistent://tenant/namespace/topic1
  2. persistent://tenant/namespace/topic2

GET /admin/v2/:schema/:tenant/:namespace/getList

  1. admin.topics().getList(namespace);

Stats

You can check the current statistics of a given topic and its connected producers and consumers in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics stats \
  2. persistent://test-tenant/namespace/topic \
  3. --get-precise-backlog

GET /admin/v2/:schema/:tenant/:namespace/:topic/stats/getStats

  1. admin.topics().getStats(topic, false /* is precise backlog */);

The following is an example. For the description of topic stats, see Pulsar statistics.

  1. {
  2. "msgRateIn": 4641.528542257553,
  3. "msgThroughputIn": 44663039.74947473,
  4. "msgRateOut": 0,
  5. "msgThroughputOut": 0,
  6. "averageMsgSize": 1232439.816728665,
  7. "storageSize": 135532389160,
  8. "publishers": [
  9. {
  10. "msgRateIn": 57.855383881403576,
  11. "msgThroughputIn": 558994.7078932219,
  12. "averageMsgSize": 613135,
  13. "producerId": 0,
  14. "producerName": null,
  15. "address": null,
  16. "connectedSince": null
  17. }
  18. ],
  19. "subscriptions": {
  20. "my-topic_subscription": {
  21. "msgRateOut": 0,
  22. "msgThroughputOut": 0,
  23. "msgBacklog": 116632,
  24. "type": null,
  25. "msgRateExpired": 36.98245516804671,
  26. "consumers": []
  27. }
  28. },
  29. "replication": {}
  30. }

Internal stats

You can check the detailed statistics of a topic. The following is an example. For the description of each internal topic stats, see Pulsar statistics.

  1. {
  2. "entriesAddedCounter": 20449518,
  3. "numberOfEntries": 3233,
  4. "totalSize": 331482,
  5. "currentLedgerEntries": 3233,
  6. "currentLedgerSize": 331482,
  7. "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
  8. "lastLedgerCreationFailureTimestamp": null,
  9. "waitingCursorsCount": 1,
  10. "pendingAddEntriesCount": 0,
  11. "lastConfirmedEntry": "324711539:3232",
  12. "state": "LedgerOpened",
  13. "ledgers": [
  14. {
  15. "ledgerId": 324711539,
  16. "entries": 0,
  17. "size": 0
  18. }
  19. ],
  20. "cursors": {
  21. "my-subscription": {
  22. "markDeletePosition": "324711539:3133",
  23. "readPosition": "324711539:3233",
  24. "waitingReadOp": true,
  25. "pendingReadOps": 0,
  26. "messagesConsumedCounter": 20449501,
  27. "cursorLedger": 324702104,
  28. "cursorLedgerLastEntry": 21,
  29. "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
  30. "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
  31. "state": "Open"
  32. }
  33. }
  34. }

You can get the internal stats for the partitioned topic in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics stats-internal \
  2. persistent://test-tenant/namespace/topic

GET /admin/v2/:schema/:tenant/:namespace/:topic/internalStats/getInternalStats

  1. admin.topics().getInternalStats(topic);

Manage partitioned topics

You can use Pulsar admin API to create, update, delete and check the status of partitioned topics.

Create

When creating a new partitioned topic, you need to provide a name and the number of partitions for the topic.

Topics - 图4note

By default, if there are no messages 60 seconds after creation, topics are considered inactive and deleted automatically to avoid generating trash data. To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to a specific value.

For more information about the two parameters, see here.

You can create partitioned topics in the following ways.

  • pulsar-admin
  • REST API
  • Java

When you create partitioned topics with the create-partitioned-topic command, you need to specify the topic name as an argument and the number of partitions using the -p or --partitions flag.

  1. pulsar-admin topics create-partitioned-topic \
  2. persistent://my-tenant/my-namespace/my-topic \
  3. --partitions 4

Topics - 图5note

If a non-partitioned topic with the suffix ‘-partition-‘ followed by a numeric value like ‘xyz-topic-partition-10’, you can not create a partitioned topic with name ‘xyz-topic’, because the partitions of the partitioned topic could override the existing non-partitioned topic. To create such partitioned topic, you have to delete that non-partitioned topic first.

PUT /admin/v2/:schema/:tenant/:namespace/:topic/partitions/createPartitionedTopic

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";
  2. int numPartitions = 4;
  3. admin.topics().createPartitionedTopic(topicName, numPartitions);

Create missed partitions

When topic auto-creation is disabled, and you have a partitioned topic without any partitions, you can use the create-missed-partitions command to create partitions for the topic.

  • pulsar-admin
  • REST API
  • Java

You can create missed partitions with the create-missed-partitions command and specify the topic name as an argument.

  1. pulsar-admin topics create-missed-partitions \
  2. persistent://my-tenant/my-namespace/my-topic

POST /admin/v2/:schema/:tenant/:namespace/:topic/createMissedPartitions

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().createMissedPartitions(topicName);

Get metadata

Partitioned topics are associated with metadata, you can view it as a JSON object. The following metadata field is available.

FieldDescription
partitionsThe number of partitions into which the topic is divided.
  • pulsar-admin
  • REST API
  • Java

You can check the number of partitions in a partitioned topic with the get-partitioned-topic-metadata subcommand.

  1. pulsar-admin topics get-partitioned-topic-metadata \
  2. persistent://my-tenant/my-namespace/my-topic

Example output:

  1. {
  2. "partitions" : 4,
  3. "deleted" : false
  4. }

GET /admin/v2/:schema/:tenant/:namespace/:topic/partitions/getPartitionedMetadata

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().getPartitionedTopicMetadata(topicName);

Update

You can update the number of partitions for an existing partitioned topic if the topic is non-global. However, you can only add the partition number. Decrementing the number of partitions would delete the topic, which is not supported in Pulsar.

Producers and consumers can find the newly created partitions automatically.

  • pulsar-admin
  • REST API
  • Java

You can update partitioned topics with the update-partitioned-topic command.

  1. pulsar-admin topics update-partitioned-topic \
  2. persistent://my-tenant/my-namespace/my-topic \
  3. --partitions 8

POST /admin/v2/:schema/:tenant/:cluster/:namespace/:destination/partitions/updatePartitionedTopic

  1. admin.topics().updatePartitionedTopic(topic, numPartitions);

Delete

You can delete partitioned topics with the delete-partitioned-topic command, REST API and Java.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics delete-partitioned-topic \
  2. persistent://my-tenant/my-namespace/my-topic

DELETE /admin/v2/:schema/:topic/:namespace/:destination/partitions/deletePartitionedTopic

  1. admin.topics().delete(topic);

List

You can get the list of partitioned topics under a given namespace in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics list-partitioned-topics tenant/namespace

Example output:

  1. persistent://tenant/namespace/topic1
  2. persistent://tenant/namespace/topic2

GET /admin/v2/:schema/:tenant/:namespace/getPartitionedTopicList

  1. admin.topics().getPartitionedTopicList(namespace);

Stats

You can check the current statistics of a given partitioned topic and its connected producers and consumers in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics partitioned-stats \
  2. persistent://test-tenant/namespace/topic \
  3. --per-partition

GET /admin/v2/:schema/:tenant/:namespace/:topic/partitioned-stats/getPartitionedStats

  1. admin.topics().getPartitionedStats(topic, true /* per partition */, false /* is precise backlog */);

The following is an example. For the description of each topic stats, see Pulsar statistics.

Note that in the subscription JSON object, chuckedMessageRate is deprecated. Please use chunkedMessageRate. Both will be sent in the JSON for now.

  1. {
  2. "msgRateIn" : 999.992947159793,
  3. "msgThroughputIn" : 1070918.4635439808,
  4. "msgRateOut" : 0.0,
  5. "msgThroughputOut" : 0.0,
  6. "bytesInCounter" : 270318763,
  7. "msgInCounter" : 252489,
  8. "bytesOutCounter" : 0,
  9. "msgOutCounter" : 0,
  10. "averageMsgSize" : 1070.926056966454,
  11. "msgChunkPublished" : false,
  12. "storageSize" : 270316646,
  13. "backlogSize" : 200921133,
  14. "publishers" : [ {
  15. "msgRateIn" : 999.992947159793,
  16. "msgThroughputIn" : 1070918.4635439808,
  17. "averageMsgSize" : 1070.3333333333333,
  18. "chunkedMessageRate" : 0.0,
  19. "producerId" : 0
  20. } ],
  21. "subscriptions" : {
  22. "test" : {
  23. "msgRateOut" : 0.0,
  24. "msgThroughputOut" : 0.0,
  25. "bytesOutCounter" : 0,
  26. "msgOutCounter" : 0,
  27. "msgRateRedeliver" : 0.0,
  28. "chuckedMessageRate" : 0,
  29. "chunkedMessageRate" : 0,
  30. "msgBacklog" : 144318,
  31. "msgBacklogNoDelayed" : 144318,
  32. "blockedSubscriptionOnUnackedMsgs" : false,
  33. "msgDelayed" : 0,
  34. "unackedMessages" : 0,
  35. "msgRateExpired" : 0.0,
  36. "lastExpireTimestamp" : 0,
  37. "lastConsumedFlowTimestamp" : 0,
  38. "lastConsumedTimestamp" : 0,
  39. "lastAckedTimestamp" : 0,
  40. "consumers" : [ ],
  41. "isDurable" : true,
  42. "isReplicated" : false
  43. }
  44. },
  45. "replication" : { },
  46. "metadata" : {
  47. "partitions" : 3
  48. },
  49. "partitions" : { }
  50. }

Internal stats

You can check the detailed statistics of a partitioned topic. The following is an example. For the description of each internal topic stats, see Pulsar statistics.

  1. {
  2. "entriesAddedCounter": 20449518,
  3. "numberOfEntries": 3233,
  4. "totalSize": 331482,
  5. "currentLedgerEntries": 3233,
  6. "currentLedgerSize": 331482,
  7. "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
  8. "lastLedgerCreationFailureTimestamp": null,
  9. "waitingCursorsCount": 1,
  10. "pendingAddEntriesCount": 0,
  11. "lastConfirmedEntry": "324711539:3232",
  12. "state": "LedgerOpened",
  13. "ledgers": [
  14. {
  15. "ledgerId": 324711539,
  16. "entries": 0,
  17. "size": 0
  18. }
  19. ],
  20. "cursors": {
  21. "my-subscription": {
  22. "markDeletePosition": "324711539:3133",
  23. "readPosition": "324711539:3233",
  24. "waitingReadOp": true,
  25. "pendingReadOps": 0,
  26. "messagesConsumedCounter": 20449501,
  27. "cursorLedger": 324702104,
  28. "cursorLedgerLastEntry": 21,
  29. "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
  30. "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
  31. "state": "Open"
  32. }
  33. }
  34. }

You can get the internal stats for the partitioned topic in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics partitioned-stats-internal \
  2. persistent://test-tenant/namespace/topic

GET /admin/v2/:schema/:tenant/:namespace/:topic/partitioned-internalStats/getPartitionedInternalStats

  1. admin.topics().getPartitionedInternalStats(topic);

Manage subscriptions

You can use Pulsar admin API to create, check, and delete subscriptions.

Create subscription

You can create a subscription for a topic using one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics create-subscription \
  2. --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1

PUT /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subscription/createSubscriptions

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subscriptionName = "my-subscription";
  3. admin.topics().createSubscription(topic, subscriptionName, MessageId.latest);

Get subscription

You can check all subscription names for a given topic using one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics subscriptions persistent://test-tenant/ns1/tp1

Example output:

  1. my-subscription

GET /admin/v2/:schema/:tenant/:namespace/:topic/subscriptions/getSubscriptions

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.topics().getSubscriptions(topic);

Unsubscribe subscription

When a subscription does not process messages anymore, you can unsubscribe it using one of the following methods.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin topics unsubscribe \
  2. --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1

DELETE /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subscription/deleteSubscription

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subscriptionName = "my-subscription";
  3. admin.topics().deleteSubscription(topic, subscriptionName);