关于scala:SparkCore实现离线IDMapping

35次阅读

共计 7562 个字符,预计需要花费 19 分钟才能阅读完成。

最近在开发一个 ID Mapping 业务零碎——辨认数据上报中社交账号的关联关系,找到零碎中哪些社交账号属于事实世界中的同一个人。简略来讲,如果同一条上报数据中呈现了两个社交账号(比方一个手机号和一个 QQ 号),就认为这两个社交账号在事实世界属于同一个人。那么,如何计算这个关联关系呢?

一开始咱们解决这个问题的思路很间接:事实世界的每个人在零碎中用惟一的 UUID 标识,每次社交账号(Account)上报,在 Redis 中记录一个 UUID->Account 的正向索引,同时记录一个 Account->UUID 的反向索引。每个 UUID 下可能有多个社交账号,每个社交账号只会归属于一个 UUID。每次收到 2 个社交账号相干的上报时,先通过 Account->UUID 的反向索引查到这两个社交账号对应的 UUID,如果两个账号别离属于两个不同的 UUID,就把这两个 UUID 合并为一个新的 UUID,同时原来归属于这两个 UUID 的所有社交账号都归属于新的 UUID;如果查到一个 UUID,那么把这两个账号归属于这个 UUID;如果未查到 UUID,则新生成一个 UUID,最初把新的正向索引和反向索引再写回 Redis 即可。

当然理论的 ID Mapping 可能有更简单的关联关系,并不是两个社交账号在同一条上报数据中呈现这么简略的逻辑,这就不在本文探讨的范畴内了。

这种形式乍一看也没什么问题,但认真想来却疏忽了一个很根本的问题:这种形式只能一直的将账号关联在一起,而不能解除关联。举例来说,给定 A -B、B- C 两组社交账号的关联关系,A、B、C 应该全副关联在一个 UUID 下,此时若 B 点被删除,或 B - C 关联关系解除,零碎无奈将 A 和 C 解除关联。而这种解除关联的场景在业务零碎中也是很常见的,比方在企业的客户管理系统中,往往会记录一个客户的一些社交账号,有时企业会删除客户的一些社交账号,甚至删除某一个客户。

其实这个业务形象成一个数据结构的问题,就是典型的不相交集问题。每一个社交账号看做一个点,两个社交账号的关联关系看做两点之间的无向边,下面的问题就变成了在一个图里划分不相交集的问题。既然很难解决动态变化的图,就每天批量计算一下某一时刻所有账号的关联关系吧。本文就是要介绍一下如何用 Spark 的 RDD API 实现动态图不相交集的计算。

一、问题定义

前言形容了问题的背景,这里再明确定义下本文要解决的问题。

算法输出是一张离线 Hive 表,每行有两个字段 Pi、Pj,示意无向图中节点 Pi 和 Pj 之间存在一条边。输入同样失去一张 Hive 表,每行也是 Pi、Pj 两个字段,Pj 均示意不相交集的根节点,即所有 Pj 雷同的行中 Pi 的汇合加上 Pj 就形成了与图中其余点不相关联的一个独立汇合。如图一所示,对于左侧的输出,计算结果将失去右侧的输入,能够看出 P1、P2、P3、P6、P8、P9 形成一个独立汇合,P4、P5、P7 形成另一个独立汇合。

图一 算法输出与输入

二、计算过程

1. 使每行数据中 Pi > Pj

为了保障迭代过程最终能够收敛,无妨将图中所有边都当做有向边解决,方向都是节点 ID 较大的节点指向节点 ID 较小的节点,这样最终计算失去的不相交集必是以汇合中 ID 最小的点为根,即所有节点都指向所在汇合中 ID 最小的点。因而,无妨将原始数据中的每一行当做由 Pi 指向 Pj 的有向边,若 Pj>Pi,则替换 Pi 和 Pj。如图二所示,这一步批改了第三行和第五行数据。

图二 使 Pi > Pj

edge_rdd = edge_rdd.map(row => if (row._1 > row._2) (row._1, row._2) else (row._2. row._1))

2. 保障 Pi 不反复

第一步解决完之后,原始数据中还会存在一个问题:多条边相交于一个 ID 较大的节点,这会导致 ID 较大的节点成为潜在的根节点。解决这个问题须要将部分相交于 ID 较大节点的边转化为相交于 ID 最小的节点。比方存在 P6->P1 和 P6->P3 两条边,这两条边交于 P6,P1、P3、P6 组成一个独立汇合。须要将这个关联关系转换为 P6->P1 和 P3->P1 两条边,即以 P1 为根节点。前一步的解决曾经保证数据中每一行都满足 Pi>Pj,因而多条边交于 ID 较大的节点等价于多行的 Pi 雷同。所以只须要在保障原有关联关系的条件下将表处理为 Pi 不反复即可。

这一步保障了 Pi 中的节点不会作为根节点,所有有向边都由叶子节点指向潜在的根节点。

图三 保障 Pi 不反复

在算法实现上,首先将 Pi 雷同的行聚合在一起,而后输入这个汇合中每个点指向汇合中 ID 最小点的有向边。

while(edge_rdd.keys.count() != edge_rdd.keys.distinct.count()) {edge_rdd = edge_rdd.groupbykey()
        .flatMap(row => {
            val vertex_list = row._2.toSeq.sorted.distinct
            val result = new ArrayBuffer[(String, String)]()
            result.append(row => (row._1, vertex_list.head))
            vertex_list.tail.foreach(vertex => result.append((vertex, vertex_list.head)))
            result
        })
}

从图三种能够看出,每次迭代之后,可能会产生新的 Pi 雷同的数据,因而须要用迭代的形式,屡次执行以上逻辑,迭代的终止条件就是 Pi 中的点不反复。

3. 将每一行中的 Pj 替换为汇合中最小的节点 ID

最初一步就是算法的外围,通过自关联,将所有叶子节点关联到根节点上。算法原理很好了解:若同时存在 Pz->Py 的有向边和 Py->Px 的有向边,就将 Pz->Py 替换为 Pz->Px。

如图四所示,第一次迭代由 P8->P2 和 P2->P1 两条边失去 P8->P1,由 P9->P8 和 P8->P2 两条边失去 P9->P2;第二次迭代由 P9->P2 和 P2->P1 两条失去 P9->P1。

如果咱们定义 P2 与根节点 P1 的间隔是 1,P8 和根节点 P1 的间隔是 2,P9 和根节点 P1 的间隔是 3,顺次类推。该算法每 n 次迭代能够将间隔为 2n 之内的节点全副关联到根节点上。

图四 将 Pj 替换为根节点

for (i <- 1 to iterateNum) {val inverse_edge = edge_rdd.map(row => (row._2, row._1))
    val edge_rdd = inverse_edge.leftOuterJoin(edge_rdd).map(row => (row._2._1, if (row._2._2.isDefined) row._2._2.get else row._1))
}

通过以上 3 个步骤的解决,能够看到原始数据集被划分成了 2 个不相交集,根节点别离为 P1 和 P4。

三、执行优化

问题也并不是这样顺利的就解决了,将上述逻辑转化为工程代码时还遇到了一些其余问题,上面也分享下遇到的问题,以及采取的优化计划。

1. 揣测执行

因为数据量比拟大,计算时会应用较大的并发来执行,对于同一个 job 经常出现大部分 task 都执行很快,只是在等 2 - 3 个 task 的执行,而察看发现并没有显著的数据歪斜。查阅相干材料理解到这种状况可能是因为各个 executor 上的运行环境不同(有可能同时运行了其余工作、或者是硬件起因),导致计算工夫差别较大,这个问题能够通过揣测执行解决。

spark.speculation=true
spark.speculation.interval=100
spark.speculation.multiplier=1.5

2. checkpoint

因为该算法是一个迭代算法,执行流程较长,有时会呈现某个 executor 失落,使 RDD 中的一部分失落,导致整个工作须要从新计算,甚至失败。查阅相干材料后,最终通过 checkpoint 的形式解决了这个问题。checkpoint 算子将 RDD 写入到 HDFS 某个目录下,因而也一个 Action,所以个别会先执行 cache 再执行 checkpoint。

edge_rdd = edge_rdd.groupbykey().flatMap(row => {...}).cache
edge_rdd.checkpoint()

3. RDD cache 开释

代码运行过程中还发现工作会占用很多内存,远比预期大的多,通过查看 Spark 工作的 Storage 页,发现其实是迭代的形式导致了“内存透露”。

在迭代的过程中,算法对每一次迭代失去的 edge_rdd 进行了 cache,而事实上每次计算出新的 edge_rdd 后,前一次迭代的 cache 就没用了。但如果没有手动开释的话,这些 RDD 的 cache 在工作终止之前都不会被开释掉,会始终占用着集群内存,导致“内存透露”。从图五中能够看出,每次迭代都会生成一个 RDD,并 cache 在内存中,如果迭代次数比拟多,这部分内存节约对集群资源的占用就很可观了。甚至如果新的 RDD 没有内存能够 cache,会导致 RDD 的反复计算,这样会重大影响工作执行的工夫。

图五 没有手动开释 RDD,导致“内存透露”

这个问题能够通过在每次计算生成新的 RDD 时手动 unpersist 上一个 RDD 来解决,在内存有效时立即开释掉这部分内存。

val tmp = edge_rdd.groupbykey().flatMap(row => {...}).cache
tmp.checkpoint()
edge_rdd.unpersist()
edge_rdd = tmp

四、执行性能

用上述算法计算业务收集到的社交账号关联关系,数据量在 5000 万条左右,第二步计算须要 7 次迭代收敛,第三步计算须要 3 - 4 次迭代收敛。程序运行应用 16 核 64G 内存的分布式 Spark 运行环境,迭代过程中 partition 个数为 64,整体运行工夫在 20 分钟左右,根本达到了业务应用的要求。

五、附加残缺实现代码

package com.yang.spark.idmapping

import com.yang.spark.utils.SparkUtils


/**
  * vertices edges
  */
object IncrementIDMapping {def main(args: Array[String]): Unit = {

    // 最大迭代次数
    val step3MaxIterateNum = 100

    val spark = SparkUtils.initSession(isLocal = false, this.getClass.getSimpleName)

    spark.sqlContext.setConf("spark.sql.adaptive.maxNumPostShufflePartitions", "1000")

    /*val inputRDD: RDD[(String, String)] = spark.sparkContext.makeRDD(Seq(("100", "101"),
      ("100", "105"),
      ("101", "102"),
      ("102", "103"),
      ("104", "105"),
      ("105", "106"),
      ("106", "103"),
      ("107", "108"),
      ("108", "109"),
      ("110", "107"),
      ("111", "112"),
      ("113", "114"),
      ("114", "112"),
      ("115", "116"),
      ("117", "118"),
      ("117", "116")
    ))*/

    val inputRDD = spark
      .sql(
        s"""
           |select id, ccid as id2
           |from hdp_jinrong_tech_dw.dw_wb_ajk_idmapping_output_idrevtable
           |where dt = '20200618'
           |union all
           |select id1, id2
           |from hdp_jinrong_tech_ods.temp_ajk_phone_idmapping_input_data_20200619
        """.stripMargin
      )
      .rdd

    /**
      * 1. 使每行数据中 (id1, id2) 满足 id1 > id2
      * 为了保障迭代过程最终能够收敛,无妨将图中所有边都当做有向边解决,方向都是节点 ID 较大的节点指向节点 ID 较小的节点,* 这样最终计算失去的不相交集必是以汇合中 ID 最小的点为根,即所有节点都指向所在汇合中 ID 最小的点。* 因而,无妨将原始数据中的每一行当做由 id1 指向 id2 的有向边,若 id2 > id1,则替换 id1 和 id2。*/
    var edgeRDD = inputRDD.repartition(1000)
      .map(row => (row.getAs[String]("id1"), row.getAs[String]("id2")))
      .map {case (id1, id2) => if (id1 > id2) (id1, id2) else (id2, id1) }
    /*println("=================step2: edgeRDD=====================")
    edgeRDD.take(20).foreach(println)*/

    /**
      * 2. 保障 id1 不反复
      * 第一步解决完之后,原始数据中还会存在一个问题:多条边相交于一个 ID 较大的节点,* 这会导致 ID 较大的节点成为潜在的根节点。解决这个问题须要将部分相交于 ID 较大节点的边转化为相交于 ID 最小的节点。* 比方存在 P6->P1 和 P6->P3 两条边,这两条边交于 P6,P1、P3、P6 组成一个独立汇合。* 须要将这个关联关系转换为 P6->P1 和 P3->P1 两条边,即以 P1 为根节点。* 前一步的解决曾经保证数据中每一行都满足 id1 > id2,因而多条边交于 ID 较大的节点等价于多行的 id1 雷同。* 所以只须要在保障原有关联关系的条件下将表处理为 id1 不反复即可。* 这一步保障了 id1 中的节点不会作为根节点,所有有向边都由叶子节点指向潜在的根节点。*/
    var step2IterateNum = 0
    while (edgeRDD.keys.count() != edgeRDD.keys.distinct().count()) {
      step2IterateNum = step2IterateNum + 1
      println(s"================step2: iterateNum: $step2IterateNum======================")
      edgeRDD = edgeRDD.groupByKey(1000)
        .flatMap {case (id, ids) =>
            val vertexList = ids.toSeq.sorted
            val head = vertexList.head
            val result = new scala.collection.mutable.HashSet[(String, String)]
            result.add((id, head))
            vertexList.tail.foreach(vertex => result.add((vertex, head)))
            result
        }
    }

    /*println("================step2: edgeRDD======================")
    edgeRDD.take(20).foreach(println)*/

    /**
      * 3. 将每一行中的 id2 替换为汇合中最小的节点 ID
      * 第三步就是算法的外围,通过自关联,将所有叶子节点关联到根节点上。* 算法原理很好了解:若同时存在 id3 -> id2 的有向边和 id2 -> id1 的有向边,就将 id3 -> id2 替换为 id3 -> id1。* 第一次迭代由 P8->P2 和 P2->P1 两条边失去 P8->P1,由 P9->P8 和 P8->P2 两条边失去 P9->P2;* 第二次迭代由 P9->P2 和 P2->P1 两条失去 P9->P1。* 如果咱们定义 P2 与根节点 P1 的间隔是 1,P8 和根节点 P1 的间隔是 2,P9 和根节点 P1 的间隔是 3,顺次类推。* 该算法每 n 次迭代能够将间隔为 2^^n 之内的节点全副关联到根节点上。*/
    import scala.util.control.Breaks._
    breakable {for (step3IterateNum <- 1 to step3MaxIterateNum) {println(s"================step3: iterateNum: $step3IterateNum======================")
        val reverseEdgeRDD = edgeRDD.map {case (id1, id2) => (id2, id1) }
        /*println("================step3: reverseEdgeRDD======================")
        reverseEdgeRDD.take(20).foreach(println)*/
        val joinRDD = reverseEdgeRDD.leftOuterJoin(edgeRDD, 1000)
        /*println("================step3: joinRDD======================")
        joinRDD.take(20).foreach(println)*/
        val innerCount = joinRDD.filter{case (k, (_, v2)) => v2.isDefined && k != v2.get}.count()
        if(0 != innerCount) {edgeRDD = joinRDD.map { case (k, (v1, v2)) => (v1, if (v2.isDefined) v2.get else k) }
        } else {break}
      }
    }

    /*println("================step3: edgeRDD======================")
    edgeRDD.take(20).foreach(println)*/

    import spark.implicits._
    val resultDF = edgeRDD.toDF("id1", "id2")
    resultDF.createOrReplaceTempView("idmapping_output_temp_view")

    spark.sql(
      s"""
         |insert overwrite table hdp_jinrong_tech_dw.dw_wb_ajk_idmapping_output_idrevtable
         |partition (dt = '20200619')
         |select *
         |from idmapping_output_temp_view
       """.stripMargin
    )

  }

}

正文完
 0