Connector Admin CLI

The pulsar-admin tool helps you manage Pulsar connectors.

sources

An interface for managing Pulsar IO sources (ingress data into Pulsar).

  1. $ pulsar-admin sources subcommands

Subcommands are:

  • create

  • update

  • delete

  • get

  • status

  • list

  • stop

  • start

  • restart

  • localrun

  • available-sources

  • reload

create

Submit a Pulsar IO source connector to run in a Pulsar cluster.

用法

  1. $ pulsar-admin sources create options

选项

标记说明
-a, —archiveThe path to the NAR archive for the source.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—classnameThe source’s class name if archive is file-url-path (file://).
—cpuThe CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
—deserialization-classnameThe SerDe classname for the source.
—destination-topic-nameThe Pulsar topic to which data is sent.
—diskThe disk (in bytes) that needs to be allocated per source instance (applicable only to Docker runtime).
—nameThe source’s name.
—namespaceThe source’s namespace.
—parallelismThe source’s parallelism factor, that is, the number of source instances to run.
—processing-guarantees应用于 source 的处理保证(也称为传递语义)。 Source 连接器接收来自外部系统的消息,并将消息写入到 Pulsar 主题。 —processing-guarantees 用于确保将消息写入 Pulsar 主题的处理保证。
可用值为 ATLEAST_ONCE,ATMOST_ONCE, EFFECTIVELY_ONCE。
—ramThe RAM (in bytes) that needs to be allocated per source instance (applicable only to the process and Docker runtimes).
-st, —schema-typeThe schema type.
Either a builtin schema (for example, AVRO and JSON) or custom schema class name to be used to encode messages emitted from source.
—source-configSource config key/values.
—source-config-fileThe path to a YAML config file specifying the source’s configuration.
-t, —source-typeSource 连接器的提供者。
—tenantThe source’s tenant.
—producer-config自定义生产者配置(作为 JSON 字符串)。

update

Update a already submitted Pulsar IO source connector.

用法

  1. $ pulsar-admin sources update options

选项

标记说明
-a, —archiveThe path to the NAR archive for the source.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—classnameThe source’s class name if archive is file-url-path (file://).
—cpuThe CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
—deserialization-classnameThe SerDe classname for the source.
—destination-topic-nameThe Pulsar topic to which data is sent.
—diskThe disk (in bytes) that needs to be allocated per source instance (applicable only to Docker runtime).
—nameThe source’s name.
—namespaceThe source’s namespace.
—parallelismThe source’s parallelism factor, that is, the number of source instances to run.
—processing-guarantees应用于 source 的处理保证(也称为传递语义)。 Source 连接器接收来自外部系统的消息,并将消息写入到 Pulsar 主题。 —processing-guarantees 用于确保将消息写入 Pulsar 主题的处理保证。
可用值为 ATLEAST_ONCE,ATMOST_ONCE, EFFECTIVELY_ONCE。
—ramThe RAM (in bytes) that needs to be allocated per source instance (applicable only to the process and Docker runtimes).
-st, —schema-typeThe schema type.
Either a builtin schema (for example, AVRO and JSON) or custom schema class name to be used to encode messages emitted from source.
—source-configSource config key/values.
—source-config-fileThe path to a YAML config file specifying the source’s configuration.
-t, —source-typeSource 连接器的提供者。 当前内置连接器 source-type 的参数是由 pulsar-io.yaml 文件中指定的 name 参数的设置决定的。
—tenantThe source’s tenant.
—update-auth-data是否更新认证数据。
默认值:false。

delete

Delete a Pulsar IO source connector.

用法

  1. $ pulsar-admin sources delete options

选项

标记说明
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

get

Get the information about a Pulsar IO source connector.

用法

  1. $ pulsar-admin sources get options

选项

标记说明
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

status

Check the current status of a Pulsar Source.

用法

  1. $ pulsar-admin sources status options

选项

标记说明
—instance-idThe source ID.
If instance-id is not provided, Pulasr gets status of all instances.
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

list

List all running Pulsar IO source connectors.

用法

  1. $ pulsar-admin sources list options

选项

标记说明
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

stop

Stop a source instance.

用法

  1. $ pulsar-admin sources stop options

选项

标记说明
—instance-idThe source instanceID.
If instance-id is not provided, Pulsar stops all instances.
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

start

Start a source instance.

用法

  1. $ pulsar-admin sources start options

选项

标记说明
—instance-idThe source instanceID.
If instance-id is not provided, Pulsar starts all instances.
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

restart

Restart a source instance.

用法

  1. $ pulsar-admin sources restart options

选项

标记说明
—instance-idThe source instanceID.
If instance-id is not provided, Pulsar restarts all instances.
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

localrun

Run a Pulsar IO source connector locally rather than deploying it to the Pulsar cluster.

用法

  1. $ pulsar-admin sources localrun options

选项

标记说明
-a, —archiveThe path to the NAR archive for the Source.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—broker-service-urlPulsar broker 的 URL。
—classnameThe source’s class name if archive is file-url-path (file://).
—client-auth-params客户端身份验证参数。
—client-auth-plugin客户端身份验证插件,function 进程需要通过此插件连接到 broker。
—cpuThe CPU (in cores) that needs to be allocated per source instance (applicable only to the Docker runtime).
—deserialization-classnameThe SerDe classname for the source.
—destination-topic-nameThe Pulsar topic to which data is sent.
—diskThe disk (in bytes) that needs to be allocated per source instance (applicable only to the Docker runtime).
—hostname-verification-enabled启用主机名验证。
默认值:false
—nameThe source’s name.
—namespaceThe source’s namespace.
—parallelismThe source’s parallelism factor, that is, the number of source instances to run).
—processing-guarantees应用于 source 的处理保证(也称为传递语义)。 Source 连接器接收来自外部系统的消息,并将消息写入到 Pulsar 主题。 —processing-guarantees 用于确保将消息写入 Pulsar 主题的处理保证。
可用值为 ATLEAST_ONCE,ATMOST_ONCE, EFFECTIVELY_ONCE。
—ramThe RAM (in bytes) that needs to be allocated per source instance (applicable only to the Docker runtime).
-st, —schema-typeThe schema type.
Either a builtin schema (for example, AVRO and JSON) or custom schema class name to be used to encode messages emitted from source.
—source-configSource config key/values.
—source-config-fileThe path to a YAML config file specifying the source’s configuration.
—source-typeSource 连接器的提供者。
—tenantThe source’s tenant.
—tls-allow-insecure允许不安全的 tls 连接。
默认值:false
—tls-trust-cert-pathThe tls trust cert file path.
—use-tls使用 tls 连接。
默认值:false
—producer-config自定义生产者配置(作为 JSON 字符串)。

available-sources

Get the list of Pulsar IO connector sources supported by Pulsar cluster.

用法

  1. $ pulsar-admin sources available-sources

reload

Reload the available built-in connectors.

用法

  1. $ pulsar-admin sources reload

sinks

An interface for managing Pulsar IO sinks (egress data from Pulsar).

  1. $ pulsar-admin sinks subcommands

Subcommands are:

  • create

  • update

  • delete

  • get

  • status

  • list

  • stop

  • start

  • restart

  • localrun

  • available-sinks

  • reload

create

Submit a Pulsar IO sink connector to run in a Pulsar cluster.

用法

  1. $ pulsar-admin sinks create options

选项

标记说明
-a, —archiveThe path to the archive file for the sink.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—auto-ackWhether or not the framework will automatically acknowledge messages.
—classnameThe sink’s class name if archive is file-url-path (file://).
—cpuThe CPU (in cores) that needs to be allocated per sink instance (applicable only to Docker runtime).
—custom-schema-inputsThe map of input topics to schema types or class names (as a JSON string).
—custom-serde-inputs输入 topic 到 SerDe 类名称的映射(以 JSON 字符串的形式)。
—diskThe disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime).
-i, —inputsThe sink’s input topic or topics (multiple topics can be specified as a comma-separated list).
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—parallelismThe sink’s parallelism factor, that is, the number of sink instances to run.
—processing-guarantees应用于 sink 的处理保证(也称为传递语义)。 Pulsar 中 —processing-guarantees 的实现也依赖于 sink 的实现。
可用值为 ATLEAST_ONCE,ATMOST_ONCE, EFFECTIVELY_ONCE。
—ramThe RAM (in bytes) that needs to be allocated per sink instance (applicable only to the process and Docker runtimes).
—retain-orderingSink consumes and sinks messages in order.
—sink-configsink config key/values.
—sink-config-fileThe path to a YAML config file specifying the sink’s configuration.
-t, —sink-typeSink 连接器的提供者。 当前内置连接器 sink-type 的参数是由 pulsar-io.yaml 文件中指定的 name 参数的设置决定的。
—subs-namePulsar source subscription name if user wants a specific subscription-name for input-topic consumer.
—tenantThe sink’s tenant.
—timeout-ms消息超时(以毫秒为单位)。
—topics-patternTopicsPattern to consume from list of topics under a namespace that match the pattern.
—input and —topics-Pattern are mutually exclusive.
Add SerDe class name for a pattern in —customSerdeInputs (supported for java fun only).

update

Update a Pulsar IO sink connector.

用法

  1. $ pulsar-admin sinks update options

选项

标记说明
-a, —archiveThe path to the archive file for the sink.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—auto-ackWhether or not the framework will automatically acknowledge messages.
—classnameThe sink’s class name if archive is file-url-path (file://).
—cpuThe CPU (in cores) that needs to be allocated per sink instance (applicable only to Docker runtime).
—custom-schema-inputsThe map of input topics to schema types or class names (as a JSON string).
—custom-serde-inputs输入 topic 到 SerDe 类名称的映射(以 JSON 字符串的形式)。
—diskThe disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime).
-i, —inputsThe sink’s input topic or topics (multiple topics can be specified as a comma-separated list).
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—parallelismThe sink’s parallelism factor, that is, the number of sink instances to run.
—processing-guarantees应用于 sink 的处理保证(也称为传递语义)。 Pulsar 中 —processing-guarantees 的实现也依赖于 sink 的实现。
可用值为 ATLEAST_ONCE,ATMOST_ONCE, EFFECTIVELY_ONCE。
—ramThe RAM (in bytes) that needs to be allocated per sink instance (applicable only to the process and Docker runtimes).
—retain-orderingSink consumes and sinks messages in order.
—sink-configsink config key/values.
—sink-config-fileThe path to a YAML config file specifying the sink’s configuration.
-t, —sink-typeSink 连接器的提供者。
—subs-namePulsar source subscription name if user wants a specific subscription-name for input-topic consumer.
—tenantThe sink’s tenant.
—timeout-ms消息超时(以毫秒为单位)。
—topics-patternTopicsPattern to consume from list of topics under a namespace that match the pattern.
—input and —topics-Pattern are mutually exclusive.
Add SerDe class name for a pattern in —customSerdeInputs (supported for java fun only).
—update-auth-data是否更新认证数据。
默认值:false。

delete

Delete a Pulsar IO sink connector.

用法

  1. $ pulsar-admin sinks delete options

选项

标记说明
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

get

Get the information about a Pulsar IO sink connector.

用法

  1. $ pulsar-admin sinks get options

选项

标记说明
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

status

Check the current status of a Pulsar sink.

用法

  1. $ pulsar-admin sinks status options

选项

标记说明
—instance-idThe sink ID.
If instance-id is not provided, Pulasr gets status of all instances.
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

list

List all running Pulsar IO sink connectors.

用法

  1. $ pulsar-admin sinks list options

选项

标记说明
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

stop

Stop a sink instance.

用法

  1. $ pulsar-admin sinks stop options

选项

标记说明
—instance-idThe sink instanceID.
If instance-id is not provided, Pulsar stops all instances.
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

start

Start a sink instance.

用法

  1. $ pulsar-admin sinks start options

选项

标记说明
—instance-idThe sink instanceID.
If instance-id is not provided, Pulsar starts all instances.
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

restart

Restart a sink instance.

用法

  1. $ pulsar-admin sinks restart options

选项

标记说明
—instance-idThe sink instanceID.
If instance-id is not provided, Pulsar restarts all instances.
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

localrun

Run a Pulsar IO sink connector locally rather than deploying it to the Pulsar cluster.

用法

  1. $ pulsar-admin sinks localrun options

选项

标记说明
-a, —archiveThe path to the archive file for the sink.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—auto-ackWhether or not the framework will automatically acknowledge messages.
—broker-service-urlPulsar broker 的 URL。
—classnameThe sink’s class name if archive is file-url-path (file://).
—client-auth-params客户端身份验证参数。
—client-auth-plugin客户端身份验证插件,function 进程需要通过此插件连接到 broker。
—cpuThe CPU (in cores) that needs to be allocated per sink instance (applicable only to the Docker runtime).
—custom-schema-inputsThe map of input topics to Schema types or class names (as a JSON string).
—max-redeliver-count消息在发送到死信队列之前重新传递的最大次数。
—dead-letter-topic发送失败消息的死信主题的名称。
—custom-serde-inputs输入 topic 到 SerDe 类名称的映射(以 JSON 字符串的形式)。
—diskThe disk (in bytes) that needs to be allocated per sink instance (applicable only to the Docker runtime).
—hostname-verification-enabled启用主机名验证。
默认值:false
-i, —inputsThe sink’s input topic or topics (multiple topics can be specified as a comma-separated list).
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—parallelismThe sink’s parallelism factor, that is, the number of sink instances to run).
—processing-guarantees应用于 sink 的处理保证(也称为传递语义)。 Pulsar 中 —processing-guarantees 的实现也依赖于 sink 的实现。
可用值为 ATLEAST_ONCE,ATMOST_ONCE, EFFECTIVELY_ONCE。
—ramThe RAM (in bytes) that needs to be allocated per sink instance (applicable only to the Docker runtime).
—retain-orderingSink consumes and sinks messages in order.
—sink-configsink config key/values.
—sink-config-fileThe path to a YAML config file specifying the sink’s configuration.
—sink-typeSink 连接器的提供者。
—subs-namePulsar source subscription name if user wants a specific subscription-name for input-topic consumer.
—tenantThe sink’s tenant.
—timeout-ms消息超时(以毫秒为单位)。
—negative-ack-redelivery-delay-ms以毫秒为单位的否认消息重新传递的延迟。
—tls-allow-insecure允许不安全的 tls 连接。
默认值:false
—tls-trust-cert-pathThe tls trust cert file path.
—topics-patternTopicsPattern to consume from list of topics under a namespace that match the pattern.
—input and —topics-Pattern are mutually exclusive.
Add SerDe class name for a pattern in —customSerdeInputs (supported for java fun only).
—use-tls使用 tls 连接。
默认值:false

available-sinks

Get the list of Pulsar IO connector sinks supported by Pulsar cluster.

用法

  1. $ pulsar-admin sinks available-sinks

reload

Reload the available built-in connectors.

用法

  1. $ pulsar-admin sinks reload