Nebula Spark Connector

Nebula Spark Connector是一个Spark连接器,提供通过Spark标准形式读写Nebula Graph数据的能力。Nebula Spark Connector由Reader和Writer两部分组成。

  • Reader

    提供一个Spark SQL接口,用户可以使用该接口编程读取Nebula Graph图数据,单次读取一个点或Edge type的数据,并将读取的结果组装成Spark的DataFrame。

  • Writer

    提供一个Spark SQL接口,用户可以使用该接口编程将DataFrame格式的数据逐条或批量写入Nebula Graph。

更多使用说明请参见Nebula Spark Connector

适用场景

Nebula Spark Connector适用于以下场景:

  • 在不同的Nebula Graph集群之间迁移数据。

  • 在同一个Nebula Graph集群内不同图空间之间迁移数据。

  • Nebula Graph与其他数据源之间迁移数据。

  • 结合Nebula Algorithm进行图计算。

特性

Nebula Spark Connector 2.5.0版本特性如下:

  • 提供多种连接配置项,如超时时间、连接重试次数、执行重试次数等。

  • 提供多种数据配置项,如写入数据时设置对应列为点ID、起始点ID、目的点ID或属性。

  • Reader支持无属性读取和全属性读取。

  • Reader支持将Nebula Graph数据读取成Graphx的VertexRDD和EdgeRDD,支持非Long型点ID。

  • 统一了SparkSQL的扩展数据源,统一采用DataSourceV2进行Nebula Graph数据扩展。

  • 支持insertupdate两种写入模式。insert模式会插入(覆盖)数据,update模式仅会更新已存在的数据。

获取Nebula Spark Connector

编译打包

  1. 克隆仓库nebula-spark-utils

    1. $ git clone -b v2.5 https://github.com/vesoft-inc/nebula-spark-utils.git
  2. 进入目录nebula-spark-connector

    1. $ cd nebula-spark-utils/nebula-spark-connector
  3. 编译打包。

    1. $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true

编译完成后,在目录nebula-spark-connector/target下生成类似文件nebula-spark-connector-2.5.0-SHANPSHOT.jar

Maven远程仓库下载

下载地址

使用方法

使用Nebula Spark Connector读写Nebula Graph数据库时,只需要编写以下代码即可实现。

  1. # 从Nebula Graph读取点边数据。
  2. spark.read.nebula().loadVerticesToDF()
  3. spark.read.nebula().loadEdgesToDF()
  4. # 将dataframe数据作为点和边写入Nebula Graph中。
  5. dataframe.write.nebula().writeVertices()
  6. dataframe.write.nebula().writeEdges()

nebula()接收两个配置参数,包括连接配置和读写配置。

从Nebula Graph读取数据

  1. val config = NebulaConnectionConfig
  2. .builder()
  3. .withMetaAddress("127.0.0.1:9559")
  4. .withConenctionRetry(2)
  5. .withExecuteRetry(2)
  6. .withTimeout(6000)
  7. .build()
  8. val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
  9. .builder()
  10. .withSpace("test")
  11. .withLabel("person")
  12. .withNoColumn(false)
  13. .withReturnCols(List("birthday"))
  14. .withLimit(10)
  15. .withPartitionNum(10)
  16. .build()
  17. val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
  18. val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
  19. .builder()
  20. .withSpace("test")
  21. .withLabel("knows")
  22. .withNoColumn(false)
  23. .withReturnCols(List("degree"))
  24. .withLimit(10)
  25. .withPartitionNum(10)
  26. .build()
  27. val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
  • NebulaConnectionConfig是连接Nebula Graph的配置,说明如下。

    参数是否必须说明
    withMetaAddress所有Meta服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,…。读取数据不需要配置withGraphAddress
    withConnectionRetryNebula Java Client连接Nebula Graph的重试次数。默认值为1
    withExecuteRetryNebula Java Client执行查询语句的重试次数。默认值为1
    withTimeoutNebula Java Client请求响应的超时时间。默认值为6000,单位:毫秒(ms)。
  • ReadNebulaConfig是读取Nebula Graph数据的配置,说明如下。

    参数是否必须说明
    withSpaceNebula Graph图空间名称。
    withLabelNebula Graph图空间内的Tag或Edge type名称。
    withNoColumn是否不读取属性。默认值为false,表示读取属性。取值为true时,表示不读取属性,此时withReturnCols配置无效。
    withReturnCols配置要读取的点或边的属性集。格式为List(property1,property2,…),默认值为List(),表示读取全部属性。
    withLimit配置Nebula Java Storage Client一次从服务端读取的数据行数。默认值为1000。
    withPartitionNum配置读取Nebula Graph数据时Spark的分区数。默认值为100。该值的配置最好不超过图空间的的分片数量(partition_num)。

向Nebula Graph写入数据

  1. val config = NebulaConnectionConfig
  2. .builder()
  3. .withMetaAddress("127.0.0.1:9559")
  4. .withGraphAddress("127.0.0.1:9669")
  5. .withConenctionRetry(2)
  6. .build()
  7. val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
  8. .builder()
  9. .withSpace("test")
  10. .withTag("person")
  11. .withVidField("id")
  12. .withVidPolicy("hash")
  13. .withVidAsProp(true)
  14. .withUser("root")
  15. .withPasswd("nebula")
  16. .withBatch(1000)
  17. .build()
  18. df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
  19. val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
  20. .builder()
  21. .withSpace("test")
  22. .withEdge("friend")
  23. .withSrcIdField("src")
  24. .withSrcPolicy(null)
  25. .withDstIdField("dst")
  26. .withDstPolicy(null)
  27. .withRankField("degree")
  28. .withSrcAsProperty(true)
  29. .withDstAsProperty(true)
  30. .withRankAsProperty(true)
  31. .withUser("root")
  32. .withPasswd("nebula")
  33. .withBatch(1000)
  34. .build()
  35. df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()

默认写入模式为insert,可以通过withWriteMode配置修改为update

  1. val config = NebulaConnectionConfig
  2. .builder()
  3. .withMetaAddress("127.0.0.1:9559")
  4. .withGraphAddress("127.0.0.1:9669")
  5. .build()
  6. val nebulaWriteVertexConfig = WriteNebulaVertexConfig
  7. .builder()
  8. .withSpace("test")
  9. .withTag("person")
  10. .withVidField("id")
  11. .withVidAsProp(true)
  12. .withBatch(1000)
  13. .withWriteMode(WriteMode.UPDATE)
  14. .build()
  15. df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
  • NebulaConnectionConfig是连接Nebula Graph的配置,说明如下。

    参数是否必须说明
    withMetaAddress所有Meta服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,…
    withGraphAddressGraph服务的地址,多个地址用英文逗号(,)隔开,格式为ip1:port1,ip2:port2,…
    withConnectionRetryNebula Java Client连接Nebula Graph的重试次数。默认值为1
  • WriteNebulaVertexConfig是写入点的配置,说明如下。

    参数是否必须说明
    withSpaceNebula Graph图空间名称。
    withTag写入点时需要关联的Tag名称。
    withVidFieldDataFrame中作为点ID的列。
    withVidPolicy写入点ID时,采用的映射函数,Nebula Graph 2.x仅支持HASH。默认不做映射。
    withVidAsPropDataFrame中作为点ID的列是否也作为属性写入。默认值为false。如果配置为true,请确保Tag中有和VidField相同的属性名。
    withUserNebula Graph用户名。若未开启身份验证,无需配置用户名和密码。
    withPasswdNebula Graph用户名对应的密码。
    withBatch一次写入的数据行数。默认值为1000.
    withWriteMode写入模式。可选值为insertupdate。默认为insert
  • WriteNebulaEdgeConfig是写入边的配置,说明如下。

    参数是否必须说明
    withSpaceNebula Graph图空间名称。
    withEdge写入边时需要关联的Edge type名称。
    withSrcIdFieldDataFrame中作为起始点的列。
    withSrcPolicy写入起始点时,采用的映射函数,Nebula Graph 2.x仅支持HASH。默认不做映射。
    withDstIdFieldDataFrame中作为目的点的列。
    withDstPolicy写入目的点时,采用的映射函数,Nebula Graph 2.x仅支持HASH。默认不做映射。
    withRankFieldDataFrame中作为rank的列。默认不写入rank。
    withSrcAsPropertyDataFrame中作为起始点的列是否也作为属性写入。默认值为false。如果配置为true,请确保Edge type中有和SrcIdField相同的属性名。
    withDstAsPropertyDataFrame中作为目的点的列是否也作为属性写入。默认值为false。如果配置为true,请确保Edge type中有和DstIdField相同的属性名。
    withRankAsPropertyDataFrame中作为rank的列是否也作为属性写入。默认值为false。如果配置为true,请确保Edge type中有和RankField相同的属性名。
    withUserNebula Graph用户名。若未开启身份验证,无需配置用户名和密码。
    withPasswdNebula Graph用户名对应的密码。
    withBatch一次写入的数据行数。默认值为1000.
    withWriteMode写入模式。可选值为insertupdate。默认为insert