Nebula Spark Connector Reader 应用示例

本文以一个示例说明如何使用 Nebula Spark Connector Reader 读取 Nebula Graph 的点和边数据。

前提条件

使用 Nebula Spark Connector Reader 前,您需要确认以下信息:

  • 您的机器上已经安装了以下软件:

    • Apache Spark™ 2.3.0 及更高版本
    • Scala
    • Java:1.8
  • 已经成功编译 Nebula Spark Connector Reader,并已经将 nebula-spark-1.x.y.jar 复制到本地 Maven 库。详细信息参考 编译 Nebula Spark Connector

  • 已经获取 Nebula Graph 数据库的以下信息:

    • 图空间名称和分区数量(如果创建图空间时未设置分区数量,则默认使用 100)
    • 标签和边类型的名称以及属性
    • Meta 服务所在机器的 IP 地址及端口号

操作步骤

参考以下步骤使用 Nebula Spark Connector Reader:

  1. 在 Maven 项目的 pom.xml 文件中加入 nebula-spark 依赖。

    1. <dependency>
    2. <groupId>com.vesoft</groupId>
    3. <artifactId>nebula-spark</artifactId>
    4. <version>1.x.y</version>
    5. </dependency>

说明<version> 建议配置为最新发布的 Nebula Java Client 版本号。您可以在 nebula-java 仓库的 Releases 页面 查看最新的 v1.x 版本。

  1. 构建 SparkSession 类。这是 Spark SQL 的编码入口。

    1. val sparkConf = new SparkConf
    2. sparkConf
    3. .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    4. .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
    5. val sparkSession = SparkSession
    6. .builder()
    7. .config(sparkConf)
    8. .master("local")
    9. .getOrCreate()

    其中,关于 .master() 的设置,参考 Spark 配置的 Master URLs

  2. 按以下说明修改配置,利用 Spark 读取 Nebula Graph 的点或者边数据,得到 DataFrame。

    1. // 读取 Nebula Graph 的点数据
    2. val vertexDataset: Dataset[Row] =
    3. sparkSession.read
    4. .nebula("127.0.0.1:45500", "spaceName", "100")
    5. .loadVerticesToDF("tag", "*")
    6. vertexDataset.show()
    7. // 读取 Nebula Graph 的边数据
    8. val edgeDataset: Dataset[Row] =
    9. sparkSession.read
    10. .nebula("127.0.0.1:45500", "spaceName", "100")
    11. .loadEdgesToDF("edge", "field1,field2")
    12. edgeDataset.show()

    其中配置说明如下:

    - nebula(<address: String>, <space: String>, <partitionNum: String>),所有参数均为必需参数。

    1. - `<address: String>`:配置为 Nebula Graph 数据库 metad 服务所在的服务器地址及端口,如果有多个 metad 服务复本,则配置为多个地址,以英文逗号分隔,例如 `"ip1:45500,ip2:45500"`。默认端口号为 45500
    2. - `<space: String>`: 配置为 Nebula Graph 的图空间名称。
    3. - `<partitionNum: String>`:设置 Spark 的分区数量。建议设置为 Nebula Graph 中创建图空间时指定的 `partitionNum`,以确保一个 Spark 分区读取 Nebula Graph 图空间中一个分区的数据。如果您在创建 Nebula Graph 图空间时未指定分区数量,则使用默认值 100

    - loadVerticesToDF(<tag: String>, <fields: String>),所有参数均为必需参数。

    1. - `<tag: String>`:配置为指定 Nebula Graph 图空间中某个标签的名称。
    2. - `<fields: String>`:配置为指定标签的属性名称,不允许为空。如果一个标签有多个属性,则以英文逗号分隔。如果指定了属性名称,表示只读取指定的属性。如果配置为 `*`,表示读取指定标签的所有属性。

    - loadEdgesToDF(<edge: String>, <fields: String>),所有参数均为必需参数。

    1. - `<edge: String>`:配置为指定 Nebula Graph 图空间中某个边类型的名称。
    2. - `<fields: String>`:配置为指定边类型的属性名称,不允许为空。如果一个边类型有多个属性,则以英文逗号分隔。如果指定了属性名称,表示只读取指定的属性,如果配置为 `*` 表示读取指定边类型的所有属性。

以下为读取结果示例。

  • 读取点数据

    1. 20/10/27 08:51:04 INFO DAGScheduler: Job 0 finished: show at Main.scala:61, took 1.873141 s
    2. +---------+----------+---+
    3. |_vertexId| name|age|
    4. +---------+----------+---+
    5. | 0| Tom55322| 19|
    6. | 84541440|Tom4152378| 27|
    7. | 67829760| Tom24006| 10|
    8. | 51118080| Tom84165| 62|
    9. | 34406400| Tom17308| 1|
    10. | 17694720| Tom73089| 56|
    11. | 983040| Tom82311| 95|
    12. | 68812800| Tom61046| 93|
    13. | 52101120| Tom52116| 45|
    14. | 18677760| Tom4773| 18|
    15. | 1966080| Tom25979| 20|
    16. | 69795840| Tom92575| 9|
    17. | 53084160| Tom48645| 29|
    18. | 36372480| Tom20594| 86|
    19. | 19660800| Tom27071| 32|
    20. | 2949120| Tom630| 61|
    21. | 70778880| Tom82319| 78|
    22. | 37355520| Tom38207| 31|
    23. | 20643840| Tom56158| 73|
    24. | 3932160| Tom36933| 59|
    25. +---------+----------+---+
    26. only showing top 20 rows
  • 读取边数据

    1. 20/10/27 08:56:57 INFO DAGScheduler: Job 4 finished: show at Main.scala:71, took 0.085975 s
    2. +------+------+----------+--------+
    3. |_srcId|_dstId|start_year|end_year|
    4. +------+------+----------+--------+
    5. | 101| 201| 2002| 2020|
    6. | 102| 201| 2002| 2015|
    7. +------+------+----------+--------+