持久化有助于访问 topic,因为 topic 是发布和消费消息的逻辑端点。 Producer 发布消息到 topic,consumer 订阅 topic,并消费该 topic 上的消息。

以下说明和命令中所使用 topic 的名称结构如下:

  1. persistent://tenant/namespace/topic

持久 topic 资源

列出 topic

以列表的形式列出指定命名空间下所有持久 topic。

pulsar-admin

可以使用 list 命令获取 topic 列表。

  1. $ pulsar-admin persistent list \
  2. my-tenant/my-namespace

REST API

GET /admin/v2/persistent/:tenant/:namespace

Java

  1. String namespace = "my-tenant/my-namespace";
  2. admin.persistentTopics().getList(namespace);

授权

授权给客户端用户,允许其在指定的 topic 上执行某些操作。

pulsar-admin

可以使用 grant-permission 命令授权。

  1. $ pulsar-admin persistent grant-permission \
  2. --actions produce,consume --role application1 \
  3. persistent://test-tenant/ns1/tp1 \

REST API

POST /admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String role = "test-role";
  3. Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);
  4. admin.persistentTopics().grantPermission(topic, role, actions);

获取权限

可以使用 permissions 命令获取权限。

pulsar-admin

  1. $ pulsar-admin persistent permissions \
  2. persistent://test-tenant/ns1/tp1 \
  3. {
  4. "application1": [
  5. "consume",
  6. "produce"
  7. ]
  8. }

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/permissions

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getPermissions(topic);

取消权限

取消已经授予客户端用户的权限。

pulsar-admin

可以使用 revoke-permission 命令取消权限。

  1. $ pulsar-admin persistent revoke-permission \
  2. --role application1 \
  3. persistent://test-tenant/ns1/tp1 \
  4. {
  5. "application1": [
  6. "consume",
  7. "produce"
  8. ]
  9. }

REST API

DELETE /admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String role = "test-role";
  3. admin.persistentTopics().revokePermissions(topic, role);

删除 topic

It deletes a topic. The topic cannot be deleted if there’s any active subscription or producers connected to it.

pulsar-admin

可以使用 delete 命令删除 topic。

  1. $ pulsar-admin persistent delete \
  2. persistent://test-tenant/ns1/tp1 \

REST API

DELETE /admin/v2/persistent/:tenant/:namespace/:topic

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().delete(topic);

卸载 topic

可以卸载 topic。

pulsar-admin

可以使用 unload 命令卸载 topic。

  1. $ pulsar-admin persistent unload \
  2. persistent://test-tenant/ns1/tp1 \

REST API

PUT /admin/v2/persistent/:tenant/:namespace/:topic/unload

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().unload(topic);

获取统计信息

获取非持久化 topic 的统计数据。

  • msgRateIn:本地发布者和副本发布者每秒发布消息速率和

  • msgThroughputIn:同上,但不是每秒发布消息数,而是每秒发布消息的字节数

  • msgRateOut:本地 consumer 和副本 consumer 每秒分发消息数量和

  • msgThroughputOut:同上,但不是每秒分发消息数,而是每秒分发消息的字节数

  • averageMsgSize:最后一次间隔内发布消息的平均字节大小

  • storageSize: The sum of the ledgers’ storage size for this topic. See

  • publishers: The list of all local publishers into the topic. There can be zero or thousands

  • averageMsgSize:此发布者在最后一个间隔内发布消息的平均字节大小

  • producerId:此 topic 内 producer 的内部标识符

  • producerName:客户端库为 producer 生成的内部标识符

  • address:连接到 producer 的 IP 地址和 source 端口

  • connectedSince:Producer 创建时或上次连接到此 producer 时的时间戳

  • subscriptions:Topic 所有本地订阅的列表

  • my-subscription:订阅名称(客户端定义)

  • msgBacklog: 本订阅在backlog中的消息数量

  • type:订阅类型

  • msgRateExpired: 由于TTL,此订阅下没有被发送而是被丢弃的比例。

  • consumers:连接到此订阅的 consumer 列表

  • consumerName:客户端库为 consumer 生成的内部标识符

  • availablePermits:Consumer 在客户端库监听队列中可以容纳的消息数量。 值为0意味着客户端类库的队列已经满了,receive()不再被调用。 非零值意味着 consumer 可以接收消息。

  • replication:Topic 交叉副本的统计信息。

  • replicationBacklog: 消息对外复制的backlog

  • connected:外部复制器是否已经连接

  • replicationDelayInSeconds:如果已连接,最老的消息已经等待被发送的时长。

  • inboundConnection:连接到此 broker 的远程集群发布者 broker 的 IP 和端口。

  • inboundConnectedSince:正在发布消息到远程集群的 TCP 连接。 如果没有连接到本地发布者,一分钟后连接将自动关闭。

  1. {
  2. "msgRateIn": 4641.528542257553,
  3. "msgThroughputIn": 44663039.74947473,
  4. "msgRateOut": 0,
  5. "msgThroughputOut": 0,
  6. "averageMsgSize": 1232439.816728665,
  7. "storageSize": 135532389160,
  8. "publishers": [
  9. {
  10. "msgRateIn": 57.855383881403576,
  11. "msgThroughputIn": 558994.7078932219,
  12. "averageMsgSize": 613135,
  13. "producerId": 0,
  14. "producerName": null,
  15. "address": null,
  16. "connectedSince": null
  17. }
  18. ],
  19. "subscriptions": {
  20. "my-topic_subscription": {
  21. "msgRateOut": 0,
  22. "msgThroughputOut": 0,
  23. "msgBacklog": 116632,
  24. "type": null,
  25. "msgRateExpired": 36.98245516804671,
  26. "consumers": []
  27. }
  28. },
  29. "replication": {}
  30. }

pulsar-admin

可以使用 stats 命令获取 topic 的统计信息。

  1. $ pulsar-admin persistent stats \
  2. persistent://test-tenant/ns1/tp1 \

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/stats

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getStats(topic);

获取内部统计信息

获取 topic 的详细统计信息。

  • entriesAddedCounter:加载自此 broker 起发布的消息到 topic

  • numberOfEntries:追踪消息总数

  • totalSize:消息总存储量(以 byte 为单位)

  • currentLedgerEntries:当前开放写入操作的 ledger 中,写入消息总量

  • currentLedgerSize:当前开放写入操作的 ledger 中,写入消息总量(以 byte 为单位)

  • lastLedgerCreatedTimestamp:最后一个 ledger 创建的时间

  • lastLedgerCreationFailureTimestamp: 最后一次 ledger 创建失败的时间

  • waitingCursorsCount:与消息发布进度同步,等待新消息发布的游标数量

  • pendingAddEntriesCount:有等待完成的(异步)写请求的消息数量

  • lastConfirmedEntry:最后一条成功写入消息的 ledgerid:entryid。 如果 entryid 为 -1,则 ledger 已经允许写入或正在开放写入权限,但还没有写入 entry。

  • state: The state of this ledger for writing. LedgerOpened means we have a ledger open for saving published messages.

  • ledgers:topic 中所保存消息的有序 ledger 列表。

  • cursors: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.

  • markDeletePosition:ack的位置:订阅者确认收到的最后一条消息

  • readPosition:订阅者最后读取消息的位置

  • waitingReadOp:当订阅者已经读取了发布到topic的最新消息,并且等待新消息发布时,值为true。

  • pendingReadOps:已经发往BookKeeper,进行中的读请求数量计数器

  • messagesConsumedCounter:此broker加载本topic以来,此cursor确认的消息数量。

  • cursorLedger:被用来持久化存储当前markDeletePosition的ledger

  • cursorLedgerLastEntry:用来持久化存储当前markDeletePosition的最后一个entryid

  • individuallyDeletedMessages:在确认不符合顺序的情况下,显示了markDeletePosition和读位置间的消息确认范围

  • lastLedgerSwitchTimestamp: cursor ledger最后一次滚动的时间

  • state:cursor ledger的状态:Open代表有一个可用的cursor ledger来保存markDeletePosition的更新。

  1. {
  2. "entriesAddedCounter": 20449518,
  3. "numberOfEntries": 3233,
  4. "totalSize": 331482,
  5. "currentLedgerEntries": 3233,
  6. "currentLedgerSize": 331482,
  7. "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
  8. "lastLedgerCreationFailureTimestamp": null,
  9. "waitingCursorsCount": 1,
  10. "pendingAddEntriesCount": 0,
  11. "lastConfirmedEntry": "324711539:3232",
  12. "state": "LedgerOpened",
  13. "ledgers": [
  14. {
  15. "ledgerId": 324711539,
  16. "entries": 0,
  17. "size": 0
  18. }
  19. ],
  20. "cursors": {
  21. "my-subscription": {
  22. "markDeletePosition": "324711539:3133",
  23. "readPosition": "324711539:3233",
  24. "waitingReadOp": true,
  25. "pendingReadOps": 0,
  26. "messagesConsumedCounter": 20449501,
  27. "cursorLedger": 324702104,
  28. "cursorLedgerLastEntry": 21,
  29. "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
  30. "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
  31. "state": "Open"
  32. }
  33. }
  34. }

pulsar-admin

可以使用 stats-internal 命令获取 topic 中数据的统计信息。

  1. $ pulsar-admin persistent stats-internal \
  2. persistent://test-tenant/ns1/tp1 \

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/internalStats

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getInternalStats(topic);

查看消息

查看指定 topic 中某个订阅的 N 条消息。

pulsar-admin

  1. $ pulsar-admin persistent peek-messages \
  2. --count 10 --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1 \
  4. Message ID: 315674752:0
  5. Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }
  6. msg-payload

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. int numMessages = 1;
  4. admin.persistentTopics().peekMessages(topic, subName, numMessages);

跳过消息

某个订阅跳过指定topic的N条消息。

pulsar-admin

  1. $ pulsar-admin persistent skip \
  2. --count 10 --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1 \

REST API

POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. int numMessages = 1;
  4. admin.persistentTopics().skipMessages(topic, subName, numMessages);

跳过所有消息

某个订阅跳过指定topic的所有消息

pulsar-admin

  1. $ pulsar-admin persistent skip-all \
  2. --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1 \

REST API

POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip_all

更多信息

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. admin.persistentTopics().skipAllMessages(topic, subName);

重置cursor

重置订阅的cursor位置回到X分钟之前被记录的位置。 实际上通过计算时间和X分钟之前的cursor位置,来重置回到那个位置。

pulsar-admin

  1. $ pulsar-admin persistent reset-cursor \
  2. --subscription my-subscription --time 10 \
  3. persistent://test-tenant/ns1/tp1 \

REST API

POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/resetcursor/:timestamp

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subName = "my-subscription";
  3. long timestamp = 2342343L;
  4. admin.persistentTopics().skipAllMessages(topic, subName, timestamp);

查询topic

定位正在服务于指定topic的broker

pulsar-admin

  1. $ pulsar-admin persistent lookup \
  2. persistent://test-tenant/ns1/tp1 \
  3. "pulsar://broker1.org.com:4480"

REST API

GET /lookup/v2/topic/persistent/:tenant:namespace/:topic

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.lookup().lookupDestination(topic);

获取bundle

给出包含指定topic的bundle范围。

pulsar-admin

  1. $ pulsar-admin persistent bundle-range \
  2. persistent://test-tenant/ns1/tp1 \
  3. "0x00000000_0xffffffff"

REST API

GET /lookup/v2/topic/:topic_domain/:tenant/:namespace/:topic/bundle

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.lookup().getBundleRange(topic);

获取订阅

给出了指定topic的所有订阅的名称。

pulsar-admin

  1. $ pulsar-admin persistent subscriptions \
  2. persistent://test-tenant/ns1/tp1 \
  3. my-subscription

REST API

GET /admin/v2/persistent/:tenant/:namespace/:topic/subscriptions

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getSubscriptions(topic);

取消订阅

当不再处理更多消息时,可以取消订阅

pulsar-admin

  1. $ pulsar-admin persistent unsubscribe \
  2. --subscription my-subscription \
  3. persistent://test-tenant/ns1/tp1 \

REST API

DELETE /admin/v2/namespaces/:tenant/:namespace/:topic/subscription/:subscription

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. String subscriptionName = "my-subscription";
  3. admin.persistentTopics().deleteSubscription(topic, subscriptionName);

最后一条消息Id

给出提交到持久topic的最后一条消息ID,将在2.3.0中提供此特性。

  1. pulsar-admin topics last-message-id topic-name

REST API

{% endpoint Get /admin/v2/persistent/:tenant/:namespace/:topic/lastMessageId %}

Java

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";
  2. admin.persistentTopics().getLastMessage(topic);