Managing persistent topics

持久化有助于访问 topic,因为 topic 是发布和消费消息的逻辑端点。 Producer 发布消息到 topic,consumer 订阅 topic,并消费该 topic 上的消息。

本教程中,topic 名称的结构为:

  1. persistent://tenant/namespace/topic

持久 topic 资源

列出 topic

以列表的形式列出指定命名空间下所有持久 topic。

pulsar-admin

可以使用 list 命令获取 topic 列表。

  1. $ pulsar-admin persistent list \
  2. my-tenant/my-namespace

REST API

GET /admin/v2/persistent/:tenant/:namespace

Java

  1. String namespace = "my-tenant/my-namespace";
  2. admin.persistentTopics().getList(namespace);

授权

授权给客户端用户,允许其在指定的 topic 上执行某些操作。

pulsar-admin

可以使用 grant-permission 命令授权。

  1. $ pulsar-admin persistent grant-permission \
  2. --actions produce,consume --role application1 \
  3. persistent://test-tenant/ns1/tp1 \

REST API

POST /admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String role = "test-role";
  3. Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);
  4. admin.persistentTopics().grantPermission(topic, role, actions);

获取权限

可以使用 permissions 命令获取权限。

pulsar-admin

  1. $ pulsar-admin persistent permissions \
  2. persistent://test-tenant/ns1/tp1 \
  3. {
  4. "application1": [
  5. "consume",
  6. "produce"
  7. ]
  8. }

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/permissions

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getPermissions(topic);

取消权限

取消已经授予客户端用户的权限。

pulsar-admin

可以使用 revoke-permission 命令取消权限。

  1. $ pulsar-admin persistent revoke-permission \
  2. --role application1 \
  3. persistent://test-tenant/ns1/tp1 \
  4. {
  5. "application1": [
  6. "consume",
  7. "produce"
  8. ]
  9. }

REST API

DELETE /admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String role = "test-role";
  3. admin.persistentTopics().revokePermissions(topic, role);

删除 topic

It deletes a topic. The topic cannot be deleted if there’s any active subscription or producers connected to it.

pulsar-admin

可以使用 delete 命令删除 topic。

  1. $ pulsar-admin persistent delete \
  2. persistent://test-tenant/ns1/tp1 \

REST API

DELETE /admin/v2/persistent/:tenant/:namespace/:topic

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().delete(topic);

卸载 topic

可以卸载 topic。

pulsar-admin

可以使用 unload 命令卸载 topic。

  1. $ pulsar-admin persistent unload \
  2. persistent://test-tenant/ns1/tp1 \

REST API

PUT /admin/v2/persistent/:tenant/:namespace/:topic/unload

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().unload(topic);

获取统计信息

获取非持久化 topic 的统计数据。

  • msgRateIn: The sum of all local and replication publishers’ publish rates in messages per second

  • msgThroughputIn: Same as above, but in bytes per second instead of messages per second

  • msgRateOut: The sum of all local and replication consumers’ dispatch rates in messages per second

  • msgThroughputOut: Same as above, but in bytes per second instead of messages per second

  • averageMsgSize: The average size in bytes of messages published within the last interval

  • storageSize: The sum of the ledgers’ storage size for this topic. Space used to store the messages for the topic

  • publishers: The list of all local publishers into the topic. There can be zero or thousands

    • msgRateIn: Total rate of messages published by this publisher in messages per second

    • msgThroughputIn: Total throughput of the messages published by this publisher in bytes per second

    • averageMsgSize: Average message size in bytes from this publisher within the last interval

    • producerId: Internal identifier for this producer on this topic

    • producerName: Internal identifier for this producer, generated by the client library

    • address: IP address and source port for the connection of this producer

    • connectedSince: Timestamp this producer was created or last reconnected

  • subscriptions: The list of all local subscriptions to the topic

    • my-subscription: The name of this subscription (client defined)

      • msgRateOut: Total rate of messages delivered on this subscription (msg/s)

      • msgThroughputOut: Total throughput delivered on this subscription (bytes/s)

      • msgBacklog: Number of messages in the subscription backlog

      • type: This subscription type

      • msgRateExpired: The rate at which messages were discarded instead of dispatched from this subscription due to TTL

      • lastExpireTimestamp: The last message expire execution timestamp

      • lastConsumedFlowTimestamp: The last flow command received timestamp

      • lastConsumedTimestamp: The latest timestamp of all the consumed timestamp of the consumers

      • lastAckedTimestamp: The latest timestamp of all the acked timestamp of the consumers

      • consumers: The list of connected consumers for this subscription

        • msgRateOut: Total rate of messages delivered to the consumer (msg/s)

        • msgThroughputOut: Total throughput delivered to the consumer (bytes/s)

        • consumerName: Internal identifier for this consumer, generated by the client library

        • availablePermits: The number of messages this consumer has space for in the client library’s listen queue. 值为0意味着客户端类库的队列已经满了,receive()不再被调用。 非零值意味着 consumer 可以接收消息。

        • unackedMessages: Number of unacknowledged messages for the consumer

        • blockedConsumerOnUnackedMsgs: Flag to verify if the consumer is blocked due to reaching threshold of unacked messages

        • lastConsumedTimestamp: The timestamp of the consumer last consume a message

        • lastAckedTimestamp: The timestamp of the consumer last ack a message

  • replication: This section gives the stats for cross-colo replication of this topic

    • msgRateIn: Total rate of messages received from the remote cluster (msg/s)

    • msgThroughputIn: Total throughput received from the remote cluster (bytes/s)

    • msgRateOut: Total rate of messages delivered to the replication-subscriber (msg/s)

    • msgThroughputOut: Total through delivered to the replication-subscriber (bytes/s)

    • msgRateExpired: Total rate of messages expired (msg/s)

    • replicationBacklog: Number of messages pending to be replicated to remote cluster

    • connected: Whether the outbound replicator is connected

    • replicationDelayInSeconds: How long the oldest message has been waiting to be sent through the connection, if connected is true

    • inboundConnection: The IP and port of the broker in the remote cluster’s publisher connection to this broker

    • inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。

    • outboundConnection: Address of outbound replication connection

    • outboundConnectedSince: Timestamp of establishing outbound connection

  1. {
  2. "msgRateIn": 4641.528542257553,
  3. "msgThroughputIn": 44663039.74947473,
  4. "msgRateOut": 0,
  5. "msgThroughputOut": 0,
  6. "averageMsgSize": 1232439.816728665,
  7. "storageSize": 135532389160,
  8. "publishers": [
  9. {
  10. "msgRateIn": 57.855383881403576,
  11. "msgThroughputIn": 558994.7078932219,
  12. "averageMsgSize": 613135,
  13. "producerId": 0,
  14. "producerName": null,
  15. "address": null,
  16. "connectedSince": null
  17. }
  18. ],
  19. "subscriptions": {
  20. "my-topic_subscription": {
  21. "msgRateOut": 0,
  22. "msgThroughputOut": 0,
  23. "msgBacklog": 116632,
  24. "type": null,
  25. "msgRateExpired": 36.98245516804671,
  26. "consumers": []
  27. }
  28. },
  29. "replication": {}
  30. }

pulsar-admin

可以使用 stats 命令获取 topic 的统计信息。

  1. $ pulsar-admin persistent stats \
  2. persistent://test-tenant/ns1/tp1 \

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/stats

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getStats(topic);

获取内部统计信息

获取 topic 的详细统计信息。

  • entriesAddedCounter: Messages published since this broker loaded this topic

  • numberOfEntries: Total number of messages being tracked

  • totalSize: Total storage size in bytes of all messages

  • currentLedgerEntries: Count of messages written to the ledger currently open for writing

  • currentLedgerSize: Size in bytes of messages written to ledger currently open for writing

  • lastLedgerCreatedTimestamp: time when last ledger was created

  • lastLedgerCreationFailureTimestamp: time when last ledger was failed

  • waitingCursorsCount: How many cursors are “caught up” and waiting for a new message to be published

  • pendingAddEntriesCount: How many messages have (asynchronous) write requests we are waiting on completion

  • lastConfirmedEntry: The ledgerid:entryid of the last message successfully written. 如果 entryid 为 -1,则 ledger 已经允许写入或正在开放写入权限,但还没有写入 entry。

  • state: The state of this ledger for writing. LedgerOpened means we have a ledger open for saving published messages.

  • ledgers: The ordered list of all ledgers for this topic holding its messages

    • ledgerId: Id of this ledger

    • entries: Total number of entries belong to this ledger

    • size: Size of messages written to this ledger (in bytes)

    • offloaded: Whether this ledger is offloaded

  • cursors: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.

    • markDeletePosition: All of messages before the markDeletePosition are acknowledged by the subscriber.

    • readPosition: The latest position of subscriber for reading message

    • waitingReadOp: This is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published.

    • pendingReadOps: The counter for how many outstanding read requests to the BookKeepers we have in progress

    • messagesConsumedCounter: Number of messages this cursor has acked since this broker loaded this topic

    • cursorLedger: The ledger being used to persistently store the current markDeletePosition

    • cursorLedgerLastEntry: The last entryid used to persistently store the current markDeletePosition

    • individuallyDeletedMessages: If Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position

    • lastLedgerSwitchTimestamp: The last time the cursor ledger was rolled over

    • state: The state of the cursor ledger: Open means we have a cursor ledger for saving updates of the markDeletePosition.

  1. {
  2. "entriesAddedCounter": 20449518,
  3. "numberOfEntries": 3233,
  4. "totalSize": 331482,
  5. "currentLedgerEntries": 3233,
  6. "currentLedgerSize": 331482,
  7. "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
  8. "lastLedgerCreationFailureTimestamp": null,
  9. "waitingCursorsCount": 1,
  10. "pendingAddEntriesCount": 0,
  11. "lastConfirmedEntry": "324711539:3232",
  12. "state": "LedgerOpened",
  13. "ledgers": [
  14. {
  15. "ledgerId": 324711539,
  16. "entries": 0,
  17. "size": 0
  18. }
  19. ],
  20. "cursors": {
  21. "my-subscription": {
  22. "markDeletePosition": "324711539:3133",
  23. "readPosition": "324711539:3233",
  24. "waitingReadOp": true,
  25. "pendingReadOps": 0,
  26. "messagesConsumedCounter": 20449501,
  27. "cursorLedger": 324702104,
  28. "cursorLedgerLastEntry": 21,
  29. "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
  30. "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
  31. "state": "Open"
  32. }
  33. }
  34. }

pulsar-admin

可以使用 stats-internal 命令获取 topic 中数据的统计信息。

  1. $ pulsar-admin persistent stats-internal \
  2. persistent://test-tenant/ns1/tp1 \

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/internalStats

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getInternalStats(topic);

查看消息

查看指定 topic 中某个订阅的 N 条消息。

pulsar-admin

  1. $ pulsar-admin persistent peek-messages \
  2. --count 10 --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1 \
  4. Message ID: 315674752:0
  5. Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }
  6. msg-payload

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. int numMessages = 1;
  4. admin.persistentTopics().peekMessages(topic, subName, numMessages);

Get message by ID

It fetches the message with given ledger id and entry id.

pulsar-admin

  1. $ ./bin/pulsar-admin topics get-message-by-id \
  2. persistent://public/default/my-topic \
  3. -l 10 -e 0

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/ledger/:ledgerId/entry/:entryId

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. long ledgerId = 10;
  3. long entryId = 10;
  4. admin.persistentTopics().getMessageById(topic, ledgerId, entryId);

跳过消息

某个订阅跳过指定topic的N条消息。

pulsar-admin

  1. $ pulsar-admin persistent skip \
  2. --count 10 --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1 \

REST API

POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. int numMessages = 1;
  4. admin.persistentTopics().skipMessages(topic, subName, numMessages);

跳过所有消息

某个订阅跳过指定topic的所有消息

pulsar-admin

  1. $ pulsar-admin persistent skip-all \
  2. --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1 \

REST API

POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip_all

更多信息

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. admin.persistentTopics().skipAllMessages(topic, subName);

重置cursor

重置订阅的cursor位置回到X分钟之前被记录的位置。 实际上通过计算时间和X分钟之前的cursor位置,来重置回到那个位置。

pulsar-admin

  1. $ pulsar-admin persistent reset-cursor \
  2. --subscription my-subscription --time 10 \
  3. persistent://test-tenant/ns1/tp1 \

REST API

POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/resetcursor/:timestamp

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. long timestamp = 2342343L;
  4. admin.persistentTopics().skipAllMessages(topic, subName, timestamp);

查询topic

定位正在服务于指定topic的broker

pulsar-admin

  1. $ pulsar-admin persistent lookup \
  2. persistent://test-tenant/ns1/tp1 \
  3. "pulsar://broker1.org.com:4480"

REST API

GET /lookup/v2/topic/persistent/:tenant:namespace/:topic

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.lookup().lookupDestination(topic);

获取bundle

给出包含指定topic的bundle范围。

pulsar-admin

  1. $ pulsar-admin persistent bundle-range \
  2. persistent://test-tenant/ns1/tp1 \
  3. "0x00000000_0xffffffff"

REST API

GET /lookup/v2/topic/:topic_domain/:tenant/:namespace/:topic/bundle

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.lookup().getBundleRange(topic);

获取订阅

给出了指定topic的所有订阅的名称。

pulsar-admin

  1. $ pulsar-admin persistent subscriptions \
  2. persistent://test-tenant/ns1/tp1 \
  3. my-subscription

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/subscriptions

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getSubscriptions(topic);

取消订阅

当不再处理更多消息时,可以取消订阅

pulsar-admin

  1. $ pulsar-admin persistent unsubscribe \
  2. --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1 \

REST API

DELETE /admin/v2/namespaces/:tenant/:namespace/:topic/subscription/:subscription

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subscriptionName = "my-subscription";
  3. admin.persistentTopics().deleteSubscription(topic, subscriptionName);

最后一条消息Id

给出提交到持久topic的最后一条消息ID,将在2.3.0中提供此特性。

  1. pulsar-admin topics last-message-id topic-name

REST API

{% endpoint Get /admin/v2/persistent/:tenant/:namespace/:topic/lastMessageId %}

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getLastMessage(topic);