Sort 插件

总览

InLong Sort 是一个基于 Apache Flink SQL 的 ETL 服务。Flink SQL 强大的表达能力带来的高可扩展性、灵活性,基本上 Flink SQL 支持的语意,InLong Sort 都支持。 当 Flink SQL 内置的函数不满足需求时,还可通过 UDF 来扩展。这对于曾经使用过 SQL 尤其是 Flink SQL 的开发者非常友好。

本文介绍如何在 InLong Sort 中扩展一个新的 source(在 InLong 中抽象为 Extract Node)或一个新的 sink(在InLong中抽象为 Load Node )。 InLong Sort 架构的 UML 对象关系图如下:

sort_uml

其中各个组件的概念为:

名称描述
Group数据流组,包含多个数据流,一个 Group 代表一个数据接入
Stream数据流,一个数据流有具体的流向
GroupInfoSort 中对数据流向的封装,一个 GroupInfo 可包含多个 DataFlowInfo
StreamInfoSort 中数据流向的抽象,包含该数据流的各种来源、转换、去向等
Node数据同步中数据源、数据转换、数据去向的抽象
ExtractNode数据同步的来源端抽象
TransformNode数据同步的转换过程抽象
LoadNode数据同步的去向端抽象
NodeRelationShip数据同步中各个节点关系抽象
FieldRelationShip数据同步中上下游节点字段间关系的抽象
FieldInfo节点字段
MetaFieldInfo节点 Meta 字段
Function转换函数的抽象
FunctionParam函数的入参抽象
ConstantParam常量参数

扩展 Extract Node 或 Load Node 需要做的工作是:

  • 继承 Node 类(例如 MyExtractNode),构建具体的 extract 或 load 使用逻辑;
  • 在具体的 Node 类(例如 MyExtractNode)中,指定对应 Flink connector;
  • 在具体的 ETL 实现逻辑中使用具体的 Node 类(例如 MyExtractNode)。

其中第二步中可以使用已有的 Flink Connector,或者用户自己扩展,如何扩展 Flink Connector 请参考 Flink 官方文档DataStream Connectors.

扩展 Extract Node

扩展一个 ExtractNode 分为三个步骤:

第一步:继承 ExtractNode 类,类的位置在:

  1. inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java

在实现的 ExtractNode 中指定 connector;

  1. // 继承 ExtractNode 类,实现具体的类,例如 MongoExtractNode
  2. @EqualsAndHashCode(callSuper = true)
  3. @JsonTypeName("MongoExtract")
  4. @Data
  5. public class MongoExtractNode extends ExtractNode implements Serializable {
  6. @JsonInclude(Include.NON_NULL)
  7. @JsonProperty("primaryKey")
  8. private String primaryKey;
  9. ...
  10. @JsonCreator
  11. public MongoExtractNode(@JsonProperty("id") String id, ...) { ... }
  12. @Override
  13. public Map<String String> tableOptions() {
  14. Map<String String> options = super.tableOptions();
  15. // 配置指定的 connector,这里指定的是 mongodb-cdc
  16. options.put("connector", "mongodb-cdc");
  17. ...
  18. return options;
  19. }
  20. }

第二步:在 ExtractNode 和 Node 中的 JsonSubTypes 添加该 Extract

  1. // 在 ExtractNode 和 Node 的 JsonSubTypes 中添加字段
  2. ...
  3. @JsonSubTypes({
  4. @JsonSubTypes.Type(value = MongoExtractNode.class name = "mongoExtract")
  5. })
  6. ...
  7. public abstract class ExtractNode implements Node{...}
  8. ...
  9. @JsonSubTypes({
  10. @JsonSubTypes.Type(value = MongoExtractNode.class name = "mongoExtract")
  11. })
  12. public interface Node {...}

第三步:扩展 Sort Connector,查看此(inlong-sort/sort-connectors/mongodb-cdc)目录下是否已经存在对应的 connector。如果没有,则需要参考 Flink 官方文档 DataStream Connectors 来扩展, 调用已有的 Flink-connector(例如inlong-sort/sort-connectors/mongodb-cdc)或自行实现相关的 connector 均可。

扩展 Load Node

扩展一个 LoadNode 分为三个步骤:

第一步:继承 LoadNode 类,类的位置在:

  1. inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java

在实现的LoadNode 中指定 connector;

  1. // 继承 LoadNode 类,实现具体的类,例如 KafkaLoadNode
  2. @EqualsAndHashCode(callSuper = true)
  3. @JsonTypeName("kafkaLoad")
  4. @Data
  5. @NoArgsConstructor
  6. public class KafkaLoadNode extends LoadNode implements Serializable {
  7. @Nonnull
  8. @JsonProperty("topic")
  9. private String topic;
  10. ...
  11. @JsonCreator
  12. public KafkaLoadNode(@Nonnull @JsonProperty("topic") String topic, ...) {...}
  13. // 根据不同的条件配置使用不同的 connector
  14. @Override
  15. public Map<String String> tableOptions() {
  16. ...
  17. if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) {
  18. if (StringUtils.isEmpty(this.primaryKey)) {
  19. // kafka connector
  20. options.put("connector", "kafka");
  21. options.putAll(format.generateOptions(false));
  22. } else {
  23. // upsert-kafka connector
  24. options.put("connector", "upsert-kafka");
  25. options.putAll(format.generateOptions(true));
  26. }
  27. } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
  28. // kafka-inlong connector
  29. options.put("connector", "kafka-inlong");
  30. options.putAll(format.generateOptions(false));
  31. } else {
  32. throw new IllegalArgumentException("kafka load Node format is IllegalArgument");
  33. }
  34. return options;
  35. }
  36. }

第二步:在 LoadNode 和 Node 中的 JsonSubTypes 添加该 Load

  1. // 在 LoadNode 和 Node 的 JsonSubTypes 中添加字段
  2. ...
  3. @JsonSubTypes({
  4. @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad")
  5. })
  6. ...
  7. public abstract class LoadNode implements Node{...}
  8. ...
  9. @JsonSubTypes({
  10. @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad")
  11. })
  12. public interface Node {...}

第三步:扩展 Sort Connector,Kafka 的 sort connector 在 inlong-sort/sort-connectors/kafka 目录下。

集成到 Entrance

将 Extract 和 Load 集成到 InLong Sort 主流程中,需要构建总览小节中提到的语意:Group、Stream、Node 等。 InLong Sort 的入口类在:

  1. inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java

Extract 和 Load 如何集成至 InLong Sort,可参考下面的 UT,首先构建对应的 ExtractNode、LoadNode,再构建 NodeRelation、StreamInfo、GroupInfo,最后通过 FlinkSqlParser 执行。

  1. public class MongoExtractToKafkaLoad extends AbstractTestBase {
  2. // 构建 MongoExtractNode
  3. private MongoExtractNode buildMongoNode() {
  4. List<FieldInfo> fields = Arrays.asList(new FieldInfo("name", new StringFormatInfo()), ...);
  5. return new MongoExtractNode(..., fields, ...);
  6. }
  7. // 构建 KafkaLoadNode
  8. private KafkaLoadNode buildAllMigrateKafkaNode() {
  9. List<FieldInfo> fields = Arrays.asList(new FieldInfo("name", new StringFormatInfo()), ...);
  10. List<FieldRelation> relations = Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()), ...), ...);
  11. CsvFormat csvFormat = new CsvFormat();
  12. return new KafkaLoadNode(..., fields, relations, csvFormat ...);
  13. }
  14. // 构建 NodeRelation
  15. private NodeRelation buildNodeRelation(List<Node> inputs List<Node> outputs) {
  16. List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
  17. List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
  18. return new NodeRelation(inputIds, outputIds);
  19. }
  20. // 测试主流程 MongoDB to Kafka
  21. @Test
  22. public void testMongoDbToKafka() throws Exception {
  23. EnvironmentSettings settings = EnvironmentSettings. ... .build();
  24. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25. ...
  26. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
  27. Node inputNode = buildMongoNode();
  28. Node outputNode = buildAllMigrateKafkaNode();
  29. StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode), ...);
  30. GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
  31. FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
  32. ParseResult result = parser.parse();
  33. Assert.assertTrue(result.tryExecute());
  34. }
  35. }