结构操作

  当前的GraphX仅仅支持一组简单的常用结构性操作。下面是基本的结构性操作列表。

  1. class Graph[VD, ED] {
  2. def reverse: Graph[VD, ED]
  3. def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
  4. vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  5. def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  6. def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
  7. }

  下面分别介绍这四种函数的原理。

1 reverse

  reverse操作返回一个新的图,这个图的边的方向都是反转的。例如,这个操作可以用来计算反转的PageRank。因为反转操作没有修改顶点或者边的属性或者改变边的数量,所以我们可以
在不移动或者复制数据的情况下有效地实现它。

  1. override def reverse: Graph[VD, ED] = {
  2. new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
  3. }
  4. def reverse(): ReplicatedVertexView[VD, ED] = {
  5. val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
  6. new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
  7. }
  8. //EdgePartition中的reverse
  9. def reverse: EdgePartition[ED, VD] = {
  10. val builder = new ExistingEdgePartitionBuilder[ED, VD](
  11. global2local, local2global, vertexAttrs, activeSet, size)
  12. var i = 0
  13. while (i < size) {
  14. val localSrcId = localSrcIds(i)
  15. val localDstId = localDstIds(i)
  16. val srcId = local2global(localSrcId)
  17. val dstId = local2global(localDstId)
  18. val attr = data(i)
  19. //将源顶点和目标顶点换位置
  20. builder.add(dstId, srcId, localDstId, localSrcId, attr)
  21. i += 1
  22. }
  23. builder.toEdgePartition
  24. }

2 subgraph

  subgraph操作利用顶点和边的判断式(predicates),返回的图仅仅包含满足顶点判断式的顶点、满足边判断式的边以及满足顶点判断式的triplesubgraph操作可以用于很多场景,如获取
感兴趣的顶点和边组成的图或者获取清除断开连接后的图。

  1. override def subgraph(
  2. epred: EdgeTriplet[VD, ED] => Boolean = x => true,
  3. vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
  4. vertices.cache()
  5. // 过滤vertices, 重用partitioner和索引
  6. val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
  7. // 过滤 triplets
  8. replicatedVertexView.upgrade(vertices, true, true)
  9. val newEdges = replicatedVertexView.edges.filter(epred, vpred)
  10. new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
  11. }

  该代码显示,subgraph方法的实现分两步:先过滤VertexRDD,然后再过滤EdgeRDD。如上,过滤VertexRDD比较简单,我们重点看过滤EdgeRDD的过程。

  1. def filter(
  2. epred: EdgeTriplet[VD, ED] => Boolean,
  3. vpred: (VertexId, VD) => Boolean): EdgeRDDImpl[ED, VD] = {
  4. mapEdgePartitions((pid, part) => part.filter(epred, vpred))
  5. }
  6. //EdgePartition中的filter方法
  7. def filter(
  8. epred: EdgeTriplet[VD, ED] => Boolean,
  9. vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
  10. val builder = new ExistingEdgePartitionBuilder[ED, VD](
  11. global2local, local2global, vertexAttrs, activeSet)
  12. var i = 0
  13. while (i < size) {
  14. // The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
  15. val localSrcId = localSrcIds(i)
  16. val localDstId = localDstIds(i)
  17. val et = new EdgeTriplet[VD, ED]
  18. et.srcId = local2global(localSrcId)
  19. et.dstId = local2global(localDstId)
  20. et.srcAttr = vertexAttrs(localSrcId)
  21. et.dstAttr = vertexAttrs(localDstId)
  22. et.attr = data(i)
  23. if (vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)) {
  24. builder.add(et.srcId, et.dstId, localSrcId, localDstId, et.attr)
  25. }
  26. i += 1
  27. }
  28. builder.toEdgePartition
  29. }

  因为用户可以看到EdgeTriplet的信息,所以我们不能重用EdgeTriplet,需要重新创建一个,然后在用epred函数处理。这里localSrcIds,localDstIds,local2global等前文均有介绍,在此不再赘述。

3 mask

  mask操作构造一个子图,这个子图包含输入图中包含的顶点和边。它的实现很简单,顶点和边均做inner join操作即可。这个操作可以和subgraph操作相结合,基于另外一个相关图的特征去约束一个图。

  1. override def mask[VD2: ClassTag, ED2: ClassTag] (
  2. other: Graph[VD2, ED2]): Graph[VD, ED] = {
  3. val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
  4. val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v }
  5. new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
  6. }

4 groupEdges

  groupEdges操作合并多重图中的并行边(如顶点对之间重复的边)。在大量的应用程序中,并行的边可以合并(它们的权重合并)为一条边从而降低图的大小。

  1. override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
  2. val newEdges = replicatedVertexView.edges.mapEdgePartitions(
  3. (pid, part) => part.groupEdges(merge))
  4. new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
  5. }
  6. def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
  7. val builder = new ExistingEdgePartitionBuilder[ED, VD](
  8. global2local, local2global, vertexAttrs, activeSet)
  9. var currSrcId: VertexId = null.asInstanceOf[VertexId]
  10. var currDstId: VertexId = null.asInstanceOf[VertexId]
  11. var currLocalSrcId = -1
  12. var currLocalDstId = -1
  13. var currAttr: ED = null.asInstanceOf[ED]
  14. // 迭代处理所有的边
  15. var i = 0
  16. while (i < size) {
  17. //如果源顶点和目的顶点都相同
  18. if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
  19. // 合并属性
  20. currAttr = merge(currAttr, data(i))
  21. } else {
  22. // This edge starts a new run of edges
  23. if (i > 0) {
  24. // 添加到builder中
  25. builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
  26. }
  27. // Then start accumulating for a new run
  28. currSrcId = srcIds(i)
  29. currDstId = dstIds(i)
  30. currLocalSrcId = localSrcIds(i)
  31. currLocalDstId = localDstIds(i)
  32. currAttr = data(i)
  33. }
  34. i += 1
  35. }
  36. if (size > 0) {
  37. builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
  38. }
  39. builder.toEdgePartition
  40. }

  在图构建那章我们说明过,存储的边按照源顶点id排过序,所以上面的代码可以通过一次迭代完成对所有相同边的处理。

5 应用举例

  1. // Create an RDD for the vertices
  2. val users: RDD[(VertexId, (String, String))] =
  3. sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
  4. (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
  5. (4L, ("peter", "student"))))
  6. // Create an RDD for edges
  7. val relationships: RDD[Edge[String]] =
  8. sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
  9. Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
  10. Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
  11. // Define a default user in case there are relationship with missing user
  12. val defaultUser = ("John Doe", "Missing")
  13. // Build the initial Graph
  14. val graph = Graph(users, relationships, defaultUser)
  15. // Notice that there is a user 0 (for which we have no information) connected to users
  16. // 4 (peter) and 5 (franklin).
  17. graph.triplets.map(
  18. triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  19. ).collect.foreach(println(_))
  20. // Remove missing vertices as well as the edges to connected to them
  21. val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
  22. // The valid subgraph will disconnect users 4 and 5 by removing user 0
  23. validGraph.vertices.collect.foreach(println(_))
  24. validGraph.triplets.map(
  25. triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  26. ).collect.foreach(println(_))
  27. / Run Connected Components
  28. val ccGraph = graph.connectedComponents() // No longer contains missing field
  29. // Remove missing vertices as well as the edges to connected to them
  30. val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
  31. // Restrict the answer to the valid subgraph
  32. val validCCGraph = ccGraph.mask(validGraph)

6 参考文献

【1】spark源码