How to use Pulsar connectors

This guide describes how to use Pulsar connectors.

Install a connector

Pulsar bundles several builtin connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-builtin connectors.

note

When using a non-builtin connector, you need to specify the path of an archive file for the connector.

To set up a builtin connector, follow the instructions here.

After the setup, the builtin connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.

Configure a connector

You can configure the following information:

Configure a default storage location for a connector

To configure a default folder for builtin connectors, set the connectorsDirectory parameter in the ./conf/functions_worker.yml configuration file.

Example

Set the ./connectors folder as the default storage location for builtin connectors.

  1. ########################
  2. # Connectors
  3. ########################
  4. connectorsDirectory: ./connectors

Configure a connector with a YAML file

To configure a connector, you need to provide a YAML configuration file when creating a connector.

The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.

Example 1

Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:

  • Which Cassandra cluster to connect
  • What is the keyspace and columnFamily to be used in Cassandra for collecting data
  • How to map Pulsar messages into Cassandra table key and columns
  1. tenant: public
  2. namespace: default
  3. name: cassandra-test-sink
  4. ...
  5. # cassandra specific config
  6. configs:
  7. roots: "localhost:9042"
  8. keyspace: "pulsar_test_keyspace"
  9. columnFamily: "pulsar_test_table"
  10. keyname: "key"
  11. columnName: "col"

Example 2

Below is a YAML configuration file of a Kafka source.

  1. configs:
  2. bootstrapServers: "pulsar-kafka:9092"
  3. groupId: "test-pulsar-io"
  4. topic: "my-topic"
  5. sessionTimeoutMs: "10000"
  6. autoCommitEnabled: "false"

Example 3

Below is a YAML configuration file of a PostgreSQL JDBC sink.

  1. configs:
  2. userName: "postgres"
  3. password: "password"
  4. jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
  5. tableName: "test_jdbc"

Get available connectors

Before starting using connectors, you can perform the following operations:

reload

If you add or delete a nar file in a connector folder, reload the available builtin connector before using it.

Source

Use the reload subcommand.

  1. $ pulsar-admin sources reload

For the latest and complete information, see Pulsar admin docs.

Sink

Use the reload subcommand.

  1. $ pulsar-admin sinks reload

For the latest and complete information, see Pulsar admin docs.

available

After reloading connectors (optional), you can get a list of available connectors.

Source

Use the available-sources subcommand.

  1. $ pulsar-admin sources available-sources

Sink

Use the available-sinks subcommand.

  1. $ pulsar-admin sinks available-sinks

Run a connector

To run a connector, you can perform the following operations:

create

You can create a connector using Admin CLI, REST API or JAVA admin API.f

Source

Create a source connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the create subcommand.

  1. $ pulsar-admin sources create options

For the latest and complete information, see Pulsar admin docs.

Send a POST request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName

  • Create a source connector with a local file.

    1. void createSource(SourceConfig sourceConfig,
    2. String fileName)
    3. throws PulsarAdminException
  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>sourceConfig</code></td><td>The source configuration object</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [createSource](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Source.html#createSource-SourceConfig-java.lang.String-).
  • Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void createSourceWithUrl(SourceConfig sourceConfig,
    2. String pkgUrl)
    3. throws PulsarAdminException
  1. Supported URLs are `http` and `file`.
  2. **Example**
  3. - HTTP: [http://www.repo.com/fileName.jar](http://www.repo.com/fileName.jar)
  4. - File: file:///dir/fileName.jar
  5. **Parameter**
  6. <table><thead><tr><th>Parameter</th><th>Description</th></tr></thead><tbody><tr><td><code>sourceConfig</code></td><td>The source configuration object</td></tr><tr><td><code>pkgUrl</code></td><td>URL from which pkg can be downloaded</td></tr></tbody></table>
  7. **Exception**
  8. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  9. For more information, see [createSourceWithUrl](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Source.html#createSourceWithUrl-SourceConfig-java.lang.String-).

Sink

Create a sink connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the create subcommand.

  1. $ pulsar-admin sinks create options

For the latest and complete information, see Pulsar admin docs.

Send a POST request to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sinkName

  • Create a sink connector with a local file.

    1. void createSink(SinkConfig sinkConfig,
    2. String fileName)
    3. throws PulsarAdminException
  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>sinkConfig</code></td><td>The sink configuration object</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [createSink](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Sink.html#createSink-SinkConfig-java.lang.String-).
  • Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void createSinkWithUrl(SinkConfig sinkConfig,
    2. String pkgUrl)
    3. throws PulsarAdminException
  1. Supported URLs are `http` and `file`.
  2. **Example**
  3. - HTTP: [http://www.repo.com/fileName.jar](http://www.repo.com/fileName.jar)
  4. - File: file:///dir/fileName.jar
  5. **Parameter**
  6. <table><thead><tr><th>Parameter</th><th>Description</th></tr></thead><tbody><tr><td><code>sinkConfig</code></td><td>The sink configuration object</td></tr><tr><td><code>pkgUrl</code></td><td>URL from which pkg can be downloaded</td></tr></tbody></table>
  7. **Exception**
  8. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  9. For more information, see [createSinkWithUrl](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Sink.html#createSinkWithUrl-SinkConfig-java.lang.String-).

start

You can start a connector using Admin CLI or REST API.

Source

Start a source connector.

  • Admin CLI
  • REST API

Use the start subcommand.

  1. $ pulsar-admin sources start options

For the latest and complete information, see Pulsar admin docs.

Sink

Start a sink connector.

  • Admin CLI
  • REST API

Use the start subcommand.

  1. $ pulsar-admin sinks start options

For the latest and complete information, see Pulsar admin docs.

localrun

You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.

Source

Run a source connector locally.

  • Admin CLI

Use the localrun subcommand.

  1. $ pulsar-admin sources localrun options

For the latest and complete information, see Pulsar admin docs.

Sink

Run a sink connector locally.

  • Admin CLI

Use the localrun subcommand.

  1. $ pulsar-admin sinks localrun options

For the latest and complete information, see Pulsar admin docs.

Monitor a connector

To monitor a connector, you can perform the following operations:

get

You can get the information of a connector using Admin CLI, REST API or JAVA admin API.

Source

Get the information of a source connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the get subcommand.

  1. $ pulsar-admin sources get options

For the latest and complete information, see Pulsar admin docs.

Send a GET request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName

  1. SourceConfig getSource(String tenant,
  2. String namespace,
  3. String source)
  4. throws PulsarAdminException

Example

This is a sourceConfig.

  1. {
  2. "tenant": "tenantName",
  3. "namespace": "namespaceName",
  4. "name": "sourceName",
  5. "className": "className",
  6. "topicName": "topicName",
  7. "configs": {},
  8. "parallelism": 1,
  9. "processingGuarantees": "ATLEAST_ONCE",
  10. "resources": {
  11. "cpu": 1.0,
  12. "ram": 1073741824,
  13. "disk": 10737418240
  14. }
  15. }

This is a sourceConfig example.

  1. {
  2. "tenant": "public",
  3. "namespace": "default",
  4. "name": "debezium-mysql-source",
  5. "className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource",
  6. "topicName": "debezium-mysql-topic",
  7. "configs": {
  8. "database.user": "debezium",
  9. "database.server.id": "184054",
  10. "database.server.name": "dbserver1",
  11. "database.port": "3306",
  12. "database.hostname": "localhost",
  13. "database.password": "dbz",
  14. "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
  15. "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  16. "database.whitelist": "inventory",
  17. "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  18. "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
  19. "pulsar.service.url": "pulsar://127.0.0.1:6650",
  20. "database.history.pulsar.topic": "history-topic2"
  21. },
  22. "parallelism": 1,
  23. "processingGuarantees": "ATLEAST_ONCE",
  24. "resources": {
  25. "cpu": 1.0,
  26. "ram": 1073741824,
  27. "disk": 10737418240
  28. }
  29. }

Exception

Exception nameDescription
PulsarAdminException.NotAuthorizedExceptionYou don’t have the admin permission
PulsarAdminException.NotFoundExceptionCluster doesn’t exist
PulsarAdminExceptionUnexpected error

For more information, see getSource.

Sink

Get the information of a sink connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the get subcommand.

  1. $ pulsar-admin sinks get options

For the latest and complete information, see Pulsar admin docs.

Send a GET request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sinkName

  1. SinkConfig getSink(String tenant,
  2. String namespace,
  3. String sink)
  4. throws PulsarAdminException

Example

This is a sinkConfig.

  1. {
  2. "tenant": "tenantName",
  3. "namespace": "namespaceName",
  4. "name": "sinkName",
  5. "className": "className",
  6. "inputSpecs": {
  7. "topicName": {
  8. "isRegexPattern": false
  9. }
  10. },
  11. "configs": {},
  12. "parallelism": 1,
  13. "processingGuarantees": "ATLEAST_ONCE",
  14. "retainOrdering": false,
  15. "autoAck": true
  16. }

This is a sinkConfig example.

  1. {
  2. "tenant": "public",
  3. "namespace": "default",
  4. "name": "pulsar-postgres-jdbc-sink",
  5. "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink",
  6. "inputSpecs": {
  7. "pulsar-postgres-jdbc-sink-topic": {
  8. "isRegexPattern": false
  9. }
  10. },
  11. "configs": {
  12. "password": "password",
  13. "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink",
  14. "userName": "postgres",
  15. "tableName": "pulsar_postgres_jdbc_sink"
  16. },
  17. "parallelism": 1,
  18. "processingGuarantees": "ATLEAST_ONCE",
  19. "retainOrdering": false,
  20. "autoAck": true
  21. }

Parameter description

NameDescription
tenantTenant name
namespaceNamespace name
sinkSink name

For more information, see getSink.

list

You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.

Source

Get the list of all running source connectors.

  • Admin CLI
  • REST API
  • Java Admin API

Use the list subcommand.

  1. $ pulsar-admin sources list options

For the latest and complete information, see Pulsar admin docs.

Send a GET request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/

  1. List<String> listSources(String tenant,
  2. String namespace)
  3. throws PulsarAdminException

Response example

  1. ["f1", "f2", "f3"]

Exception

Exception nameDescription
PulsarAdminException.NotAuthorizedExceptionYou don’t have the admin permission
PulsarAdminExceptionUnexpected error

For more information, see listSource.

Sink

Get the list of all running sink connectors.

  • Admin CLI
  • REST API
  • Java Admin API

Use the list subcommand.

  1. $ pulsar-admin sinks list options

For the latest and complete information, see Pulsar admin docs.

Send a GET request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/

  1. List<String> listSinks(String tenant,
  2. String namespace)
  3. throws PulsarAdminException

Response example

  1. ["f1", "f2", "f3"]

Exception

Exception nameDescription
PulsarAdminException.NotAuthorizedExceptionYou don’t have the admin permission
PulsarAdminExceptionUnexpected error

For more information, see listSource.

status

You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.

Source

Get the current status of a source connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the status subcommand.

  1. $ pulsar-admin sources status options

For the latest and complete information, see Pulsar admin docs.

  1. **Parameter**
  2. <table><thead><tr><th>Parameter</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>sink</code></td><td>Source name</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [getSourceStatus](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Source.html#getSource-java.lang.String-java.lang.String-java.lang.String-).
  • Gets the current status of a specified source connector.

    1. SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant,
    2. String namespace,
    3. String source,
    4. int id)
    5. throws PulsarAdminException
  1. **Parameter**
  2. <table><thead><tr><th>Parameter</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>sink</code></td><td>Source name</td></tr><tr><td><code>id</code></td><td>Source instanceID</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Exception name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [getSourceStatus](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Source.html#getSourceStatus-java.lang.String-java.lang.String-java.lang.String-int-).

Sink

Get the current status of a Pulsar sink connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the status subcommand.

  1. $ pulsar-admin sinks status options

For the latest and complete information, see Pulsar admin docs.

  1. **Parameter**
  2. <table><thead><tr><th>Parameter</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>sink</code></td><td>Source name</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Exception name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [getSinkStatus](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Sink.html#getSinkStatus-java.lang.String-java.lang.String-java.lang.String-).
  • Gets the current status of a specified source connector.

    1. SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant,
    2. String namespace,
    3. String sink,
    4. int id)
    5. throws PulsarAdminException
  1. **Parameter**
  2. <table><thead><tr><th>Parameter</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>sink</code></td><td>Source name</td></tr><tr><td><code>id</code></td><td>Sink instanceID</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Exception name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [getSinkStatusWithInstanceID](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Sink.html#getSinkStatus-java.lang.String-java.lang.String-java.lang.String-int-).

Update a connector

update

You can update a running connector using Admin CLI, REST API or JAVA admin API.

Source

Update a running Pulsar source connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the update subcommand.

  1. $ pulsar-admin sources update options

For the latest and complete information, see Pulsar admin docs.

Send a PUT request to this endpoint: PUT /admin/v3/sources/:tenant/:namespace/:sourceName

  • Update a running source connector with a local file.

    1. void updateSource(SourceConfig sourceConfig,
    2. String fileName)
    3. throws PulsarAdminException
  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>sourceConfig</code></td><td>The source configuration object</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException.NotAuthorizedException</code></td><td>You don't have the admin permission</td></tr><tr><td><code>PulsarAdminException.NotFoundException</code></td><td>Cluster doesn't exist</td></tr><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [updateSource](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Source.html#updateSource-SourceConfig-java.lang.String-).
  • Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void updateSourceWithUrl(SourceConfig sourceConfig,
    2. String pkgUrl)
    3. throws PulsarAdminException
  1. Supported URLs are `http` and `file`.
  2. **Example**
  3. - HTTP: [http://www.repo.com/fileName.jar](http://www.repo.com/fileName.jar)
  4. - File: file:///dir/fileName.jar
  5. **Parameter**
  6. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>sourceConfig</code></td><td>The source configuration object</td></tr><tr><td><code>pkgUrl</code></td><td>URL from which pkg can be downloaded</td></tr></tbody></table>
  7. **Exception**
  8. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException.NotAuthorizedException</code></td><td>You don't have the admin permission</td></tr><tr><td><code>PulsarAdminException.NotFoundException</code></td><td>Cluster doesn't exist</td></tr><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>

For more information, see createSourceWithUrl.

Sink

Update a running Pulsar sink connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the update subcommand.

  1. $ pulsar-admin sinks update options

For the latest and complete information, see Pulsar admin docs.

Send a PUT request to this endpoint: PUT /admin/v3/sinks/:tenant/:namespace/:sinkName

  • Update a running sink connector with a local file.

    1. void updateSink(SinkConfig sinkConfig,
    2. String fileName)
    3. throws PulsarAdminException
  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>sinkConfig</code></td><td>The sink configuration object</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException.NotAuthorizedException</code></td><td>You don't have the admin permission</td></tr><tr><td><code>PulsarAdminException.NotFoundException</code></td><td>Cluster doesn't exist</td></tr><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [updateSink](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Sink.html#updateSink-SinkConfig-java.lang.String-).
  • Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void updateSinkWithUrl(SinkConfig sinkConfig,
    2. String pkgUrl)
    3. throws PulsarAdminException
  1. Supported URLs are `http` and `file`.
  2. **Example**
  3. - HTTP: [http://www.repo.com/fileName.jar](http://www.repo.com/fileName.jar)
  4. - File: file:///dir/fileName.jar
  5. **Parameter**
  6. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>sinkConfig</code></td><td>The sink configuration object</td></tr><tr><td><code>pkgUrl</code></td><td>URL from which pkg can be downloaded</td></tr></tbody></table>
  7. **Exception**
  8. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException.NotAuthorizedException</code></td><td>You don't have the admin permission</td></tr><tr><td><code>PulsarAdminException.NotFoundException</code></td><td>Cluster doesn't exist</td></tr><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>

For more information, see updateSinkWithUrl.

Stop a connector

stop

You can stop a connector using Admin CLI, REST API or JAVA admin API.

Source

Stop a source connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the stop subcommand.

  1. $ pulsar-admin sources stop options

For the latest and complete information, see Pulsar admin docs.

  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>source</code></td><td>Source name</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [stopSource](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-).
  • Stop a specified source connector.

    1. void stopSource(String tenant,
    2. String namespace,
    3. String source,
    4. int instanceId)
    5. throws PulsarAdminException
  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>source</code></td><td>Source name</td></tr><tr><td><code>instanceId</code></td><td>Source instanceID</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [stopSource](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-int-).

Sink

Stop a sink connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the stop subcommand.

  1. $ pulsar-admin sinks stop options

For the latest and complete information, see Pulsar admin docs.

  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>source</code></td><td>Source name</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [stopSink](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-).
  • Stop a specified sink connector.

    1. void stopSink(String tenant,
    2. String namespace,
    3. String sink,
    4. int instanceId)
    5. throws PulsarAdminException
  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>source</code></td><td>Source name</td></tr><tr><td><code>instanceId</code></td><td>Source instanceID</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [stopSink](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-int-).

Restart a connector

restart

You can restart a connector using Admin CLI, REST API or JAVA admin API.

Source

Restart a source connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the restart subcommand.

  1. $ pulsar-admin sources restart options

For the latest and complete information, see Pulsar admin docs.

  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>source</code></td><td>Source name</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [restartSource](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-).
  • Restart a specified source connector.

    1. void restartSource(String tenant,
    2. String namespace,
    3. String source,
    4. int instanceId)
    5. throws PulsarAdminException
  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>source</code></td><td>Source name</td></tr><tr><td><code>instanceId</code></td><td>Source instanceID</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [restartSource](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-int-).

Sink

Restart a sink connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the restart subcommand.

  1. $ pulsar-admin sinks restart options

For the latest and complete information, see Pulsar admin docs.

  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>sink</code></td><td>Sink name</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [restartSink](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-).
  • Restart a specified sink connector.

    1. void restartSink(String tenant,
    2. String namespace,
    3. String sink,
    4. int instanceId)
    5. throws PulsarAdminException
  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>tenant</code></td><td>Tenant name</td></tr><tr><td><code>namespace</code></td><td>Namespace name</td></tr><tr><td><code>source</code></td><td>Source name</td></tr><tr><td><code>instanceId</code></td><td>Sink instanceID</td></tr></tbody></table>
  3. **Exception**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>Unexpected error</td></tr></tbody></table>
  5. For more information, see [restartSink](https://pulsar.apache.orghttps://pulsar.apache.org/api/admin/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-int-).

Delete a connector

delete

You can delete a connector using Admin CLI, REST API or JAVA admin API.

Source

Delete a source connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the delete subcommand.

  1. $ pulsar-admin sources delete options

For the latest and complete information, see Pulsar admin docs.

Delete al Pulsar source connector.

Send a DELETE request to this endpoint: DELETE /admin/v3/sources/:tenant/:namespace/:sourceName

Delete a source connector.

  1. void deleteSource(String tenant,
  2. String namespace,
  3. String source)
  4. throws PulsarAdminException

Parameter

NameDescription
tenantTenant name
namespaceNamespace name
sourceSource name

Exception

NameDescription
PulsarAdminException.NotAuthorizedExceptionYou don’t have the admin permission
PulsarAdminException.NotFoundExceptionCluster doesn’t exist
PulsarAdminException.PreconditionFailedExceptionCluster is not empty
PulsarAdminExceptionUnexpected error

For more information, see deleteSource.

Sink

Delete a sink connector.

  • Admin CLI
  • REST API
  • Java Admin API

Use the delete subcommand.

  1. $ pulsar-admin sinks delete options

For the latest and complete information, see Pulsar admin docs.

Delete a sink connector.

Send a DELETE request to this endpoint: DELETE /admin/v3/sinks/:tenant/:namespace/:sinkName

Delete a Pulsar sink connector.

  1. void deleteSink(String tenant,
  2. String namespace,
  3. String source)
  4. throws PulsarAdminException

Parameter

NameDescription
tenantTenant name
namespaceNamespace name
sinkSink name

Exception

NameDescription
PulsarAdminException.NotAuthorizedExceptionYou don’t have the admin permission
PulsarAdminException.NotFoundExceptionCluster doesn’t exist
PulsarAdminException.PreconditionFailedExceptionCluster is not empty
PulsarAdminExceptionUnexpected error

For more information, see deleteSource.