How to develop Pulsar connectors

本教程解释了如何开发 Pulsar connector,以在 Pulsar 和其他系统之间移动数据。

Pulsar connector 是特殊的 Pulsar Functions,因此创建 Pulsar connector 类似于创建 Pulsar function。

Pulsar connector 可分为两类:

类型说明示例

{@inject: github:

Source:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}|Import data from another system to Pulsar.|RabbitMQ source connector imports the messages of a RabbitMQ queue to a Pulsar topic. Sink |将数据从 Pulsar 导出到其他系统。|Kinesis sink connector 将 Pulsar 主题消息导出到 Kinesis 流。

开发

你可以开发 Pulsar source connector 和 sink connector。

Source

要开发一个源连接器,需要实现 Source 接口,也就是说,需要实现 open 和 {@inject: github:read:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java 这两个方法。

  1. 实现 open 方法。

    1. /**
    2. * Open connector with configuration
    3. *
    4. * @param config initialization config
    5. * @param sourceContext
    6. * @throws Exception IO type exceptions when opening a connector
    7. */
    8. void open(final Map<String, Object> config, SourceContext sourceContext) throws Exception;

    Source connector 初始化会调用此方法。

    在此方法中,可通过传入的 config 参数检索所有 connector 的特定设置,并初始化所有必须的资源。

    例如,Kafka connector 可以在 open 方法中创建 Kafka 客户端。

    Besides, Pulsar runtime also provides a SourceContext for the connector to access runtime resources for tasks like collecting metrics. 执行此方法可以保存 SourceContext 供后续使用。

  2. 实现 read 方法。

    1. /**
    2. * Reads the next message from source.
    3. * If source does not have any new messages, this call should block.
    4. * @return next message from source. The return result should never be null
    5. * @throws Exception
    6. */
    7. Record<T> read() throws Exception;

    如果没有要返回的内容,则应停止该方法的执行而不是返回 null

    返回值 Record 需要封装以下 Pulsar IO 运行时所需要的信息。

    • Record 需要包含以下变量:

      变量Required说明
  1. `TopicName`|No|Pulsar topic name from which the record is originated from. `Key`|No| 随机用密钥标记消息。<br/><br/> 更多详细信息,参阅 [Routing modes](/docs/zh-CN/concepts-messaging#routing-modes).| `Value`|Yes| 实际记录数据。 `EventTime`|No| Source 中记录的事件时间。 `PartitionId`|No| 如果记录来自已分区的 source,则返回 `PartitionId`。 <br/><br/>`PartitionId` 被 Pulsar IO 运行时间用作唯一标识符的一部分,删除重复的消息并实现 exactly-once 处理保证。 `RecordSequence`|No| 如果记录来自于序列 source,则返回 `RecordSequence`。<br/><br/>`RecordSequence` 被 Pulsar IO 运行时间用作唯一标识符的一部分,删除重复的消息并实现 exactly-once 处理保证。 `Properties` |No| 如果记录带有用户自定义的属性,则返回相关属性。 `DestinationTopic`|No| 应该写入消息的 topic。 `Message`|No| 包含用户发送数据的类。<br/><br/> 更多详细信息,参阅 [Message.java](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java)。|
  2. * {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java} should provide the following methods:
  3. Method|Description |---|--- `ack` |Acknowledge that the record is fully processed. `fail`|Indicate that the record fails to be processed.

处理 Schema 信息

Pulsar IO 能够自动处理 Schema 信息,同时提供了强大的基于 Java 通用 API 的类型。 如果你知道你正在处理的 Schema 类型,你就可以在 Sink 申明中为此类型直接指定对应的 Java 原生类。

  1. public class MySource implements Source<String> {
  2. public Record<String> read() {}
  3. }

如果你想实现一个能够兼容任意 Schema 的 Source,你可以使用 byte[] (of ByteBuffer) ,代码实现为 Schema.AUTO_PRODUCE_BYTES()。

  1. public class MySource implements Source<byte[]> {
  2. public Record<byte[]> read() {
  3. Schema wantedSchema = ....
  4. Record<byte[]> myRecord = new MyRecordImplementation();
  5. ....
  6. }
  7. class MyRecordImplementation implements Record<byte[]> {
  8. public byte[] getValue() {
  9. return ....encoded byte[]...that represents the value
  10. }
  11. public Schema<byte[]> getSchema() {
  12. return Schema.AUTO_PRODUCE_BYTES(wantedSchema);
  13. }
  14. }
  15. }

To handle the KeyValue type properly, follow the guidelines for your record implementation:

  • It must implement Record interface and implement getKeySchema,getValueSchema, and getKeyValueEncodingType
  • It must return a KeyValue object as Record.getValue()
  • It may return null in Record.getSchema()

When Pulsar IO runtime encounters a KVRecord, it brings the following changes automatically:

  • Set properly the KeyValueSchema
  • Encode the Message Key and the Message Value according to the KeyValueEncoding (SEPARATED or INLINE)

提示

For more information about how to create a source connector, see KafkaSource .

Sink

Developing a sink connector is similar to developing a source connector, that is, you need to implement the Sink interface, which means implementing the open method and the write method.

  1. 实现 open 方法。

    1. /**
    2. * Open connector with configuration
    3. *
    4. * @param config initialization config
    5. * @param sinkContext
    6. * @throws Exception IO type exceptions when opening a connector
    7. */
    8. void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;
  2. 实现 write 方法。

    1. /**
    2. * Write a message to Sink
    3. * @param record record to write to sink
    4. * @throws Exception
    5. */
    6. void write(Record<T> record) throws Exception;

    During the implementation, you can decide how to write the Value and the Key to the actual source, and leverage all the provided information such as PartitionId and RecordSequence to achieve different processing guarantees.

    还需要 ack 记录(消息发送成功)或发送失败记录(消息发送失败)。

处理 Schema 信息

Pulsar IO 能够自动处理 Schema 信息,同时提供了强大的基于 Java 通用 API 的类型。 如果你知道你正在消费的数据的 Schema 类型,你就可以在 Sink 申明中为此类型直接指定对应的 Java 原生类。

  1. public class MySink implements Sink<String> {
  2. public void write(Record<String> record) {}
  3. }

如果你要实现一个可以兼容任意 Schema 的 Sink 连接器,你可以使用特殊的 GenericObject 接口。

  1. public class MySink implements Sink<GenericObject> {
  2. public void write(Record<GenericObject> record) {
  3. Schema schema = record.getSchema();
  4. GenericObject genericObject = record.getValue();
  5. if (genericObject != null) {
  6. SchemaType type = genericObject.getSchemaType();
  7. Object nativeObject = genericObject.getNativeObject();
  8. ...
  9. }
  10. ....
  11. }
  12. }

当记录类型为 AVRO、JSON 或 Protobuf 时(schemaType=AVRO,JSON,PROTOBUF_NATIVE),你可以将 genericObject 类型的变量转换成 GenericRecord 类型变量,并使用 getFields()getField() API。 你可以通过 genericObject.getNativeObject() 方法访问原生 AVRO 记录。

当记录类型为 KeyValue 时,你可以通过以下代码访问 Schema 的 Key 和 Value。

  1. public class MySink implements Sink<GenericObject> {
  2. public void write(Record<GenericObject> record) {
  3. Schema schema = record.getSchema();
  4. GenericObject genericObject = record.getValue();
  5. SchemaType type = genericObject.getSchemaType();
  6. Object nativeObject = genericObject.getNativeObject();
  7. if (type == SchemaType.KEY_VALUE) {
  8. KeyValue keyValue = (KeyValue) nativeObject;
  9. Object key = keyValue.getKey();
  10. Object value = keyValue.getValue();
  11. KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
  12. Schema keySchema = keyValueSchema.getKeySchema();
  13. Schema valueSchema = keyValueSchema.getValueSchema();
  14. }
  15. ....
  16. }
  17. }

测试

测试 connector 会有一定的难度,因为 Pulsar IO connector 与两个可能很难模拟的系统交互,即 Pulsar 和与 connector 连接的系统。

建议在模拟外部服务时编写特殊测试来测试 connector 功能,如下所示。

单元测试

可以为 connector 创建单元测试。

集成测试

只要有足够的单元测试,就可以添加单独的集成测试来验证端到端的功能。

Pulsar uses testcontainers for all integration tests.

提示

For more information about how to create integration tests for Pulsar connectors, see IntegrationTests .

在开发并测试 connector 后,需要先将其打包,然后才能提交到 Pulsar Functions 集群。

有两种方法可以使用 Pulsar Functions 的运行时间,即 NARuber JAR

Note

如果打算将 connector 打包并分发给其他人使用,则需要 为自己的代码添加许可,并进行版权保护。 请记得添加许可和版权到 代码用到的所有库和你的发行版中。

如果使用 NAR 方法,则 NAR 插件会在生成的 NAR 包中自动创建一个 DEPENDENCIES 文件,包含 connector 所有库的许可和版权。

NAR

NAR stands for NiFi Archive, which is a custom packaging mechanism used by Apache NiFi, to provide a bit of Java ClassLoader isolation.

提示

For more information about how NAR works, see here.

Pulsar uses the same mechanism for packaging all built-in connectors.

安装 Pulsar connector 最简单的方法是使用 nifi-nar-maven-plugin 创建一个 NAR 包。

Include this nifi-nar-maven-plugin in your maven project for your connector as below.

  1. <plugins>
  2. <plugin>
  3. <groupId>org.apache.nifi</groupId>
  4. <artifactId>nifi-nar-maven-plugin</artifactId>
  5. <version>1.2.0</version>
  6. </plugin>
  7. </plugins>

You must also create a resources/META-INF/services/pulsar-io.yaml file with the following contents:

  1. name: connector name
  2. description: connector description
  3. sourceClass: fully qualified class name (only if source connector)
  4. sinkClass: fully qualified class name (only if sink connector)

If you are using the Gradle NiFi plugin you might need to create a directive to ensure your pulsar-io.yaml is copied into the NAR file correctly.

提示

For more information about an how to use NAR for Pulsar connectors, see TwitterFirehose .

Uber JAR

An alternative approach is to create an uber JAR that contains all of the connector’s JAR files and other resource files. No directory internal structure is necessary.

可以使用 maven-shade-plugin 来创建 uber JAR。如下所示:

  1. <plugin>
  2. <groupId>org.apache.maven.plugins</groupId>
  3. <artifactId>maven-shade-plugin</artifactId>
  4. <version>3.1.1</version>
  5. <executions>
  6. <execution>
  7. <phase>package</phase>
  8. <goals>
  9. <goal>shade</goal>
  10. </goals>
  11. <configuration>
  12. <filters>
  13. <filter>
  14. <artifact>*:*</artifact>
  15. </filter>
  16. </filters>
  17. </configuration>
  18. </execution>
  19. </executions>
  20. </plugin>

Monitor

Pulsar 连接器可以让你轻松地将数据接入和输出 Pulsar。 必须确保运行的连接器在任何时候都是正常运行的。 你可以通过以下方法监控已部署的 Pulsar 连接器:

  • 检查由 Pulsar 提供的 Metrics。

    Pulsar connectors expose the metrics that can be collected and used for monitoring the health of Java connectors. 你可以通过关注 监控 指南来检查这些指标。

  • 设置并检查你的自定义 Metrics。

    In addition to the metrics provided by Pulsar, Pulsar allows you to customize metrics for Java connectors. Function workers 会自动将这些 Metrics 数据收集并发送到 Prometheus,你可以在 Grafana 中查看这些指标。

下面是一个为 Java 连接器编写自定义 Metrics 的示例。

Java

  1. public class TestMetricSink implements Sink<String> { @Override public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { sinkContext.recordMetric("foo", 1); } @Override public void write(Record<String> record) throws Exception { } @Override public void close() throws Exception { } }