单源最短路径

  1. import scala.reflect.ClassTag
  2. import org.apache.spark.graphx._
  3. /**
  4. * Computes shortest paths to the given set of landmark vertices, returning a graph where each
  5. * vertex attribute is a map containing the shortest-path distance to each reachable landmark.
  6. */
  7. object ShortestPaths {
  8. /** Stores a map from the vertex id of a landmark to the distance to that landmark. */
  9. type SPMap = Map[VertexId, Int]
  10. private def makeMap(x: (VertexId, Int)*) = Map(x: _*)
  11. private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
  12. private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap =
  13. (spmap1.keySet ++ spmap2.keySet).map {
  14. k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
  15. }.toMap
  16. /**
  17. * Computes shortest paths to the given set of landmark vertices.
  18. *
  19. * @tparam ED the edge attribute type (not used in the computation)
  20. *
  21. * @param graph the graph for which to compute the shortest paths
  22. * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each
  23. * landmark.
  24. *
  25. * @return a graph where each vertex attribute is a map containing the shortest-path distance to
  26. * each reachable landmark vertex.
  27. */
  28. def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
  29. val spGraph = graph.mapVertices { (vid, attr) =>
  30. if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
  31. }
  32. val initialMessage = makeMap()
  33. def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
  34. addMaps(attr, msg)
  35. }
  36. def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
  37. val newAttr = incrementMap(edge.dstAttr)
  38. if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
  39. else Iterator.empty
  40. }
  41. Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
  42. }
  43. }