Connector Admin CLI

The pulsar-admin tool helps you manage Pulsar connectors.

sources

An interface for managing Pulsar IO sources (ingress data into Pulsar).

  1. $ pulsar-admin sources subcommands

Subcommands are:

  • create

  • update

  • delete

  • get

  • status

  • list

  • stop

  • start

  • restart

  • localrun

  • available-sources

  • 重新加载

create

Submit a Pulsar IO source connector to run in a Pulsar cluster.

用法

  1. $ pulsar-admin sources create options

选项

标记说明
-a, —archiveThe path to the NAR archive for the source.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—batch-source-configBatchSource configuration key/values pairs provided as a JSON string, e.g., { “discoveryTriggererClassName” : “org.apache.pulsar.io.batchdiscovery.CronTriggerer”, “discoveryTriggererConfig”: {“cron”: “/5 *”} }
—classnameThe source’s class name if archive is file-url-path (file://).
—cpuThe CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
—deserialization-classnameThe SerDe classname for the source.
—destination-topic-nameThe Pulsar topic to which data is sent.
—diskThe disk (in bytes) that needs to be allocated per source instance (applicable only to Docker runtime).
—nameThe source’s name.
—namespaceThe source’s namespace.
—parallelismThe source’s parallelism factor, that is, the number of source instances to run.
—processing-guaranteesThe processing guarantees (also named as delivery semantics) applied to the source. A source connector receives messages from external system and writes messages to a Pulsar topic. The —processing-guarantees is used to ensure the processing guarantees for writing messages to the Pulsar topic.
The available values are ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE.
—ramThe RAM (in bytes) that needs to be allocated per source instance (applicable only to the process and Docker runtimes).
-st, —schema-typeThe schema type.
Either a builtin schema (for example, AVRO and JSON) or custom schema class name to be used to encode messages emitted from source.
—source-configSource config key/values.
—source-config-fileThe path to a YAML config file specifying the source’s configuration.
-t, —source-typeThe source’s connector provider.
—tenantThe source’s tenant.
—producer-configThe custom producer configuration (as a JSON string).

update

Update a already submitted Pulsar IO source connector.

用法

  1. $ pulsar-admin sources update options

选项

标记说明
-a, —archiveThe path to the NAR archive for the source.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—batch-source-configBatchSource configuration key/values pairs provided as a JSON string, e.g., { “discoveryTriggererClassName” : “org.apache.pulsar.io.batchdiscovery.CronTriggerer”, “discoveryTriggererConfig”: {“cron”: “/5 *”} }
—classnameThe source’s class name if archive is file-url-path (file://).
—cpuThe CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
—deserialization-classnameThe SerDe classname for the source.
—destination-topic-nameThe Pulsar topic to which data is sent.
—diskThe disk (in bytes) that needs to be allocated per source instance (applicable only to Docker runtime).
—nameThe source’s name.
—namespaceThe source’s namespace.
—parallelismThe source’s parallelism factor, that is, the number of source instances to run.
—processing-guaranteesThe processing guarantees (also named as delivery semantics) applied to the source. A source connector receives messages from external system and writes messages to a Pulsar topic. The —processing-guarantees is used to ensure the processing guarantees for writing messages to the Pulsar topic.
The available values are ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE.
—ramThe RAM (in bytes) that needs to be allocated per source instance (applicable only to the process and Docker runtimes).
-st, —schema-typeThe schema type.
Either a builtin schema (for example, AVRO and JSON) or custom schema class name to be used to encode messages emitted from source.
—source-configSource config key/values.
—source-config-fileThe path to a YAML config file specifying the source’s configuration.
-t, —source-typeThe source’s connector provider. The source-type parameter of the currently built-in connectors is determined by the setting of the name parameter specified in the pulsar-io.yaml file.
—tenantThe source’s tenant.
—update-auth-data是否更新认证数据。
默认值:false。

delete

Delete a Pulsar IO source connector.

用法

  1. $ pulsar-admin sources delete options

选项

标记说明
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

get

Get the information about a Pulsar IO source connector.

用法

  1. $ pulsar-admin sources get options

选项

标记说明
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

status

Check the current status of a Pulsar Source.

用法

  1. $ pulsar-admin sources status options

选项

标记说明
—instance-idThe source ID.
If instance-id is not provided, Pulasr gets status of all instances.
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

list

List all running Pulsar IO source connectors.

用法

  1. $ pulsar-admin sources list options

选项

标记说明
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

stop

Stop a source instance.

用法

  1. $ pulsar-admin sources stop options

选项

标记说明
—instance-idThe source instanceID.
If instance-id is not provided, Pulsar stops all instances.
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

start

Start a source instance.

用法

  1. $ pulsar-admin sources start options

选项

标记说明
—instance-idThe source instanceID.
If instance-id is not provided, Pulsar starts all instances.
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

restart

Restart a source instance.

用法

  1. $ pulsar-admin sources restart options

选项

标记说明
—instance-idThe source instanceID.
If instance-id is not provided, Pulsar restarts all instances.
—nameThe source’s name.
—namespaceThe source’s namespace.
—tenantThe source’s tenant.

localrun

Run a Pulsar IO source connector locally rather than deploying it to the Pulsar cluster.

用法

  1. $ pulsar-admin sources localrun options

选项

标记说明
-a, —archiveThe path to the NAR archive for the Source.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—broker-service-urlPulsar broker 的 URL。
—classnameThe source’s class name if archive is file-url-path (file://).
—client-auth-params客户端身份验证参数。
—client-auth-plugin客户端身份验证插件,function 进程需要通过此插件连接到 broker。
—cpuThe CPU (in cores) that needs to be allocated per source instance (applicable only to the Docker runtime).
—deserialization-classnameThe SerDe classname for the source.
—destination-topic-nameThe Pulsar topic to which data is sent.
—diskThe disk (in bytes) that needs to be allocated per source instance (applicable only to the Docker runtime).
—hostname-verification-enabled启用主机名验证。
默认值:false
—nameThe source’s name.
—namespaceThe source’s namespace.
—parallelismThe source’s parallelism factor, that is, the number of source instances to run).
—processing-guaranteesThe processing guarantees (also named as delivery semantics) applied to the source. A source connector receives messages from external system and writes messages to a Pulsar topic. The —processing-guarantees is used to ensure the processing guarantees for writing messages to the Pulsar topic.
The available values are ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE.
—ramThe RAM (in bytes) that needs to be allocated per source instance (applicable only to the Docker runtime).
-st, —schema-typeThe schema type.
Either a builtin schema (for example, AVRO and JSON) or custom schema class name to be used to encode messages emitted from source.
—source-configSource config key/values.
—source-config-fileThe path to a YAML config file specifying the source’s configuration.
—source-typeThe source’s connector provider.
—tenantThe source’s tenant.
—tls-allow-insecure允许不安全的 tls 连接。
默认值:false
—tls-trust-cert-pathThe tls trust cert file path.
—use-tls使用 tls 连接。
默认值:false
—producer-configThe custom producer configuration (as a JSON string).

available-sources

Get the list of Pulsar IO connector sources supported by Pulsar cluster.

用法

  1. $ pulsar-admin sources available-sources

重新加载

Reload the available built-in connectors.

用法

  1. $ pulsar-admin 源刷新

sinks

An interface for managing Pulsar IO sinks (egress data from Pulsar).

  1. $ pulsar-admin sinks subcommands

Subcommands are:

  • create

  • update

  • delete

  • get

  • status

  • list

  • stop

  • start

  • restart

  • localrun

  • available-sinks

  • 重新加载

create

Submit a Pulsar IO sink connector to run in a Pulsar cluster.

用法

  1. $ pulsar-admin sinks create options

选项

标记说明
-a, —archiveThe path to the archive file for the sink.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—auto-ackWhether or not the framework will automatically acknowledge messages.
—classnameThe sink’s class name if archive is file-url-path (file://).
—cpuThe CPU (in cores) that needs to be allocated per sink instance (applicable only to Docker runtime).
—custom-schema-inputsThe map of input topics to schema types or class names (as a JSON string).
—custom-serde-inputs输入 topic 到 SerDe 类名称的映射(以 JSON 字符串的形式)。
—diskThe disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime).
-i, —inputsThe sink’s input topic or topics (multiple topics can be specified as a comma-separated list).
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—parallelismThe sink’s parallelism factor, that is, the number of sink instances to run.
—processing-guaranteesThe processing guarantees (also known as delivery semantics) applied to the sink. The —processing-guarantees implementation in Pulsar also relies on sink implementation.
The available values are ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE.
—ramThe RAM (in bytes) that needs to be allocated per sink instance (applicable only to the process and Docker runtimes).
—retain-orderingSink consumes and sinks messages in order.
—sink-configsink config key/values.
—sink-config-fileThe path to a YAML config file specifying the sink’s configuration.
-t, —sink-typeThe sink’s connector provider. The sink-type parameter of the currently built-in connectors is determined by the setting of the name parameter specified in the pulsar-io.yaml file.
—subs-namePulsar source subscription name if user wants a specific subscription-name for input-topic consumer.
—tenantThe sink’s tenant.
—timeout-ms消息超时(以毫秒为单位)。
—topics-patternTopicsPattern to consume from list of topics under a namespace that match the pattern.
—input and —topics-Pattern are mutually exclusive.
Add SerDe class name for a pattern in —customSerdeInputs (supported for java fun only).

update

Update a Pulsar IO sink connector.

用法

  1. $ pulsar-admin sinks update options

选项

标记说明
-a, —archiveThe path to the archive file for the sink.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—auto-ackWhether or not the framework will automatically acknowledge messages.
—classnameThe sink’s class name if archive is file-url-path (file://).
—cpuThe CPU (in cores) that needs to be allocated per sink instance (applicable only to Docker runtime).
—custom-schema-inputsThe map of input topics to schema types or class names (as a JSON string).
—custom-serde-inputs输入 topic 到 SerDe 类名称的映射(以 JSON 字符串的形式)。
—diskThe disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime).
-i, —inputsThe sink’s input topic or topics (multiple topics can be specified as a comma-separated list).
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—parallelismThe sink’s parallelism factor, that is, the number of sink instances to run.
—processing-guaranteesThe processing guarantees (also known as delivery semantics) applied to the sink. The —processing-guarantees implementation in Pulsar also relies on sink implementation.
The available values are ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE.
—ramThe RAM (in bytes) that needs to be allocated per sink instance (applicable only to the process and Docker runtimes).
—retain-orderingSink consumes and sinks messages in order.
—sink-configsink config key/values.
—sink-config-fileThe path to a YAML config file specifying the sink’s configuration.
-t, —sink-typeThe sink’s connector provider.
—subs-namePulsar source subscription name if user wants a specific subscription-name for input-topic consumer.
—tenantThe sink’s tenant.
—timeout-ms消息超时(以毫秒为单位)。
—topics-patternTopicsPattern to consume from list of topics under a namespace that match the pattern.
—input and —topics-Pattern are mutually exclusive.
Add SerDe class name for a pattern in —customSerdeInputs (supported for java fun only).
—update-auth-data是否更新认证数据。
默认值:false。

delete

Delete a Pulsar IO sink connector.

用法

  1. $ pulsar-admin sinks delete options

选项

标记说明
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

get

Get the information about a Pulsar IO sink connector.

用法

  1. $ pulsar-admin sinks get options

选项

标记说明
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

status

Check the current status of a Pulsar sink.

用法

  1. $ pulsar-admin sinks status options

选项

标记说明
—instance-idThe sink ID.
If instance-id is not provided, Pulasr gets status of all instances.
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

list

List all running Pulsar IO sink connectors.

用法

  1. $ pulsar-admin sinks list options

选项

标记说明
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

stop

Stop a sink instance.

用法

  1. $ pulsar-admin sinks stop options

选项

标记说明
—instance-idThe sink instanceID.
If instance-id is not provided, Pulsar stops all instances.
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

start

Start a sink instance.

用法

  1. $ pulsar-admin sinks start options

选项

标记说明
—instance-idThe sink instanceID.
If instance-id is not provided, Pulsar starts all instances.
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

restart

Restart a sink instance.

用法

  1. $ pulsar-admin sinks restart options

选项

标记说明
—instance-idThe sink instanceID.
If instance-id is not provided, Pulsar restarts all instances.
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—tenantThe sink’s tenant.

localrun

Run a Pulsar IO sink connector locally rather than deploying it to the Pulsar cluster.

用法

  1. $ pulsar-admin sinks localrun options

选项

标记说明
-a, —archiveThe path to the archive file for the sink.
It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
—auto-ackWhether or not the framework will automatically acknowledge messages.
—broker-service-urlPulsar broker 的 URL。
—classnameThe sink’s class name if archive is file-url-path (file://).
—client-auth-params客户端身份验证参数。
—client-auth-plugin客户端身份验证插件,function 进程需要通过此插件连接到 broker。
—cpuThe CPU (in cores) that needs to be allocated per sink instance (applicable only to the Docker runtime).
—custom-schema-inputsThe map of input topics to Schema types or class names (as a JSON string).
—max-redeliver-countMaximum number of times that a message is redelivered before being sent to the dead letter queue.
—dead-letter-topicName of the dead letter topic where the failing messages are sent.
—custom-serde-inputs输入 topic 到 SerDe 类名称的映射(以 JSON 字符串的形式)。
—diskThe disk (in bytes) that needs to be allocated per sink instance (applicable only to the Docker runtime).
—hostname-verification-enabled启用主机名验证。
默认值:false
-i, —inputsThe sink’s input topic or topics (multiple topics can be specified as a comma-separated list).
—nameThe sink’s name.
—namespaceThe sink’s namespace.
—parallelismThe sink’s parallelism factor, that is, the number of sink instances to run).
—processing-guaranteesThe processing guarantees (also known as delivery semantics) applied to the sink. The —processing-guarantees implementation in Pulsar also relies on sink implementation.
The available values are ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE.
—ramThe RAM (in bytes) that needs to be allocated per sink instance (applicable only to the Docker runtime).
—retain-orderingSink consumes and sinks messages in order.
—sink-configsink config key/values.
—sink-config-fileThe path to a YAML config file specifying the sink’s configuration.
—sink-typeThe sink’s connector provider.
—subs-namePulsar source subscription name if user wants a specific subscription-name for input-topic consumer.
—tenantThe sink’s tenant.
—timeout-ms消息超时(以毫秒为单位)。
—negative-ack-redelivery-delay-msThe negatively-acknowledged message redelivery delay in milliseconds.
—tls-allow-insecure允许不安全的 tls 连接。
默认值:false
—tls-trust-cert-pathThe tls trust cert file path.
—topics-patternTopicsPattern to consume from list of topics under a namespace that match the pattern.
—input and —topics-Pattern are mutually exclusive.
Add SerDe class name for a pattern in —customSerdeInputs (supported for java fun only).
—use-tls使用 tls 连接。
默认值:false

available-sinks

Get the list of Pulsar IO connector sinks supported by Pulsar cluster.

用法

  1. $ pulsar-admin sinks available-sinks

重新加载

Reload the available built-in connectors.

用法

  1. $ pulsar-admin sinks reload