Pulsar Functions command line tool

下表为 Pulsar Functions 命令行工具。 可用于了解 Pulsar Functions 的模式、命令、参数等。

localrun

本地运行 Pulsar Functions ,而不是将其部署到 Pulsar 集群。

配置项说明默认值
auto-ack框架是否自动 ack 消息。true
broker-service-urlPulsar broker 的 URL。
classnamePulsar Function 的类名称。
client-auth-params客户端身份验证参数。
client-auth-plugin客户端身份验证插件,function 进程需要通过此插件连接到 broker。
CPU核心 CPU 需要给每个 function 分配实例(仅适用于在 docker 上运行时)。
custom-schema-inputs输入 topic 到 Schema 类名称的映射(以 JSON 字符串的形式)。
custom-serde-inputs输入 topic 到 SerDe 类名称的映射(以 JSON 字符串的形式)。
dead-letter-topic发送有未成功处理消息的 topic。 Python 函数不支持这个参数。
disk每个 function 实例需要分配的磁盘(仅适用于在 docker 上运行时)。
fqfn此 function 的完全限定名称(FQFN)。
function-config-file指定 Pulsar Function 配置的 YAML 配置文件的路径。
goFunction 的主可执行 Go 二进制文件路径(如果 function 是用 Go 编写的)。 支持 URL 路径 [http/https/file (文件协议假定文件已在 worker 主机上)/function (包管理服务的软件包 URL)],worker 可从中下载包。
hostname-verification-enabled启用主机名称验证。false
inputsPulsar Function 中一个或多个输入 topic(可以将多个 topic 指定为以逗号分隔的列表)。
jarPath to the jar file for the function (if the function is written in Java). 支持 URL 路径 [http/https/file (文件协议假定文件已在 worker 主机上)/function (包管理服务的软件包 URL)],worker 可从中下载包。
instance-id-offset从此偏移量启动 instanceId。0
log-topic用于存储 Pulsar Function 生成的日志的 topic。
max-message-retries尝试处理一条消息的最大次数。
namePulsar Function 的名称。
命名空间Pulsar Function 的命名空间。
outputPulsar Function 的输出 topic(如果没有指定,则没有输出)。
output-serde-classname此 function 输出消息使用的 SerDe 类。
parallelismPulsar Function 的并行度因子(即运行 function 实例的数量)。
processing-guarantees应用于 function 的处理保证(传递语义)。 可用值:[ATLEAST_ONCE、ATMOST_ONCE、EFFECTIVELY_ONCE]。ATLEAST_ONCE
pyFunction 的主可执行 Python 文件/Python Wheel 文件路径(如果 function 是用 Python 编写的)。 支持 URL 路径 [http/https/file (文件协议假定文件已在 worker 主机上)/function (包管理服务的软件包 URL)],worker 可从中下载包。
ram需要为每个 function 实例分配的 ram 字节数(仅适用于进程/在 docker 上运行时)。
retain-orderingFunction 按顺序消费并处理消息。
schema-typeFunction 用于输出消息的内置 schema 类型或自定义 schema 名称。
sliding-interval-count窗口滑动后的消息数量。
sliding-interval-duration-ms窗口滑动后的持续时间。
subs-name如果用户请求输入 topic 中 consumer 的特定订阅名称,则返回 Pulsar source 订阅名称。
租户Pulsar Function 的租户。
timeout-ms消息超时(以毫秒为单位)。
tls-allow-insecure允许不安全的 tls 连接。false
tls-trust-cert-pathtls 信任证书文件路径。
topics-patternThe topic pattern to consume from list of topics under a namespace that match the pattern. [—input] and [—topic-pattern] are mutually exclusive. 在 —custom-serde-inputs 中添加模式的 SerDe 类名称(仅适用于 Java Function)。
use-tls使用 tls 连接。false
user-config用户定义的配置键/值。
window-length-count每个窗口的消息数量,
window-length-duration-ms窗口的时间长度,以毫秒为单位。

create

在集群模式下创建和部署 Pulsar Function。

配置项说明默认值
auto-ack框架是否自动 ack 消息。true
classnamePulsar Function 的类名称。
CPU核心 CPU 需要给每个 function 分配实例(仅适用于在 docker 上运行时)。
custom-runtime-options用于自定义运行时间编码选项的字符串,查看配置运行时间的详细信息
custom-schema-inputs输入 topic 到 Schema 类名称的映射(以 JSON 字符串的形式)。
custom-serde-inputs输入 topic 到 SerDe 类名称的映射(以 JSON 字符串的形式)。
dead-letter-topic发送有未成功处理消息的 topic。 Python 函数不支持这个参数。
disk每个 function 实例需要分配的磁盘(仅适用于在 docker 上运行时)。
fqfn此 function 的完全限定名称(FQFN)。
function-config-file指定 Pulsar Function 配置的 YAML 配置文件的路径。
goFunction 的主可执行 Go 二进制文件路径(如果 function 是用 Go 编写的)。 支持 URL 路径 [http/https/file (文件协议假定文件已在 worker 主机上)/function (包管理服务的软件包 URL)],worker 可从中下载包。
inputsPulsar Function 中一个或多个输入 topic(可以将多个 topic 指定为以逗号分隔的列表)。
jarPath to the jar file for the function (if the function is written in Java). 支持 URL 路径 [http/https/file (文件协议假定文件已在 worker 主机上)/function (包管理服务的软件包 URL)],worker 可从中下载包。
log-topicThe topic to which the logs of a Pulsar Function are produced.
max-message-retries尝试处理一条消息的最大次数。
namePulsar Function 的名称。
命名空间Pulsar Function 的命名空间。
outputPulsar Function 的输出 topic(如果没有指定,则没有输出)。
output-serde-classname此 function 输出消息使用的 SerDe 类。
parallelismThe parallelism factor of a Pulsar Function (i.e. the number of function instances to run).
processing-guarantees应用于 function 的处理保证(传递语义)。 可用值:[ATLEAST_ONCE、ATMOST_ONCE、EFFECTIVELY_ONCE]。ATLEAST_ONCE
pyFunction 的主可执行 Python 文件/Python Wheel 文件路径(如果 function 是用 Python 编写的)。 支持 URL 路径 [http/https/file (文件协议假定文件已在 worker 主机上)/function (包管理服务的软件包 URL)],worker 可从中下载包。
ram需要为每个 function 实例分配的 ram 字节数(仅适用于进程/在 docker 上运行时)。
retain-orderingFunction 按顺序消费并处理消息。
schema-typeFunction 用于输出消息的内置 schema 类型或自定义 schema 名称。
sliding-interval-count窗口滑动后的消息数量。
sliding-interval-duration-ms窗口滑动后的持续时间。
subs-name如果用户请求输入 topic 中 consumer 的特定订阅名称,则返回 Pulsar source 订阅名称。
租户Pulsar Function 的租户。
timeout-ms消息超时(以毫秒为单位)。
topics-patternThe topic pattern to consume from list of topics under a namespace that match the pattern. [—input] and [—topic-pattern] are mutually exclusive. 在 —custom-serde-inputs 中添加模式的 SerDe 类名称(仅适用于 Java Function)。
user-config用户定义的配置键/值。
window-length-count每个窗口的消息数量,
window-length-duration-ms窗口的时间长度,以毫秒为单位。

delete

删除运行在 Pulsar 集群上的 Pulsar Function。

配置项说明默认值
fqfn此 function 的完全限定名称(FQFN)。
namePulsar Function 的名称。
命名空间Pulsar Function 的命名空间。
租户Pulsar Function 的租户。

update

更新已部署到 Pulsar 集群的 Pulsar Function。

配置项说明默认值
auto-ack框架是否自动 ack 消息。true
classnamePulsar Function 的类名称。
CPU核心 CPU 需要给每个 function 分配实例(仅适用于在 docker 上运行时)。
custom-runtime-options用于自定义运行时间编码选项的字符串,查看配置运行时间的详细信息
custom-schema-inputs输入 topic 到 Schema 类名称的映射(以 JSON 字符串的形式)。
custom-serde-inputs输入 topic 到 SerDe 类名称的映射(以 JSON 字符串的形式)。
dead-letter-topic发送有未成功处理消息的 topic。 Python 函数不支持这个参数。
disk每个 function 实例需要分配的磁盘(仅适用于在 docker 上运行时)。
fqfn此 function 的完全限定名称(FQFN)。
function-config-file指定 Pulsar Function 配置的 YAML 配置文件的路径。
goFunction 的主可执行 Go 二进制文件路径(如果 function 是用 Go 编写的)。 支持 URL 路径 [http/https/file (文件协议假定文件已在 worker 主机上)/function (包管理服务的软件包 URL)],worker 可从中下载包。
inputsPulsar Function 中一个或多个输入 topic(可以将多个 topic 指定为以逗号分隔的列表)。
jarPath to the jar file for the function (if the function is written in Java). 支持 URL 路径 [http/https/file (文件协议假定文件已在 worker 主机上)/function (包管理服务的软件包 URL)],worker 可从中下载包。
log-topicThe topic to which the logs of a Pulsar Function are produced.
max-message-retries尝试处理一条消息的最大次数。
namePulsar Function 的名称。
命名空间Pulsar Function 的命名空间。
outputPulsar Function 的输出 topic(如果没有指定,则没有输出)。
output-serde-classname此 function 输出消息使用的 SerDe 类。
parallelismThe parallelism factor of a Pulsar Function (i.e. the number of function instances to run).
processing-guarantees应用于 function 的处理保证(传递语义)。 可用值:[ATLEAST_ONCE、ATMOST_ONCE、EFFECTIVELY_ONCE]。ATLEAST_ONCE
pyFunction 的主可执行 Python 文件/Python Wheel 文件路径(如果 function 是用 Python 编写的)。 支持 URL 路径 [http/https/file (文件协议假定文件已在 worker 主机上)/function (包管理服务的软件包 URL)],worker 可从中下载包。
ram需要为每个 function 实例分配的 ram 字节数(仅适用于进程/在 docker 上运行时)。
retain-orderingFunction 按顺序消费并处理消息。
schema-typeFunction 用于输出消息的内置 schema 类型或自定义 schema 名称。
sliding-interval-count窗口滑动后的消息数量。
sliding-interval-duration-ms窗口滑动后的持续时间。
subs-name如果用户请求输入 topic 中 consumer 的特定订阅名称,则返回 Pulsar source 订阅名称。
租户Pulsar Function 的租户。
timeout-ms消息超时(以毫秒为单位)。
topics-patternThe topic pattern to consume from list of topics under a namespace that match the pattern. [—input] and [—topic-pattern] are mutually exclusive. 在 —custom-serde-inputs 中添加模式的 SerDe 类名称(仅适用于 Java Function)。
update-auth-dataWhether or not to update the auth data.false
user-config用户定义的配置键/值。
window-length-count每个窗口的消息数量,
window-length-duration-ms窗口的时间长度,以毫秒为单位。

get

获取关于 Pulsar Function 的信息。

配置项说明默认值
fqfn此 function 的完全限定名称(FQFN)。
namePulsar Function 的名称。
命名空间Pulsar Function 的命名空间。
租户Pulsar Function 的租户。

restart

重启 function 实例。

配置项说明默认值
fqfn此 function 的完全限定名称(FQFN)。
instance-idFunction instanceId (如果没有提供 instance-id,则重启所有实例。
namePulsar Function 的名称。
命名空间Pulsar Function 的命名空间。
租户Pulsar Function 的租户。

stop

停止 function 实例。

配置项说明默认值
fqfn此 function 的完全限定名称(FQFN)。
instance-idFunction instanceId (如果没有提供 instance-id,则重启所有实例。
namePulsar Function 的名称。
命名空间Pulsar Function 的命名空间。
租户Pulsar Function 的租户。

start

启动已停止的 function 实例。

配置项说明默认值
fqfn此 function 的完全限定名称(FQFN)。
instance-idFunction instanceId (如果没有提供 instance-id,则重启所有实例。
namePulsar Function 的名称。
命名空间Pulsar Function 的命名空间。
租户Pulsar Function 的租户。