How to use Pulsar connectors

本指南描述如何使用 Pulsar 连接器。

安装

Pulsar 自带了几个常用的数据交互系统(如数据库、消息系统等)相关的 内置连接器。 当然,你也可以创建和使用你想要用到的非内置连接器。

Note

当使用非内置连接器时,需要为连接器指定归档文件的路径。

若要设置内置连接器,请按照这里 说明设置

设置完成后,内置连接器会自动被 Pulsar broker(或 function-workers) 发现,因此不需要额外安装步骤。

配置连接器

配置以下信息:

配置连接器的默认存储位置

要配置内置连接器的默认文件夹,请在 ./conf/functions_worker.yml 文件中设置 connectorsDirectory 参数。

示例

./connectors 文件夹设为内置连接器的默认存储位置。

  1. ########################
  2. # Connectors
  3. ########################
  4. connectorsDirectory: ./connectors

配置连接器的 YAML 文件

要配置连接器,需要在创建连接器时提供一个 YAML 配置文件。

YAML 配置文件告诉 Pulsar 在哪里找到连接器,以及如何将连接器与 Pulsar 主题连接起来。

示例 1

下面是 Cassandra sink 的 YAML 配置文件:

  • 要连接哪个 Cassandra 集群

  • Cassandra 中用于收集数据的 keyspacecolumnFamily 是什么

  • 如何将 Pulsar 消息映射到 Cassandra 的表键(table key)和列(column)

  1. tenant: public
  2. namespace: default
  3. name: cassandra-test-sink
  4. ...
  5. # cassandra specific config
  6. configs:
  7. roots: "localhost:9042"
  8. keyspace: "pulsar_test_keyspace"
  9. columnFamily: "pulsar_test_table"
  10. keyname: "key"
  11. columnName: "col"

示例 2

下面是 Kafka source 的 YAML 配置文件。

  1. configs:
  2. bootstrapServers: "pulsar-kafka:9092"
  3. groupId: "test-pulsar-io"
  4. topic: "my-topic"
  5. sessionTimeoutMs: "10000"
  6. autoCommitEnabled: "false"

示例 3

下面是 PostgreSQL JDBC sink 的 YAML 配置文件。

  1. configs:
  2. userName: "postgres"
  3. password: "password"
  4. jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
  5. tableName: "test_jdbc"

获取可用的连接器

在开始使用连接器之前,可以执行以下操作:

reload

如果你在连接器文件夹中添加或删除了 nar 文件,在使用连接器之前要重新载入已有内置连接器。

Source

使用 reload 子命令。

  1. $ pulsar-admin sources reload

更多信息,可参考此处

Sink

使用 reload 子命令。

  1. $ pulsar-admin sinks reload

更多信息,可参考此处

available

重新加载连接器后(可选),你可以获得可用连接器列表。

Source

使用 available-sources 子命令。

  1. $ pulsar-admin sources available-sources

Sink

使用 available-sinks 子命令。

  1. $ pulsar-admin sinks available-sinks

运行连接器

要运行连接器,你可以执行以下操作:

create

You can create a connector using Admin CLI, REST API or JAVA admin API.f

Source

创建一个源连接器。

Admin CLI

REST API

Java Admin API

使用 create 子命令。

  1. $ pulsar-admin sources create options

更多信息,可参阅此处

向如下端点发送一个 POST 请求: POST /admin/v3/sources/:tenant/:namespace/:sourceName

  • Create a source connector with a local file.

    1. void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException

    参数名

    配置项说明
  1. `sourceConfig` | 源配置对象

异常

  1. | 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`createSource`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#createSource-SourceConfig-java.lang.String-)。
  • Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL 包括 httpfile

    示例

    参数名

    参数 | 描述 |—-|—— sourceConfig | 源配置对象 pkgUrl | 可以从其中下载 pkg 的 URL

    异常

    配置项说明
    PulsarAdminException未知错误

    欲了解更多信息,请参阅 createSourceWithUrl

Sink

创建 sink 连接器。

Admin CLI

REST API

Java Admin API

使用 create 子命令。

  1. $ pulsar-admin sinks create options

更多信息,可参阅此处

向如下端点发送一个 POST 请求: POST /admin/v3/sinks/:tenant/:namespace/:sinkName

  • Create a sink connector with a local file.

    1. void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException

    参数名

    配置项说明
  1. `sinkConfig` | The sink configuration object**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`createSink`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#createSink-SinkConfig-java.lang.String-)。
  • Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL 包括 httpfile

    示例

    参数名

    参数 | 描述 |—-|—— sinkConfig | sink 配置对象 pkgUrl | 可以从其中下载 pkg 的 URL

    异常

    配置项说明
    PulsarAdminException未知错误

    欲了解更多信息,请参阅 createSinkWithUrl

start

You can start a connector using Admin CLI or REST API.

Source

启动一个源连接器。

Admin CLI

REST API

使用 start 子命令。

  1. $ pulsar-admin sources start options

更多信息,可参阅此处

Sink

启动 sink 连接器。

Admin CLI

REST API

使用 start 子命令。

  1. $ pulsar-admin sinks start options

更多信息,可参阅此处

localrun

You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.

Source

本地运行一个源连接器。

Admin CLI

使用 localrun 子命令。

  1. $ pulsar-admin sources localrun options

更多信息,可参阅此处

Sink

本地运行 sink 连接器。

Admin CLI

使用 localrun 子命令。

  1. $ pulsar-admin sinks localrun options

更多信息,可参阅此处

监控连接器

要监控一个连接器,你可以执行以下操作:

get

You can get the information of a connector using Admin CLI, REST API or JAVA admin API.

Source

获取源连接器的信息。

Admin CLI

REST API

Java Admin API

使用 get 子命令。

  1. $ pulsar-admin sources get options

更多信息,可参阅此处

向如下端点发送一个 GET 请求:GET /admin/v3/sources/:tenant/:namespace/:sourceName

  1. SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException

示例

这是 sourceConfig。

  1. { "tenant": "tenantName", "namespace": "namespaceName", "name": "sourceName", "className": "className", "topicName": "topicName", "configs": {}, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}

这是一个源配置示例。

  1. { "tenant": "public", "namespace": "default", "name": "debezium-mysql-source", "className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource", "topicName": "debezium-mysql-topic", "configs": { "database.user": "debezium", "database.server.id": "184054", "database.server.name": "dbserver1", "database.port": "3306", "database.hostname": "localhost", "database.password": "dbz", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "database.whitelist": "inventory", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "pulsar.service.url": "pulsar://127.0.0.1:6650", "database.history.pulsar.topic": "history-topic2" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}

异常

异常名称 | 描述 |—-|—- PulsarAdminException.NotAuthorizedException | 你没有 admin 权限 PulsarAdminException.NotFoundException | 集群不存在 PulsarAdminException | 意外错误

欲了解更多信息,请参阅 getSource

Sink

获取 sink 连接器的信息。

Admin CLI

REST API

Java Admin API

使用 get 子命令。

  1. $ pulsar-admin sinks get options

更多信息,可参阅此处

向如下端点发送一个 GET 请求: GET /admin/v3/sinks/:tenant/:namespace/:sinkName

  1. SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException

示例

这是一个 sinkConfig。

  1. {"tenant": "tenantName","namespace": "namespaceName","name": "sinkName","className": "className","inputSpecs": {"topicName": { "isRegexPattern": false}},"configs": {},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true}

这是一个 sinkConfig 示例。

  1. { "tenant": "public", "namespace": "default", "name": "pulsar-postgres-jdbc-sink", "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink", "inputSpecs": { "pulsar-postgres-jdbc-sink-topic": { "isRegexPattern": false } }, "configs": { "password": "password", "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink", "userName": "postgres", "tableName": "pulsar_postgres_jdbc_sink" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "autoAck": true}

参数描述

名称|描述 |—-|—— tenant | 租户名称 namespace | 命名空间名称 sink | Sink 名称

欲了解更多信息,请参阅 getSink

list

You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.

Source

获取所有正在运行的源连接器列表。

Admin CLI

REST API

Java Admin API

使用 list 子命令。

  1. $ pulsar-admin sources list options

更多信息,可参阅此处

向如下端点发送一个 GET 请求:GET /admin/v3/sources/:tenant/:namespace/

  1. List<String> listSources(String tenant, String namespace) throws PulsarAdminException

响应示例

  1. ["f1", "f2", "f3"]

异常

异常名称 | 描述 |—-|—- PulsarAdminException.NotAuthorizedException | 你没有 admin 权限 PulsarAdminException | 意外错误

欲了解更多信息,见 listSource

Sink

获取所有正在运行的 sink 连接器列表。

Admin CLI

REST API

Java Admin API

使用 list 子命令。

  1. $ pulsar-admin sinks list options

更多信息,可参阅此处

向如下端点发送一个 GET 请求: GET /admin/v3/sinks/:tenant/:namespace/

  1. List<String> listSinks(String tenant, String namespace) throws PulsarAdminException

响应示例

  1. ["f1", "f2", "f3"]

异常

异常名称 | 描述 |—-|—- PulsarAdminException.NotAuthorizedException | 你没有 admin 权限 PulsarAdminException | 意外错误

欲了解更多信息,见 listSource

status

You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.

Source

获取源连接器的当前状态。

Admin CLI

REST API

Java Admin API

使用 status 子命令。

  1. $ pulsar-admin sources status options

更多信息,可参阅此处

  • Get the current status of all source connectors.

    向如下端点发送一个 GET 请求:GET /admin/v3/sources/:tenant/:namespace/:sourceName/status

  • Gets the current status of a specified source connector.

    发送 GET 请求到此端点: GET /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/status

  • Get the current status of all source connectors.

    1. SourceStatus getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException

    参数名

    参数|描述 |—-|—— tenant | 租户名称 namespace | 命名空间名称 sink | 源名称

    异常

    名称 | 描述 |—-|—— PulsarAdminException | 意外错误

    欲了解更多信息,请参阅 getSourceStatus

  • Gets the current status of a specified source connector.

    1. SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant, String namespace, String source, int id) throws PulsarAdminException

    参数名

    Parameter| Description |—-|—- tenant | Tenant name namespace | Namespace name sink | Source name id | Source instanceID

    异常

    异常名称|描述 | —- | —- PulsarAdminException |意外的错误

    欲了解更多信息,请参阅 getSourceStatus

Sink

获取 Pulsar sink 连接器的当前状态。

Admin CLI

REST API

Java Admin API

使用 status 子命令。

  1. $ pulsar-admin sinks status options

更多信息,可参阅此处

  • Get the current status of all sink connectors.

    向如下端点发送一个 GET 请求:GET /admin/v3/sinks/:tenant/:namespace/:sinkName/status

  • Gets the current status of a specified sink connector.

    发送 GET 请求到此端点: GET /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/status

  • Get the current status of all sink connectors.

    1. SinkStatus getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException

    参数名

    参数|描述 |—-|—— tenant | 租户名称 namespace | 命名空间名称 sink | 源名称

    异常

    异常名称|描述 | —- | —- PulsarAdminException |意外的错误

    欲了解更多信息,请参阅 getSinkStatus

  • Gets the current status of a specified source connector.

    1. SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant, String namespace, String sink, int id) throws PulsarAdminException

    参数名

    Parameter| Description |—-|—- tenant | Tenant name namespace | Namespace name sink | Source name id | Sink instanceID

    异常

    异常名称|描述 | —- | —- PulsarAdminException |意外的错误

    欲了解更多信息,请参阅 getSinkStatusWithInstanceID

更新连接器

update

You can update a running connector using Admin CLI, REST API or JAVA admin API.

Source

更新正在运行的 Pulsar 源连接器。

Admin CLI

REST API

Java Admin API

使用 update 子命令。

  1. $ pulsar-admin sources update options

更多信息,可参阅此处

向如下端点发送一个 PUT 请求: PUT /admin/v3/sources/:tenant/:namespace/:sourceName

  • Update a running source connector with a local file.

    1. void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException

    参数名

    配置项说明
    sourceConfig源配置对象

    异常

    配置项说明
    PulsarAdminException.NotAuthorizedException没有管理员权限
    PulsarAdminException.NotFoundException集群不存在
    PulsarAdminException未知错误

    欲了解更多信息,请参阅 updateSource

  • Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL 包括 httpfile

    示例

    参数名

    配置项说明
    sourceConfig源配置对象
    pkgUrl下载 pkg 的 URL

    异常

    配置项说明
    PulsarAdminException.NotAuthorizedException没有管理员权限
    PulsarAdminException.NotFoundException集群不存在
    PulsarAdminException未知错误

欲了解更多信息,请参阅 createSourceWithUrl

Sink

更新正在运行的 Pulsar sink 连接器。

Admin CLI

REST API

Java Admin API

使用 update 子命令。

  1. $ pulsar-admin sinks update options

更多信息,可参阅此处

向如下端点发送一个 PUT 请求:PUT /admin/v3/sinks/:tenant/:namespace/:sinkName

  • Update a running sink connector with a local file.

    1. void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException

    参数名

    配置项说明
    sinkConfigsink 配置对象

    异常

    配置项说明
    PulsarAdminException.NotAuthorizedException没有管理员权限
    PulsarAdminException.NotFoundException集群不存在
    PulsarAdminException未知错误

    欲了解更多信息,请参阅 updateSink

  • Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.

    1. void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException

    支持的 URL 包括 httpfile

    示例

    参数名

    配置项说明
    sinkConfigsink 配置对象
    pkgUrl下载 pkg 的 URL

    异常

    配置项说明
    PulsarAdminException.NotAuthorizedException没有管理员权限
    PulsarAdminException.NotFoundException集群不存在
    PulsarAdminException未知错误

欲了解更多信息,请参阅 updateSinkWithUrl

停止连接器

stop

You can stop a connector using Admin CLI, REST API or JAVA admin API.

Source

停止一个源连接器。

Admin CLI

REST API

Java Admin API

使用 stop 子命令。

  1. $ pulsar-admin sources stop options

更多信息,可参阅此处

  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSource`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-)。
  • Stop a specified source connector.

    1. void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException

    参数名

    配置项说明
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSource`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-int-)。

Sink

停止 sink 连接器。

Admin CLI

REST API

Java Admin API

使用 stop 子命令。

  1. $ pulsar-admin sinks stop options

更多信息,可参阅此处

  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSink`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-)。
  • Stop a specified sink connector.

    1. void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException

    参数名

    配置项说明
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSink`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-int-)。

重新启动连接器

restart

You can restart a connector using Admin CLI, REST API or JAVA admin API.

Source

重新启动一个源连接器。

Admin CLI

REST API

Java Admin API

使用 restart 子命令。

  1. $ pulsar-admin sources restart options

更多信息,可参阅此处

  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`restartSource`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-)。
  • Restart a specified source connector.

    1. void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException

    参数名

    配置项说明
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`restartSource`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-int-)。

Sink

重新启动 sink 连接器。

Admin CLI

REST API

Java Admin API

使用 restart 子命令。

  1. $ pulsar-admin sinks restart options

更多信息,可参阅此处

  1. `tenant` | Tenant name `namespace` | Namespace name `sink` | Sink name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`restartSink`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-)。
  • Restart a specified sink connector.

    1. void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException

    参数名

    配置项说明
  1. `tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Sink instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`restartSink`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-int-)。

删除连接器

delete

You can delete a connector using Admin CLI, REST API or JAVA admin API.

Source

删除源连接器。

Admin CLI

REST API

Java Admin API

使用 delete 子命令。

  1. $ pulsar-admin sources delete options

更多信息,可参阅此处

删除 Pulsar 源连接器。

向如下端点发送一个 DELETE 请求: DELETE /admin/v3/sources/:tenant/:namespace/:sourceName ]}

删除源连接器。

  1. void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException

参数名

配置项说明

tenant | 租户名称 namespace | 命名空间名称 source | 源名称

异常

配置项说明
PulsarAdminException.NotAuthorizedException没有管理员权限
PulsarAdminException.NotFoundException集群不存在
PulsarAdminException.PreconditionFailedException集群不是空的
PulsarAdminException未知错误

欲了解更多信息,请参阅 deleteSource

Sink

删除 sink 连接器。

Admin CLI

REST API

Java Admin API

使用 delete 子命令。

  1. $ pulsar-admin sinks delete options

更多信息,可参阅此处

删除 sink 连接器。

向如下端点发送一个 DELETE 请求: DELETE /admin/v3/sinks/:tenant/:namespace/:sinkName

删除 Pulsar sink 连接器。

  1. void deleteSink(String tenant, String namespace, String source) throws PulsarAdminException

参数名

配置项说明

tenant | 租户名称 namespace | 命名空间名称 sink | Sink 名称

异常

配置项说明
PulsarAdminException.NotAuthorizedException没有管理员权限
PulsarAdminException.NotFoundException集群不存在
PulsarAdminException.PreconditionFailedException集群不是空的
PulsarAdminException未知错误

欲了解更多信息,请参阅 deleteSource