How to connect Pulsar to database

This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code.

运行本指南中的步骤让你在阅读 Pulsar I/O 概念时会有更深入的理解。

本教程结束后,你可以:

提示

  • These instructions assume you are running Pulsar in standalone mode. 但是,本教程中使用的所有命令无需做任何更改也可在多节点 Pulsar 集群中使用。

  • All the instructions are assumed to run at the root directory of a Pulsar binary distribution.

安装 Pulsar 和内置连接器

在连接 Pulsar 到数据库之前,你需要安装 Pulsar 和所需的内置连接器。

For more information about how to install a standalone Pulsar and built-in connectors, see here.

启动单机模式 Pulsar

  1. 本地启动 Pulsar。

    1. bin/pulsar standalone

    按顺序启动 Pulsar 服务的所有组件。

    You can curl those pulsar service endpoints to make sure Pulsar service is up running correctly.

  2. 检查 Pulsar 二进制协议端口。

    1. telnet localhost 6650
  3. 检查 Pulsar Function 集群。

    1. curl -s http://localhost:8080/admin/v2/worker/cluster

    示例输出

    1. [{"workerId":"c-standalone-fw-localhost-6750","workerHostname":"localhost","port":6750}]
  4. 请确保公共租户和默认命名空间存在。

    1. curl -s http://localhost:8080/admin/v2/namespaces/public

    示例输出

    1. ["public/default","public/functions"]
  5. 所有内置连接器都应列为 available。

    1. curl -s http://localhost:8080/admin/v2/functions/connectors

    示例输出

    1. [{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]

    If an error occurs when starting Pulsar service, you may see an exception at the terminal running pulsar/standalone, or you can navigate to the logs directory under the Pulsar directory to view the logs.

连接 Pulsar 到 Cassandra

本节展示如何连接 Pulsar 到 Cassandra。

提示

  • Make sure you have Docker installed. If you do not have one, see install Docker.

  • The Cassandra sink connector reads messages from Pulsar topics and writes the messages into Cassandra tables. 更多详细信息,请参阅 Cassandra sink connector

配置 Cassandra 集群

本示例使用 cassandra Docker 镜像在 Docker 中启动单节点 Cassandra 集群。

  1. 启动 Cassandra 集群。

    1. docker run -d --rm --name=cassandra -p 9042:9042 cassandra

    Note

    进行下一个步骤前,请确保 Cassandra 集群正在运行。

  2. 确保 Docker 进程正在运行。

    1. docker ps
  3. 检查 Cassandra 日志以确保 Cassandra 进程按预期运行。

    1. docker logs cassandra
  4. 检查 Cassandra 集群的状态。

    1. docker exec cassandra nodetool status

    示例输出

    1. Datacenter: datacenter1
    2. =======================
    3. Status=Up/Down
    4. |/ State=Normal/Leaving/Joining/Moving
    5. -- Address Load Tokens Owns (effective) Host ID Rack
    6. UN 172.17.0.2 103.67 KiB 256 100.0% af0e4b2f-84e0-4f0b-bb14-bd5f9070ff26 rack1
  5. 使用 cqlsh 连接到 Cassandra 集群。

    1. $ docker exec -ti cassandra cqlsh localhost
    2. Connected to Test Cluster at localhost:9042.
    3. [cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
    4. Use HELP for help.
    5. cqlsh>
  6. 创建键空间 pulsar_test_keyspace

    1. cqlsh> CREATE KEYSPACE pulsar_test_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
  7. 创建表 pulsar_test_table

    1. cqlsh> USE pulsar_test_keyspace;
    2. cqlsh:pulsar_test_keyspace> CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);

配置 Cassandra sink

Now that we have a Cassandra cluster running locally.

在本节中,你需要配置一个 Cassandra sink 连接器。

要运行 Cassandra sink 连接器,你需要准备一个配置文件,其中包含 Pulsar 连接器运行时需要知道的信息。

例如,Pulsar 连接器如何找到 Cassandra 集群, Pulsar 连接器用于写入消息的键空间和表,等等。

你可以通过以下任意一种方法创建配置文件。

  • JSON

    1. {
    2. "roots": "localhost:9042",
    3. "keyspace": "pulsar_test_keyspace",
    4. "columnFamily": "pulsar_test_table",
    5. "keyname": "key",
    6. "columnName": "col"
    7. }
  • YAML

    1. configs:
    2. roots: "localhost:9042"
    3. keyspace: "pulsar_test_keyspace"
    4. columnFamily: "pulsar_test_table"
    5. keyname: "key"
    6. columnName: "col"

更多详细信息,请参阅 Cassandra sink connector

创建 Cassandra sink

你可以使用连接器管理 CLI 来创建 sink 连接器并执行其他操作。

Run the following command to create a Cassandra sink connector with sink type cassandra and the config file examples/cassandra-sink.yml created previously.

Note

The sink-type parameter of the currently built-in connectors is determined by the setting of the name parameter specified in the pulsar-io.yaml file.

  1. bin/pulsar-admin sinks create \
  2. --tenant public \
  3. --namespace default \
  4. --name cassandra-test-sink \
  5. --sink-type cassandra \
  6. --sink-config-file examples/cassandra-sink.yml \
  7. --inputs test_cassandra

Once the command is executed, Pulsar creates the sink connector cassandra-test-sink.

This sink connector runs as a Pulsar Function and writes the messages produced in the topic test_cassandra to the Cassandra table pulsar_test_table.

检查 Cassandra sink

你可以使用连接器管理 CLI 来监控连接器并执行其他操作。

  • 获取 Cassandra sink 信息。

    1. bin/pulsar-admin sinks get \
    2. --tenant public \
    3. --namespace default \
    4. --name cassandra-test-sink

    示例输出

    1. {
    2. "tenant": "public",
    3. "namespace": "default",
    4. "name": "cassandra-test-sink",
    5. "className": "org.apache.pulsar.io.cassandra.CassandraStringSink",
    6. "inputSpecs": {
    7. "test_cassandra": {
    8. "isRegexPattern": false
    9. }
    10. },
    11. "configs": {
    12. "roots": "localhost:9042",
    13. "keyspace": "pulsar_test_keyspace",
    14. "columnFamily": "pulsar_test_table",
    15. "keyname": "key",
    16. "columnName": "col"
    17. },
    18. "parallelism": 1,
    19. "processingGuarantees": "ATLEAST_ONCE",
    20. "retainOrdering": false,
    21. "autoAck": true,
    22. "archive": "builtin://cassandra"
    23. }
  • 检查 Cassandra sink 状态。

    1. bin/pulsar-admin sinks status \
    2. --tenant public \
    3. --namespace default \
    4. --name cassandra-test-sink

    示例输出

    1. {
    2. "numInstances" : 1,
    3. "numRunning" : 1,
    4. "instances" : [ {
    5. "instanceId" : 0,
    6. "status" : {
    7. "running" : true,
    8. "error" : "",
    9. "numRestarts" : 0,
    10. "numReadFromPulsar" : 0,
    11. "numSystemExceptions" : 0,
    12. "latestSystemExceptions" : [ ],
    13. "numSinkExceptions" : 0,
    14. "latestSinkExceptions" : [ ],
    15. "numWrittenToSink" : 0,
    16. "lastReceivedTime" : 0,
    17. "workerId" : "c-standalone-fw-localhost-8080"
    18. }
    19. } ]
    20. }

检验 Cassandra sink

  1. Produce some messages to the input topic of the Cassandra sink test_cassandra.

    1. for i in {0..9}; do bin/pulsar-client produce -m "key-$i" -n 1 test_cassandra; done
  2. Inspect the status of the Cassandra sink test_cassandra.

    1. bin/pulsar-admin sinks status \
    2. --tenant public \
    3. --namespace default \
    4. --name cassandra-test-sink

    You can see 10 messages are processed by the Cassandra sink test_cassandra.

    示例输出

    1. {
    2. "numInstances" : 1,
    3. "numRunning" : 1,
    4. "instances" : [ {
    5. "instanceId" : 0,
    6. "status" : {
    7. "running" : true,
    8. "error" : "",
    9. "numRestarts" : 0,
    10. "numReadFromPulsar" : 10,
    11. "numSystemExceptions" : 0,
    12. "latestSystemExceptions" : [ ],
    13. "numSinkExceptions" : 0,
    14. "latestSinkExceptions" : [ ],
    15. "numWrittenToSink" : 10,
    16. "lastReceivedTime" : 1551685489136,
    17. "workerId" : "c-standalone-fw-localhost-8080"
    18. }
    19. } ]
    20. }
  3. 使用 cqlsh 连接到 Cassandra 集群。

    1. docker exec -ti cassandra cqlsh localhost
  4. Check the data of the Cassandra table pulsar_test_table.

    1. cqlsh> use pulsar_test_keyspace;
    2. cqlsh:pulsar_test_keyspace> select * from pulsar_test_table;
    3. key | col
    4. --------+--------
    5. key-5 | key-5
    6. key-0 | key-0
    7. key-9 | key-9
    8. key-2 | key-2
    9. key-1 | key-1
    10. key-3 | key-3
    11. key-6 | key-6
    12. key-7 | key-7
    13. key-4 | key-4
    14. key-8 | key-8

删除 Cassandra Sink

你可以使用连接器管理 CLI 来删除连接器并执行其他操作。

  1. bin/pulsar-admin sinks delete \
  2. --tenant public \
  3. --namespace default \
  4. --name cassandra-test-sink

连接 Pulsar 到 PostgreSQL

本节展示如何连接 Pulsar 到 PostgreSQL。

提示

  • Make sure you have Docker installed. If you do not have one, see install Docker.

  • JDBC sink 连接器从 Pulsar 主题拉取消息,并将消息持久化到 ClickHouse、MariaDB、PostgreSQL 或 SQlite。 更多详细信息,请参阅 JDBC sink connector

配置 PostgreSQL 集群

本示例使用 PostgreSQL 12 Docker 镜像在 Docker 中启动单节点 PostgreSQL 集群。

  1. 从 Docker 拉取 PostgreSQL 12 镜像。

    1. $ docker pull postgres:12
  2. 启动 PostgreSQL。

    1. $ docker run -d -it --rm \
    2. --name pulsar-postgres \
    3. -p 5432:5432 \
    4. -e POSTGRES_PASSWORD=password \
    5. -e POSTGRES_USER=postgres \
    6. postgres:12

    提示

    标记说明示例
    -d以分离模式启动容器。/
    -it即使没有连接,也要保持 STDIN 开启并分配终端。/
    —rm容器退出时会自动删除。/
    -name为容器指定名称。This example specifies pulsar-postgres for the container.
    -p将容器的端口发布到主机。This example publishes the port 5432 of the container to the host.
    -e设置环境变量This example sets the following variables:
    - The password for the user is password.
    - The name for the user is postgres.

    提示

    关于 Docker 命令的更多信息,详见 Docker CLI

  3. 检查 PostgreSQL 是否已成功启动。

    1. $ docker logs -f pulsar-postgres

    如果出现以下消息说明 PostgreSQL 已成功启动。

    1. 2020-05-11 20:09:24.492 UTC [1] LOG: starting PostgreSQL 12.2 (Debian 12.2-2.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
    2. 2020-05-11 20:09:24.492 UTC [1] LOG: listening on IPv4 address "0.0.0.0", port 5432
    3. 2020-05-11 20:09:24.492 UTC [1] LOG: listening on IPv6 address "::", port 5432
    4. 2020-05-11 20:09:24.499 UTC [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
    5. 2020-05-11 20:09:24.523 UTC [55] LOG: database system was shut down at 2020-05-11 20:09:24 UTC
    6. 2020-05-11 20:09:24.533 UTC [1] LOG: database system is ready to accept connections
  4. 访问 PostgreSQL。

    1. $ docker exec -it pulsar-postgres /bin/bash
  5. Create a PostgreSQL table pulsar_postgres_jdbc_sink.

    1. $ psql -U postgres postgres
    2. postgres=# create table if not exists pulsar_postgres_jdbc_sink
    3. (
    4. id serial PRIMARY KEY,
    5. name VARCHAR(255) NOT NULL
    6. );

配置 JDBC sink

现在我们有一个在本地运行的 PostgreSQL 。

在本节中,你需要配置 JDBC sink 连接器。

  1. Add a configuration file.

    要运行 JDBC sink 连接器,你需要准备一个 YAML 配置文件,其中包含 Pulsar 连接器运行时需要知道的信息。

    例如,Pulsar 连接器如何找到 PostgreSQL 集群,JDBC URL 是什么以及 Pulsar 连接器用于写入消息的表。

    Create a pulsar-postgres-jdbc-sink.yaml file, copy the following contents to this file, and place the file in the pulsar/connectors folder.

    1. configs:
    2. userName: "postgres"
    3. password: "password"
    4. jdbcUrl: "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink"
    5. tableName: "pulsar_postgres_jdbc_sink"
  2. Create a schema.

    Create a avro-schema file, copy the following contents to this file, and place the file in the pulsar/connectors folder.

    1. {
    2. "type": "AVRO",
    3. "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
    4. "properties": {}
    5. }

    提示

    关于 AVRO 的更多信息,详见 Apache Avro

  3. Upload a schema to a topic.

    This example uploads the avro-schema schema to the pulsar-postgres-jdbc-sink-topic topic.

    1. $ bin/pulsar-admin schemas upload pulsar-postgres-jdbc-sink-topic -f ./connectors/avro-schema
  4. Check if the schema has been uploaded successfully.

    1. $ bin/pulsar-admin schemas get pulsar-postgres-jdbc-sink-topic

    The schema has been uploaded successfully if the following message appears.

    1. {"name":"pulsar-postgres-jdbc-sink-topic","schema":"{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}","type":"AVRO","properties":{}}

创建 JDBC sink

你可以使用连接器管理 CLI 来创建 sink 连接器并执行其他操作。

This example creates a sink connector and specifies the desired information.

  1. $ bin/pulsar-admin sinks create \
  2. --archive ./connectors/pulsar-io-jdbc-postgres-2.8.0.nar \
  3. --inputs pulsar-postgres-jdbc-sink-topic \
  4. --name pulsar-postgres-jdbc-sink \
  5. --sink-config-file ./connectors/pulsar-postgres-jdbc-sink.yaml \
  6. --parallelism 1

Once the command is executed, Pulsar creates a sink connector pulsar-postgres-jdbc-sink.

This sink connector runs as a Pulsar Function and writes the messages produced in the topic pulsar-postgres-jdbc-sink-topic to the PostgreSQL table pulsar_postgres_jdbc_sink.

提示

标记说明示例
—archiveThe path to the archive file for the sink.pulsar-io-jdbc-postgres-2.8.0.nar
—inputsThe input topic(s) of the sink.

Multiple topics can be specified as a comma-separated list.
—nameSink 的名称pulsar-postgres-jdbc-sink
—sink-config-file指定 sink 配置的 YAML 配置文件的路径。pulsar-postgres-jdbc-sink.yaml
—parallelismThe parallelism factor of the sink.

For example, the number of sink instances to run.
1

提示

更多关于 pulsar-admin sinks create options 的信息,参阅这里

The sink has been created successfully if the following message appears.

  1. "Created successfully"

检查 JDBC sink

你可以使用连接器管理 CLI 来监控连接器并执行其他操作。

  • 列出所有运行中的 JDBC sink(s)。

    1. $ bin/pulsar-admin sinks list \
    2. --tenant public \
    3. --namespace default

    提示

    更多关于 pulsar-admin sinks list options 的信息,参阅这里

    The result shows that only the postgres-jdbc-sink sink is running.

    1. [
    2. "pulsar-postgres-jdbc-sink"
    3. ]
  • 获取 JDBC sink 信息。

    1. $ bin/pulsar-admin sinks get \
    2. --tenant public \
    3. --namespace default \
    4. --name pulsar-postgres-jdbc-sink

    提示

    更多关于 pulsar-admin sinks get options 的信息,参阅这里

    结果显示了 sink 连接器的信息,包括租户、命名空间、主题等。

    1. {
    2. "tenant": "public",
    3. "namespace": "default",
    4. "name": "pulsar-postgres-jdbc-sink",
    5. "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink",
    6. "inputSpecs": {
    7. "pulsar-postgres-jdbc-sink-topic": {
    8. "isRegexPattern": false
    9. }
    10. },
    11. "configs": {
    12. "password": "password",
    13. "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink",
    14. "userName": "postgres",
    15. "tableName": "pulsar_postgres_jdbc_sink"
    16. },
    17. "parallelism": 1,
    18. "processingGuarantees": "ATLEAST_ONCE",
    19. "retainOrdering": false,
    20. "autoAck": true
    21. }
  • 获取 JDBC sink 状态。

    1. $ bin/pulsar-admin sinks status \
    2. --tenant public \
    3. --namespace default \
    4. --name pulsar-postgres-jdbc-sink

    提示

    更多关于 pulsar-admin sinks status options 的信息,参阅这里

    The result shows the current status of sink connector, including the number of instance, running status, worker ID and so on.

    1. {
    2. "numInstances" : 1,
    3. "numRunning" : 1,
    4. "instances" : [ {
    5. "instanceId" : 0,
    6. "status" : {
    7. "running" : true,
    8. "error" : "",
    9. "numRestarts" : 0,
    10. "numReadFromPulsar" : 0,
    11. "numSystemExceptions" : 0,
    12. "latestSystemExceptions" : [ ],
    13. "numSinkExceptions" : 0,
    14. "latestSinkExceptions" : [ ],
    15. "numWrittenToSink" : 0,
    16. "lastReceivedTime" : 0,
    17. "workerId" : "c-standalone-fw-192.168.2.52-8080"
    18. }
    19. } ]
    20. }

停止 JDBC sink

你可以使用连接器管理 CLI 来停止连接器并执行其他操作。

  1. $ bin/pulsar-admin sinks stop \
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-postgres-jdbc-sink

提示

更多关于 pulsar-admin sinks stop options 的信息,参阅这里

The sink instance has been stopped successfully if the following message disappears.

  1. "Stopped successfully"

重启 JDBC sink

你可以使用连接器管理 CLI 来重新启动连接器并执行其他操作。

  1. $ bin/pulsar-admin sinks restart \
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-postgres-jdbc-sink

提示

更多关于 pulsar-admin sinks restart options 的信息,参阅这里

The sink instance has been started successfully if the following message disappears.

  1. "Started successfully"

提示

  • 可选,你可以使用 pulsar-admin sinks localrun options 运行独立的 sink 连接器。

    Note that pulsar-admin sinks localrun options runs a sink connector locally, while pulsar-admin sinks start options starts a sink connector in a cluster.

  • 更多关于 pulsar-admin sinks localrun options 的信息,参阅这里

更新 JDBC sink

你可以使用连接器管理 CLI 来更新连接器并执行其他操作。

This example updates the parallelism of the pulsar-postgres-jdbc-sink sink connector to 2.

  1. $ bin/pulsar-admin sinks update \
  2. --name pulsar-postgres-jdbc-sink \
  3. --parallelism 2

提示

更多关于 pulsar-admin sinks update options 的信息,参阅这里

The sink connector has been updated successfully if the following message disappears.

  1. "Updated successfully"

This example double-checks the information.

  1. $ bin/pulsar-admin sinks get \
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-postgres-jdbc-sink

The result shows that the parallelism is 2.

  1. {
  2. "tenant": "public",
  3. "namespace": "default",
  4. "name": "pulsar-postgres-jdbc-sink",
  5. "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink",
  6. "inputSpecs": {
  7. "pulsar-postgres-jdbc-sink-topic": {
  8. "isRegexPattern": false
  9. }
  10. },
  11. "configs": {
  12. "password": "password",
  13. "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink",
  14. "userName": "postgres",
  15. "tableName": "pulsar_postgres_jdbc_sink"
  16. },
  17. "parallelism": 2,
  18. "processingGuarantees": "ATLEAST_ONCE",
  19. "retainOrdering": false,
  20. "autoAck": true
  21. }

删除 JDBC sink

你可以使用连接器管理 CLI 来删除连接器并执行其他操作。

This example deletes the pulsar-postgres-jdbc-sink sink connector.

  1. $ bin/pulsar-admin sinks delete \
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-postgres-jdbc-sink

提示

更多关于 pulsar-admin sinks delete options 的信息,参阅这里

The sink connector has been deleted successfully if the following message appears.

  1. "Deleted successfully"

此示例复查 sink 连接器的状态。

  1. $ bin/pulsar-admin sinks get \
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-postgres-jdbc-sink

结果显示 sink 连接器不存在。

  1. HTTP 404 Not Found
  2. Reason: Sink pulsar-postgres-jdbc-sink doesn't exist