Pulsar command-line tools

Pulsar 提供了一些命令行工具,可以用于管理 Pulsar 的安装、测试性能、使用命令行生产者与消费者,等等。

All Pulsar command-line tools can be run from the bin directory of your installed Pulsar package. The following tools are currently documented:

获取帮助

You can get help for any CLI tool, command, or subcommand using the --help flag, or -h for short. 下面是一个示例:

  1. $ bin/pulsar broker --help

pulsar

Pulsar 工具用于在前台启动 Pulsar 组件,比如 bookie 和 ZooKeeper。

这些进程也可以在后台启动,使用 nohup,使用 pulsar-daemon 工具(它有着与 pulsar 相同的命令接口)。

用法:

  1. $ pulsar command

Commands:

  • bookie
  • broker
  • compact-topic
  • discovery
  • configuration-store
  • initialize-cluster-metadata
  • proxy
  • standalone
  • websocket
  • zookeeper
  • zookeeper-shell

例子:

  1. $ PULSAR_BROKER_CONF=/path/to/broker.conf pulsar broker

下表列出了你可以用来配置 pulsar 工具的环境变量。

变量说明默认值
PULSAR_LOG_CONFLog4j 的配置文件conf/log4j2.yaml
PULSAR_BROKER_CONFbroker 的配置文件conf/broker.conf
PULSAR_BOOKKEEPER_CONFbookie 的配置文件conf/bookkeeper.conf
PULSAR_ZK_CONFZooKeeper 的配置文件conf/zookeeper.conf
PULSAR_CONFIGURATION_STORE_CONF配置存储的配置文件conf/global_zookeeper.conf
PULSAR_DISCOVERY_CONF服务发现的配置文件conf/discovery.conf
PULSAR_WEBSOCKET_CONFWebSocket 代理的配置文件conf/websocket.conf
PULSAR_STANDALONE_CONFstandalone 的配置文件conf/standalone.conf
PULSAR_EXTRA_OPTS传递给 jvm 的额外选项
PULSAR_EXTRA_CLASSPATHPulsar classpath 的额外路径
PULSAR_PID_DIR存储 Pulsar 服务器 PID 文件的文件夹
PULSAR_STOP_TIMEOUT如果停止 Bookie 服务器失败,在强制杀死 Bookie 服务器实例之前的等待时间。

bookie

启动 bookie 服务器

用法:

  1. $ pulsar bookie options

选项

选项说明默认值
-readOnly强制启动一台只读 bookie 服务器false
-withAutoRecovery启动带自动恢复服务的 bookie 服务器false

示例

  1. $ PULSAR_BOOKKEEPER_CONF=/path/to/bookkeeper.conf pulsar bookie \
  2. -readOnly \
  3. -withAutoRecovery

broker

启动一个 Pulsar broker

用法

  1. $ pulsar broker options

选项

选项说明默认值
-bc , —bookie-confBookKeeper 的配置文件
-rb , —run-bookie在 Pulsar broker 的同一主机上运行 BookKeeper bookiefalse
-ra , —run-bookie-autorecovery在 Pulsar broker 的同一主机上运行 BookKeeper 自动恢复守护进程。false

示例

  1. $ PULSAR_BROKER_CONF=/path/to/broker.conf pulsar broker

compact-topic

对 Pulsar 主题执行压缩(在新进程里)

用法

  1. $ pulsar compact-topic options

选项

标记说明默认值
-t , —topic你想压缩的 Pulsar 主题

示例

  1. $ pulsar compact-topic --topic topic-to-compact

discovery

运行发现服务器

用法

  1. $ pulsar discovery

示例

  1. $ PULSAR_DISCOVERY_CONF=/path/to/discovery.conf pulsar discovery

configuration-store

启动 Pulsar 配置存储

用法

  1. $ pulsar configuration-store

示例

  1. $ PULSAR_CONFIGURATION_STORE_CONF=/path/to/configuration_store.conf pulsar configuration-store

initialize-cluster-metadata

一次性集群元数据初始化

用法

  1. $ pulsar initialize-cluster-metadata options

选项

标记说明默认值
-ub , —broker-service-url新集群的 broker 服务 URL
-tb , —broker-service-url-tls新集群的 broker 服务 URL(带 TLS 加密)
-c , —cluster集群名称
-cs , —configuration-store配置存储的 quorum 连接字符串
—existing-bk-metadata-service-uri你想使用的已有 BookKeeper 集群的元数据服务 URL
-h , —help集群名称false
—initial-num-stream-storage-containersBookKeeper流存储容器数16
—initial-num-transaction-coordinators集群中分配的事务协调器的数量16
-uw , —web-service-url新集群的 web 服务 URL
-tw , —web-service-url-tls新集群的 web 服务 URL(带 TLS 加密)
-zk , —zookeeper本地 ZooKeeper 的 quorum 连接字符串
—zookeeper-session-timeout-msThe local ZooKeeper session timeout. The time unit is in millisecond(ms)30000

proxy

管理 Pulsar 代理

用法

  1. $ pulsar proxy options

选项

标记说明默认值
—configuration-store配置存储连接字符串
-zk , —zookeeper-servers本地 ZooKeeper 连接字符串

示例

  1. $ PULSAR_PROXY_CONF=/path/to/proxy.conf pulsar proxy \
  2. --zookeeper-servers zk-0,zk-1,zk2 \
  3. --configuration-store zk-0,zk-1,zk-2

standalone

运行使用本地 bookie 和本地 ZooKeeper 的 broker 服务

用法

  1. $ pulsar standalone options

选项

标记说明默认值
-a , —advertised-addressStandalone broker 的广告地址
—bookkeeper-dir本地 bookie 的基本数据目录data/standalone/bookkeeper
—bookkeeper-port本地 bookie 的基本端口3181
—no-broker只启动 ZooKeeper 和 BookKeeper 服务,而不启动 brokerfalse
—num-bookies本地 bookie 的数量1
—only-broker只启动 Pulsar broker 服务(而不启动 ZooKeeper 或 BookKeeper)
—wipe-data清除之前的 ZooKeeper/BookKeeper 数据
—zookeeper-dir本地 ZooKeeper 的数据目录data/standalone/zookeeper
—zookeeper-port本地 ZooKeeper 的端口2181

示例

  1. $ PULSAR_STANDALONE_CONF=/path/to/standalone.conf pulsar standalone

websocket

用法

  1. $ pulsar websocket

示例

  1. $ PULSAR_WEBSOCKET_CONF=/path/to/websocket.conf pulsar websocket

zookeeper

启动 ZooKeeper 集群

用法

  1. $ pulsar zookeeper

示例

  1. $ PULSAR_ZK_CONF=/path/to/zookeeper.conf pulsar zookeeper

zookeeper-shell

使用 ZooKeeper shell 来连接到一个正在运行的 ZooKeeper 集群

用法

  1. $ pulsar zookeeper-shell options

选项

标记说明默认值
-c, —confZooKeeper 的配置文件
-server配置 zk 地址,比如 127.0.0.1:2181

pulsar-client

pulsar-client 工具

用法

  1. $ pulsar-client command

Commands

  • produce
  • consume

选项

标记说明默认值
—auth-params认证参数,其格式取决于认证插件类中 configure 方法的实现,例如 “key1:val1,key2:val2” 或 “{“key1”:”val1”,”key2”:”val2”}”{“saslJaasClientSectionName”:”PulsarClient”, “serverType”:”broker”}
—auth-plugin认证插件类名称org.apache.pulsar.client.impl.auth.AuthenticationSasl
—listener-namebroker 监听器名称
—proxy-protocolProxy protocol 选择代理路由类型
—proxy-url要连接的 Proxy server URL
—urlBroker URL to which to connectpulsar://localhost:6650/
ws://localhost:8080
-v, —version获取Pulsar客户端版本
-h, —help显示帮助

produce

发送单条或多条消息给指定的 broker 和主题

用法

  1. $ pulsar-client produce topic options

选项

标记说明默认值
-f, —files将要发送的文件路径,以逗号分隔;必须指定 -m 或 -f[]
-m, —messages将要发送的消息字符串,以逗号分隔;必须指定 -m 或 -f[]
-n, —num-produce发送单条或多条消息的次数;消息/文件 生产次数 应小于 10001
-r, —rate生产消息的频率(单位是消息/秒),值为 0 表示尽可能快地生产消息0.0
-c, —chunking如果消息超过允许的最大大小,则将消息分割并分块发布。false
-s, —separator用于分割消息字符的字符串。“,”
-k, —key要添加的消息键(Message Key)key=value string, like k1=v1,k2=v2.
-p, —propertiesProperties to add. If you want to add multiple properties, use the comma as the separator, e.g. k1=v1,k2=v2.
-ekn, —encryption-key-nameThe public key name to encrypt payload.
-ekv, —encryption-key-valueThe URI of public key to encrypt payload. For example, file:///path/to/public.key or data:application/x-pem-file;base64,**.

consume

从指定的 broker 和主题消费消息

用法

  1. $ pulsar-client consume topic options

选项

标记说明默认值
—hex以十六进制格式显示二进制消息。false
-n, —num-messages要消费的消息数量,0 表示一直消费。1
-r, —rate消费消息的频率(单位是消息/秒),值为 0 表示尽可能快地消费消息0.0
—regex表明主题名称为正则表达式false
-s, —subscription-nameSubscription name
-t, —subscription-typeThe type of the subscription. Possible values: Exclusive, Shared, Failover, Key_Shared.Exclusive
-p, —subscription-positionThe position of the subscription. Possible values: Latest, Earliest.Latest
-m, —subscription-mode订阅模式。 Possible values: Durable, NonDurable.Durable
-q, —queue-size消费者接收队列的大小。0
-mc, —max_chunked_msg最大的待处理块消息。0
-ac, —auto_ack_chunk_q_full如果队列已满,则自动确认消费者的接收队列中最早的消息。false
—hide-content不要将消息打印到控制台。false
-st, —schema-typeSet the schema type. Use auto_consume to dump AVRO and other structured data types. Possible values: bytes, auto_consume.bytes
-ekv, —encryption-key-valueThe URI of public key to encrypt payload. For example, file:///path/to/public.key or data:application/x-pem-file;base64,*.
-pm, —pool-messages使用池化消息true

pulsar-daemon

基于 pulsar tool 的封装,用于在后台用 nohup 启动和停止进程,比如 ZooKeeper,bookie 和 Pulsar broker。

pulsar-daemon 有一个与 pulsar 命令相似的接口,但是为各种服务添加了启动和停止命令。 如果需要这些服务的列表,运行 pulsar-daemon 来查看帮助文档的输出或查看 pulsar 命令的文档。

用法

  1. $ pulsar-daemon command

Commands

  • start
  • stop

start

使用 nohup 在后台启动服务

用法

  1. $ pulsar-daemon start service

stop

停止一个已经使用 start 命令启动了的服务

用法

  1. $ pulsar-daemon stop service options

选项

标记说明默认值
-force如果无法正常停止则强制停止服务false

pulsar-perf

用于性能测试 Pulsar broker 的工具。

用法

  1. $ pulsar-perf command

Commands

  • consume
  • produce
  • read
  • websocket-producer
  • managed-ledger
  • monitor-brokers
  • simulation-client
  • simulation-controller
  • help

环境变量

下表列出了你可以用来配置 pulsar-perf 工具的环境变量。

变量说明默认值
PULSAR_LOG_CONFLog4j 的配置文件conf/log4j2.yaml
PULSAR_CLIENT_CONF客户端的配置文件conf/client.conf
PULSAR_EXTRA_OPTS传递给 JVM 的额外选项
PULSAR_EXTRA_CLASSPATHPulsar classpath 的额外路径

consume

Run a consumer

用法

  1. $ pulsar-perf consume options

选项

标记说明默认值
—auth-params认证参数,其格式由认证插件类中configure方法的实现决定。 例如, key1:val1,key2:val2{“key1”:”val1”,”key2”:”val2”}
—auth_plugin认证插件类名称
-ac, —auto_ack_chunk_q_full如果队列已满,则自动确认消费者的接收队列中最早的消息。false
—listener-namebroker 监听器名称
—acks-delay-millis确认分组延迟,以毫秒计100
—batch-index-ack开启或禁用批量索引确认false
-bw, —busy-wait在 Pulsar 客户端启用或禁用 Busy-Waitfalse
-v, —encryption-key-value-file包含解密有效荷载的私钥的文件
-h, —help帮助信息false
—conf-fileConfiguration file
-e, —expire_time_incomplete_chunked_messages不完整区块消息的到期时间 (毫秒)0
-c, —max-connectionsMax number of TCP connections to a single broker100
-mc, —max_chunked_msg最大的待处理块消息。0
-n, —num-consumers消费者数量(每个主题)1
-ioThreads, —num-io-threads设置用于处理 broker 连接的线程数1
-ns, —num-subscriptions订阅数量 (每个主题)1
-t, —num-topicsThe number of topics1
-pm, —pool-messages使用池化消息true
-r, —rate模拟一个慢速消息消费者(速率 消息/秒)0
-q, —receiver-queue-size接收队列的大小1000
-p, —receiver-queue-size-across-partitions分区接收队列的最大大小50000
—replicated是否应该复制订阅状态false
-u, —service-urlPulsar service URL
-i, —stats-interval-secondsStatistics interval seconds. If 0, statistics will be disabled0
-s, —subscriber-name订阅者名称前缀
-ss, —subscriptions要消费的订阅列表(比如 sub1,sub2)sub
-st, —subscription-typeSubscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.Exclusive
-sp, —subscription-positionSubscriber position. Possible values are Latest, Earliest.Latest
-time, —test-durationTest duration (in seconds). If the value is 0 or smaller than 0, it keeps consuming messages0
—trust-cert-filePath for the trusted TLS certificate file
—tls-allow-insecure允许不安全的 TLS 连接。

produce

Run a producer

用法

  1. $ pulsar-perf produce options

选项

标记说明默认值
-am, —access-modeProducer access mode. Valid values are Shared, Exclusive and WaitForExclusiveShared(共享)
-au, —admin-urlPulsar admin URL
—auth-params认证参数,其格式由认证插件类中configure方法的实现决定。 例如, key1:val1,key2:val2{“key1”:”val1”,”key2”:”val2”}
—auth_plugin认证插件类名称
—listener-namebroker 监听器名称
-b, —batch-time-windowBatch messages in a window of the specified number of milliseconds1
-bb, —batch-max-bytes每批的最大字节4194304
-bm, —batch-max-messages每批的最大消息数1000
-bw, —busy-wait在 Pulsar 客户端启用或禁用 Busy-Waitfalse
-bm, —batch-max-messages如果消息超过允许的最大大小,则将消息分割并分块发布。false
-d, —delay给消息标记延迟时间(秒)0s
-z, —compressionCompress messages’ payload. Possible values are NONE, LZ4, ZLIB, ZSTD or SNAPPY.
—conf-fileConfiguration file
-k, —encryption-key-name加密有效载荷的公钥名称
-v, —encryption-key-value-file包含用于加密有效载荷的公钥文件
-ef, —exit-on-failure发布失败后退出进程false
-fc, —format-class自定义 formatter 类名org.apache.pulsar.testclient.DefaultMessageFormatter
-fp, —format-payload格式化为来自 producer 和/或流中的消息索引作为时间戳,单位为 nanosecondsfalse
-h, —help帮助信息false
-c, —max-connectionsMax number of TCP connections to a single broker100
-o, —max-outstandingMax number of outstanding messages1000
-p, —max-outstanding-across-partitions跨分区场景下的最大未处理消息数50000
-mk, —message-key-generation-modeThe generation mode of message key. Valid options are autoIncrement, random
-ioThreads, —num-io-threads设置用于处理 broker 连接的线程数1
-m, —num-messagesNumber of messages to publish in total. If the value is 0 or smaller than 0, it keeps publishing messages.0
-n, —num-producersThe number of producers (per topic)1
-threads, —num-test-threads测试线程数1
-t, —num-topicThe number of topics1
-np, —partitionsCreate partitioned topics with the given number of partitions. Setting this value to 0 means not trying to create a topic
-f, —payload-file使用 UTF-8 编码的文本文件的有效载荷(payload),会在发布消息时随机选择一个有效载荷
-e, —payload-delimiter当使用来自文件的有效载荷( payload)时,设置每行之间的分隔符\n
-pn, —producer-name生产者名称
-r, —ratePublish rate msg/s across topics100
—send-timeout设置发送超时时间0
—separatorTopic 和 topic 编号之间的分隔符-
-u, —service-urlPulsar service URL
-s, —sizeMessage size (in bytes)1024
-i, —stats-interval-secondsStatistics interval seconds. If 0, statistics will be disabled.0
-time, —test-durationTest duration (in seconds). If the value is 0 or smaller than 0, it keeps publishing messages.0
—trust-cert-filePath for the trusted TLS certificate file
—warmup-time预热时间(秒)1
—tls-allow-insecure允许不安全的 TLS 连接。

read

运行主题 Reader

用法

  1. $ pulsar-perf read options

选项

标记说明默认值
—auth-params认证参数,其格式由认证插件类中configure方法的实现决定。 例如, key1:val1,key2:val2{“key1”:”val1”,”key2”:”val2”}
—auth_plugin认证插件类名称
—listener-namebroker 监听器名称
—conf-fileConfiguration file
-h, —help帮助信息false
-c, —max-connectionsMax number of TCP connections to a single broker100
-ioThreads, —num-io-threads设置用于处理 broker 连接的线程数1
-t, —num-topicsThe number of topics1
-r, —rate模拟一个慢速消息 reader(速率 消息/秒)0
-q, —receiver-queue-size接收队列的大小1000
-u, —service-urlPulsar service URL
-m, —start-message-idStart message id. This can be either ‘earliest’, ‘latest’ or a specific message id by using ‘lid:eid’earliest
-i, —stats-interval-secondsStatistics interval seconds. If 0, statistics will be disabled.0
-time, —test-durationTest duration (in seconds). If the value is 0 or smaller than 0, it keeps consuming messages.0
—trust-cert-filePath for the trusted TLS certificate file
—use-tls在连接上使用 TLS 加密false
—tls-allow-insecure允许不安全的 TLS 连接。

websocket-producer

启动 websocket 生产者

用法

  1. $ pulsar-perf websocket-producer options

选项

标记说明默认值
—auth-params认证参数,其格式由认证插件类中configure方法的实现决定。 例如, key1:val1,key2:val2{“key1”:”val1”,”key2”:”val2”}
—auth_plugin认证插件类名称
—conf-fileConfiguration file
-h, —help帮助信息false
-m, —num-messagesNumber of messages to publish in total. If the value is 0 or smaller than 0, it keeps publishing messages0
-t, —num-topicThe number of topics1
-f, —payload-file使用文件中的 payload 而不是空缓存中的
-u, —proxy-urlPulsar 代理 URL,比如 “ws://localhost:8080/
-r, —ratePublish rate msg/s across topics100
-s, —size消息大小(字节)1024
-time, —test-durationTest duration (in seconds). If the value is 0 or smaller than 0, it keeps publishing messages0

managed-ledger

直接写入 managed-ledgers

用法

  1. $ pulsar-perf managed-ledger options

选项

标记说明默认值
-a, —ack-quorumLedger ack quorum1
-dt, —digest-typeBookKeeper digest type. Possible Values: [CRC32, MAC, CRC32C, DUMMY]CRC32C
-e, —ensemble-sizeLedger 整体大小1
-h, —help帮助信息false
-c, —max-connections单个 bookie 的最大 TCP 连接数1
-o, —max-outstanding未处理请求的最大数量1000
-m, —num-messagesNumber of messages to publish in total. If the value is 0 or smaller than 0, it keeps publishing messages0
-t, —num-topicmanaged ledger 的数量1
-r, —rate在managed ledger中的写入速率(消息/秒)100
-s, —size消息大小(字节)1024
-time, —test-durationTest duration (in seconds). If the value is 0 or smaller than 0, it keeps publishing messages0
—threads正在写入的线程数1
-w, —write-quorumLedger write quorum1
-zk, —zookeeperServersZooKeeper 连接字符串

monitor-brokers

Continuously receive broker data and/or load reports

用法

  1. $ pulsar-perf monitor-brokers options

选项

标记说明默认值
—connect-stringA connection string for one or more ZooKeeper servers
-h, —help帮助信息false

simulation-client

Run a simulation server acting as a Pulsar client. Uses the client configuration specified in conf/client.conf.

用法

  1. $ pulsar-perf simulation-client options

选项

标记说明默认值
—port用于监听控制器的端口0
—service-urlPulsar 服务 URL
-h, —help帮助信息false

simulation-controller

Run a simulation controller to give commands to servers

用法

  1. $ pulsar-perf simulation-controller options

选项

标记说明默认值
—client-portThe port that the clients are listening on0
—clientsComma-separated list of client hostnames
—clusterThe cluster to test on
-h, —help帮助信息false

help

帮助消息

用法

  1. $ pulsar-perf help

bookkeeper

A tool for managing BookKeeper.

用法

  1. $ bookkeeper command

Commands

  • 自动恢复
  • bookie
  • localbookie
  • upgrade
  • shell

环境变量

The table below lists the environment variables that you can use to configure the bookkeeper tool.

变量说明默认值
BOOKIE_LOG_CONFLog4j 的配置文件conf/log4j2.yaml
BOOKIE_CONFBookKeeper configuration fileconf/bk_server.conf
BOOKIE_EXTRA_OPTS传递给 JVM 的额外选项
BOOKIE_EXTRA_CLASSPATHExtra paths for BookKeeper’s classpath
ENTRY_FORMATTER_CLASSThe Java class used to format entries
BOOKIE_PID_DIRFolder where the BookKeeper server PID file should be stored
BOOKIE_STOP_TIMEOUT如果停止 Bookie 服务器失败,在强制杀死 Bookie 服务器实例之前的等待时间。

auto-recovery

运行自动恢复服务

用法

  1. $ bookkeeper autorecovery options

选项

标记说明默认值
-c, —conf自动恢复配置

bookie

Starts up a BookKeeper server (aka bookie)

用法

  1. $ bookkeeper bookie options

选项

标记说明默认值
-c, —conf自动恢复配置
-readOnly强制启动一台只读 bookie 服务器false
-withAutoRecoveryStart auto-recovery service bookie serverfalse

localbookie

Runs a test ensemble of N bookies locally

用法

  1. $ bookkeeper localbookie N

upgrade

Upgrade the bookie’s filesystem

用法

  1. $ bookkeeper upgrade options

选项

标记说明默认值
-c, —conf自动恢复配置
-u, —upgradeUpgrade the bookie’s directories

shell

Run shell for admin commands. To see a full listing of those commands, run bookkeeper shell without an argument.

用法

  1. $ bookkeeper shell

示例

  1. $ bookkeeper shell bookiesanity

broker-tool

broker- tool 用于在指定的 broker 上进行操作。

用法

  1. $ broker-tool command

Commands

  • load-report
  • help

示例 获取关于某条命令的更多信息的两种方式:

  1. $ broker-tool help command
  2. $ broker-tool command --help

load-report

Collect the load report of a specific broker. The command is run on a broker, and used for troubleshooting why broker can’t collect right load report.

选项

标记说明默认值
-i, —interval收集负载报告的间隔(毫秒)
-h, —help显示帮助信息