本指南描述如何使用 Pulsar 连接器。

安装

Pulsar 捆绑了几个用于移动数据进出常用系统的 内置连接器。 (可选)你可以创建和使用所需的非内置连接器。

Note

当使用非内置连接器时,需要为连接器指定归档文件的路径。

若要设置内置连接器,请按照这里 说明设置

安装后,内置连接器会自动被 Pulsar 代理(或 function-workers)发现,因此不需要额外安装步骤。

配置

配置以下信息:

配置连接器的默认存储位置

要配置内置连接器的默认文件夹,请在 中设置 <code>连接目录 参考/conf/functions_worker.yml 配置文件。

示例

设置 ./connectors 文件夹作为内置连接器的默认存储位置。

  1. ########################
  2. # Connectors
  3. ########################
  4. connectorsDirectory: ./connectors

配置一个 YAML 文件的连接器

要配置连接器,需要在创建连接器时提供一个 YAML 配置文件。

YAML 配置文件告诉 Pulsar 在哪里找到连接器,以及如何将连接器与 Pulsar 主题连接起来。

示例 1

下面是 Cassandra sink 的 YAML 配置文件

  • 哪个 Cassandra 集群可以连接

  • 什么是 keyspace and columnFamily 用于收集数据

  • 如何将 Pulsar 消息映射到 Cassandra 桌面键和列

  1. tenant: public
  2. namespace: default
  3. name: cassandra-test-sink
  4. ...
  5. # cassandra specific config
  6. configs:
  7. roots: "localhost:9042"
  8. keyspace: "pulsar_test_keyspace"
  9. columnFamily: "pulsar_test_table"
  10. keyname: "key"
  11. columnName: "col"

示例 2

下面是 Kafka 源的 YAML 配置文件。

  1. configs:
  2. bootstrapServers: "pulsar-kafka:9092"
  3. groupId: "test-pulsar-io"
  4. topic: "my-topic"
  5. sessionTimeoutMs: "10000"
  6. autoCommitEnabled: "false"

示例 3

Below is a YAML configuration file of a PostgreSQL JDBC sink.

  1. configs:
  2. userName: "postgres"
  3. password: "password"
  4. jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
  5. tableName: "test_jdbc"

获取可用的连接

在开始使用连接器之前,可以执行以下操作:

重新加载

如果你在连接器文件夹中添加或删除 nar 文件,在使用之前重新载入可用的内置连接器。

Source

使用 重新加载 子命令。

  1. $ pulsar-admin 源刷新

For more information, see here.

Sink

使用 重新加载 子命令。

  1. $ pulsar-admin sinks reload

For more information, see here.

可用

重新加载连接器后(可选),你可以获得可用连接器列表。

Source

使用 可用源 子命令。

  1. $ pulsar-admin sources available-sources

Sink

使用 可用的集合 子命令。

  1. $ pulsar-admin sinks available-sinks

运行连接器

要运行连接器,你可以执行以下操作:

create

You can create a connector using Admin CLI, REST API or JAVA admin API.f

Source

创建一个源连接器。

Admin CLI

REST API

Java Admin API

使用 创建 子命令。

  1. $ pulsar-admin sources create options

For more information, see here.

发送 POST 请求到此端点: POST /admin/v3/sources/:tenant/:namespace/:sourceName

  • Create a source connector with a local file.

    1. void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException

    Parameter

    NameDescription
  1. `sourceConfig` | 源配置对象

异常

  1. | Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`createSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#createSource-SourceConfig-java.lang.String-).
  • Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL是 http文件

    示例

  1. **Parameter**
  2. 参数 | 描述 |---|---- `sourceConfig` | 源配置对象 `pkgUrl` | 可以从其中下载pkg URL
  3. **异常**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>未知错误</td></tr></tbody></table>
  5. For more information, see [`createSourceWithUrl`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#createSourceWithUrl-SourceConfig-java.lang.String-).

Sink

创建源连接器。

Admin CLI

REST API

Java Admin API

使用 创建 子命令。

  1. $ pulsar-admin sinks create options

For more information, see here.

发送 POST 请求到此端点:POST /admin/v3/sinks/:tenant/:namespace/:sinkName

  • Create a sink connector with a local file.

    1. void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException

    Parameter

    NameDescription
  1. `sinkConfig` | The sink configuration object**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`createSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#createSink-SinkConfig-java.lang.String-).
  • Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL是 http文件

    示例

  1. **Parameter**
  2. 参数 | 描述 |---|---- `sourceConfig` | 源配置对象 `pkgUrl` | 可以从其中下载pkg URL
  3. **异常**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>未知错误</td></tr></tbody></table>
  5. For more information, see [`createSinkWithUrl`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#createSinkWithUrl-SinkConfig-java.lang.String-).

start

You can start a connector using Admin CLI or REST API.

Source

启动一个源连接器。

Admin CLI

REST API

使用 起始 子命令。

  1. $ pulsar-admin sources start options

For more information, see here.

Sink

启动 sink 连接器。

Admin CLI

REST API

使用 起始 子命令。

  1. $ pulsar-admin sinks start options

For more information, see here.

localrun

You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.

Source

本地运行一个源连接器。

Admin CLI

使用 localrun 子命令。

  1. $ pulsar-admin sources localrun options

For more information, see here.

Sink

本地运行 sink 连接器。

Admin CLI

使用 localrun 子命令。

  1. $ pulsar-admin sinks localrun options

For more information, see here.

监视连接器

要监视连接器,你可以执行以下操作:

get

You can get the information of a connector using Admin CLI, REST API or JAVA admin API.

Source

获取源连接器的信息。

Admin CLI

REST API

Java Admin API

使用 获得 子命令。

  1. $ pulsar-admin sources get options

For more information, see here.

发送 GET 请求到此端点: GET /admin/v3/sources/:tenant/:namespace/:sourceName

  1. SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException

示例

这是一个源配置

  1. { "tenant": "tenantName", "namespace": "namespaceName", "name": "sourceName", "className": "className", "topicName": "topicName", "configs": {}, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}

这是一个源配置示例。

  1. { "tenant": "public", "namespace": "default", "name": "debezium-mysql-source", "className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource", "topicName": "debezium-mysql-topic", "configs": { "database.user": "debezium", "database.server.id": "184054", "database.server.name": "dbserver1", "database.port": "3306", "database.hostname": "localhost", "database.password": "dbz", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "database.whitelist": "inventory", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "pulsar.service.url": "pulsar://127.0.0.1:6650", "database.history.pulsar.topic": "history-topic2" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}

异常

Exception name | Description |—-|—- PulsarAdminException.NotAuthorizedException | You don’t have the admin permission PulsarAdminException.NotFoundException | Cluster doesn’t exist PulsarAdminException | Unexpected error

For more information, see getSource.

Sink

获取 sink 的信息。

Admin CLI

REST API

Java Admin API

使用 获得 子命令。

  1. $ pulsar-admin sinks get options

For more information, see here.

发送 GET 请求到此端点: GET /admin/v3/sinks/:tenant/:namespace/:sinkName

  1. SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException

示例

这是一个 sinkConfig。

  1. {"tenant": "tenantName","namespace": "namespaceName","name": "sinkName","className": "className","inputSpecs": {"topicName": { "isRegexPattern": false}},"configs": {},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true}

这是一个 sinkConfig 示例。

  1. { "tenant": "public", "namespace": "default", "name": "pulsar-postgres-jdbc-sink", "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink", "inputSpecs": { "pulsar-postgres-jdbc-sink-topic": { "isRegexPattern": false } }, "configs": { "password": "password", "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink", "userName": "postgres", "tableName": "pulsar_postgres_jdbc_sink" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "autoAck": true}

参数描述

名称|描述 |—-|—— 租户 | 租户名称 命名空间 | 命名空间名称 sink | 唱名名称

For more information, see getSink.

list

You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.

Source

获取所有正在运行的源连接器列表。

Admin CLI

REST API

Java Admin API

使用 列表 子命令。

  1. $ pulsar-admin sources list options

For more information, see here.

发送 GET 请求到这个端点:GET /admin/v3/sources/:tenant/:namespace/

  1. List<String> listSources(String tenant, String namespace) throws PulsarAdminException

响应示例

  1. ["f1", "f2", "f3"]

异常

Exception name | Description |—-|—- PulsarAdminException.NotAuthorizedException | You don’t have the admin permission PulsarAdminException | Unexpected error

For more information, see listSource.

Sink

获取所有正在运行的 sink 列表。

Admin CLI

REST API

Java Admin API

使用 列表 子命令。

  1. $ pulsar-admin sinks list options

For more information, see here.

发送 GET 请求到此端点:GET /admin/v3/sinks/:tenant/:namespace/

  1. List<String> listSinks(String tenant, String namespace) throws PulsarAdminException

响应示例

  1. ["f1", "f2", "f3"]

异常

Exception name | Description |—-|—- PulsarAdminException.NotAuthorizedException | You don’t have the admin permission PulsarAdminException | Unexpected error

For more information, see listSource.

status

You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.

Source

获取源连接器的当前状态。

Admin CLI

REST API

Java Admin API

使用 状态 子命令。

  1. $ pulsar-admin sources status options

For more information, see here.

  • Get the current status of all source connectors.

    1. SourceStatus getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException

    Parameter

    参数|描述 |—-|—— 租户 | 租户名称 命名空间 | 命名空间名称 sink | 源名称

    异常

    名称 | 描述 |—-|—— PulsarAdminException | 意外错误

    For more information, see getSourceStatus.

  • Gets the current status of a specified source connector.

    1. SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant, String namespace, String source, int id) throws PulsarAdminException

    Parameter

    Parameter| Description |—-|—- tenant | Tenant name namespace | Namespace name sink | Source name id | Source instanceID

    异常

    异常名称|描述| —- | —-PulsarAdminException |意外的错误

    For more information, see getSourceStatus.

Sink

获取 Pulsar sink 连接器的当前状态。

Admin CLI

REST API

Java Admin API

使用 状态 子命令。

  1. $ pulsar-admin sinks status options

For more information, see here.

  • Get the current status of all sink connectors.

    1. SinkStatus getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException

    Parameter

    参数|描述 |—-|—— 租户 | 租户名称 命名空间 | 命名空间名称 sink | 源名称

    异常

    异常名称|描述| —- | —-PulsarAdminException |意外的错误

    For more information, see getSinkStatus.

  • Gets the current status of a specified source connector.

    1. SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant, String namespace, String sink, int id) throws PulsarAdminException

    Parameter

    Parameter| Description |—-|—- tenant | Tenant name namespace | Namespace name sink | Source name id | Sink instanceID

    异常

    异常名称|描述| —- | —-PulsarAdminException |意外的错误

    For more information, see getSinkStatusWithInstanceID.

更新连接器

update

You can update a running connector using Admin CLI, REST API or JAVA admin API.

Source

更新正在运行的 Pulsar 源连接器。

Admin CLI

REST API

Java Admin API

使用 更新 子命令。

  1. $ pulsar-admin sources update options

For more information, see here.

发送 PUT 请求到此端点: PUT /admin/v3/sources:tenant/:namespace/:sourceName

  • Update a running source connector with a local file.

    1. void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException

    Parameter

    NameDescription
    sourceConfig源配置对象

    异常

    NameDescription
    PulsarAdminException.NotAuthorizedException没有管理员权限
    PulsarAdminException.NotFoundException集群不存在
    PulsarAdminException未知错误

    For more information, see updateSource.

  • Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL是 http文件

    示例

  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>sourceConfig</code></td><td>源配置对象</td></tr><tr><td><code>pkgUrl</code></td><td>下载 pkg URL</td></tr></tbody></table>
  3. **异常**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException.NotAuthorizedException</code></td><td>没有管理员权限</td></tr><tr><td><code>PulsarAdminException.NotFoundException</code></td><td>集群不存在</td></tr><tr><td><code>PulsarAdminException</code></td><td>未知错误</td></tr></tbody></table>

For more information, see createSourceWithUrl.

Sink

更新正在运行的 Pulsar sink 连接器。

Admin CLI

REST API

Java Admin API

使用 更新 子命令。

  1. $ pulsar-admin sinks update options

For more information, see here.

发送 PUT 请求到此端点: PUT /admin/v3/sinks/:tenant/:namespace/:sinkName

  • Update a running sink connector with a local file.

    1. void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException

    Parameter

    NameDescription
    sinkConfigsink 配置对象

    异常

    NameDescription
    PulsarAdminException.NotAuthorizedException没有管理员权限
    PulsarAdminException.NotFoundException集群不存在
    PulsarAdminException未知错误

    For more information, see updateSink.

  • Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL是 http文件

    示例

  1. **Parameter**
  2. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>sinkConfig</code></td><td>sink 配置对象</td></tr><tr><td><code>pkgUrl</code></td><td>下载 pkg URL</td></tr></tbody></table>
  3. **异常**
  4. <table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException.NotAuthorizedException</code></td><td>没有管理员权限</td></tr><tr><td><code>PulsarAdminException.NotFoundException</code></td><td>集群不存在</td></tr><tr><td><code>PulsarAdminException</code></td><td>未知错误</td></tr></tbody></table>

For more information, see updateSinkWithUrl.

停止连接器

stop

You can stop a connector using Admin CLI, REST API or JAVA admin API.

Source

停止一个源连接器。

Admin CLI

REST API

Java Admin API

使用 停止 子命令。

  1. $ pulsar-admin sources stop options

For more information, see here.

  • Stop all source connectors.

    1. void stopSource(String tenant, String namespace, String source) throws PulsarAdminException

    Parameter

    NameDescription
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`stopSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-).
  • Stop a specified source connector.

    1. void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException

    Parameter

    NameDescription
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`stopSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-int-).

Sink

停止 sink 连接器。

Admin CLI

REST API

Java Admin API

使用 停止 子命令。

  1. $ pulsar-admin sinks stop options

For more information, see here.

  • Stop all sink connectors.

    1. void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException

    Parameter

    NameDescription
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`stopSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-).
  • Stop a specified sink connector.

    1. void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException

    Parameter

    NameDescription
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`stopSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-int-).

重新启动连接器

restart

You can restart a connector using Admin CLI, REST API or JAVA admin API.

Source

重新启动一个源连接器。

Admin CLI

REST API

Java Admin API

使用 重启 子命令。

  1. $ pulsar-admin sources restart options

For more information, see here.

  • Restart all source connectors.

    1. void restartSource(String tenant, String namespace, String source) throws PulsarAdminException

    Parameter

    NameDescription
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`restartSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-).
  • Restart a specified source connector.

    1. void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException

    Parameter

    NameDescription
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`restartSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-int-).

Sink

重新启动 sink 连接器。

Admin CLI

REST API

Java Admin API

使用 重启 子命令。

  1. $ pulsar-admin sinks restart options

For more information, see here.

  • 重启所有 Pulsar sink 连接器。

    1. void restartSink(String tenant, String namespace, String sink) throws PulsarAdminException

    Parameter

    NameDescription
  1. `tenant` | Tenant name `namespace` | Namespace name `sink` | Sink name**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`restartSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-).
  • Restart a specified sink connector.

    1. void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException

    Parameter

    NameDescription
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Sink instanceID**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`restartSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-int-).

删除连接器

delete

You can delete a connector using Admin CLI, REST API or JAVA admin API.

Source

删除源连接器。

Admin CLI

REST API

Java Admin API

使用 删除 子命令。

  1. $ pulsar-admin sources delete options

For more information, see here.

删除 al Pulsar 源连接器。

发送 DELETE 请求到此端点: DELETE /admin/v3/sources/:tenant/:namespace/:sourceName

删除源连接器。

  1. void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException

Parameter

NameDescription

租户 | 租户名称 命名空间 | 命名空间名称 | 源名称

异常

NameDescription
PulsarAdminException.NotAuthorizedException没有管理员权限
PulsarAdminException.NotFoundException集群不存在
PulsarAdminException.PreconditionFailedException集群不是空的
PulsarAdminException未知错误

For more information, see deleteSource.

Sink

删除 sink 连接器。

Admin CLI

REST API

Java Admin API

使用 删除 子命令。

  1. $ pulsar-admin sinks delete options

For more information, see here.

删除 sink 连接器。

发送 DELETE 请求到此端点: DELETE /admin/v3/sinks/:tenant/:namespace/:sinkname

删除 Pulsar sink 连接器。

  1. void deleteSink(String tenant, String namespace, String source) throws PulsarAdminException

Parameter

NameDescription

租户 | 租户名称 命名空间 | 命名空间名称 sink | sink 名称

异常

NameDescription
PulsarAdminException.NotAuthorizedException没有管理员权限
PulsarAdminException.NotFoundException集群不存在
PulsarAdminException.PreconditionFailedException集群不是空的
PulsarAdminException未知错误

For more information, see deleteSource.