Pregel API

  图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。所以许多重要的图算法都是迭代的重新计算每个顶点的属性,直到满足某个确定的条件。
一系列的图并发(graph-parallel)抽象已经被提出来用来表达这些迭代算法。GraphX公开了一个类似Pregel的操作,它是广泛使用的PregelGraphLab抽象的一个融合。

  GraphX中实现的这个更高级的Pregel操作是一个约束到图拓扑的批量同步(bulk-synchronous)并行消息抽象。Pregel操作者执行一系列的超步(super steps),在这些步骤中,顶点从
之前的超步中接收进入(inbound)消息的总和,为顶点属性计算一个新的值,然后在以后的超步中发送消息到邻居顶点。不像Pregel而更像GraphLab,消息通过边triplet的一个函数被并行计算,
消息的计算既会访问源顶点特征也会访问目的顶点特征。在超步中,没有收到消息的顶点会被跳过。当没有消息遗留时,Pregel操作停止迭代并返回最终的图。

  注意,与标准的Pregel实现不同的是,GraphX中的顶点仅仅能发送信息给邻居顶点,并且可以利用用户自定义的消息函数并行地构造消息。这些限制允许对GraphX进行额外的优化。

  下面的代码是pregel的具体实现。

  1. def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
  2. (graph: Graph[VD, ED],
  3. initialMsg: A,
  4. maxIterations: Int = Int.MaxValue,
  5. activeDirection: EdgeDirection = EdgeDirection.Either)
  6. (vprog: (VertexId, VD, A) => VD,
  7. sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
  8. mergeMsg: (A, A) => A)
  9. : Graph[VD, ED] =
  10. {
  11. var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
  12. // 计算消息
  13. var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
  14. var activeMessages = messages.count()
  15. // 迭代
  16. var prevG: Graph[VD, ED] = null
  17. var i = 0
  18. while (activeMessages > 0 && i < maxIterations) {
  19. // 接收消息并更新顶点
  20. prevG = g
  21. g = g.joinVertices(messages)(vprog).cache()
  22. val oldMessages = messages
  23. // 发送新消息
  24. messages = g.mapReduceTriplets(
  25. sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
  26. activeMessages = messages.count()
  27. i += 1
  28. }
  29. g
  30. }

1 pregel计算模型

  Pregel计算模型中有三个重要的函数,分别是vertexProgramsendMessagemessageCombiner

  • vertexProgram:用户定义的顶点运行程序。它作用于每一个顶点,负责接收进来的信息,并计算新的顶点值。

  • sendMsg:发送消息

  • mergeMsg:合并消息

  我们具体分析它的实现。根据代码可以知道,这个实现是一个迭代的过程。在开始迭代之前,先完成一些初始化操作:

  1. var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
  2. // 计算消息
  3. var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
  4. var activeMessages = messages.count()

  程序首先用vprog函数处理图中所有的顶点,生成新的图。然后用生成的图调用聚合操作(mapReduceTriplets,实际的实现是我们前面章节讲到的aggregateMessagesWithActiveSet函数)获取聚合后的消息。
activeMessagesmessages这个VertexRDD中的顶点数。

  下面就开始迭代操作了。在迭代内部,分为二步。

  • 1 接收消息,并更新顶点
  1. g = g.joinVertices(messages)(vprog).cache()
  2. //joinVertices的定义
  3. def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
  4. : Graph[VD, ED] = {
  5. val uf = (id: VertexId, data: VD, o: Option[U]) => {
  6. o match {
  7. case Some(u) => mapFunc(id, data, u)
  8. case None => data
  9. }
  10. }
  11. graph.outerJoinVertices(table)(uf)
  12. }

  这一步实际上是使用outerJoinVertices来更新顶点属性。outerJoinVertices关联操作中有详细介绍。

  • 2 发送新消息
  1. messages = g.mapReduceTriplets(
  2. sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()

  注意,在上面的代码中,mapReduceTriplets多了一个参数Some((oldMessages, activeDirection))。这个参数的作用是:它使我们在发送新的消息时,会忽略掉那些两端都没有接收到消息的边,减少计算量。

2 pregel实现最短路径

  1. import org.apache.spark.graphx._
  2. import org.apache.spark.graphx.util.GraphGenerators
  3. val graph: Graph[Long, Double] =
  4. GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
  5. val sourceId: VertexId = 42 // The ultimate source
  6. // 初始化图
  7. val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
  8. val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  9. (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  10. triplet => { // Send Message
  11. if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
  12. Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
  13. } else {
  14. Iterator.empty
  15. }
  16. },
  17. (a,b) => math.min(a,b) // Merge Message
  18. )
  19. println(sssp.vertices.collect.mkString("\n"))

  上面的例子中,Vertex Program函数定义如下:

  1. (id, dist, newDist) => math.min(dist, newDist)

  这个函数的定义显而易见,当两个消息来的时候,取它们当中路径的最小值。同理Merge Message函数也是同样的含义。

  Send Message函数中,会首先比较triplet.srcAttr + triplet.attrtriplet.dstAttr,即比较加上边的属性后,这个值是否小于目的节点的属性,如果小于,则发送消息到目的顶点。

3 参考文献

【1】spark源码