自定义 source (NebulaSource)

Nebula Flink Connector 支持以 addSource 或者 createInput 方式将 Nebula Graph 图数据库注册为 Flink 的数据源(source)。其中,通过 addSource 读取 source 数据得到的是 Flink 的 DataStreamSource,表示 DataStream 的起点,而通过 createInput 读取 source 数据得到的是 Flink 的 DataSourceDataSource 会作为进一步转换的数据集。DataSource 可以通过 withParameters 封装配置参数进行其他操作。

NebulaSource 的实现类图如下所示。

Nebula Flink Connector 的 source 实现类图

addSource

addSource 方式通过 NebulaSourceFunction 类实现,该类继承自 RichSourceFunction 并实现了以下方法:

  • open:准备 Nebula Graph 的连接信息,并获取 Nebula Graph 图数据库 Meta 服务和 Storage 服务的连接。
  • close:在数据读取完成后释放资源,并断开与 Nebula Graph 图数据库服务的连接。
  • run:开始读取数据,并将数据填充到 sourceContext
  • cancel:取消 Flink 作业时调用这个方法以关闭资源。

createInput

createInput 方式通过 NebulaInputFormat 类实现,该类继承自 RichInputFormat 并实现了以下方法:

  • openInputFormat:准备 inputFormat 以获取连接。
  • closeInputFormat:数据读取完成后释放资源。断开与 Nebula Graph 图数据库服务的连接。
  • open:开始 inputFormat 的数据读取,将读取的数据转换为 Flink 的数据格式,构造迭代器。
  • close:在数据读取完成后打印读取日志。
  • reachedEnd:判断是否读取完成。
  • nextRecord:通过迭代器获取下一条数据.

应用实践

您可以按以下步骤使用 Nebula Flink Connector 读取 Nebula Graph 的图数据:

  1. 构造 NebulaSourceFunctionNebulaOutputFormat
  2. 通过 Flink 的 addSource 或者 createInput 方式将 Nebula Graph 注册为数据源。

在构造的 NebulaSourceFunctionNebulaOutputFormat 中,对客户端参数和执行参数作如下配置:

  • NebulaClientOptions 需要配置:
    • Nebula Graph 图数据库 Meta 服务的 IP 地址及端口号。如果有多个服务,使用逗号分隔,例如 “ip1:port1,ip2:port2"
    • Nebula Graph 图数据库的账号及其密码。
  • VertexExecutionOptions 需要配置:
    • 需要读取点数据的 Nebula Graph 图数据库中的图空间名称。
    • 需要读取的标签(点类型)名称。一次只能一个标签。
    • 要读取的标签属性。
    • 是否读取指定标签的所有属性,默认为 false。如果配置为 true 则标签属性的配置无效。
    • 单次读取的数据量限值,默认为 2000 个点数据。
  • EdgeExecutionOptions 需要配置:
    • 需要读取边数据的 Nebula Graph 图数据库中的图空间名称。
    • 需要读取的边类型。一次只能一个边类型。
    • 需要读取的边类型属性。
    • 是否读取指定边类型的所有属性,默认为 false。如果配置为 true 则边类型属性的配置无效。
    • 单次读取的数据量限值,默认值为 2000 个边数据。

假设需要读取点数据的 Nebula Graph 图数据库信息如下:

  • Meta 服务为本地单副本部署,使用默认端口
  • 图空间名称:flinkSource
  • 标签:player
  • 标签属性:nameage
  • 单次最多读取 100 个点数据

以下为自定义 NebulaSource 的代码示例。

  1. // 构造 Nebula Graph 客户端连接需要的参数
  2. NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
  3. .NebulaClientOptionsBuilder()
  4. .setAddress("127.0.0.1:45500")
  5. .build();
  6. // 创建 connectionProvider
  7. NebulaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
  8. // 构造读取 Nebula Graph 数据需要的参数
  9. List<String> cols = Arrays.asList("name", "age");
  10. VertexExecutionOptions sourceExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
  11. .setGraphSpace("flinkSource")
  12. .setTag(tag)
  13. .setFields(cols)
  14. .setLimit(100)
  15. .builder();
  16. // 构造 NebulaInputFormat
  17. NebulaInputFormat inputFormat = new NebulaInputFormat(metaConnectionProvider)
  18. .setExecutionOptions(sourceExecutionOptions);
  19. // 方式 1 使用 createInput 方式将 Nebula Graph 注册为数据源
  20. DataSource<Row> dataSource1 = ExecutionEnvironment.getExecutionEnvironment()
  21. .createInput(inputFormat);
  22. // 方式 2 使用 addSource 方式将 Nebula Graph 注册为数据源
  23. NebulaSourceFunction sourceFunction = new NebulaSourceFunction(metaConnectionProvider)
  24. .setExecutionOptions(sourceExecutionOptions);
  25. DataStreamSource<Row> dataSource2 = StreamExecutionEnvironment.getExecutionEnvironment()
  26. .addSource(sourceFunction);

示例程序

您可以参考 GitHub 上的示例程序 testNebulaSource 编写您自己的 Flink 应用程序。

以 testNebulaSource 为例:该程序以 Nebula Graph 图数据库为 source,以 Print 为 sink,从 Nebula Graph 图数据库中读取 59,671,064 条点数据后再打印。将该程序打包提交到 Flink 集群执行,结果如下图所示。

Flink Dashboard 上显示的 testNebulaSource 执行结果

由上图可知,source 发送数据 59,671,064 条,sink 接收数据 59,671,064 条。