ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
* Pregel是Google提出的用于大规模分布式图计算框架。可用于: * 图遍历(BFS)。 * 单源最短路径(SSSP)。 * PageRank计算。 * Pregel的计算由一系列迭代组成,称为supersteps。 * Pregel迭代过程。 (1)每个顶点从上一个superstep接收入站消息。 (2)计算顶点新的属性值。 (3)在下一个superstep中向相邻的顶点发送消息。 (4)当没有剩余消息时,迭代结束。 求下图中顶点的最小值。 ![](https://img.kancloud.cn/e7/ac/e7ac0c8448e4d4f91255760e59f47868_890x260.png) Pregel的计算过程如下: ![](https://img.kancloud.cn/0c/16/0c16b3d6b8591b6fed8e85abc13f3f88_1303x1091.png) Graphx提供的Pregel API如下: ```scala class GraphOps[VD, ED](graph : org.apache.spark.graphx.Graph[VD, ED]){ def pregel[A]( initialMsg : A, // 在superstep 0之前发送至顶点的初始消息 maxIterations : Int, // 将要执行的最大迭代次数 activeDirection : EdgeDirection // 发送消息方向(默认是出边方向:EdgeDirection.Out) ) ( vprog : Function3, // 用户定义函数,用于顶点接收消息 sendMsg : Function1, // 用户定义的函数,用于确定下一个迭代发送的消息及发往何处 mergeMsg : Function2[A, A, A] // 用户定义的函数,在vprog前,合并到达顶点的多个消息 ){ } } ``` Pregel求下图的最小值。 ![](https://img.kancloud.cn/e7/ac/e7ac0c8448e4d4f91255760e59f47868_890x260.png) ```scala import org.apache.spark.SparkContext import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object Pregel { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc: SparkContext = spark.sparkContext import spark.implicits._ // 1. 构建顶点的RDD val verts: RDD[(Long, (Int, Int))] = sc.parallelize(Array( (1L, (7, -1)), (2L, (3, -1)), (3L, (2, -1)), (4L, (6, -1)) )) // 2. 构建边的RDD val edges: RDD[Edge[Boolean]] = sc.parallelize(Array( Edge(1L, 2L, true), Edge(1L, 4L, true), Edge(2L, 4L, true), Edge(3L, 1L, true), Edge(3L, 4L, true) )) // 3. 构建图 val graph: Graph[(Int, Int), Boolean] = Graph(verts, edges) graph.triplets.foreach(println) // ((1,(7,-1)),(4,(6,-1)),true) // ((3,(2,-1)),(1,(7,-1)),true) // ((1,(7,-1)),(2,(3,-1)),true) // ((2,(3,-1)),(4,(6,-1)),true) // ((3,(2,-1)),(4,(6,-1)),true) // initialMsg 在superstep 0之前发送至顶点的初始消息 // maxIterations 将要执行的最大迭代次数 // activeDirection 发送消息方向(默认是出边方向:EdgeDirection.Out) // vprog 用户定义函数,用于顶点接收消息 // sendMsg 用户定义的函数,用于确定下一个迭代发送的消息及发往何处 // mergeMsg 用户定义的函数,在vprog前,合并到达顶点的多个消息 val initialMsg: Int = 9999 val maxIterations: Int = Int.MaxValue // vertexId: 当前顶点的Id // value: 当前顶点的attr // message: 当前顶点要接收的消息 def vprog(vertexId: VertexId, value: (Int, Int), message: Int) = { if (message == initialMsg) value else (message min value._1, value._1) } def sendMsg(triple: EdgeTriplet[(Int, Int), Boolean]) = { val sourceVertex: (Int, Int) = triple.srcAttr if (sourceVertex._1 == sourceVertex._2) Iterator.empty else Iterator((triple.dstId, sourceVertex._1)) } def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2 graph.pregel(initialMsg, maxIterations, EdgeDirection.Out)(vprog, sendMsg, mergeMsg) .vertices.foreach(println) // (1,(2,7)) // (2,(2,3)) // (3,(2,-1)) // (4,(2,2)) } } ```