乐趣区

关于spark:SparkGraphX编程指南

Spark 系列面试题

  • Spark 面试题(一)
  • Spark 面试题(二)
  • Spark 面试题(三)
  • Spark 面试题(四)
  • Spark 面试题(五)——数据歪斜调优
  • Spark 面试题(六)——Spark 资源调优
  • Spark 面试题(七)——Spark 程序开发调优
  • Spark 面试题(八)——Spark 的 Shuffle 配置调优

GraphX 是新的图形和图像并行计算的 Spark API。从整顿上看,GraphX 通过引入 弹性分布式属性图 (Resilient Distributed Property Graph) 继承了 Spark RDD:一个将无效信息放在顶点和边的有向多重图。为了反对图形计算,GraphX 公开了一组根本的运算(例如,subgraph,joinVertices 和 mapReduceTriplets),以及在一个优化后的 PregelAPI 的变形。此外,GraphX 包含越来越多的图算法和 builder 结构器,以简化图形剖析工作。

图并行计算的背景

从社交网络到语言建模,日益扩充的规模和图形数据的重要性已带动许多新的图像并行零碎(例如,Giraph 和 GraphLab)。通过限度可示意计算的类型以及引入新的技术来划分和分布图,这些零碎比个别的数据并行零碎在执行简单图形算法方面有大幅度地进步。

然而,这些限度在取得重大性能晋升的同时,也使其难以表白一个典型的图表剖析流程中的许多重要阶段:结构图,批改它的构造或表白计算逾越多重图的计算。此外,如何对待数据取决于咱们的指标,雷同的原始数据,可能有许多不同的表 (table) 和图表视图(graph views)。

因而,可能在同一组物理数据的表和图表视图之间切换是很有必要的,并利用各视图的属性,以不便地和无效地表白计算。然而,现有的图形剖析管道必须由图并行和数据并行零碎组成,从而导致大量的数据挪动和反复以及简单的编程模型。

该 GraphX 我的项目的指标是建设一个零碎,建设一个对立的图和数据并行计算的 API。该 GraphX API 使用户可能将数据既能够当作一个图,也能够当作汇合(即 RDDS)而不必进行数据挪动或数据复制。通过引入在图并行零碎中的最新进展,GraphX 可能优化图形操作的执行。

GraphX 替换 Spark Bagel 的 API

在 GraphX 的公布之前,Spark 的图计算是通过 Bagel 实现的,后者是 Pregel 的一个具体实现。GraphX 提供了更丰盛的图属性 API,从而加强了 Bagel。从而达到一个更加精简的 Pregel 形象,系统优化,性能晋升以及缩小内存开销。尽管咱们打算最终弃用 Bagel,咱们将持续反对 Bagel 的 API 和 Bagel 编程指南。不过,咱们激励 Bagel 用户,摸索新的 GraphXAPI,并就从 Bagel 降级中遇到的阻碍反馈给咱们。

入门

首先,你要导入 Spark 和 GraphX 到你的我的项目,如下所示:

import org.apache.spark._
import  org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import  org.apache.spark.rdd.RDD

如果你不应用 Spark shell,你还须要一个 SparkContext。要理解更多无关如何开始应用 Spark 参考 Spark 疾速入门指南。

属性图

该 属性图是一个用户定义的顶点和边的有向多重图。有向多重图是一个有向图,它可能有多个平行边共享雷同的源和目标顶点。多重图反对并行边的能力简化了有多重关系(例如,共事和敌人)的建模场景。每个顶点是 惟一 的 64 位长的标识符(VertexID)作为主键。GraphX 并没有对顶点增加任何程序的束缚。同样,每条边具备相应的源和目标顶点的标识符。

该属性表的参数由顶点(VD)和边缘(ED)的类型来决定。这些是别离与每个顶点和边相关联的对象的类型。

GraphX 优化顶点和边的类型的示意办法,当他们是一般的旧的数据类型(例如,整数,双精度等)通过将它们存储在专门的阵列减小了在内存占用量。

在某些状况下,可能心愿顶点在同一个图中有不同的属性类型。这能够通过继承来实现。例如,以用户和产品型号为二分图咱们能够做到以下几点:

class  VertexProperty()
case  class  UserProperty(val name: String)  extends  VertexProperty
case  class  ProductProperty(val name: String,  val price: Double)  extends
VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

和 RDDS 一样,属性图是不可变的,分布式的和容错的。对图中的值或构造的扭转是通过生成具备所需更改的新图来实现的。留神原始图的该次要局部(即不受影响的构造,属性和索引)被重用,从而缩小这个数据结构的老本。该图是通过启发式执行顶点分区,在不同的执行器 (executor) 中进行顶点的划分。与 RDDS 一样,在产生故障的状况下,图中的每个分区都能够重建。

逻辑上讲,属性图对应于一对类型汇合(RDDS),这个组合记录顶点和边的属性。因而,该图表类蕴含成员拜访该图的顶点和边:

class  VertexProperty()
case  class  UserProperty(val name: String)  extends  VertexProperty
case  class  ProductProperty(val name: String,  val price: Double)  extends
VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

类 VertexRDD [VD]和 EdgeRDD[ED,VD]继承和并且别离是一个优化的版本的 RDD[(VertexID,VD)]和 RDD[Edge[ED]]。这两个 VertexRDD[VD]和 EdgeRDD[ED,VD]提供各地图的计算内置附加性能,并充分利用外部优化。咱们在上一节顶点和边 RDDS 中具体探讨了 VertexRDD 和 EdgeRDD 的 API,但当初,他们能够简略地看成是 RDDS 模式的:RDD[(VertexID,VD)]和 RDD [EDGE[ED]]。

属性图的例子

假如咱们要建设一个 GraphX 我的项目各合作者的属性图。顶点属性可能会蕴含用户名和职业。咱们能够应用一组字符正文来形容代表合作者关系的边:

由此产生的图形将有类型签名:

val userGraph: Graph[(String, String), String]

有许多办法能够从原始数据文件,RDDS,甚至合成生成器来生成图,咱们会在 graph builders 更具体的探讨。可能是最通用的办法是应用 Graph ojbect。例如,上面的代码从一系列的 RDDS 的汇合中构建图:

// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
    sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal","postdoc")),(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
    sc.parallelize(Array( Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),Edge(2L, 5L, "colleague"),  Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph =  Graph(users, relationships, defaultUser)

在下面的例子中,咱们利用了 Edge 的 case 类。Edge 具备 srcId 和 dstId,它们别离对应于源和目的地顶点的标识符。此外,Edge 类具备 attr 属性,并存储的边的个性。

咱们能够通过 graph.vertices 和 graph.edges 属性,失去图到各自的顶点和边的视图。

val graph: Graph[(String, String), String]  // Constructed from above
// Count all users which are postdocs
graph.vertices.filter {case (id, (name, pos))  => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e  => e.srcId > e.dstId).count

须要留神的是 graph.vertices 返回 VertexRDD[(String,String)]延长 RDD[(VertexID,(String,String))],所以咱们应用 Scala 的 case 表白来解构元组。在另一方面,graph.edges 返回 EdgeRDD 蕴含 Edge[String]对象。咱们能够也应用的如下的类型的结构器:

graph.edges.filter {case  Edge(src, dst, prop)  => src > dst }.count

除了 图的顶点和边的属性,GraphX 也提供了三器重图。三器重图逻辑连接点和边的属性产生的 RDD[EdgeTriplet [VD,ED]蕴含的实例 EdgeTriplet 类。此 连贯 能够示意如下的 SQL 表达式:

SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges  AS e  LEFT  JOIN vertices  AS src, vertices  AS dst
ON e.srcId = src.Id  AND e.dstId = dst.Id

或图形形式:

该 EdgeTriplet 类继承了 Edge 并退出了类属性:srcAttr 和 dstAttr, 用于蕴含了源和指标属性。咱们能够用一个图的三元组视图渲染形容用户之间的关系字符串的汇合。

val graph: Graph[(String, String), String]  // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
    graph.triplets.map(triplet  =>
        triplet.srcAttr._1 + "is the" + triplet.attr + "of" + triplet.dstAttr._1)
facts.collect.foreach(println(_ _))

也能够参照上面的操作获取全副属性

graph.triplets.foreach(t => println(s"triplet:${t.srcId},${t.srcAttr},${t.dstId},${t.dstAttr},${t.attr}"))

Graph 操作

正如 RDDs 有这样根本的操作 map,filter,和 reduceByKey,属性图也有一系列根本的运算, 采纳用户定义的函数,并产生新的图形与变换的性质和构造。定义外围运算已优化的实现形式中定义的 Graph,并且被示意为外围操作的组合定义在 GraphOps。然而,因为 Scala 的 implicits 个性,GraphOps 中的操作会主动作为 Graph 的成员。例如,咱们能够计算各顶点的入度(定义在的 GraphOps):

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

将外围图操作和 GraphOps 辨别开来的起因是为了未来可能反对不同的图示意。每个图的示意必须实现外围操作并且复用 GraphOps 中很多有用的操作。

运算列表总结

以下列出了 Graph 图 和 GraphOps 中同时定义的操作. 为了简略起见, 咱们都定义为 Graph 的成员函数。请留神,某些函数签名已被简化(例如,默认参数和类型的限度被删除了), 还有一些更高级的性能已被删除,残缺的列表, 请参考 API 文档。

/** Summary of the functionality in the property graph */
class  Graph[VD, ED] {
    //  Information  about  the  Graph
    val numEdges: Long
    val numVertices: Long
    val inDegrees: VertexRDD[Int]
    val outDegrees: VertexRDD[Int]
    val degrees: VertexRDD[Int]
    //  Views  of  the  graph  as  collections
    val vertices: VertexRDD[VD]
    val edges: EdgeRDD[ED, VD]
    val triplets: RDD[EdgeTriplet[VD, ED]]
    //  Functions  for  caching  graphs
    def persist(newLevel: StorageLevel =  StorageLevel. MEMORY_ONLY): Graph[VD, ED]
    def cache(): Graph[VD, ED]
    def unpersistVertices(blocking: Boolean =  true): Graph[VD, ED]
    //  Change  the  partitioning  heuristic
    def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
    //  Transform  vertex  and  edge  attributes
    def mapVertices[VD2](map: (VertexID, VD)  =>  VD2): Graph[VD2, ED]
    def mapEdges[ED2](map: Edge[ED]  =>  ED2): Graph[VD, ED2]
    def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]])  =>  Iterator[ED2]):Graph[VD, ED2]
    def mapTriplets[ED2](map: EdgeTriplet[VD, ED]  =>  ED2): Graph[VD, ED2]
    def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]])  =>
    Iterator[ED2]): Graph[VD, ED2]
    //  Modify  the  graph  structure
    def reverse: Graph[VD, ED]
    def subgraph(epred: EdgeTriplet[VD,ED]  =>  Boolean = (x  =>  true),
            vpred: (VertexID, VD)  =>  Boolean = ((v, d)  =>  true)
        ): Graph[VD, ED]
    def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
    def groupEdges(merge: (ED, ED)  =>  ED): Graph[VD, ED]
    //  Join  RDDs  with  the  graph
    def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U)
    =>  VD): Graph[VD, ED]
    def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
    (mapFunc: (VertexID, VD,  Option[U])  =>  VD2)
    : Graph[VD2, ED]
    //  Aggregate  information  about  adjacent  triplets
    def  collectNeighborIds(edgeDirection:  EdgeDirection): VertexRDD[Array[VertexID]]
    def  collectNeighbors(edgeDirection:  EdgeDirection):VertexRDD[Array[(VertexID, VD)]]
    def mapReduceTriplets[A: ClassTag](mapFunc: EdgeTriplet[VD, ED]  =>  Iterator[(VertexID, A)],
            reduceFunc: (A, A)  => A,
            activeSetOpt: Option[(VertexRDD[_ _], EdgeDirection)] =  None
        ): VertexRDD[A]
    //  Iterative  graph-parallel  computation
    def pregel[A](initialMsg: A, maxIterations: Int, activeDirection:  EdgeDirection)(vprog: (VertexID, VD, A)  =>  VD,
            sendMsg: EdgeTriplet[VD, ED]  =>  Iterator[(VertexID,A)],
            mergeMsg: (A, A)  => A
        ): Graph[VD, ED]
    //  Basic  graph  algorithms
    def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
    def connectedComponents(): Graph[VertexID, ED]
    def triangleCount(): Graph[Int, ED]
    def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}

属性操作

和 RDD 的 map 操作相似,属性图蕴含以下内容:

class  Graph[VD, ED] {def mapVertices[VD2](map: (VertexId, VD)  =>  VD2): Graph[VD2, ED]
    def mapEdges[ED2](map: Edge[ED]  =>  ED2): Graph[VD, ED2]
    def mapTriplets[ED2](map: EdgeTriplet[VD, ED]  =>  ED2): Graph[VD, ED2]
}

每个运算产生一个新的图, 这个图的顶点和边属性通过 map 办法批改。

请留神,在所有状况下的图的机构不受影响。这是这些运算符的关键所在,它容许新失去图能够复用初始图的构造索引。上面的代码段在逻辑上是等效的,但第一个不保留构造索引,所以不会从 GraphX 系统优化中受害:

123 val newVertices = graph.vertices.map {case (id, attr) => (id, mapUdf(id,attr)) }val newGraph = Graph(newVertices, graph.edges)

相同,应用 mapVertices 保留索引:

1 val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

这些操作常常被用来初始化图的特定计算或者去除不必要的属性。例如,给定一个将出度作为顶点的属性图(咱们之后将介绍如何构建这样的图),咱们初始化它作为 PageRank:

// Given a graph where the vertex property is the out-degree
val inputGraph: Graph[Int, String] =
    graph.outerJoinVertices(graph.outDegrees)((vid,  _ _,  degOpt)  => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
    inputGraph.mapTriplets(triplet  => 1.0 / triplet.srcAttr).mapVertices((id, _ _) => 1.0)

构造操作

以后 GraphX 只反对一组简略的罕用结构化操作,咱们心愿未来减少更多的操作。以下是根本的构造运算符的列表。

class  Graph[VD, ED] {def reverse: Graph[VD, ED]   
  def subgraph(epred: EdgeTriplet[VD,ED]  =>  Boolean, vpred: (VertexId, VD)  =>  Boolean): Graph[VD, ED] 
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]  
  def groupEdges(merge: (ED, ED)  =>  ED): Graph[VD,ED]}

该 reverse 操作符返回一个新图, 新图的边的方向都反转了。这是十分实用的,例如,试图计算逆向 PageRank。因为反向操作不批改顶点或边属性或扭转的边的数目,它的实现不须要数据挪动或复制。

该子图 subgraph 将顶点和边的预测作为参数,并返回一个图,它只蕴含满足了顶点条件的顶点图(值为 true),以及满足边条件 并连贯顶点的边。subgraph 子运算符可利用于很多场景,以限度图表的顶点和边是咱们感兴趣的,或打消断开的链接。例如,在上面的代码中,咱们删除已损坏的链接:

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
        sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal","postdoc")),(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
        sc.parallelize(Array( Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),Edge(2L, 5L, "colleague"),  Edge(5L, 7L, "pi"),Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph =  Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(triplet  => triplet.srcAttr._1 + "is the" + triplet.attr + "of" + triplet.dstAttr._1).collect.foreach(println(_ _))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr)  => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_ _))
validGraph.triplets.map(triplet  => triplet.srcAttr._1 + "is the" + triplet.attr + "of" + triplet.dstAttr._1).collect.foreach(println(_ _))

留神,在下面的例子中,仅提供了顶点条件。如果不提供顶点或边的条件,在 subgraph 操作中默认为 真。

mask 操作返回一个蕴含输出图中所有的顶点和边的图。这能够用来和 subgraph 一起应用,以限度基于属性的另一个相干图。例如,咱们用去掉顶点的图来运行联通重量,并且限度输入为非法的子图。

// Run Connected Components
val ccGraph = graph.connectedComponents()  // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr)  => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

该 groupEdges 操作合并在多重图中的平行边(即反复顶点对之间的边)。在许多数值计算的利用中,平行的边缘能够退出(他们的权重的会被汇总)为单条边从而升高了图形的大小。

Join 操作

在许多状况下,有必要从内部汇合(RDDS)中退出图形数据。例如,咱们可能有额定的用户属性,想要与现有的图形合并,或者咱们可能须要从一个图选取一些顶点属性到另一个图。这些工作都能够应用来 join 经操作实现。上面咱们列出的要害联接运算符:

class  Graph[VD, ED] {def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U)  => VD): Graph[VD, ED]
    def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD,  Option[U])  =>  VD2): Graph[VD2, ED]
}

该 joinVertices 运算符连贯与输出 RDD 的顶点,并返回一个新的图,新图的顶点属性是通过用户自定义的 map 性能作用在被连贯的顶点上。没有匹配的 RDD 保留其原始值。

须要留神的是,如果 RDD 顶点蕴含多于一个的值,其中只有一个将会被应用。因而,倡议在输出的 RDD 在初始为惟一的时候,应用上面的 pre-index 所失去的值以放慢后续 join。

val nonUniqueCosts: RDD[(VertexID, Double)]
val uniqueCosts: VertexRDD[Double] = 
        graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)((id, oldCost, extraCost)  => oldCost + extraCost)

更个别 outerJoinVertices 操作相似于 joinVertices,除了将用户定义的 map 函数利用到所有的顶点,并且能够扭转顶点的属性类型。因为不是所有的顶点可能会在输出匹配值 RDD 的 mpa 函数承受一个 Optin 类型。例如,咱们能够通过

用 outDegree 初始化顶点属性来设置一个图的 PageRank。

val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) {(id, oldAttr, outDegOpt)  =>
        outDegOpt  match {case  Some(outDeg)  => outDeg
            case  None  => 0  // No outDegree means zero outDegree
        }
}

您可能曾经留神到,在下面的例子中采纳了多个参数列表的 curried 函数模式(例如,f(a)(b))。尽管咱们能够有同样写 f(a)(b)为 f(a,b),这将意味着该类型推断 b 不依赖于 a。其后果是,用户将须要提供类型标注给用户自定义的函数:

val joinedGraph = graph.joinVertices(uniqueCosts,  (id: VertexID, oldCost: Double, extraCost: Double)  => oldCost + extraCost) 

街坊汇集

图形计算的一个要害局部是汇集每个顶点的邻域信息。例如,咱们可能想要晓得每个用户追随者的数量或每个用户的追随者的平均年龄。许多图迭代算法(如 PageRank,最短门路,连通重量等)重复汇集街坊节点的属性,(例如,以后的 PageRank 值,到源节点的最短门路,最小可达顶点 ID)。

mapReduceTriplets

GraphX 中外围(大量优化)汇集操作是 mapReduceTriplets 操作:

class  Graph[VD, ED] {def reverse: Graph[VD, ED]
    def subgraph(epred: EdgeTriplet[VD,ED]  =>  Boolean, vpred: (VertexId, VD)  =>  Boolean): Graph[VD, ED]
    def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
    def groupEdges(merge: (ED, ED)  =>  ED): Graph[VD,ED]
}

该 mapReduceTriplets 运算符将用户定义的 map 函数作为输出,并且将 map 作用到每个 triplet,并能够失去 triplet 上所有的顶点(或者两个,或者空)的信息。为了便于优化预聚合,咱们目前仅反对发往 triplet 的源或目的地的顶点信息。用户定义的 reduce 性能将合并所有指标顶点雷同的信息。该 mapReduceTriplets 操作返回 VertexRDD [A],蕴含所有以每个顶点作为指标节点汇合音讯(类型 A)。没有收到音讯的顶点不蕴含在返回 VertexRDD。

须要留神的是 mapReduceTriplets 须要一个附加的可选 activeSet(下面没有显示, 请参见 API 文档的详细信息),这限度了 VertexRDD 地图提供的邻接边的 map 阶段:

activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None 

该 EdgeDirection 指定了哪些和顶点相邻的边蕴含在 map 阶段。如果该方向是 in,则用户定义的 mpa 函数 将仅仅作用指标顶点在与沉闷集中。如果方向是 out,则该 map 函数将仅仅作用在那些源顶点在沉闷集中的边。如果方向是 either,则 map 函数将仅在任一顶点在流动集中的边。如果方向是 both,则 map 函数将仅作用在两个顶点都在沉闷集中。沉闷汇合必须来自图的顶点中。限度计算到相邻顶点的一个子集三胞胎是增量迭代计算中十分必要,而且是 GraphX 实现 Pregel 中的要害。

在上面的例子中咱们应用 mapReduceTriplets 算子来计算高级用户追随者的平均年龄。

// Import random graph generation library
import  org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
        GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices((id, _ _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int,Double)](
    triplet  => {  // Map Function
        if (triplet.srcAttr > triplet.dstAttr) {
            // Send message to destination vertex containing counter and age
            Iterator((triplet.dstId, (1, triplet.srcAttr)))
        }  else {
            // Don't send a message for this triplet
            Iterator.empty
        }
    },
    // Add counter and age
    (a, b)  => (a._1 + b._1, a._2 + b._2)  // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
    olderFollowers.mapValues((id, value)  => value  
        match {case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_ _))

留神,当音讯(和音讯的总和)是固定尺寸的时候(例如,浮点运算和加法而不是列表和连贯)时,mapReduceTriplets 操作执行。更准确地说,后果 mapReduceTriplets 最好是每个顶点度的次线性函数。

计算度信息

一个常见的聚合工作是计算每个顶点的度:每个顶点相邻边的数目。在有向图的状况下,往往须要晓得入度,出度,以及总度。该 GraphOps 类蕴含一系列的运算来计算每个顶点的度的汇合。例如,在上面咱们计算最大的入度,出度,总度:

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {if (a._2 > b._2) a  else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)

收集街坊

在某些状况下可能更容易通过收集相邻顶点和它们的属性来表白在每个顶点示意的计算。这能够通过应用容易地实现 collectNeighborIds 和 collectNeighbors 运算。

class GraphOps[VD, ED] {def  collectNeighborIds(edgeDirection:  EdgeDirection): VertexRDD[Array[VertexId]]
    def  collectNeighbors(edgeDirection:  EdgeDirection): VertexRDD[Array[(VertexId, VD)] ]
}

须要留神的是,这些运算计算代价十分高,因为他们蕴含反复信息,并且须要大量的通信。如果可能的话尽量间接应用 mapReduceTriplets。

缓存和清空缓存

在 Spark 中,RDDS 默认并不保留在内存中。为了防止反复计算,当他们须要屡次应用时,必须明确地应用缓存(见 Spark 编程指南)。在 GraphX 中 Graphs 行为形式雷同。当须要屡次应用图形时,肯定要首先调用 Graph.cache。

在迭代计算,为了最佳性能,也可能须要清空缓存。默认状况下,缓存的 RDDS 和图表将保留在内存中,直到内存压力迫使他们依照 LRU 程序被删除。对于迭代计算,之前的迭代的两头后果将填补缓存。尽管他们最终将被删除,内存中的不必要的数据会使垃圾收集机制变慢。一旦它们不再须要缓存,就立刻清空两头后果的缓存,这将会更加无效。这波及物化(缓存和强制)图形或 RDD 每次迭代,清空所有其余数据集,并且只应用物化数据集在将来的迭代中。然而,因为图形是由多个 RDDS 的组成的,正确地继续化他们将十分艰难。对于迭代计算,咱们举荐应用 Pregel API,它正确地 unpersists 两头后果。

Pregel 的 API

图实质上是递归的数据结构,因为顶点的性质取决于它们的街坊,这反过来又依赖于街坊的属性。其后果是许多重要的图形算法迭代从新计算每个顶点的属性,直到定点条件满足为止。一系列图像并行办法曾经被提出来表白这些迭代算法。GraphX 提供了相似与 Pregel 的操作,这是 Pregel 和 GraphLab 办法的交融。

从总体来看,Graphx 中的 Pregel 是一个批量同步并行消息传递形象 束缚到该图的拓扑构造。Pregel 运算符在一系列超步骤中,其中顶点收到从之前的步骤中流入音讯的总和,计算出顶点属性的新值,而后在下一步中将音讯发送到相邻的顶点。不同于 Pregel,而是更像 GraphLab 音讯被并行计算,并且作为 edge-triplet,该音讯的计算能够拜访的源和目的地的顶点属性。没有收到音讯的顶点在一个超级步跳过。当没有音讯是,Pregel 进行迭代,并返回最终图形。

请留神,不像更规范的 Pregel 的实现,在 GraphX 中顶点只能将音讯发送到邻近的顶点,并且信息构建是通过应用用户定义的音讯函数并行执行。这些限度使得在 GraphX 有额定的优化。

以下是类型签名 Pregel,以及一个 初始的实现(注调用 graph.cache 已被删除)中:

class  GraphOps[VD, ED] {def pregel[A](
    initialMsg: A,
    maxIter: Int =  Int. MaxValue,
    activeDir: EdgeDirection =  EdgeDirection. Out)(vprog: (VertexId, VD, A)  =>  VD,
        sendMsg: EdgeTriplet[VD, ED]  =>  Iterator[(VertexId, A)],
        mergeMsg: (A, A)  => A
    ): Graph[VD, ED] = {
        // Receive the initial message at each vertex
        var  g  =  mapVertices((vid,  vdata)  =>  vprog(vid,  vdata, initialMsg) ).cache()
        // compute the messages
        var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
        var activeMessages = messages.count()
        // Loop until no messages remain or maxIterations is achieved
        var i = 0
        while (activeMessages > 0 && i < maxIterations) {
            //  Receive  the  messages:------
            // Run the vertex program on all vertices that receive messages
            val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
            // Merge the new vertex values back into the graph
            g  =  g.outerJoinVertices(newVerts)  {(vid,  old,  newOpt)  =>  newOpt.getOrElse(old) }.cache()
            //  Send  Messages:-----
            --
            // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
            // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
            // on edges in the activeDir of vertices in newVerts
            messages = g.mapReduceTriplets(sendMsg, mergeMsg,  Some((newVerts, activeDir))).cache()
            activeMessages = messages.count()
            i += 1
        }
        g
    }
}

请留神,Pregel 须要两个参数列表(即 graph.pregel(list1)(list2))。第一个参数列表中蕴含的配置参数包含初始信息,迭代的最大次数,以及发送音讯(默认出边)的方向。第二个参数列表蕴含用于用户定义的接管音讯(顶点程序 vprog),计算音讯(sendMsg),并联合信息 mergeMsg。

咱们能够应用 Pregel 运算符来表白计算,如在上面的例子中的单源最短门路。

import  org.apache.spark.graphx._
// Import random graph generation library
import  org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Int, Double] =
        GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42  // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _ _)  =>  if (id == sourceId) 0.0  else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double. PositiveInfinity)((id, dist, newDist)  => math.min(dist, newDist),  // Vertex Program
    triplet  => { // Send Message
        if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
        }  else {Iterator.empty}
    },
    (a,b)  => math.min(a,b)  // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))

Graph Builder

GraphX 提供多种从 RDD 或者硬盘中的节点和边中构建图。默认状况下,没有哪种 Graph Builder 会从新划分图的边; 相同,边会留在它们的默认分区(如原来的 HDFS 块)。Graph.groupEdges 须要的图形进行从新分区,因为它假如雷同的边将被放在同一个分区同一地位,所以你必须在调用 Graph.partitionBy 之前调用 groupEdges。

object  GraphLoader {
    def edgeListFile(
        sc: SparkContext,
        path: String,
        canonicalOrientation: Boolean =  false,
        minEdgePartitions: Int = 1
    ): Graph[Int, Int]
}

GraphLoader.edgeListFile 提供了一种从磁盘上边的列表载入图的形式。它解析了一个以下模式的邻接列表(源顶点 ID,目的地顶点 ID)对,疏忽以 #结尾的正文行:

 # This is a comment2 14 11 2

它从指定的边创立了一个图表,主动边中提到的任何顶点。所有顶点和边的属性默认为 1。canonicalOrientation 参数容许从新定向边的正方向(srcId < dstId),这是必须的 connected-component 算法。该 minEdgePartitions 参数指定边缘分区生成的最小数目; 例如,在 HDFS 文件具备多个块, 那么就有多个边的宰割.

object Graph {def apply[VD, ED](vertices: RDD[(VertexId, VD)],
            edges: RDD[Edge[ED]],
            defaultVertexAttr: VD =  null
    ): Graph[VD, ED]
    
    def fromEdges[VD, ED](edges: RDD[Edge[ED]],
            defaultValue: VD): Graph[VD, ED]
    
    def fromEdgeTuples[VD](rawEdges: RDD[(VertexId, VertexId)],
            defaultValue: VD,
            uniqueEdges: Option[PartitionStrategy] =  None
    ): Graph[VD, Int]
}

Graph.apply 容许从顶点和边的 RDDS 中创立的图。反复的顶点会任意抉择,并在边 RDD 中存在的顶点,但不是顶点 RDD 会被赋值为默认属性。

Graph.fromEdges 容许从只有边的元组 RDD 创立的图,主动生成由边中存在的顶点,并且给这些顶点赋值为缺省值。

Graph.fromEdgeTuples 容许从只有边的元组的 RDD 图中创立图,并将的边的值赋为 1,并主动创立边中所存在的顶点,并设置为缺省值。它也反对删除重边; 进行删除重边时,传入 PartitionStrategy 的 Some 作为 uniqueEdges 参数(例如,uniqueEdges=Some(PartitionStrategy.RandomVertexCut))。分区策略是必要的,因为定位在同一分区雷同的边,能力使他们可能进行反复删除。

顶点和边 RDDs

GraphX 公开了图中 RDD 顶点和边的视图。然而,因为 GraphX 将顶点和边保留在优化的数据结构,并且为这些数据结构提供额定的性能,顶点和边别离作为 VertexRDD 和 EdgeRDD 返回。在本节中,咱们回顾一些这些类型的其余有用的性能。

VertexRDDs

该 VertexRDD [A]继承 RDD [(VertexID, A)],并减少了一些额定的限度,每个 VertexID 只呈现 一次。此外,VertexRDD[A]示意一个顶点汇合,其中每个顶点与类型的属性为 A。在外部,这是通过将顶点属性中存储在一个可重复使用的哈希表。因而,如果两个 VertexRDDs 继承自雷同的基类 VertexRDD(例如,通过 filter 或 mapValues),他们能够加入在常数工夫内实现合并,而不须要从新计算 hash 值。要充分利用这个索引数据结构,VertexRDD 提供了以下附加性能:

class  VertexRDD[VD]  extends  RDD[(VertexID, VD)] {
    // Filter the vertex set but preserves the internal index
    def filter(pred: Tuple2[VertexId, VD]  =>  Boolean): VertexRDD[VD]
    // Transform the values without changing the ids (preserves the internal index)
    def mapValues[VD2](map: VD =>  VD2): VertexRDD[VD2]
    def mapValues[VD2](map: (VertexId, VD)  =>  VD2): VertexRDD[VD2]
    // Remove vertices from this set that appear in the other set
    def diff(other: VertexRDD[VD]): VertexRDD[VD]
    // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
    def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2])  =>  VD3): VertexRDD[VD3]
    def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U)  => VD2): VertexRDD[VD2]
    // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
    def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2)  =>  VD2): VertexRDD[VD2]
}

请留神,例如,如何 filter 操作符返回一个 VertexRDD。过滤器应用的是理论通过 BitSet 实现的,从而复用索引和放弃能疾速与其余 VertexRDD 实现连接功能。相似地,mapValues 操作不容许 mapha 函数扭转 VertexID,从而能够复用对立 HashMap 中的数据结构。当两个 VertexRDD 派生自同一 HashMap,并且是通过线性少买而非代价低廉的逐点查问时,无论是 leftJoin 和 innerJoin 连贯时可能辨认 VertexRDD。

该 aggregateUsingIndex 操作是一种新的无效的从 RDD[(VertexID,A)]构建新的 VertexRDD 的形式。从概念上讲,如果我在一组顶点上构建了一个 VertexRDD[B],这是一个在某些顶点 RDD[(VertexID,A)]的超集,而后我能够重用该索引既汇集,随后为 RDD[(VertexID, A)]建设索引。例如:

val setA: VertexRDD[Int] =  VertexRDD(sc.parallelize(0L until 100L).map(id  => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id =>  List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ _ + _ _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b)  => a + b)

EdgeRDDs

该 EdgeRDD [ED,VD],它继承 RDD[Edge[ED], 以各种分区策略 PartitionStrategy 将边划分成不同的块。在每个分区中,边属性和邻接构造,别离存储,这使得更改属性值时,可能最大限度的复用。

EdgeRDD 是提供的三个额定的函数:

// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED]  =>  ED2): EdgeRDD[ED2, VD]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED, VD]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId,  ED, ED2) => ED3): EdgeRDD[ED3, VD]

在大多数利用中,咱们发现,在 EdgeRDD 中的操作是通过图形运算符来实现,或依附在基类定义的 RDD 类操作。

优化图的示意

对于 GraphX 中如何示意分布式图构造的详细描述,这个话题超出了本指南的范畴,一些高层次的了解可能有助于设计可扩大的算法设计以及 API 的最佳利用。GraphX 采纳顶点切的办法来散发图划分:

不通过边划分图,GraphX 沿顶点来划分图,这样能够缩小顶点之间的通信和存储开销。逻辑上,这对应于将边调配到不同的机器,并容许顶点逾越多个机器。调配边的确切办法取决于 PartitionStrategy 并有多个衡量各种试探法。用户能够通过从新分区图与不同的策略之间进行抉择 Graph.partitionBy 操作。默认分区策略是依照图的结构,应用图中初始的边。然而,用户能够不便地切换到二维 - 分区或 GraphX 中其余启发式分区办法。

一旦边被划分,并行图计算的要害挑战在于无效的将每个顶点属性和边的属性连接起来。因为在事实世界中,边的数量多于顶点的数量,咱们把顶点属性放在边中。因为不是所有的分区将蕴含所有顶点相邻的边的信息,咱们在外部保护一个路由表,这个表确定在哪里播送顶点信息,执行 triplet 和 mapReduceTriplets 的连贯操作。

图算法

GraphX 包含一组图形算法来简化剖析工作。该算法被蕴含于 org.apache.spark.graphx.lib 包中,并可间接通过 GraphOps 而被 Graph 中的办法调用。本节介绍这些算法以及如何应用它们。

PageRank

PageRank 记录了图中每个顶点的重要性,假如一条边从 u 到 v,代表从 u 传递给 v 的重要性。例如,如果一个 Twitter 用户有很多粉丝,用户排名将很高。

GraphX 自带的 PageRank 的动态和动静的实现,放在 PageRank 对象中。动态的 PageRank 运行的固定数量的迭代,而动静的 PageRank 运行,直到排名收敛(即当每个迭代和上一迭代的差值,在某个范畴之内时进行迭代)。GraphOps 容许 Graph 中的办法间接调用这些算法。

GraphX 还包含,咱们能够将 PageRank 运行在社交网络数据集中。一组用户给出 graphx/data/users.txt,以及一组用户之间的关系,给出了 graphx/data/followers.txt。咱们能够依照如下办法来计算每个用户的网页级别:

// Load the edges as a graph
val graph =  GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line  =>
    val fields = line.split(",")(fields(0).toLong, fields(1)
    )
}
val ranksByUsername = users.join(ranks).map {case (id, (username, rank))  => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))

联通重量

连贯重量算法标出了图中编号最低的顶点所联通的子集。例如,在社交网络中,连贯重量相似集群。GraphX 蕴含在 ConnectedComponents 对象的算法,并且咱们从该社交网络数据集中计算出连贯组件的 PageRank 局部,如下所示:

// Load the graph as in the PageRank example
val graph =  GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line  =>
    val fields = line.split(",")(fields(0).toLong, fields(1)
    )
}
val ccByUsername = users.join(cc).map {case (id, (username, cc))  => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))

三角计数

当顶点四周与有一个其余两个顶点有连线时,这个顶点是三角形的一部分。GraphX 在 TriangleCount 对象实现了一个三角形计数算法,这个算法计算通过各顶点的三角形数目,从而提供集群的度。咱们从 PageRank 局部计算社交网络数据集的三角形数量。留神 TriangleCount 要求边是标准的指向(srcId < dstId),并应用 Graph.partitionBy 来宰割图形。

// Load the edges in canonical order and partition the graph for triangle count
val graph =  GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy. RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line => 
        val fields = line.split(",")(fields(0).toLong, fields(1)
        )
    }
val triCountByUsername = users.join(triCounts).map {case (id, (username, tc)) => (username, tc)
    }
// Print the result
println(triCountByUsername.collect().mkString("\n"))
  • 示例

假如我想从一些文本文件中构建图,只思考图中重要关系和用户,在子图中运行的页面排名算法,而后终于返回与顶级用户相干的属性。咱们能够在短短的几行 GraphX 代码中实现这一性能:

// Connect to the Spark cluster
val sc =  new  SparkContext("spark://master.amplab.org", "research")
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("graphx/data/users.txt").map(line  =>  
        line.split(",")).map(parts  =>  (parts.head.toLong,parts.tail) 
        )
    )
// Parse the edge data which is already in userId -> userId format
val  followerGraph  =  GraphLoader.edgeListFile(sc,"graphx/data/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {case (uid, deg,  Some(attrList))  => attrList
        // Some users may not have attributes so we set them as empty
        case (uid, deg,  None)  =>  Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr)  => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val  userInfoWithPageRank  =
        subgraph.outerJoinVertices(pagerankGraph.vertices) {case (uid, attrList,  Some(pr))  => (pr, attrList.toList)
            case (uid, attrList,  None)  => (0.0, attrList.toList)
        }
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_ _._2._1)).mkString("\n")
)

猜你喜爱
Hadoop3 数据容错技术(纠删码)
Hadoop 数据迁徙用法详解
Flink 实时计算 topN 热榜
数仓建模分层实践
数仓建模方法论

退出移动版