Nebula Spark Connector Reader 应用示例
本文以一个示例说明如何使用 Nebula Spark Connector Reader 读取 Nebula Graph 的点和边数据。
前提条件
使用 Nebula Spark Connector Reader 前,您需要确认以下信息:
您的机器上已经安装了以下软件:
- Apache Spark™ 2.3.0 及更高版本
- Scala
- Java:1.8
已经成功编译 Nebula Spark Connector Reader,并已经将
nebula-spark-1.x.y.jar
复制到本地 Maven 库。详细信息参考 编译 Nebula Spark Connector已经获取 Nebula Graph 数据库的以下信息:
- 图空间名称和分区数量(如果创建图空间时未设置分区数量,则默认使用 100)
- 标签和边类型的名称以及属性
- Meta 服务所在机器的 IP 地址及端口号
操作步骤
参考以下步骤使用 Nebula Spark Connector Reader:
在 Maven 项目的
pom.xml
文件中加入nebula-spark
依赖。<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-spark</artifactId>
<version>1.x.y</version>
</dependency>
说明:
<version>
建议配置为最新发布的 Nebula Java Client 版本号。您可以在 nebula-java 仓库的 Releases 页面 查看最新的 v1.x 版本。
构建
SparkSession
类。这是 Spark SQL 的编码入口。val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val sparkSession = SparkSession
.builder()
.config(sparkConf)
.master("local")
.getOrCreate()
其中,关于
.master()
的设置,参考 Spark 配置的 Master URLs。按以下说明修改配置,利用 Spark 读取 Nebula Graph 的点或者边数据,得到 DataFrame。
// 读取 Nebula Graph 的点数据
val vertexDataset: Dataset[Row] =
sparkSession.read
.nebula("127.0.0.1:45500", "spaceName", "100")
.loadVerticesToDF("tag", "*")
vertexDataset.show()
// 读取 Nebula Graph 的边数据
val edgeDataset: Dataset[Row] =
sparkSession.read
.nebula("127.0.0.1:45500", "spaceName", "100")
.loadEdgesToDF("edge", "field1,field2")
edgeDataset.show()
其中配置说明如下:
-
nebula(<address: String>, <space: String>, <partitionNum: String>)
,所有参数均为必需参数。- `<address: String>`:配置为 Nebula Graph 数据库 metad 服务所在的服务器地址及端口,如果有多个 metad 服务复本,则配置为多个地址,以英文逗号分隔,例如 `"ip1:45500,ip2:45500"`。默认端口号为 45500。
- `<space: String>`: 配置为 Nebula Graph 的图空间名称。
- `<partitionNum: String>`:设置 Spark 的分区数量。建议设置为 Nebula Graph 中创建图空间时指定的 `partitionNum`,以确保一个 Spark 分区读取 Nebula Graph 图空间中一个分区的数据。如果您在创建 Nebula Graph 图空间时未指定分区数量,则使用默认值 100。
-
loadVerticesToDF(<tag: String>, <fields: String>)
,所有参数均为必需参数。- `<tag: String>`:配置为指定 Nebula Graph 图空间中某个标签的名称。
- `<fields: String>`:配置为指定标签的属性名称,不允许为空。如果一个标签有多个属性,则以英文逗号分隔。如果指定了属性名称,表示只读取指定的属性。如果配置为 `*`,表示读取指定标签的所有属性。
-
loadEdgesToDF(<edge: String>, <fields: String>)
,所有参数均为必需参数。- `<edge: String>`:配置为指定 Nebula Graph 图空间中某个边类型的名称。
- `<fields: String>`:配置为指定边类型的属性名称,不允许为空。如果一个边类型有多个属性,则以英文逗号分隔。如果指定了属性名称,表示只读取指定的属性,如果配置为 `*` 表示读取指定边类型的所有属性。
以下为读取结果示例。
读取点数据
20/10/27 08:51:04 INFO DAGScheduler: Job 0 finished: show at Main.scala:61, took 1.873141 s
+---------+----------+---+
|_vertexId| name|age|
+---------+----------+---+
| 0| Tom55322| 19|
| 84541440|Tom4152378| 27|
| 67829760| Tom24006| 10|
| 51118080| Tom84165| 62|
| 34406400| Tom17308| 1|
| 17694720| Tom73089| 56|
| 983040| Tom82311| 95|
| 68812800| Tom61046| 93|
| 52101120| Tom52116| 45|
| 18677760| Tom4773| 18|
| 1966080| Tom25979| 20|
| 69795840| Tom92575| 9|
| 53084160| Tom48645| 29|
| 36372480| Tom20594| 86|
| 19660800| Tom27071| 32|
| 2949120| Tom630| 61|
| 70778880| Tom82319| 78|
| 37355520| Tom38207| 31|
| 20643840| Tom56158| 73|
| 3932160| Tom36933| 59|
+---------+----------+---+
only showing top 20 rows
读取边数据
20/10/27 08:56:57 INFO DAGScheduler: Job 4 finished: show at Main.scala:71, took 0.085975 s
+------+------+----------+--------+
|_srcId|_dstId|start_year|end_year|
+------+------+----------+--------+
| 101| 201| 2002| 2020|
| 102| 201| 2002| 2015|
+------+------+----------+--------+