广度优先遍历

  1. val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")
  2. val root: VertexId = 1
  3. val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
  4. Double.PositiveInfinity)
  5. val vprog = { (id: VertexId, attr: Double, msg: Double) => math.min(attr,msg) }
  6. val sendMessage = { (triplet: EdgeTriplet[Double, Int]) =>
  7. var iter:Iterator[(VertexId, Double)] = Iterator.empty
  8. val isSrcMarked = triplet.srcAttr != Double.PositiveInfinity
  9. val isDstMarked = triplet.dstAttr != Double.PositiveInfinity
  10. if(!(isSrcMarked && isDstMarked)){
  11. if(isSrcMarked){
  12. iter = Iterator((triplet.dstId,triplet.srcAttr+1))
  13. }else{
  14. iter = Iterator((triplet.srcId,triplet.dstAttr+1))
  15. }
  16. }
  17. iter
  18. }
  19. val reduceMessage = { (a: Double, b: Double) => math.min(a,b) }
  20. val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)(vprog, sendMessage, reduceMessage)
  21. println(bfs.vertices.collect.mkString("\n"))