Manage topics

Pulsar 提供持久化与非持久化两种topic。 持久化topic是消息发布、消费的逻辑端点。 持久化主题地址的命名格式如下:

  1. persistent://tenant/namespace/topic

非持久主题应用在仅消费实时发布消息与不需要持久化保证的应用程序。 通过这种方式,它通过删除持久消息的开销来减少消息发布延迟。 非持久化topic地址的命名格式如下:

  1. non-persistent://tenant/namespace/topic

管理主题资源

无论是持久化还是非持久化主题,你可以通过pulsar-admin工具、REST API 、 Java 获取到主题资源。

备注
REST API 中 :schema 分为 persistent(持久化)和 non-persistent(非持久化)。 :tenant:namespace:x 是变量,在使用时候请注意使用真正的租户、命名空间,和 x 替换。
GET /admin/v2/:schema/:tenant/:namespace 为例, 若要获取 REST API 中持久化主题列表,请参考 https://pulsar.apache.org/admin/v2/persistent/my-tenant/my-namespace。 若要获取 REST API 中非持久化主题的列表,请参考 https://pulsar.apache.org/admin/v2/non-persistent/my-tenant/my-namespace

列出 topic

你可以通过以下方式获得特定命名空间下的 topic 列表。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics list \ my-tenant/my-namespace

GET /admin/v2/:schema/:tenant/:namespace

  1. String namespace = "my-tenant/my-namespace";admin.topics().getList(namespace);

授权

通过以下方式可以在客户端角色上授权,以便于在指定的主题上执行具体的操作。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics grant-permission \ --actions produce,consume --role application1 \ persistent://test-tenant/ns1/tp1 \

POST /admin/v2/:schema/:tenant/:namespace/:topic/permissions/:role

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String role = "test-role";Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);admin.topics().grantPermission(topic, role, actions);

获取权限

通过以下方式获取权限认证。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics permissions \ persistent://test-tenant/ns1/tp1 \{ "application1": [ "consume", "produce" ]}

GET /admin/v2/:schema/:tenant/:namespace/:topic/permissions

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getPermissions(topic);

取消权限

您可以通过以下方式撤销授予客户端角色的权限。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics revoke-permission \ --role application1 \ persistent://test-tenant/ns1/tp1 \{ "application1": [ "consume", "produce" ]}

DELETE /admin/v2/:schema/:tenant/:namespace/:topic/permissions/:role

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String role = "test-role";admin.topics().revokePermissions(topic, role);

删除 topic

您可以通过以下方式删除一个主题。 如果某个主题拥有任何活跃的订阅或者生产者,则不能对其删除。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics delete \ persistent://test-tenant/ns1/tp1 \

DELETE /admin/v2/:schema/:tenant/:namespace/:topic

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().delete(topic);

卸载 topic

通过以下方式卸载主题。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics unload \ persistent://test-tenant/ns1/tp1 \

PUT /admin/v2/:schema/:tenant/:namespace/:topic/unload

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().unload(topic);

获取统计信息

你可以检查给定的未分区主题的以下统计数据。

  • msgRateIn:所有本地和副本发布者每秒发布消息速率之和(msg/s)。

  • msgThroughputIn:所有本地和副本发布者每秒发布消息字节数之和(byte/s)。

  • msgRateOut:所有本地和副本消费者每秒调度消息率之和(msg/s)。

  • msgThroughputOut:所有本地和副本消费者每秒调度消息字节数之和(byte/s)。

  • averageMsgSize:在最近时间间隔内发布消息的平均大小(以字节为单位)。

  • storageSize: The sum of the ledgers’ storage size for this topic. 用于存储该主题消息的空间。

  • publishers: The list of all local publishers into the topic. 该列表的范围从零到千不等。

    • msgRateIn:发布者每秒发布消息的总速率(msg/s)。

    • msgThroughputIn:发布者发布消息的总吞吐量(byte/s)。

    • averageMsgSize:发布者在最近时间间隔内发布消息的平均大小(以字节为单位)。

    • producerId:该主题对应的生产者内部识别号。

    • producerName:由客户端生成的生产者内部识别名称。

    • address:连接生产者所需的 IP 地址和端口号。

    • connectedSince:生产者上次创建或者重新连接时的时间戳。

  • subscriptions:该主题下的所有本地订阅列表。

    • my-subscription:当前订阅的订阅名称。 通过客户端去定义。

      • msgRateOut:在此订阅中发送消息的总速率(msg/s)。

      • msgThroughputOut:在此订阅上发送消息的总吞吐量(byte/s)。

      • msgBacklog:在此订阅上积压的消息数量。

      • type:订阅类型。

      • msgRateExpired:由于 TTL 的原因,消息被丢弃而不是发送到此订阅中的比例。

      • lastExpireTimestamp:最后一条执行消息过期的时间戳。

      • lastConsumedFlowTimestamp:收到的最后一条流量指令的时间戳。

      • lastConsumedTimestamp:消费者所有已消费消息的最新时间戳。

      • lastAckedTimestamp:消费者所有已被 ack 消息的最新时间戳

      • consumers:连接到此订阅的消费者列表。

        • msgRateOut:发送给消费者的消息总速率(msg/s)。

        • msgThroughputOut: 发送给消费者的总吞吐量(byte/s)。

        • consumerName:由客户端生成的消费者内部标识符。

        • availablePermits:消费者在客户端库的监听队列中有空闲的消息数量。 0 意味着客户端库的队列已经满了,receive() 方法不会再接收消息。 非 0 值意味着该消费者可以接收消息。

        • unackedMessages:消费者未确认消息的数量。

        • blockedConsumerOnUnackedMsgs:验证消费者是否因达到未确认消息数的阀值而被阻塞。

        • lastConsumedTimestamp:消费者最后一次读取消息的时间戳。

        • lastAckedTimestamp:消费者最后一次确认消息的时间戳。

  • replication: This section gives the stats for cross-colo replication of this topic

    • msgRateIn:从远程集群中收到消息的总速率(msg/s)。

    • msgThroughputIn:从远程集群中收到消息的总吞吐量(byte/s)。

    • msgRateOut:发送给副本订阅的消息总速率(msg/s)。

    • msgThroughputOut:发送给副本订阅的消息总吞吐量(byte/s)。

    • msgRateExpired:过期消息的总速率(msg/s)。

    • replicationBacklog:待复制到远程集群的消息数量。

    • connected:验证外部副本连接器是否已经连接。

    • replicationDelayInSeconds:如果连接是 true 的话,最早消息的已等待被发送的时长。

    • inboundConnection:远程集群中的 broker 要连接到此 broker 的 IP 和端口。

    • inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。

    • outboundConnection:外部副本连接的地址。

    • outboundConnectedSince:建立对外连接时的时间戳。

下面是主题状态的示例:

  1. {
  2. "msgRateIn": 4641.528542257553,
  3. "msgThroughputIn": 44663039.74947473,
  4. "msgRateOut": 0,
  5. "msgThroughputOut": 0,
  6. "averageMsgSize": 1232439.816728665,
  7. "storageSize": 135532389160,
  8. "publishers": [
  9. {
  10. "msgRateIn": 57.855383881403576,
  11. "msgThroughputIn": 558994.7078932219,
  12. "averageMsgSize": 613135,
  13. "producerId": 0,
  14. "producerName": null,
  15. "address": null,
  16. "connectedSince": null
  17. }
  18. ],
  19. "subscriptions": {
  20. "my-topic_subscription": {
  21. "msgRateOut": 0,
  22. "msgThroughputOut": 0,
  23. "msgBacklog": 116632,
  24. "type": null,
  25. "msgRateExpired": 36.98245516804671,
  26. "consumers": []
  27. }
  28. },
  29. "replication": {}
  30. }

使用以下方式来获取一个主题的状态:

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics stats \ persistent://test-tenant/ns1/tp1 \

GET /admin/v2/:schema/:tenant/:namespace/:topic/stats

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getStats(topic);

获取内部统计信息

主题中的详细统计数据如下:

  • entriesAddedCounter:自此 broker 加载该主题以来发布的消息总量。

  • numberOfEntries:追踪消息的总数。

  • totalSize:消息的总存储大小(以字节为单位)。

  • currentLedgerEntries:当前打开写入操作的 ledger 中,写入的消息总数。

  • currentLedgerSize:当前打开写入操作的 ledger 中,写入的消息字节大小。

  • lastLedgerCreatedTimestamp:最后一个 ledger 创建的时间。

  • lastLedgerCreationFailureTimestamp: 最后一个 ledger 失败的时间。

  • waitingCursorsCount:等待新消息发布并标记为 “caught up”的游标数量。

  • pendingAddEntriesCount:完成(异步)写请求的消息数。

  • lastConfirmedEntry:最后一条成功写入消息的 ledger:entryid。 如果 entryid 为 -1,即 ledger 是开启状态,但没有写入任何的 entry 。

  • state: The state of this ledger for writing. LedgerOpened 意味着 ledger 是开启状态,可以保存已发布的消息。

  • ledgers:主题中所保存消息的有序 ledger 列表。

    • ledgerId: 此 ledger 的 ID。

    • entries:属于该 ledger 的 entry 总数。

    • size:写入该 ledger 的消息大小(以字节为单位)。

    • offloaded:该 ledger 是否已卸载。

    • metadata: 该 ledger 的元数据。

  • schemaLedgers:该主题模式下所有 ledger 的有序列表。

    • ledgerId: 此 ledger 的 ID。

    • entries:属于该 ledger 的 entry 总数。

    • size:写入该 ledger 的消息大小(以字节为单位)。

    • offloaded:该 ledger 是否已卸载。

    • metadata: 该 ledger 的元数据。

  • compactedLedger: The ledgers holding un-acked messages after topic compaction.

    • ledgerId: 此 ledger 的 ID。

    • entries:属于该 ledger 的 entry 总数。

    • size:写入该 ledger 的消息大小(以字节为单位)。

    • offloaded:该 ledger 是否已卸载。 The value is false for the compacted topic ledger.

  • cursors: The list of all cursors on this topic. Each subscription in the topic stats has a cursor.

    • markDeletePosition: All messages before the markDeletePosition are acknowledged by the subscriber.

    • readPosition: The latest position of subscriber for reading message.

    • waitingReadOp: This is true when the subscription has read the latest message published to the topic and is waiting for new messages to be published.

    • pendingReadOps: The counter for how many outstanding read requests to the BookKeepers in progress.

    • messagesConsumedCounter: The number of messages this cursor has acked since this broker loaded this topic.

    • cursorLedger: The ledger being used to persistently store the current markDeletePosition.

    • cursorLedgerLastEntry: The last entryid used to persistently store the current markDeletePosition.

    • individuallyDeletedMessages: If acknowledges are being done out of order, the ranges of messages acknowledged between the markDeletePosition and the read-position shows.

    • lastLedgerSwitchTimestamp: The last time the cursor ledger is rolled over.

    • state: The state of the cursor ledger: Open means you have a cursor ledger for saving updates of the markDeletePosition.

下面是关于一个主题的详细统计示例。

  1. {
  2. "entriesAddedCounter":0,
  3. "numberOfEntries":0,
  4. "totalSize":0,
  5. "currentLedgerEntries":0,
  6. "currentLedgerSize":0,
  7. "lastLedgerCreatedTimestamp":"2021-01-22T21:12:14.868+08:00",
  8. "lastLedgerCreationFailureTimestamp":null,
  9. "waitingCursorsCount":0,
  10. "pendingAddEntriesCount":0,
  11. "lastConfirmedEntry":"3:-1",
  12. "state":"LedgerOpened",
  13. "ledgers":[
  14. {
  15. "ledgerId":3,
  16. "entries":0,
  17. "size":0,
  18. "offloaded":false,
  19. "metadata":null
  20. }
  21. ],
  22. "cursors":{
  23. "test":{
  24. "markDeletePosition":"3:-1",
  25. "readPosition":"3:-1",
  26. "waitingReadOp":false,
  27. "pendingReadOps":0,
  28. "messagesConsumedCounter":0,
  29. "cursorLedger":4,
  30. "cursorLedgerLastEntry":1,
  31. "individuallyDeletedMessages":"[]",
  32. "lastLedgerSwitchTimestamp":"2021-01-22T21:12:14.966+08:00",
  33. "state":"Open",
  34. "numberOfEntriesSinceFirstNotAckedMessage":0,
  35. "totalNonContiguousDeletedMessagesRange":0,
  36. "properties":{
  37. }
  38. }
  39. },
  40. "schemaLedgers":[
  41. {
  42. "ledgerId":1,
  43. "entries":11,
  44. "size":10,
  45. "offloaded":false,
  46. "metadata":null
  47. }
  48. ],
  49. "compactedLedger":{
  50. "ledgerId":-1,
  51. "entries":-1,
  52. "size":-1,
  53. "offloaded":false,
  54. "metadata":null
  55. }
  56. }

可以使用以下方式来获取一个主题的内部状态。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics stats-internal \ persistent://test-tenant/ns1/tp1 \

GET /admin/v2/:schema/:tenant/:namespace/:topic/internalStats

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getInternalStats(topic);

查看消息

可以使用以下方式为某一主题的特定订阅提供一些信息。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics peek-messages \ --count 10 --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \Message ID: 315674752:0Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }msg-payload

GET /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";int numMessages = 1;admin.topics().peekMessages(topic, subName, numMessages);

Get message by ID

可以使用以下方式获取给定 ledger ID 和 entry ID 的信息。

pulsar-admin

REST API

Java

  1. $ ./bin/pulsar-admin topics get-message-by-id \ persistent://public/default/my-topic \ -l 10 -e 0

GET /admin/v2/:schema/:tenant/:namespace/:topic/ledger/:ledgerId/entry/:entryId

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";long ledgerId = 10;long entryId = 10;admin.topics().getMessageById(topic, ledgerId, entryId);

跳过消息

可以使用以下方式跳过某一主题的特定订阅的一些信息。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics skip \ --count 10 --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \

POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";int numMessages = 1;admin.topics().skipMessages(topic, subName, numMessages);

跳过所有消息

跳过某一主题的特定订阅的所有旧消息。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics skip-all \ --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \

POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/skip_all

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";admin.topics().skipAllMessages(topic, subName);

重置cursor

可以将一个订阅的游标位置重新设置为 X 分钟前记录的位置。 实际上通过计算时间和X分钟之前的cursor位置,来重置回到那个位置。 你可以用下面方式重置 cursor。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics reset-cursor \ --subscription my-subscription --time 10 \ persistent://test-tenant/ns1/tp1 \

POST /admin/v2/:schema/:tenant/:namespace/:topic/subscription/:subName/resetcursor/:timestamp

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String subName = "my-subscription";long timestamp = 2342343L;admin.topics().skipAllMessages(topic, subName, timestamp);

查询topic

可以通过以下方式找到服务于特定主题的 broker URL。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics lookup \ persistent://test-tenant/ns1/tp1 \ "pulsar://broker1.org.com:4480"

GET /lookup/v2/topic/:schema/:tenant:namespace/:topic

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.lookup().lookupDestination(topic);

获取bundle

可以通过以下方式检查包含给定主题的 bundle 范围。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics bundle-range \ persistent://test-tenant/ns1/tp1 \ "0x00000000_0xffffffff"

GET /lookup/v2/topic/:topic_domain/:tenant/:namespace/:topic/bundle

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.lookup().getBundleRange(topic);

获取订阅

可以通过以下方式查看某个主题的所有订阅名称。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics subscriptions \ persistent://test-tenant/ns1/tp1 \ my-subscription

GET /admin/v2/:schema/:tenant/:namespace/:topic/subscriptions

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getSubscriptions(topic);

取消订阅

当订阅不再处理消息时,可以通过以下方式取消订阅。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics unsubscribe \ --subscription my-subscription \ persistent://test-tenant/ns1/tp1 \

DELETE /admin/v2/namespaces/:tenant/:namespace/:topic/subscription/:subscription

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";String subscriptionName = "my-subscription";admin.topics().deleteSubscription(topic, subscriptionName);

最后一条消息Id

可以获得一个持久化主题最后提交的消息 ID 。 2.3.0 往后版本都可用。

pulsar-admin

REST API

Java

  1. pulsar-admin topics last-message-id topic-name

Get /admin/v2/:schema/:tenant/:namespace/:topic/lastMessageId?version=2.8.0

  1. String topic = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getLastMessage(topic);

管理非分区主题

可以使用 Pulsar admin API 来创建、删除和检查非分区主题的状态。

创建

必须明确创建非分区主题。 当创建一个新的非分区主题时,需要为该主题提供一个名称。

默认情况下,创建 60 秒后,主题会被视为不活跃,并自动删除,以避免生成垃圾数据。 To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. 设置 brokerDeleteInactiveTopicsFrequencySeconds 为特殊值以改变检查非活动主题的频率。

关于这两个参数的更多信息,请参阅 这里

可以通过以下方式创建非分区主题。

pulsar-admin

REST API

Java

When you create non-partitioned topics with the create command, you need to specify the topic name as an argument.

  1. $ bin/pulsar-admin topics create \ persistent://my-tenant/my-namespace/my-topic

备注
当你创建一个后缀为 ‘’-partition-‘ 的非分区主题时,主题名称后跟数字值,如 ‘xyz-topic-partition-x’ ,如果存在 ‘xyz-topic-partition-y’ 的分区主题,那么非分区主题的数字值(x)必须大于分区主题的分区数(y)。 否则就无法创建未分区主题。

PUT /admin/v2/:schema/:tenant/:namespace/:topic

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().createNonPartitionedTopic(topicName);

删除

可以通过以下方式来删除非分区主题。

pulsar-admin

REST API

Java

  1. $ bin/pulsar-admin topics delete \ persistent://my-tenant/my-namespace/my-topic

DELETE /admin/v2/:schema/:tenant/:namespace/:topic

  1. admin.topics().delete(topic);

获取资源列表

你可以通过以下方式获得特定命名空间下的 topic 列表。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics list tenant/namespacepersistent://tenant/namespace/topic1persistent://tenant/namespace/topic2

GET /admin/v2/:schema/:tenant/:namespace

  1. admin.topics().getList(namespace);

统计信息

检查某个主题的当前统计数据。 The following is an example. 关于每个统计数据的描述,参见 get stats

  1. {
  2. "msgRateIn": 4641.528542257553,
  3. "msgThroughputIn": 44663039.74947473,
  4. "msgRateOut": 0,
  5. "msgThroughputOut": 0,
  6. "averageMsgSize": 1232439.816728665,
  7. "storageSize": 135532389160,
  8. "publishers": [
  9. {
  10. "msgRateIn": 57.855383881403576,
  11. "msgThroughputIn": 558994.7078932219,
  12. "averageMsgSize": 613135,
  13. "producerId": 0,
  14. "producerName": null,
  15. "address": null,
  16. "connectedSince": null
  17. }
  18. ],
  19. "subscriptions": {
  20. "my-topic_subscription": {
  21. "msgRateOut": 0,
  22. "msgThroughputOut": 0,
  23. "msgBacklog": 116632,
  24. "type": null,
  25. "msgRateExpired": 36.98245516804671,
  26. "consumers": []
  27. }
  28. },
  29. "replication": {}
  30. }

可以通过以下方式检查某个主题及其相关生产者和消费者的当前统计数据。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics stats \ persistent://test-tenant/namespace/topic \ --get-precise-backlog

GET /admin/v2/:schema/:tenant/:namespace/:topic/stats

  1. admin.topics().getStats(topic, false /* is precise backlog */);

管理分区主题

可以使用 Pulsar admin API 来创建、更新、删除和检查分区主题的状态。

创建

必须明确创建分区主题。 当创建一个新的分区主题时,需要为该主题提供一个名称和分区数量。

默认情况下,创建 60 秒后,主题会被视为不活跃,并自动删除,以避免生成垃圾数据。 To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. 设置 brokerDeleteInactiveTopicsFrequencySeconds 为特殊值以改变检查非活动主题的频率。

关于这两个参数的更多信息,请参阅 这里

可以通过以下方式创建分区主题。

pulsar-admin

REST API

Java

When you create partitioned topics with the create-partitioned-topic command, you need to specify the topic name as an argument and the number of partitions using the -p or --partitions flag.

  1. $ bin/pulsar-admin topics create-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic \ --partitions 4

备注
如果一个非分区主题的后缀是 ‘-partition-‘ 并且后面跟着一个数字值,比如 ‘xyz-topic-partition-10’,那么就不能创建名为 ‘xyz-topic’ 的分区主题,因为分区主题的分区可以覆盖现有的非分区主题。 必须删除上述的非分区主题,才能创建该分区主题。

PUT /admin/v2/:schema/:tenant/:namespace/:topic/partitions

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";int numPartitions = 4;admin.topics().createPartitionedTopic(topicName, numPartitions);

Create missed partitions

在主题的 auto-creation 是禁用状态并且有一个没有任何分区的主题时,可以使用 create-missed-partitions 命令为主题创建分区。

pulsar-admin

REST API

Java

可以用 create-missed-partitions 命令指定主题名称作为参数来创建 miss 分区。

  1. $ bin/pulsar-admin topics create-missed-partitions \ persistent://my-tenant/my-namespace/my-topic \

POST /admin/v2/:schema/:tenant/:namespace/:topic

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().createMissedPartitions(topicName);

获取元数据

已分区的主题与元数据相关联,可以将其看作一个 JSON 对象。 以下元数据字段是可用的。

字段说明
分区主题中的分区数量。

pulsar-admin

REST API

Java

可以通过 get-partitioned-topic-metadata 子命令检查分区主题的分区数量。

  1. $ pulsar-admin topics get-partitioned-topic-metadata \ persistent://my-tenant/my-namespace/my-topic{ "partitions": 4}

GET /admin/v2/:schema/:tenant/:namespace/:topic/partitions

  1. String topicName = "persistent://my-tenant/my-namespace/my-topic";admin.topics().getPartitionedTopicMetadata(topicName);

更新

如果 主题是非全局的,你可以更新现有已分区主题的分区数量。 然而,你只能添加分区号。 减少分区的数量就会删除对应主题,在 Pulsar 中是不支持的。

生产者和消费者可以自动找到新创建的分区。

pulsar-admin

REST API

Java

可以使用 update-partitioned-topic 命令更新分区主题。

  1. $ pulsar-admin topics update-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic \ --partitions 8

POST /admin/v2/:schema/:tenant/:cluster/:namespace/:destination/partitions

  1. admin.topics().updatePartitionedTopic(topic, numPartitions);

删除

可以使用 delete-partitioned-topic 命令、REST API 或者 Java 删除分区主题。

pulsar-admin

REST API

Java

  1. $ bin/pulsar-admin topics delete-partitioned-topic \ persistent://my-tenant/my-namespace/my-topic

DELETE /admin/v2/:schema/:topic/:namespace/:destination/partitions

  1. admin.topics().delete(topic);

获取资源列表

你可以通过以下方式获得特定命名空间下的 topic 列表。

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics list tenant/namespacepersistent://tenant/namespace/topic1persistent://tenant/namespace/topic2

GET /admin/v2/:schema/:tenant/:namespace

  1. admin.topics().getList(namespace);

统计信息

可以查看某个主题的当前统计数据。 The following is an example. 关于每个统计数据的描述,参见 get stats

请注意,在订阅的 JSON 对象中, chuckedMessageRate 已被废弃。 请使用 chunkedMessageRate。 两者目前都将发送到 JSON 中。

  1. {
  2. "msgRateIn" : 999.992947159793,
  3. "msgThroughputIn" : 1070918.4635439808,
  4. "msgRateOut" : 0.0,
  5. "msgThroughputOut" : 0.0,
  6. "bytesInCounter" : 270318763,
  7. "msgInCounter" : 252489,
  8. "bytesOutCounter" : 0,
  9. "msgOutCounter" : 0,
  10. "averageMsgSize" : 1070.926056966454,
  11. "msgChunkPublished" : false,
  12. "storageSize" : 270316646,
  13. "backlogSize" : 200921133,
  14. "publishers" : [ {
  15. "msgRateIn" : 999.992947159793,
  16. "msgThroughputIn" : 1070918.4635439808,
  17. "averageMsgSize" : 1070.3333333333333,
  18. "chunkedMessageRate" : 0.0,
  19. "producerId" : 0
  20. } ],
  21. "subscriptions" : {
  22. "test" : {
  23. "msgRateOut" : 0.0,
  24. "msgThroughputOut" : 0.0,
  25. "bytesOutCounter" : 0,
  26. "msgOutCounter" : 0,
  27. "msgRateRedeliver" : 0.0,
  28. "chuckedMessageRate" : 0,
  29. "chunkedMessageRate" : 0,
  30. "msgBacklog" : 144318,
  31. "msgBacklogNoDelayed" : 144318,
  32. "blockedSubscriptionOnUnackedMsgs" : false,
  33. "msgDelayed" : 0,
  34. "unackedMessages" : 0,
  35. "msgRateExpired" : 0.0,
  36. "lastExpireTimestamp" : 0,
  37. "lastConsumedFlowTimestamp" : 0,
  38. "lastConsumedTimestamp" : 0,
  39. "lastAckedTimestamp" : 0,
  40. "consumers" : [ ],
  41. "isDurable" : true,
  42. "isReplicated" : false
  43. }
  44. },
  45. "replication" : { },
  46. "metadata" : {
  47. "partitions" : 3
  48. },
  49. "partitions" : { }
  50. }

You can check the current statistics of a given partitioned topic and its connected producers and consumers in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics partitioned-stats \ persistent://test-tenant/namespace/topic \ --per-partition

GET /admin/v2/:schema/:tenant/:namespace/:topic/partitioned-stats

  1. admin.topics().getPartitionedStats(topic, true /* per partition */, false /* is precise backlog */);

Internal stats

You can check the detailed statistics of a topic. The following is an example. For description of each stats, refer to get internal stats.

  1. {
  2. "entriesAddedCounter": 20449518,
  3. "numberOfEntries": 3233,
  4. "totalSize": 331482,
  5. "currentLedgerEntries": 3233,
  6. "currentLedgerSize": 331482,
  7. "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
  8. "lastLedgerCreationFailureTimestamp": null,
  9. "waitingCursorsCount": 1,
  10. "pendingAddEntriesCount": 0,
  11. "lastConfirmedEntry": "324711539:3232",
  12. "state": "LedgerOpened",
  13. "ledgers": [
  14. {
  15. "ledgerId": 324711539,
  16. "entries": 0,
  17. "size": 0
  18. }
  19. ],
  20. "cursors": {
  21. "my-subscription": {
  22. "markDeletePosition": "324711539:3133",
  23. "readPosition": "324711539:3233",
  24. "waitingReadOp": true,
  25. "pendingReadOps": 0,
  26. "messagesConsumedCounter": 20449501,
  27. "cursorLedger": 324702104,
  28. "cursorLedgerLastEntry": 21,
  29. "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
  30. "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
  31. "state": "Open"
  32. }
  33. }
  34. }

You can get the internal stats for the partitioned topic in the following ways.

pulsar-admin

REST API

Java

  1. $ pulsar-admin topics stats-internal \ persistent://test-tenant/namespace/topic

GET /admin/v2/:schema/:tenant/:namespace/:topic/internalStats

  1. admin.topics().getInternalStats(topic);

发布到分区主题

By default, Pulsar topics are served by a single broker, which limits the maximum throughput of a topic. Partitioned topics can span multiple brokers and thus allow for higher throughput.

You can publish to partitioned topics using Pulsar client libraries. When publishing to partitioned topics, you must specify a routing mode. If you do not specify any routing mode when you create a new producer, the round robin routing mode is used.

Routing mode

You can specify the routing mode in the ProducerConfiguration object that you use to configure your producer. The routing mode determines which partition(internal topic) that each message should be published to.

The following MessageRoutingMode options are available.

发送模式说明
RoundRobinPartition如果没有提供 key,生产者将在所有分区中以轮训策略进行发布消息,以达到最大的吞吐量。 请注意轮训并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。 如果在消息上指定了 key ,分区生产者会根据 key 的 hash 值将消息分配给对应的分区。 这是默认的模式。
SinglePartition如果消息没有指定 key,生产者会随机挑选一个分区,并发布所有消息到该分区。 如果在消息上指定了 key ,分区生产者会根据 key 的 hash 值将消息分配给对应的分区。
CustomPartition使用自定义消息路由器实现来决定特定消息的分区。 你可以通过使用 Java 客户端和实现 MessageRouter 接口来创建一个自定义路由模式。

如下所示:

  1. String pulsarBrokerRootUrl = "pulsar://localhost:6650";
  2. String topic = "persistent://my-tenant/my-namespace/my-topic";
  3. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
  4. Producer<byte[]> producer = pulsarClient.newProducer()
  5. .topic(topic)
  6. .messageRoutingMode(MessageRoutingMode.SinglePartition)
  7. .create();
  8. producer.send("Partitioned topic message".getBytes());

Custom message router

要使用自定义消息路由器,您需要提供MessageRouter 接口的实现,该接口只有一个choosePartition方法:

  1. public interface MessageRouter extends Serializable {
  2. int choosePartition(Message msg);
  3. }

下面的路由模式表示所有的消息都会被发送到分区10:

  1. public class AlwaysTenRouter implements MessageRouter {
  2. public int choosePartition(Message msg) {
  3. return 10;
  4. }
  5. }

With that implementation, you can send

  1. String pulsarBrokerRootUrl = "pulsar://localhost:6650";
  2. String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";
  3. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
  4. Producer<byte[]> producer = pulsarClient.newProducer()
  5. .topic(topic)
  6. .messageRouter(new AlwaysTenRouter())
  7. .create();
  8. producer.send("Partitioned topic message".getBytes());

使用 Key 时如何选择分区

If a message has a key, it supersedes the round robin routing policy. The following example illustrates how to choose the partition when using a key.

  1. // 如果消息存在key,轮询路由策略将被替换
  2. if (msg.hasKey()) {
  3. return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
  4. }
  5. if (isBatchingEnabled) { // 如果开启批处理,请在 `partitionSwitchMs` 边界上选择分区。
  6. long currentMs = clock.millis();
  7. return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartiations());
  8. other.
  9. return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartiations());
  10. }