关于etl:复杂-ETL-下-JobEngine-阻塞问题排查实录

1次阅读

共计 6648 个字符,预计需要花费 17 分钟才能阅读完成。

本文作者:zsh,效劳于观远计算引擎团队,在一直进步数据系统服务稳定性方面做着一点渺小的工作。

问题背景

在咱们将 Spark 版本从 3.0.1 降级至 3.2.1 之后,发现某个 ETL 无奈失常运行了,而在降级之前运行都是失常的。

通过查问查问运维日志,发现工作几次失败的工夫节点,Spark application 的 driver 日志中偶现 OutOfMemory 的报错。

问题剖析

对于此类 OOM 问题,个别分为两个方向,一个是通过拿取 OOM 过后的 dump 文件联合代码进行剖析占用内存较多的对象是什么,另一个就是联合内存监控定位呈现问题时的工作状况,对具体的 ETL 工作进行剖析并尝试复现。

排查过程

问题确认

首先在排除其余工作烦扰的状况下独自运行出问题的 ETL,发现内存的确有显著的回升并导致服务不可用。

联合之前呈现 OOM 时的内存监控,内存迅速上涨的体现统一。

由此根本能够确定就是那个 ETL 导致的问题,下一步就是确认工作自身和降级前是否做了批改。通过查询数据库拿到工作运行的具体历史脚本并进行比拟,发现 ETL 自身根本没有改变。因而咱们第一工夫回滚了 Spark 版本,再次尝试运行这个 ETL,果然能跑进去了。

问题揣测

  1. 是否是 Spark.scheduler.listenerbus.eventqueue.capacity 调大导致?

在本次降级中,咱们同时调大了这个参数,用于防止高并发时的 eventqueue 解决的事件过多导致事件被抛弃从而影响一些指标的统计。会不会是因为这个参数调大了,导致这个 Listener 须要解决的事件变多了,进而加大了内存上的压力呢?我又去查问了历史的日志,并没有因为事件过多呈现 dropped event 相干的日志,因而处理事件的并发之前也没达到下限,临时排除这个影响。

  1. 查看 Spark 3.2 代码逻辑上的变动。

用 Jprofile 关上 dump 文件,发现其中 SQLAppStatusListener 占用了大量内存。最大的对象是 stageMetric,寄存的是 Stage 相干的 metric 信息,寄存在 concurrentHashMap 里。

查看代码发现了一处比拟狐疑的改变:

SPARK-33016 Potential SQLMetrics missed which might cause WEB UI display issue while AQE is on

提交者说:

So decided to make a trade off of keeping more duplicate SQLMetrics without deleting them when AQE with newPlan updated.

会不会是寄存了额定的反复的 SQLMetrics 导致内存应用上涨呢?那就再尝试复现一下。

问题复现

如果是 ETL 逻辑有问题,那么咱们不须要数据应该也能复现这个问题。为了便于本地 debug,我导出了有问题的 ETL 的构造,结构了一份空数据,运行了一下问题没有复现,甚至在 Spark 3.2 上体现更好。

那么想来问题和数据也是无关的。在此之前,我在 Spark ui 上察看过出问题环境的 etl 运行,有几个比拟显著的特点:

  1. job 很多,而且有着依赖关系,也就是一些 job 是在前序 job 运行完之后才会提交。
  2. 最初运行的 job 有着十分多的 task,大略有上万的量级。
  3. etl 自身有着比拟多的 join 和 union 计算。

通过查询数据库中记录的 Spark 工作 metric 的统计,印证了我的察看。这个工作在 Spark 3.0.1 版本运行时会生成几百个 stage,上万个 task。在 Spark 3.2 中没运行胜利,因而没有精确的统计数据。

一开始我先轻易造了一些数据,通过减少数据量的形式试图复现问题,但始终无奈复现出异样的状况。回过头来思考,如何让 Spark 工作的体现和出问题的环境相似,次要从 stage 和 task 的数量上动手。stage 的数量和 shuffle 无关,要制作 shuffle 就须要用到宽依赖转换操作,例如在这个 etl 中就有很多的 groupBy。而 task 数量和 partition 数量无关,也就是说我须要制作更多的 partition。因而我针对 groupBy 的 key 将制作了很多不反复的数据,在限度 driver 内存的状况下一直减少数据量从而制作更多的 task,终于复现频繁 GC 的邻近 OOM 状态了。此时 dump 进去的堆信息也和真实情况相似,SQLAppStatusListener 占用了堆内存的大头。

且在 webui 上能看到比拟多的 task:

回归上文提到的代码改变,这里的改变是在 AQE 扭转执行打算的时候,本来是用新打算的 metric 代替老打算,当初变成了叠加。而在我敞开 AQE 后,driver 内存被占满的状况没有再呈现。那这可能阐明问题出在累加的 metrics?尝试批改代码将 ++= 改回 =,然而通过验证问题仍旧存在!

二次排查

首先须要弄清楚 stageMetrics 为什么占用了那么大的内存,为了搞清楚内存应用的上的差异,我决定应用控制变量法比拟新老版本的差别。于是我做了三件事:

  1. 正文掉清理 StageMetrics 的代码。
  2. 再运行一次 ETL 后将内存 dump 进去。
  3. 关上 eventLog 便于数据统计。

dump 进去的 SQLAppStatusListener 的大小大概是 800M(新版本)vs 200M(老版本)的程度。

对于这个对象的大小区别又有以下几个猜想:

  1. Stage 数量 [排除]
private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()

SQLAppStatusListener 中占大头的是 stageMetrics。stageMetrics 存储的是一个 stageId 和 LiveStageMetrics 的映射,也就是 stageMetrics 的 size 是由 stage 数量决定的。通过统计的 metric 信息看,问题 ETL,在新版本和老版本的 stage 数量差距不是特地大,老版本 270 个 stage,新版本 201 个 stage,比拟起来还是老版本多,阐明这里的问题不是 stage 数量的问题。

在存储的 LiveStageMetric 外面,有几种状况占用内存较多:

  1. taskMetrics[排除]

无论是新版本还是老版本,在 stageMetrics map 里占用最多内存的几条数据占大多数内存的是 taskMetrics:

taskMetrics 存储的是【累加器 id】和【长度为工作数的列表】的映射,如何确认外面有多少条数据呢?这个数据等于 Accumulator 的数量 * task 的数量:

accumUpdates
      .filter {acc => acc.update.isDefined && accumIdsToMetricType.contains(acc.id) }
      .foreach { acc =>
        // In a live application, accumulators have Long values, but when reading from event
        // logs, they have String values. For now, assume all accumulators are Long and convert
        // accordingly.
        val value = acc.update.get match {
          case s: String => s.toLong
          case l: Long => l
          case o => throw new IllegalArgumentException(s"Unexpected: $o")
        }

        val metricValues = taskMetrics.computeIfAbsent(acc.id, _ => new Array(numTasks))
        metricValues(taskIdx) = value

        if (SQLMetrics.metricNeedsMax(accumIdsToMetricType(acc.id))) {
          val maxMetricsTaskId = metricsIdToMaxTaskValue.computeIfAbsent(acc.id, _ => Array(value,
            taskId))

          if (value > maxMetricsTaskId.head) {maxMetricsTaskId(0) = value
            maxMetricsTaskId(1) = taskId
          }
        }
      }
    if (finished) {completedIndices += taskIdx}
  }

在 SparkListenerStageSubmitted 事件中有记录工作数:

依据 JobGroupId 筛选所有的 SparkListenerStageSubmitted 事件,而后统计其中的工作数,发现老版本的 Spark 产生的工作数更多,每个 stage 大多有 128 个工作,新版本则大多是 1 个,最初几个 stage 有上千个工作。


右边是老版本,左边是新版本

然而 Accumulator 的数量不好统计,因为 eventLog 中没有打进去。于是我批改了代码,将这个数据塞进 eventLog

打进去发现新老版本的 Accums Count 数数量差异也不大,而且老版本的数量还更多些,因而这一因素也被排除。

 统计了个几个 task 数量比拟多的 stage 中的 task 关联的 accums

Spark 3.0.1
Number of Tasks:3200 Number of Accums:4038
Number of Tasks:128  Number of Accums:5632

Spark 3.2.1
Number of Tasks:1152 Number of Accums:1023
Number of Tasks:1152 Number of Accums:1009
Number of Tasks:1664 Number of Accums:1031
Number of Tasks:2432 Number of Accums:1016
  1. accumIdsToMetricType[可疑]

在 Spark 3.2 中,还存在着大量 内存大多数被 accumIdsToMetricType 占用的 LiveStageMetric 对象,大小是 2-5M 左右,而 Spark 3.0.1 则大多是几百 K。

依据代码,可能使得 metrics 减少的事件只有两个,SparkListenerSQLAdaptiveExecutionUpdate 和 SparkListenerSQLAdaptiveSQLMetricUpdates。在 eventLog 中可能看到 SparkListenerSQLAdaptiveExecutionUpdate 触发的次数的确不是很多,然而每一次增加的量却不少。

应用 Spark 3.0.1 查看发现后果有很大区别:

区别在于:

  • 新版本 SparkListenerSQLAdaptiveExecutionUpdate 触发次数变多;
  • 新版本每个 SparkListenerSQLAdaptiveExecutionUpdate 蕴含的 metrics 变多。

联合 Spark 的 change log 能够推断出一点:Spark 新版本优化了 AQE 的实现,对于局部场景可能更加智能地动静生成新的执行打算,同时也减少了一些指标的统计。局部新生成的执行打算蕴含多层嵌套的树节点,因而也带来了大量的 Metrics 指标加大了内存的压力。

然而这两点咱们都无奈轻易改变,再看看还有什么不同。

依据内存 dump 和 event log 的统计,这个 case 新版本生成的 StageMetrics 的数量相差了靠近两倍,从 stageId 上也能看出新版本生成的最大 ID 序号(从 0 开始)也是两倍的关系,那么关注点就在不间断的这部分 ID 下面。在 eventLog 中对 stage Id 进行搜寻,发现有些 stage 在 SparkListenerJobStart 事件中存在,但没有触发 SparkListenerStageSubmitted 事件,想必这就是 skipped stage。那 skipped stage 对 StageMetric 的影响是什么呢?

StageMetric 初始化是在解决 SparkListenerJobStart 事件时,更新是在解决 onStageSubmitted 事件时:

    // Record the accumulator IDs and metric types for the stages of this job, so that the code
    // that keeps track of the metrics knows which accumulators to look at.
    val accumIdsAndType = exec.metrics.map {m => (m.accumulatorId, m.metricType) }.toMap
    if (accumIdsAndType.nonEmpty) {
      event.stageInfos.foreach { stage =>
        stageMetrics.put(stage.stageId, new LiveStageMetrics(stage.stageId, 0,
          stage.numTasks, accumIdsAndType))
      }
    }
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {if (!isSQLStage(event.stageInfo.stageId)) {return}

  // Reset the metrics tracking object for the new attempt.
  Option(stageMetrics.get(event.stageInfo.stageId)).foreach { stage =>
    if (stage.attemptId != event.stageInfo.attemptNumber) {
      stageMetrics.put(event.stageInfo.stageId,
        new LiveStageMetrics(event.stageInfo.stageId, event.stageInfo.attemptNumber,
          stage.numTasks, stage.accumIdsToMetricType))
    }
  }
}

也就是这里的差值会造成 StageMetric 中存储了许多 skipped 的 stage,从 ui 上看 skipped stages 大概是 completed stage 的两倍多。

如果咱们在 JobEnd 的时候把 skipped 的 stage 删除掉呢?

  override def onJobEnd(event: SparkListenerJobEnd): Unit = {liveExecutions.values().asScala.foreach { exec =>
      if (exec.jobs.contains(event.jobId)) {
        val result = event.jobResult match {
          case JobSucceeded => JobExecutionStatus.SUCCEEDED
          case _ => JobExecutionStatus.FAILED
        }
        exec.jobs = exec.jobs + (event.jobId -> result)
        exec.endEvents.incrementAndGet()
        update(exec)
        // 在 JobEnd 的时候筛选出 pendingStage,从 stageMetrics 中移除
        val finishedStages = Option.apply(pendingStages.remove(event.jobId))
        if (finishedStages.isDefined) {for (stageId <- finishedStages.get -- activeStages) {stageMetrics.remove(stageId)
          }
        }
      }
    }
  }

从后果上看,批改后 etl 可能顺利排查进去,批改前 JobEngine 处于频繁 fullgc 的状态。

从监控看,批改后老年代没有做 fullgc,批改前则比拟频繁:


批改前


批改后

排查论断

尽管定位到问题和 AQE 相干的优化无关,然而为了个别 CASE 放弃 AQE 带来的性能晋升必然也是不可取的。而通过缩小 skipped stage 相干数据的存储,尽管没能解决这个问题的 root cause,但也能肯定水平上可能减小内存的压力,属于是另辟蹊径的一种做法。在接下来的工夫里,咱们也将持续钻研 AQE 相干的代码逻辑,针对每一个 corner case 刨根问底,继续为企业级服务的稳定性保驾护航。

正文完
 0