Get started

This guide walks you through the quickest way to get started with the following methods to manage topics.

  • pulsar-admin
  • REST API
  • Java

pulsar-admin CLI: it’s a command-line tool and is available in the bin folder of your Pulsar installation.

REST API: HTTP calls, which are made against the admin APIs provided by brokers. In addition, both the Java admin API and pulsar-admin CLI use the REST API.

Java admin API: it’s a programmable interface written in Java.

Check the detailed steps below.

  • pulsar-admin
  • REST API
  • Java

This tutorial guides you through every step of using pulsar-admin CLI to manage topics. It includes the following steps:

  1. Set the service URL.

  2. Create a partitioned topic.

  3. Update the number of a partition.

  4. Produce messages to the topic.

  5. Check the stats of the topic.

  6. Delete the topic.

Prerequisites

  • Install and start Pulsar standalone. This tutorial runs Pulsar 2.11 as an example.

Steps

  1. Set the service URLs to point to the broker service in client.conf.

    1. webServiceUrl=http://localhost:8080/
    2. brokerServiceUrl=pulsar://localhost:6650/
  2. Create a persistent topic named test-topic-1 with 6 partitions.

    Input

    1. bin/pulsar-admin topics create-partitioned-topic \
    2. persistent://public/default/test-topic-1 \
    3. --partitions 6

    Output

    There is no output. You can check the status of the topic in Step 5.

  3. Update the number of the partition to 8.

    Input

    1. bin/pulsar-admin topics update-partitioned-topic \
    2. persistent://public/default/test-topic-1 \
    3. --partitions 8

    Output

    There is no output. You can check the number of partitions in Step 5.

  4. Produce some messages to the partitioned topic test-topic-1.

    Input

    1. bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-1

    Output

    1. 2023-03-07T15:33:56,832+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
    2. "confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf",
    3. "serviceURL" : "pulsar://localhost:6650",
    4. "authPluginClassName" : "",
    5. "authParams" : "",
    6. "tlsTrustCertsFilePath" : "",
    7. "tlsAllowInsecureConnection" : false,
    8. "tlsHostnameVerificationEnable" : false,
    9. "maxConnections" : 1,
    10. "statsIntervalSeconds" : 1000,
    11. "ioThreads" : 1,
    12. "enableBusyWait" : false,
    13. "listenerName" : null,
    14. "listenerThreads" : 1,
    15. "maxLookupRequest" : 50000,
    16. "topics" : [ "persistent://public/default/test-topic-1" ],
    17. "numTestThreads" : 1,
    18. "msgRate" : 1000,
    19. "msgSize" : 1024,
    20. "numTopics" : 1,
    21. "numProducers" : 1,
    22. "separator" : "-",
    23. "sendTimeout" : 0,
    24. "producerName" : null,
    25. "adminURL" : "http://localhost:8080/",
    26. ...
    27. 2023-03-07T15:35:03,769+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 8.931 ms - med: 3.775 - 95pct: 32.144 - 99pct: 98.432 - 99.9pct: 216.088 - 99.99pct: 304.807 - 99.999pct: 349.391 - Max: 351.235
  5. Check the internal stats of the partitioned topic test-topic-1.

    Input

    1. bin/pulsar-admin topics partitioned-stats-internal \
    2. persistent://public/default/test-topic-1

    Output

    Below is a part of the output. For detailed explanations of topic stats, see Pulsar statistics.

    1. {
    2. "metadata" : {
    3. "partitions" : 8
    4. },
    5. "partitions" : {
    6. "persistent://public/default/test-topic-1-partition-1" : {
    7. "entriesAddedCounter" : 4213,
    8. "numberOfEntries" : 4213,
    9. "totalSize" : 8817693,
    10. "currentLedgerEntries" : 4212,
    11. "currentLedgerSize" : 8806289,
    12. "lastLedgerCreatedTimestamp" : "2023-03-07T15:33:59.367+08:00",
    13. "waitingCursorsCount" : 0,
    14. "pendingAddEntriesCount" : 0,
    15. "lastConfirmedEntry" : "65:4211",
    16. "state" : "LedgerOpened",
    17. "ledgers" : [ {
    18. "ledgerId" : 49,
    19. "entries" : 1,
    20. "size" : 11404,
    21. "offloaded" : false,
    22. "underReplicated" : false
    23. }, {
    24. "ledgerId" : 65,
    25. "entries" : 0,
    26. "size" : 0,
    27. "offloaded" : false,
    28. "underReplicated" : false
    29. } ],
    30. "cursors" : {
    31. "test-subscriptio-1" : {
    32. "markDeletePosition" : "49:-1",
    33. "readPosition" : "49:0",
    34. "waitingReadOp" : false,
    35. "pendingReadOps" : 0,
    36. "messagesConsumedCounter" : 0,
    37. "cursorLedger" : -1,
    38. "cursorLedgerLastEntry" : -1,
    39. "individuallyDeletedMessages" : "[]",
    40. "lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00",
    41. "state" : "NoLedger",
    42. "numberOfEntriesSinceFirstNotAckedMessage" : 1,
    43. "totalNonContiguousDeletedMessagesRange" : 0,
    44. "subscriptionHavePendingRead" : false,
    45. "subscriptionHavePendingReplayRead" : false,
    46. "properties" : { }
    47. },
    48. "test-subscription-1" : {
    49. "markDeletePosition" : "49:-1",
    50. "readPosition" : "49:0",
    51. "waitingReadOp" : false,
    52. "pendingReadOps" : 0,
    53. "messagesConsumedCounter" : 0,
    54. "cursorLedger" : -1,
    55. "cursorLedgerLastEntry" : -1,
    56. "individuallyDeletedMessages" : "[]",
    57. "lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00",
    58. "state" : "NoLedger",
    59. "numberOfEntriesSinceFirstNotAckedMessage" : 1,
    60. "totalNonContiguousDeletedMessagesRange" : 0,
    61. "subscriptionHavePendingRead" : false,
    62. "subscriptionHavePendingReplayRead" : false,
    63. "properties" : { }
    64. }
    65. },
    66. "schemaLedgers" : [ ],
    67. "compactedLedger" : {
    68. "ledgerId" : -1,
    69. "entries" : -1,
    70. "size" : -1,
    71. "offloaded" : false,
    72. "underReplicated" : false
    73. }
    74. },
    75. ...
  6. Delete the topic test-topic-1.

    Input

    1. bin/pulsar-admin topics delete-partitioned-topic persistent://public/default/test-topic-1

    Output

    There is no output. You can verify whether the test-topic-1 exists or not using the following command.

    Input

    List topics in public/default namespace.

    1. bin/pulsar-admin topics list public/default

This tutorial guides you through every step of using REST API to manage topics. It includes the following steps:

  1. Create a partitioned topic

  2. Update the number of a partition.

  3. Produce messages to the topic.

  4. Check the stats of the topic.

  5. Delete the topic.

Prerequisites

  • Install and start Pulsar standalone. This tutorial runs Pulsar 2.11 as an example.

Steps

  1. Create a persistent topic named test-topic-2 with 4 partitions.

    Input

    1. curl -X PUT http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "4"

    Output

    There is no output. You can check the topic in Step 4.

  2. Update the number of the partition to 5.

    Input

    1. curl -X POST http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "5"

    Output

    There is no output. You can check the status of the topic in Step 4.

  3. Produce some messages to the partitioned topic test-topic-2.

    Input

    1. bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-2

    Output

    1. 2023-03-08T15:47:06,268+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
    2. "confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf",
    3. "serviceURL" : "pulsar://localhost:6650",
    4. "authPluginClassName" : "",
    5. "authParams" : "",
    6. "tlsTrustCertsFilePath" : "",
    7. "tlsAllowInsecureConnection" : false,
    8. "tlsHostnameVerificationEnable" : false,
    9. "maxConnections" : 1,
    10. "statsIntervalSeconds" : 1000,
    11. "ioThreads" : 1,
    12. "enableBusyWait" : false,
    13. "listenerName" : null,
    14. "listenerThreads" : 1,
    15. "maxLookupRequest" : 50000,
    16. "topics" : [ "persistent://public/default/test-topic-2" ],
    17. "numTestThreads" : 1,
    18. "msgRate" : 1000,
    19. "msgSize" : 1024,
    20. "numTopics" : 1,
    21. "numProducers" : 1,
    22. "separator" : "-",
    23. "sendTimeout" : 0,
    24. "producerName" : null,
    25. "adminURL" : "http://localhost:8080/",
    26. "deprecatedAuthPluginClassName" : null,
    27. "maxOutstanding" : 0,
    28. "maxPendingMessagesAcrossPartitions" : 0,
    29. "partitions" : null,
    30. "numMessages" : 0,
    31. "compression" : "NONE",
    32. "payloadFilename" : null,
    33. "payloadDelimiter" : "\\n",
    34. "batchTimeMillis" : 1.0,
    35. "batchMaxMessages" : 1000,
    36. "batchMaxBytes" : 4194304,
    37. "testTime" : 0,
    38. "warmupTimeSeconds" : 1.0,
    39. "encKeyName" : null,
    40. "encKeyFile" : null,
    41. "delay" : 0,
    42. "exitOnFailure" : false,
    43. "messageKeyGenerationMode" : null,
    44. "producerAccessMode" : "Shared",
    45. "formatPayload" : false,
    46. "formatterClass" : "org.apache.pulsar.testclient.DefaultMessageFormatter",
    47. "transactionTimeout" : 10,
    48. "numMessagesPerTransaction" : 50,
    49. "isEnableTransaction" : false,
    50. "isAbortTransaction" : false,
    51. "histogramFile" : null
    52. }
    53. ...
    54. 2023-03-08T15:53:28,178+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 4.481 ms - med: 2.918 - 95pct: 10.710 - 99pct: 38.928 - 99.9pct: 112.689 - 99.99pct: 154.241 - 99.999pct: 193.249 - Max: 241.717
  4. Check the internal stats of the topic test-topic-2.

    Input

    1. curl -X GET http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitioned-internalStats

    Output

    For detailed explanations of topic stats, see Pulsar statistics.

    1. {"metadata":{"partitions":5},"partitions":{"persistent://public/default/test-topic-2-partition-3":{"entriesAddedCounter":47087,"numberOfEntries":47087,"totalSize":80406959,"currentLedgerEntries":47087,"currentLedgerSize":80406959,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.273+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"117:47086","state":"LedgerOpened","ledgers":[{"ledgerId":117,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],"cursors":{},"schemaLedgers":[],"compactedLedger":{"ledgerId":-1,"entries":-1,"size":-1,"offloaded":false,"underReplicated":false}},"persistent://public/default/test-topic-2-partition-2":{"entriesAddedCounter":46995,"numberOfEntries":46995,"totalSize":80445417,"currentLedgerEntries":46995,"currentLedgerSize":80445417,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.43+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"118:46994","state":"LedgerOpened","ledgers":[{"ledgerId":118,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],...
  5. Delete the topic test-topic-2.

    Input

    1. curl -X DELETE http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions

    Output

    There is no output. You can verify whether the test-topic-2 exists or not using the following command.

    Input

    List topics in public/default namespace.

    1. curl -X GET http://localhost:8080/admin/v2/persistent/public/default

This tutorial guides you through every step of using Java admin API to manage topics. It includes the following steps:

  1. Initiate a Pulsar Java client.

  2. Create a partitioned topic

  3. Update the number of a partition.

  4. Produce messages to the topic.

  5. Check the stats of the topic.

  6. Delete the topic.

Prerequisites

  • Prepare a Java project and add the following dependency to your POM file.

    1. <dependency>
    2. <groupId>org.apache.pulsar</groupId>
    3. <artifactId>pulsar-client-admin</artifactId>
    4. <version>2.11.0</version>
    5. </dependency>

Steps

  1. Initiate a Pulsar Java client in your Java project.

    Input

    1. String url = "http://localhost:8080";
    2. PulsarAdmin admin = PulsarAdmin.builder()
    3. .serviceHttpUrl(url)
    4. .build();
  2. Create a partitioned topic test-topic-1 with 4 partitions.

    Input

    1. admin.topics().createPartitionedTopic("persistent://public/default/test-topic-1", 4);
  3. Update the number of the partition to 5.

    Input

    1. admin.topics().updatePartitionedTopic("test-topic-1", 5);
  4. Produce some messages to the topic test-topic-1.

    Input

    1. PulsarClient client = PulsarClient.builder()
    2. .serviceUrl("pulsar://localhost:6650")
    3. .build();
    4. Producer<String> producer = client.newProducer(Schema.STRING)
    5. .topic("test-topic-1")
    6. .blockIfQueueFull(true)
    7. .create();
    8. for (int i = 0; i < 100; ++i) {
    9. producer.newMessage().value("test").send();
    10. }
    11. producer.close();
    12. client.close();
  5. Check the stats of the topic test-topic-1.

    Input

    1. PartitionedTopicStats stats = admin.topics().getPartitionedStats("persistent://public/default/test-topic-1",false);
    2. System.out.println(stats.getMsgInCounter());

    Output

    1. 100
  6. Delete the topic test-topic-1.

    Input

    1. admin.topics().deletePartitionedTopic("test-topic-1");