How to use Pulsar connectors

本指南描述如何使用 Pulsar 连接器。

安装

Pulsar 捆绑了几个用于移动数据进出常用系统的 内置连接器。 (可选)你可以创建和使用所需的非内置连接器。

Note

当使用非内置连接器时,需要为连接器指定归档文件的路径。

若要设置内置连接器,请按照这里 说明设置

安装后,内置连接器会自动被 Pulsar 代理(或 function-workers)发现,因此不需要额外安装步骤。

配置

配置以下信息:

配置连接器的默认存储位置

要配置内置连接器的默认文件夹,请在 中设置 <code>连接目录 参考/conf/functions_worker.yml 配置文件。

示例

设置 ./connectors 文件夹作为内置连接器的默认存储位置。

  1. ########################
  2. # Connectors
  3. ########################
  4. connectorsDirectory: ./connectors

配置一个 YAML 文件的连接器

要配置连接器,需要在创建连接器时提供一个 YAML 配置文件。

YAML 配置文件告诉 Pulsar 在哪里找到连接器,以及如何将连接器与 Pulsar 主题连接起来。

示例 1

下面是 Cassandra sink 的 YAML 配置文件

  • 哪个 Cassandra 集群可以连接

  • 什么是 keyspace and columnFamily 用于收集数据

  • 如何将 Pulsar 消息映射到 Cassandra 桌面键和列

  1. tenant: public
  2. namespace: default
  3. name: cassandra-test-sink
  4. ...
  5. # cassandra specific config
  6. configs:
  7. roots: "localhost:9042"
  8. keyspace: "pulsar_test_keyspace"
  9. columnFamily: "pulsar_test_table"
  10. keyname: "key"
  11. columnName: "col"

示例 2

下面是 Kafka 源的 YAML 配置文件。

  1. configs:
  2. bootstrapServers: "pulsar-kafka:9092"
  3. groupId: "test-pulsar-io"
  4. topic: "my-topic"
  5. sessionTimeoutMs: "10000"
  6. autoCommitEnabled: "false"

示例 3

Below is a YAML configuration file of a PostgreSQL JDBC sink.

  1. configs:
  2. userName: "postgres"
  3. password: "password"
  4. jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
  5. tableName: "test_jdbc"

获取可用的连接

在开始使用连接器之前,可以执行以下操作:

重新加载

如果你在连接器文件夹中添加或删除 nar 文件,在使用之前重新载入可用的内置连接器。

Source

使用 重新加载 子命令。

  1. $ pulsar-admin 源刷新

For more information, see here.

Sink

使用 重新加载 子命令。

  1. $ pulsar-admin sinks reload

For more information, see here.

可用

重新加载连接器后(可选),你可以获得可用连接器列表。

Source

使用 可用源 子命令。

  1. $ pulsar-admin sources available-sources

Sink

使用 可用的集合 子命令。

  1. $ pulsar-admin sinks available-sinks

运行连接器

要运行连接器,你可以执行以下操作:

create

You can create a connector using Admin CLI, REST API or JAVA admin API.f

Source

创建一个源连接器。

Admin CLI

REST API

Java Admin API

使用 创建 子命令。

  1. $ pulsar-admin sources create options

For more information, see here.

Send a POST request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName

  • Create a source connector with a local file.

    1. void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException

    参数名

    配置项说明
  1. `sourceConfig` | 源配置对象

异常

  1. | 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`CreateSource`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#createSource-SourceConfig-java.lang.String-)。
  • Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL是 http文件

    示例

    参数名

    参数 | 描述 |—-|—— sourceConfig | 源配置对象 pkgUrl | 可以从其中下载pkg 的 URL

    异常

    配置项说明
    PulsarAdminException未知错误

    欲了解更多信息,请参阅 createSourceWell

Sink

创建源连接器。

Admin CLI

REST API

Java Admin API

使用 创建 子命令。

  1. $ pulsar-admin sinks create options

For more information, see here.

Send a POST request to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sinkName

  • Create a sink connector with a local file.

    1. void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException

    参数名

    配置项说明
  1. `sinkConfig` | The sink configuration object**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`createSink`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#createSink-SinkConfig-java.lang.String-)。
  • Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL是 http文件

    示例

    参数名

    参数 | 描述 |—-|—— sourceConfig | 源配置对象 pkgUrl | 可以从其中下载pkg 的 URL

    异常

    配置项说明
    PulsarAdminException未知错误

    欲了解更多信息,请参阅 createSinkWidurl

start

You can start a connector using Admin CLI or REST API.

Source

启动一个源连接器。

Admin CLI

REST API

使用 起始 子命令。

  1. $ pulsar-admin sources start options

For more information, see here.

Sink

启动 sink 连接器。

Admin CLI

REST API

使用 起始 子命令。

  1. $ pulsar-admin sinks start options

For more information, see here.

localrun

You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.

Source

本地运行一个源连接器。

Admin CLI

使用 localrun 子命令。

  1. $ pulsar-admin sources localrun options

For more information, see here.

Sink

本地运行 sink 连接器。

Admin CLI

使用 localrun 子命令。

  1. $ pulsar-admin sinks localrun options

For more information, see here.

监视连接器

要监视连接器,你可以执行以下操作:

get

You can get the information of a connector using Admin CLI, REST API or JAVA admin API.

Source

获取源连接器的信息。

Admin CLI

REST API

Java Admin API

使用 获得 子命令。

  1. $ pulsar-admin sources get options

For more information, see here.

Send a GET request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName

  1. SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException

示例

这是一个源配置

  1. { "tenant": "tenantName", "namespace": "namespaceName", "name": "sourceName", "className": "className", "topicName": "topicName", "configs": {}, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}

这是一个源配置示例。

  1. { "tenant": "public", "namespace": "default", "name": "debezium-mysql-source", "className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource", "topicName": "debezium-mysql-topic", "configs": { "database.user": "debezium", "database.server.id": "184054", "database.server.name": "dbserver1", "database.port": "3306", "database.hostname": "localhost", "database.password": "dbz", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "database.whitelist": "inventory", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "pulsar.service.url": "pulsar://127.0.0.1:6650", "database.history.pulsar.topic": "history-topic2" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}

异常

异常名称|描述| —- | —-PulsarAdminException.NotAuthorizedException |你没有管理员权限PulsarAdminException.NotFoundException |集群不存在PulsarAdminException |意外错误

欲了解更多信息,请参阅 getSource

Sink

获取 sink 的信息。

Admin CLI

REST API

Java Admin API

使用 获得 子命令。

  1. $ pulsar-admin sinks get options

For more information, see here.

Send a GET request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sinkName

  1. SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException

示例

这是一个 sinkConfig。

  1. {"tenant": "tenantName","namespace": "namespaceName","name": "sinkName","className": "className","inputSpecs": {"topicName": { "isRegexPattern": false}},"configs": {},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true}

这是一个 sinkConfig 示例。

  1. { "tenant": "public", "namespace": "default", "name": "pulsar-postgres-jdbc-sink", "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink", "inputSpecs": { "pulsar-postgres-jdbc-sink-topic": { "isRegexPattern": false } }, "configs": { "password": "password", "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink", "userName": "postgres", "tableName": "pulsar_postgres_jdbc_sink" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "autoAck": true}

参数描述

名称|描述 |—-|—— 租户 | 租户名称 命名空间 | 命名空间名称 sink | 唱名名称

欲了解更多信息,请参阅 getSink

list

You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.

Source

获取所有正在运行的源连接器列表。

Admin CLI

REST API

Java Admin API

使用 列表 子命令。

  1. $ pulsar-admin sources list options

For more information, see here.

Send a GET request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/

  1. List<String> listSources(String tenant, String namespace) throws PulsarAdminException

响应示例

  1. ["f1", "f2", "f3"]

异常

异常名称|描述| —- | —-PulsarAdminException.NotAuthorizedException |你没有管理员权限PulsarAdminException.NotFoundException |集群不存在PulsarAdminException |意外错误

欲了解更多信息,见 listSource

Sink

获取所有正在运行的 sink 列表。

Admin CLI

REST API

Java Admin API

使用 列表 子命令。

  1. $ pulsar-admin sinks list options

For more information, see here.

Send a GET request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/

  1. List<String> listSinks(String tenant, String namespace) throws PulsarAdminException

响应示例

  1. ["f1", "f2", "f3"]

异常

异常名称|描述| —- | —-PulsarAdminException.NotAuthorizedException |你没有管理员权限PulsarAdminException.NotFoundException |集群不存在PulsarAdminException |意外错误

欲了解更多信息,见 listSource

status

You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.

Source

获取源连接器的当前状态。

Admin CLI

REST API

Java Admin API

使用 状态 子命令。

  1. $ pulsar-admin sources status options

For more information, see here.

  • Get the current status of all source connectors.

    Send a GET request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName/status

  • Gets the current status of a specified source connector.

    Send a GET request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/status

  • Get the current status of all source connectors.

    1. SourceStatus getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException

    参数名

    参数|描述 |—-|—— 租户 | 租户名称 命名空间 | 命名空间名称 sink | 源名称

    异常

    名称 | 描述 |—-|—— PulsarAdminException | 意外错误

    欲了解更多信息,请参阅 getSourceStatus

  • Gets the current status of a specified source connector.

    1. SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant, String namespace, String source, int id) throws PulsarAdminException

    参数名

    Parameter| Description |—-|—- tenant | Tenant name namespace | Namespace name sink | Source name id | Source instanceID

    异常

    异常名称|描述| —- | —-PulsarAdminException |意外的错误

    欲了解更多信息,请参阅 getSourceStatus

Sink

获取 Pulsar sink 连接器的当前状态。

Admin CLI

REST API

Java Admin API

使用 状态 子命令。

  1. $ pulsar-admin sinks status options

For more information, see here.

  • Get the current status of all sink connectors.

    Send a GET request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sinkName/status

  • Gets the current status of a specified sink connector.

    Send a GET request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/status

  • Get the current status of all sink connectors.

    1. SinkStatus getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException

    参数名

    参数|描述 |—-|—— 租户 | 租户名称 命名空间 | 命名空间名称 sink | 源名称

    异常

    异常名称|描述| —- | —-PulsarAdminException |意外的错误

    欲了解更多信息,请参阅 getSinkStatus

  • Gets the current status of a specified source connector.

    1. SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant, String namespace, String sink, int id) throws PulsarAdminException

    参数名

    Parameter| Description |—-|—- tenant | Tenant name namespace | Namespace name sink | Source name id | Sink instanceID

    异常

    异常名称|描述| —- | —-PulsarAdminException |意外的错误

    欲了解更多信息,请参阅 getSinkStatusWidstanceID

更新连接器

update

You can update a running connector using Admin CLI, REST API or JAVA admin API.

Source

更新正在运行的 Pulsar 源连接器。

Admin CLI

REST API

Java Admin API

使用 更新 子命令。

  1. $ pulsar-admin sources update options

For more information, see here.

Send a PUT request to this endpoint: PUT /admin/v3/sources/:tenant/:namespace/:sourceName

  • Update a running source connector with a local file.

    1. void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException

    参数名

    配置项说明
    sourceConfig源配置对象

    异常

    配置项说明
    PulsarAdminException.NotAuthorizedException没有管理员权限
    PulsarAdminException.NotFoundException集群不存在
    PulsarAdminException未知错误

    欲了解更多信息,请参阅 updateSource

  • Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL是 http文件

    示例

    参数名

    配置项说明
    sourceConfig源配置对象
    pkgUrl下载 pkg 的 URL

    异常

    配置项说明
    PulsarAdminException.NotAuthorizedException没有管理员权限
    PulsarAdminException.NotFoundException集群不存在
    PulsarAdminException未知错误

欲了解更多信息,请参阅 createSourceWell

Sink

更新正在运行的 Pulsar sink 连接器。

Admin CLI

REST API

Java Admin API

使用 更新 子命令。

  1. $ pulsar-admin sinks update options

For more information, see here.

Send a PUT request to this endpoint: PUT /admin/v3/sinks/:tenant/:namespace/:sinkName

  • Update a running sink connector with a local file.

    1. void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException

    参数名

    配置项说明
    sinkConfigsink 配置对象

    异常

    配置项说明
    PulsarAdminException.NotAuthorizedException没有管理员权限
    PulsarAdminException.NotFoundException集群不存在
    PulsarAdminException未知错误

    欲了解更多信息,请参阅 updateSink

  • Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL是 http文件

    示例

    参数名

    配置项说明
    sinkConfigsink 配置对象
    pkgUrl下载 pkg 的 URL

    异常

    配置项说明
    PulsarAdminException.NotAuthorizedException没有管理员权限
    PulsarAdminException.NotFoundException集群不存在
    PulsarAdminException未知错误

欲了解更多信息,请参阅 updateSinkWirl

停止连接器

stop

You can stop a connector using Admin CLI, REST API or JAVA admin API.

Source

停止一个源连接器。

Admin CLI

REST API

Java Admin API

使用 停止 子命令。

  1. $ pulsar-admin sources stop options

For more information, see here.

  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`停止源`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-)。
  • Stop a specified source connector.

    1. void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException

    参数名

    配置项说明
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`停止源`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-int-)。

Sink

停止 sink 连接器。

Admin CLI

REST API

Java Admin API

使用 停止 子命令。

  1. $ pulsar-admin sinks stop options

For more information, see here.

  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSink`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-)。
  • Stop a specified sink connector.

    1. void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException

    参数名

    配置项说明
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSink`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-int-)。

重新启动连接器

restart

You can restart a connector using Admin CLI, REST API or JAVA admin API.

Source

重新启动一个源连接器。

Admin CLI

REST API

Java Admin API

使用 重启 子命令。

  1. $ pulsar-admin sources restart options

For more information, see here.

  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`重启源`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-)。
  • Restart a specified source connector.

    1. void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException

    参数名

    配置项说明
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`重启源`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-int-)。

Sink

重新启动 sink 连接器。

Admin CLI

REST API

Java Admin API

使用 重启 子命令。

  1. $ pulsar-admin sinks restart options

For more information, see here.

  1. `tenant` | Tenant name `namespace` | Namespace name `sink` | Sink name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`重新启动Sink`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-)。
  • Restart a specified sink connector.

    1. void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException

    参数名

    配置项说明
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Sink instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`重新启动Sink`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-int-)。

删除连接器

delete

You can delete a connector using Admin CLI, REST API or JAVA admin API.

Source

删除源连接器。

Admin CLI

REST API

Java Admin API

使用 删除 子命令。

  1. $ pulsar-admin sources delete options

For more information, see here.

删除 al Pulsar 源连接器。

Send a DELETE request to this endpoint: DELETE /admin/v3/sources/:tenant/:namespace/:sourceName

删除源连接器。

  1. void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException

参数名

配置项说明

租户 | 租户名称 命名空间 | 命名空间名称 | 源名称

异常

配置项说明
PulsarAdminException.NotAuthorizedException没有管理员权限
PulsarAdminException.NotFoundException集群不存在
PulsarAdminException.PreconditionFailedException集群不是空的
PulsarAdminException未知错误

欲了解更多信息,请参阅 deleteSource

Sink

删除 sink 连接器。

Admin CLI

REST API

Java Admin API

使用 删除 子命令。

  1. $ pulsar-admin sinks delete options

For more information, see here.

删除 sink 连接器。

Send a DELETE request to this endpoint: DELETE /admin/v3/sinks/:tenant/:namespace/:sinkName

删除 Pulsar sink 连接器。

  1. void deleteSink(String tenant, String namespace, String source) throws PulsarAdminException

参数名

配置项说明

租户 | 租户名称 命名空间 | 命名空间名称 sink | sink 名称

异常

配置项说明
PulsarAdminException.NotAuthorizedException没有管理员权限
PulsarAdminException.NotFoundException集群不存在
PulsarAdminException.PreconditionFailedException集群不是空的
PulsarAdminException未知错误

欲了解更多信息,请参阅 deleteSource