How to develop Pulsar connectors

本教程解释了如何开发 Pulsar connector,以在 Pulsar 和其他系统之间移动数据。

Pulsar connector 是特殊的 Pulsar Functions,因此创建 Pulsar connector 类似于创建 Pulsar function。

Pulsar connector 可分为两类:

类型Description示例

{@inject: github:

Source:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}|Import data from another system to Pulsar.|RabbitMQ source connector imports the messages of a RabbitMQ queue to a Pulsar topic. Sink | 从 Pulsar 导出数据到另一个系统。|Kinesis sink connector 从 Pulsar topic 导出消息到 Kinesis 流。

Develop

你可以开发 Pulsar source connector 和 sink connector。

Source

Developing a source connector is to implement the Source interface, which means you need to implement the open method and the read method.

  1. Implement the open method.

    1. /**
    2. * Open connector with configuration
    3. *
    4. * @param config initialization config
    5. * @param sourceContext
    6. * @throws Exception IO type exceptions when opening a connector
    7. */
    8. void open(final Map<String, Object> config, SourceContext sourceContext) throws Exception;

    Source connector 初始化会调用此方法。

    在此方法中,可通过传入的 config 参数检索所有 connector 的特定设置,并初始化所有必须的资源。

    例如,Kafka connector 可以在 open 方法中创建 Kafka 客户端。

    Besides, Pulsar runtime also provides a SourceContext for the connector to access runtime resources for tasks like collecting metrics. 执行此方法可以保存 SourceContext 供后续使用。

  2. Implement the read method.

    1. /**
    2. * Reads the next message from source.
    3. * If source does not have any new messages, this call should block.
    4. * @return next message from source. The return result should never be null
    5. * @throws Exception
    6. */
    7. Record<T> read() throws Exception;

    如果没有要返回的内容,则应停止该方法的执行而不是返回 null

    The returned Record should encapsulate the following information, which is needed by Pulsar IO runtime.

    • Record should provide the following variables:

      变量RequiredDescription
  1. `TopicName`|No|Pulsar topic name from which the record is originated from. `Key`|No| 随机用密钥标记消息。<br/><br/> 更多详细信息,参阅 [Routing modes](/docs/zh-CN/2.6.3/concepts-messaging#routing-modes).| `Value`|Yes| 实际记录数据。 `EventTime`|No| Source 中记录的事件时间。 `PartitionId`|No| 如果记录来自已分区的 source,则返回 `PartitionId`。 <br/><br/>`PartitionId` 被 Pulsar IO 运行时间用作唯一标识符的一部分,删除重复的消息并实现 exactly-once 处理保证。 `RecordSequence`|No| 如果记录来自于序列 source,则返回 `RecordSequence`。<br/><br/>`RecordSequence` 被 Pulsar IO 运行时间用作唯一标识符的一部分,删除重复的消息并实现 exactly-once 处理保证。 `Properties` |No| 如果记录带有用户自定义的属性,则返回相关属性。 `DestinationTopic`|No| 应该写入消息的 topic。 `Message`|No| 包含用户发送数据的类。<br/><br/> 更多详细信息,参阅 [Message.java](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java)。|
  2. * {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java} should provide the following methods:
  3. Method|Description |---|--- `ack` |Acknowledge that the record is fully processed. `fail`|Indicate that the record fails to be processed.

Tip

For more information about how to create a source connector, see KafkaSource .

Sink

Developing a sink connector is similar to developing a source connector, that is, you need to implement the Sink interface, which means implementing the open method and the write method.

  1. Implement the open method.

    1. /**
    2. * Open connector with configuration
    3. *
    4. * @param config initialization config
    5. * @param sinkContext
    6. * @throws Exception IO type exceptions when opening a connector
    7. */
    8. void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;
  2. Implement the write method.

    1. /**
    2. * Write a message to Sink
    3. * @param record record to write to sink
    4. * @throws Exception
    5. */
    6. void write(Record<T> record) throws Exception;

    During the implementation, you can decide how to write the Value and the Key to the actual source, and leverage all the provided information such as PartitionId and RecordSequence to achieve different processing guarantees.

    还需要 ack 记录(消息发送成功)或发送失败记录(消息发送失败)。

测试

测试 connector 会有一定的难度,因为 Pulsar IO connector 与两个可能很难模拟的系统交互,即 Pulsar 和与 connector 连接的系统。

建议在模拟外部服务时编写特殊测试来测试 connector 功能,如下所示。

单元测试

可以为 connector 创建单元测试。

集成测试

只要有足够的单元测试,就可以添加单独的集成测试来验证端到端的功能。

Pulsar uses testcontainers for all integration tests.

Tip

For more information about how to create integration tests for Pulsar connectors, see IntegrationTests .

在开发并测试 connector 后,需要先将其打包,然后才能提交到 Pulsar Functions 集群。

有两种方法可以使用 Pulsar Functions 的运行时间,即 NARuber JAR

Note

如果打算将 connector 打包并分发给其他人使用,则需要 为自己的代码添加许可,并进行版权保护。 请记得添加许可和版权到 代码用到的所有库和你的发行版中。

如果使用 NAR 方法,则 NAR 插件会在生成的 NAR 包中自动创建一个 DEPENDENCIES 文件,包含 connector 所有库的许可和版权。

NAR

NAR stands for NiFi Archive, which is a custom packaging mechanism used by Apache NiFi, to provide a bit of Java ClassLoader isolation.

Tip

For more information about how NAR works, see here.

Pulsar uses the same mechanism for packaging all built-in connectors.

安装 Pulsar connector 最简单的方法是使用 nifi-nar-maven-plugin 创建一个 NAR 包。

Include this nifi-nar-maven-plugin in your maven project for your connector as below.

  1. <plugins>
  2. <plugin>
  3. <groupId>org.apache.nifi</groupId>
  4. <artifactId>nifi-nar-maven-plugin</artifactId>
  5. <version>1.2.0</version>
  6. </plugin>
  7. </plugins>

You must also create a resources/META-INF/services/pulsar-io.yaml file with the following contents:

  1. name: connector name
  2. description: connector description
  3. sourceClass: fully qualified class name (only if source connector)
  4. sinkClass: fully qualified class name (only if sink connector)

If you are using the Gradle NiFi plugin you might need to create a directive to ensure your pulsar-io.yaml is copied into the NAR file correctly.

Tip

For more information about an how to use NAR for Pulsar connectors, see TwitterFirehose .

Uber JAR

An alternative approach is to create an uber JAR that contains all of the connector’s JAR files and other resource files. No directory internal structure is necessary.

可以使用 maven-shade-plugin 来创建 uber JAR。如下所示:

  1. <plugin>
  2. <groupId>org.apache.maven.plugins</groupId>
  3. <artifactId>maven-shade-plugin</artifactId>
  4. <version>3.1.1</version>
  5. <executions>
  6. <execution>
  7. <phase>package</phase>
  8. <goals>
  9. <goal>shade</goal>
  10. </goals>
  11. <configuration>
  12. <filters>
  13. <filter>
  14. <artifact>*:*</artifact>
  15. </filter>
  16. </filters>
  17. </configuration>
  18. </execution>
  19. </executions>
  20. </plugin>