自定义 sink (NebulaSink)

Nebula Flink Connector 支持以 DataStream.addSink 的方式将 Flink 数据流写入 Nebula Graph 数据库。

说明:Nebula Flink Connector 使用 Flink 1.11-SNAPSHOT 开发,这个版本已经不再支持使用 writeUsingOutputFormat 方式定义输出端的接口,源码如下。所以,在使用自定义 NebulaSink 时,请您务必使用 DataStream.addSink 方式。

  1. /** @deprecated */
  2. @Deprecated
  3. @PublicEvolving
  4. public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
  5. return this.addSink(new OutputFormatSinkFunction(format));
  6. }

Nebula Flink Connector 中实现了自定义的 NebulaSinkFunction,开发者通过调用 dataSource.addSink 方法并将 NebulaSinkFunction 对象作为参数传入即可实现将 Flink 数据流写入 Nebula Graph 数据库中。

NebulaSink 的实现类图如下所示。

Nebula Flink Connector 的 sink 实现类图

最重要的两个类是 NebulaSinkFunction NebulaBatchOutputFormat

NebulaSinkFunction

NebulaSinkFunction 继承自 AbstractRichFunction 并实现了以下方法:

  • open:调用 NebulaBatchOutputFormatopen 方法以准备资源。
  • close:调用 NebulaBatchOutputFormatclose 方法以释放资源。
  • invoke:是 NebulaSink 中的核心方法,调用 NebulaBatchOutputFormat 中的 write 方法写入数据。
  • flush:调用 NebulaBatchOutputFormatflush 方法提交数据。

NebulaBatchOutputFormat

NebulaBatchOutputFormat 继承自 AbstractNebulaOutPutFormat,而后者继承自 RichOutputFormat,主要实现了以下方法:

  • open:准备 Nebula Graph 数据库的 Graph 服务的连接,并初始化数据写入执行器 nebulaBatchExecutor
  • close:提交最后批次的数据,等待最后提交的回调结果并关闭服务连接等资源。
  • writeRecord:核心方法,将数据写入 bufferedRow 中,并在达到配置的批量写入上限时提交写入。NebulaSink 的写入操作是异步的,所以需要执行回调来获取执行结果。
  • flush:当 bufferedRow 存在数据时,将数据提交到 Nebula Graph 中。

AbstractNebulaOutputFormat 中调用了 NebulaBatchExecutor,用于数据的批量管理和批量提交,并通过定义回调函数接收批量提交的结果,代码如下:

  1. /**
  2. * write one record to buffer
  3. */
  4. @Override
  5. public final synchronized void writeRecord(T row) throws IOException {
  6. nebulaBatchExecutor.addToBatch(row);
  7. if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) {
  8. commit();
  9. }
  10. }
  11. /**
  12. * put record into buffer
  13. *
  14. * @param record represent vertex or edge
  15. */
  16. void addToBatch(T record) {
  17. boolean isVertex = executionOptions.getDataType().isVertex();
  18. NebulaOutputFormatConverter converter;
  19. if (isVertex) {
  20. converter = new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) executionOptions);
  21. } else {
  22. converter = new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions);
  23. }
  24. String value = converter.createValue(record, executionOptions.getPolicy());
  25. if (value == null) {
  26. return;
  27. }
  28. nebulaBufferedRow.putRow(value);
  29. }
  30. /**
  31. * commit batch insert statements
  32. */
  33. private synchronized void commit() throws IOException {
  34. graphClient.switchSpace(executionOptions.getGraphSpace());
  35. future = nebulaBatchExecutor.executeBatch(graphClient);
  36. // clear waiting rows
  37. numPendingRow.compareAndSet(executionOptions.getBatch(),0);
  38. }
  39. /**
  40. * execute the insert statement
  41. *
  42. * @param client Asynchronous graph client
  43. */
  44. ListenableFuture executeBatch(AsyncGraphClientImpl client) {
  45. String propNames = String.join(NebulaConstant.COMMA, executionOptions.getFields());
  46. String values = String.join(NebulaConstant.COMMA, nebulaBufferedRow.getRows());
  47. // construct insert statement
  48. String exec = String.format(NebulaConstant.BATCH_INSERT_TEMPLATE, executionOptions.getDataType(), executionOptions.getLabel(), propNames, values);
  49. // execute insert statement
  50. ListenableFuture<Optional<Integer>> execResult = client.execute(exec);
  51. // define callback function
  52. Futures.addCallback(execResult, new FutureCallback<Optional<Integer>>() {
  53. @Override
  54. public void onSuccess(Optional<Integer> integerOptional) {
  55. if (integerOptional.isPresent()) {
  56. if (integerOptional.get() == ErrorCode.SUCCEEDED) {
  57. LOG.info("batch insert Succeed");
  58. } else {
  59. LOG.error(String.format("batch insert Error: %d",
  60. integerOptional.get()));
  61. }
  62. } else {
  63. LOG.error("batch insert Error");
  64. }
  65. }
  66. @Override
  67. public void onFailure(Throwable throwable) {
  68. LOG.error("batch insert Error");
  69. }
  70. });
  71. nebulaBufferedRow.clean();
  72. return execResult;
  73. }

由于 NebulaSink 的写入是批量、异步的,所以在最后业务结束关闭(close)资源之前需要将缓存中的批量数据提交且等待写入操作的完成,以防在写入提交之前提前关闭 Nebula Graph 的客户端,代码如下:

  1. /**
  2. * commit the batch write operator before release connection
  3. */
  4. @Override
  5. public final synchronized void close() throws IOException {
  6. if(numPendingRow.get() > 0){
  7. commit();
  8. }
  9. while(!future.isDone()){
  10. try {
  11. Thread.sleep(100);
  12. } catch (InterruptedException e) {
  13. LOG.error("sleep interrupted, ", e);
  14. }
  15. }
  16. super.close();
  17. }

应用实践

Flink 将处理完成的数据 sink 到 Nebula Graph 数据库时,需要将 Flink 数据流进行 map 转换成 NebulaSink 可接收的数据格式。自定义 NebulaSink 的使用方式是通过 addSink 的形式,

您可以按以下步骤使用 Nebula Flink Connector 的 NebulaSink 向 Nebula Graph 写入数据:

  1. 将 Flink 数据转换成 NebulaSink 可以接受的数据格式。
  2. NebulaSinkFunction 作为参数传给 addSink 方法来实现 Flink 数据流的写入。

在构造的 NebulaSinkFunction 中分别对客户端参数和执行参数作了如下配置:

  • NebulaClientOptions 需要配置:
    • Nebula Graph 图数据库 Graph 服务的 IP 地址及端口号。如果有多个地址,使用英文逗号分隔。
    • Nebula Graph 图数据库的账号及其密码。
  • VertexExecutionOptions 需要配置:
    • 需要写入点数据的 Nebula Graph 图数据库中的图空间名称。
    • 需要写入的标签(点类型)名称。
    • 需要写入的标签属性。
    • 需要写入的点 VID 所在 Flink 数据流 Row 中的索引。
    • 单次写入 Nebula Graph 的数据量限值,默认为 2000。
  • EdgeExecutionOptions 需要配置:
    • 需要写入边数据的 Nebula Graph 图数据库中的图空间名称。
    • 需要写入的边类型。
    • 需要写入的边类型属性。
    • 需要写入的边起点 VID(src_Id)所在 Flink 数据流 Row 中的索引。
    • 需要写入的边终点 VID(dst_Id)所在 Flink 数据流 Row 中的索引。
    • 需要写入的边 rank 所在 Flink 数据流 Row 中的索引。如果不配置,则写入边数据时不带 rank 信息。
    • 单次写入的数据量限值,默认值为 2000。

假设需要写入点数据的 Nebula Graph 图数据库信息如下:

  • Graph 服务为本地单副本部署,使用默认端口
  • 图空间名称:flinkSink
  • 标签:player
  • 标签属性:nameage

以下为自定义 NebulaSink 的代码示例。

  1. // 构造 Nebula Graph 的 Graph 服务客户端连接需要的参数
  2. NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
  3. .NebulaClientOptionsBuilder()
  4. .setAddress("127.0.0.1:3699")
  5. .build();
  6. NebulaConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
  7. // 构造 Nebula Graph 写入点数据的操作参数
  8. List<String> cols = Arrays.asList("name", "age")
  9. ExecutionOptions sinkExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
  10. .setGraphSpace("flinkSink")
  11. .setTag(tag)
  12. .setFields(cols)
  13. .setIdIndex(0)
  14. .setBatch(2)
  15. .builder();
  16. // 将点数据写入 Nebula Graph
  17. dataSource.addSink(nebulaSinkFunction);

NebulaSink 示例程序

您可以参考 GitHub 上的示例程序 testSourceSink 编写您自己的 Flink 应用程序。

以 testSourceSink 为例:该程序以 Nebula Graph 的图空间 flinkSource 作为 source,通过 Flink 读取进行 map 类型转换后的数据,再写入 Nebula Graph 另一个图空间 flinkSink,即 Nebula Graph 一个图空间 flinkSource 的数据流入另一个图空间 flinkSink 中。