Nebula Spark Connector Writer 应用示例

本文以一个示例说明如何使用 Nebula Spark Connector Writer 向 Nebula Graph 写入点和边数据。

前提条件

使用 Nebula Spark Connector Writer 前,您需要确认以下信息:

  • 您的机器上已经安装了以下软件:

    • Apache Spark™ 2.3.0 及更高版本
    • Scala
    • Java:1.8
  • 已经成功编译 Nebula Spark Connector,并已经将 nebula-spark-1.x.y.jar 复制到本地 Maven 库。详细信息参考 编译 Nebula Spark Connector

  • 待写入的点和边数据源。在本示例中所用的数据源为 JSON 文件,您可以从 nebula-java 库 中下载。

  • Nebula Graph 的 Graph 服务 IP 地址及端口号。在本示例中,对应的信息为 127.0.0.1:3699

  • 在 Nebula Graph 中创建 Schema,并获取以下信息:

    • 图空间名称和分区数量。在本示例中,对应的信息为 nb100
    • 点的信息,包括标签和 VID 映射策略(hash)。
    • 边的信息,包括起点和终点对应的标签,以及 VID 映射策略(hash)。
  • (可选)如果是批量写入,需要确认单次写入的最大数据量,默认为 2000。详见本文 batchInsert 配置项说明。

操作步骤

参考以下步骤使用 Nebula Spark Connector Writer 向 Nebula Graph 写入数据。

第 1 步

在 Maven 项目的 POM 文件中加入 nebula-spark 依赖。

  1. <dependency>
  2. <groupId>com.vesoft</groupId>
  3. <artifactId>nebula-spark</artifactId>
  4. <version>1.x.y</version>
  5. </dependency>

说明<version> 建议配置为最新发布的 Nebula Java Client 版本号。您可以在 nebula-java 仓库的 Releases 页面 查看最新的 v1.x 版本。

第 2 步

根据逐条或批量写入需求,参考以下示例在 Spark 应用程序中完成配置。

逐条写入数据

示例代码如下:

  1. // 构造点和边数据的 DataFrame,
  2. // 这里使用 nebula-java 库 v1.0 分支里 nebula-java/examples/src/main/resources 目录下的示例数据,
  3. // 示例数据在本地的存储路径为 examples/src/main/resources
  4. val vertexDF = spark.read.json("examples/src/main/resources/vertex")
  5. vertexDF.show()
  6. val edgeDF = spark.read.json("examples/src/main/resources/edge")
  7. edgeDF.show()
  8. // 写入点
  9. vertexDF.write
  10. .nebula("127.0.0.1:3699", "nb", "100")
  11. .writeVertices("player", "vertexId", "hash")
  12. // 写入边
  13. edgeDF.write
  14. .nebula("127.0.0.1:3699", "nb", "100")
  15. .wirteEdges("follow", "src_id", "dst_id")

示例代码中的配置说明如下:

  • nebula(address: String, space: String, partitionNum: String)

    • address:Nebula Graph 的 Graph 服务地址及端口,可以配置多个地址,以英文逗号分隔,如 “ip1:port,ip2:port”,端口默认为 3699
    • space:Nebula Graph 中即将写入数据的图空间名称。
    • partitionNum:在 Nebula Graph 中创建图空间时指定的 partitionNum 的值。如果未指定,这里填写 100
  • writeVertices(tag: String, vertexField: String, policy: String = "")

    • tag:点对应的 Nebula Graph 图空间中的标签名称。
    • vertexField:DataFrame 中可作为 Nebula Graph 点 VID 的列。例如,如果 DataFrame 有三列,分别为 a、b、c,其中 a 列作为点 VID 列,则该参数设置为 "a"
    • policy:如果 DataFrame 中 vertexField 列的数据类型非数值型,则需要配置 Nebula Graph 中 VID 的映射策略,即该参数设置为 "hash"。如果 vertexField 列的数据类型为整数型,则不需要配置。
  • writeEdges(edge: String, srcVertexField: String, dstVertexField: String, policy: String = "")

    • edge:边对应的 Nebula Graph 图空间中的边类型名称。
    • srcVertexFielddstVertexField:DataFrame 中可作为边起点和边终点的列。列值必须同为整数型或同为非数值型。
    • policy:如果 DataFrame 中 srcVertexField 列和 dstVertexField 列的数据类型非数值型,则需要配置 Nebula Graph 中 VID 的映射策略,即该参数设置为 "hash"。如果 srcVertexField 列和 dstVertexField 列的数据类型为整数型,则不需要配置。

批量写入数据

示例代码如下:

  1. // 构造点和边数据的 DataFrame,
  2. // 这里使用 nebula-java 库 v1.0 分支里 nebula-java/examples/src/main/resources 目录下的示例数据,
  3. // 示例数据在本地的存储路径为 examples/src/main/resources
  4. val vertexDF = spark.read.json("examples/src/main/resources/vertex")
  5. vertexDF.show()
  6. val edgeDF = spark.read.json("examples/src/main/resources/edge")
  7. edgeDF.show()
  8. // 批量写入点
  9. new NebulaBatchWriterUtils()
  10. .batchInsert("127.0.0.1:3699", "nb", 2000)
  11. .batchToNebulaVertex(vertexDF, "player", "vertexId")
  12. // 批量写入边
  13. new NebulaBatchWriterUtils()
  14. .batchInsert("127.0.0.1:3699", "nb", 2000)
  15. .batchToNebulaEdge(edgeDF, "follow", "source", "target")

示例代码中的配置说明如下:

  • batchInsert(address: String, space: String, batch: Int = 2000)

    • address:Nebula Graph 的 Graph 服务地址及端口,可以配置多个地址,以英文逗号分隔,如 “ip1:port,ip2:port”,端口默认为 3699
    • space:Nebula Graph 中即将写入数据的图空间名称。
    • batch:批量写入时一批次的最大数据量,可不配置,默认为 2000。
  • batchToNebulaVertex(data: DataFrame, tag: String, vertexField: String, policy: String = "")

    • data:待写入 Nebula Graph 的 DataFrame 数据。
    • tag:Nebula Graph 图空间中对应的标签名称。
    • vertexField:DataFrame 中可作为 Nebula Graph 点 VID 的列。例如,如果 DataFrame 有三列,分别为 a、b、c,其中a 列作为点 VID 列,则该参数设置为 "a"
    • policy:如果 DataFrame 中 vertexField 列的数据类型非数值型,则需要配置 Nebula Graph 中 VID 的映射策略,即该参数设置为 "hash"。如果 vertexField 列的数据类型为整数型,则不需要配置。
  • batchToNebulaEdge(data: DataFrame, edge: String, srcVertexField: String, dstVertexField: String, rankField: String = "", policy: String = "")

    • data:待写入 Nebula Graph 的 DataFrame 数据。
    • edge:Nebula Graph 中对应的边类型。
    • srcVertexFielddstVertexField:DataFrame 中可作为边起点和边终点的列。列值必须同为整数型或同为非数值型。
    • rankField:DataFrame 中可作为边 rank 值的列,可选配。
    • policy:可选。如果 DataFrame 中 srcVertexField 列和 dstVertexField 列的数据类型非数值型,则需要配置 Nebula Graph 中 VID 的映射策略,即该参数设置为 "hash"。如果 srcVertexField 列和 dstVertexField 列的数据类型为整数型,则不需要配置。