本文作者: Jamie,观远数据 in-house Spark 之父,ABI 畛域史诗级工程师。
年初咱们接到了一个客户反馈,示意服务器 cpu 占用异样,于是咱们近程连贯到服务器下面排查,发现是 Spark driver 占用了大部分 cpu。对于 cpu 占用问题,用 jstack 能很快定位到 jvm 的执行逻辑。剖析 jstack 后果发现,大部分占用 cpu 的线程都在执行一个叫做 transpose window 的优化规定,而且都和这个逻辑里的一段办法无关:
private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = {ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall {case (l, r) => l.semanticEquals(r)
})
}
这段逻辑看上去并不简单,留神到这个办法里有一个 permutations
函数调用,这个函数会返回一个数组的所有排列。对于一个有 n 个不同元素的数组,他的排列数是 n!,也就是说这个算法工夫复杂度会到 O(n!),那么咱们遇到的问题很有可能和这个有关系了。然而咱们还是须要找到是什么 sql 语句触发了这个问题。
从监控上来看,driver 的 cpu 是呈阶梯状回升的,那这些回升的工夫点应该就是有问题工作提交的工夫点,再联合 call stack 外面的逻辑是在做 window 相干的优化,咱们重点去找这些工夫点蕴含 window function 相干的工作。很快咱们就定位到了一个 ETL,从监控上看,每运行一次这个 ETL,Spark driver 就会多占用一个 cpu,并且长时间不开释。
原始的 ETL 逻辑比较复杂,咱们把他简化之后发现只和两个带窗口函数的计算字段有关系,特点就是 partition by 应用的字段比拟多,为了 debug 不便,咱们用 spark-shell 复现了一下:
val df = spark.range(10).selectExpr("id AS a1", "id AS a2", "id AS a3", "id AS a4", "id AS a5", "id AS a6", "id AS a7", "id AS a8", "id AS a9", "id AS a10", "id AS a11", "id AS a12", "id AS a13", "id AS a14", "id AS a15", "id AS a16")
df.selectExpr("sum(`a16`) OVER(PARTITION BY `a1`,`a2`,`a3`,`a4`,`a5`,`a6`,`a7`,`a8`,`a9`,`a10`,`a11`,`a12`,`a13`,`a14`,`a15`) as p1",
"sum(`a16`) OVER(PARTITION BY `a14`,`a2`,`a3`,`a4`,`a5`,`a6`,`a7`,`a8`,`a9`,`a10`,`a11`,`a12`,`a13`,`a1`) as p2"
).explain
在 3.0 版本以上的 spark-shell 外面运行下面的代码就会发现 spark-shell 卡住了,而卡住的中央正是 compatiblePartitions
办法。
也就是说如果加了多个带有窗口函数的计算字段,而 partition by 的字段过多的话,很容易触发这个问题,比方下面这个例子,大家能够算算 14 个元素的数组全排列有多少种组合。那么这个问题咱们有没有方法优化呢。
个别遇到 Spark 相干的问题,咱们能够先去 JIRA 下面搜寻一下有没有人提过相似的问题,因为 Spark 的应用十分宽泛,个别遇到的问题很可能之前曾经有人发现甚至修复了。然而咱们用各种关键字搜寻之后也找不到相干的问题,看来这个问题只能靠咱们本人来解决了。
首先须要看一下相干逻辑是什么时候引入的,引入的起因是什么。查看提交历史能够发现,是为了解决这个问题:https://issues.apache.org/jir…,之所以想要 transpose window,是想要缩小一次 shuffle 操作。而 compatiblePartitions
外面的逻辑,就是在判断是否须要 transpose 两个 window。
从现有代码的逻辑反推这个 compatible 的定义,应该是 window1 的 partition 字段是 window2 的 partition 字段前缀的一种排列。举几个例子就比较清楚了,比方 window2 是 partition by(‘a’, ‘b’, ‘c’, ‘d’),那么 window1 能够是 partition by(‘a’), partition by(‘a’, ‘b’), partition by(‘b’, ‘a’), partition by(‘c’, ‘a’, ‘b’) 等等,然而不能是 partition by(‘b’), partition by(‘a’, ‘c’) 等。
然而这个逻辑其实并不是很正当,一个是算排列代价太高,另一个是有些能够 transpose 的 case 却没有做,比方下面的 partition by(‘b’), partition by(‘a’, ‘c’) 等。另外思考一些反复字段的 case,比方 partition by(‘a’, ‘a’),这种原来的算法也是不行的,所以这个 compatible 能够定义成 window1 的 partition by 外面所有的字段在 window2 外面都能找到,那么咱们就能够做 transpose 来缩小 shuffle,用代码来示意就是:
private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = {
ps1.length < ps2.length && ps1.forall { expr1 =>
ps2.exists(expr1.semanticEquals)
}
}
这样咱们既防止了简单的 permutation,也让这个优化的适用范围减少了。\
通过一系列测试发现改变是有成果的,于是咱们向社区提交了 issue 和相干的 PR,感兴趣的同学能够查看具体内容:
- Issue: SPARK-38034
- PR: 35334
尽管一开始沉没在 PR 的陆地之中,不过时隔半年又被国际友人捞了进去,最终也顺利被社区驳回,这样后续咱们只须要降级到官网的版本就能够解决这个问题了。观远数据的 Spark 贡献者名单中也又多了一位新同学,后续咱们会继续关注实际中遇到的 Spark/Delta 等相干问题,为开源我的项目倒退添砖加瓦。