乐趣区

关于机器学习:技术实践干货-初探大规模-GBDT-训练

本文作者: 字节,观远数据首席科学家。主导多个 AI 我的项目在世界 500 强的利用落地,屡次斩获智能批发方向 Hackathon 冠军。曾就任于微策略,阿里云,领有十多年的行业教训。

本文是此前评估在 Spark 上做大规模 GBDT 训练时写的一篇入门级教程与框架评估。目前市面上仿佛没有多少应用 Spark 来跑 GBDT 的分享,故分享进去看看是否有做过相似场景的同学能够一道交换。

背景

在服务一些客户做商业问题的机器学习建模时,咱们会碰到不少领有十分大量数据且对模型 pipeline 运行有肯定要求的状况。相比间接的单机 Python 建模,这类我的项目有一些难点:

1. 数据量大。 因为预测粒度较细,导致历史数据量十分微小。一些场景的 pilot 我的项目中曾经达到近千万级别的训练数据量,后续拓展到整个业务线,数据量会超过十亿甚至百亿行级别。

2. 整体流程运行工夫有肯定要求。 个别模型所依赖的上游数据会在中午开始通过一系列 ETL 工作从业务零碎导入到 Hive 数仓中,大概在凌晨 3 点后,各类预测所需数据会准备就绪。接下来运行整个取数,荡涤,特色,训练,预测,业务零碎对接全流程,须要在早上 8 点前实现并下发到业务零碎中,整体运行工夫必须管制在 5 小时以内。

3. 对监控运维等方面的高要求。 海量数据细粒度的预测,笼罩十分多业务人员的日常工作需要,因此也会受到更多的扫视与挑战。如何确保模型预测输入的稳定性,在业务反馈问题后又如何疾速定位排查,遇到数据不可用,服务器 down 机等突发状况,有什么样的备选计划确保整体流程的稳固运行,都是须要思考的问题。

从前两点来看,咱们之前习惯的单机 Pandas + lgb/xgb 建模思路曾经难以实用(除非搞台神威·太湖之光之类的机器),所以咱们须要引入目前大数据界的当红炸子鸡 — Spark 来帮助实现此类我的项目。

部署 Spark

要玩 Spark,第一步是部署。如果是本机测试运行,个别跑一个 pip install pyspark 就能把一个 local 节点跑起来了,十分的不便。如果想部署一个绝对残缺一点的 standalone 集群,能够参考以下步骤:

  1. 到 Spark 官方网站[1] 下载 Spark。过后最新的稳固版本是 2.4.5,下载 pre-built for Apache Hadoop 2.7 就好。
  2. 解压缩,做一些简略的配置文件配置。在解压开的 spark 目录下,进入到 conf 目录里,会看到一系列配置的 template。把须要自定义的配置 copy 一份,例如:cp spark-env.sh.template spark-env.sh,而后进行编辑。我改的一些配置具体如下:
spark-env.sh
# 配置 master 和 worker

SPARK_MASTER_HOST=0.0.0.0
SPARK_DAEMON_MEMORY=4g
SPARK_WORKER_CORES=6
SPARK_WORKER_MEMORY=36g
slaves
# 指定 slaves 机器的列表,这里就选了本机

localhost
spark-defaults.conf
# 这个文件很多教程都会让你改,说是 spark-submit 命令会默认从这里读取相干配置
# 但要留神咱们写的 PySpark 程序很多时候并不是通过 spark-submit 命令提交的,所以这里改了可能没用

spark.driver.memory 4g
  1. 启动集群。间接运行 sbin/start-all.sh  即可。或者也能够别离起 master 和 slave,运行./sbin/start-master.sh  和 ./sbin/start-slave.sh spark://127.0.0.1:7077 -c 6 -m 36G 即可。
  2. 进行集群。命令与下面十分相似,sbin/stop-all.sh,或者别离停 slave 和 master 都行。

这样就算部署完了!其中 spark master 会有一个监听 8080 端口的 web-ui,worker 会监听 8081,前面提交 application 就会有监听 4040 端口的治理界面,功能强大,用户友好度强。

跑第一个 PySpark 程序

间接上代码:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master('spark://127.0.0.1:7077')
         .appName('zijie')
         .getOrCreate())
df = spark.read.parquet('data/the_only_data_i_ever_wanted.parquet')
df.show()

咱们的大数据平台就跑起来了。

Spark 与 Pandas 的一些不同之处

在网上看一些 Spark 相干的介绍应该很快会有一些意识。有几个比拟显著的区别点我大抵列一下:

  1. Spark 里对 DataFrame 的操作大多是 lazy 的,也就是所谓的 transformation,只有多数的 action,例如 take, count, collect 等会实在进行计算返回后果。而 pandas 只有做了操作就会立即执行。
  2. Pandas 里对性能方面的关注次要是这个操作能不能利用底层的计算库做 vectorize,而在 Spark 里须要关注的点就太多了,可能比拟次要的是看怎么尽量减少 shuffle 这类宽依赖吧。当然还有什么数据歪斜等相干高级话题。
  3. 用 Spark 来做算法相干的利用时,要十分留神整体的计算逻辑(数据 lineage),对须要重复用到的数据集,肯定要记得 cache/persist/checkpoint 才行(这条不知是否过期了)。

从实际操作来看,在 PySpark 中其实有很多操作长得跟 Pandas 十分相似,比方咱们罕用的 df[df['date'] > '2020-01-01'] 之类的写法。当然区别也有不少,所以起初 Databricks 罗唆推出了一个 Koalas  的库来反对更平滑的切换。

Spark 特色工程

这里次要记录几个在我的项目过程中写的感觉比拟好玩的,并比照 pandas 的版本不便大家了解。

日期填充

pandas version:

# 对每家店每个 SKU 历史无销售状况进行填零解决
def fill_dates(df):
    new_df = []
    for store_id in df.store_id.unique():
        for sku in df.query('store_id == @store_id').sku.unique():
            tmp = pd.DataFrame()
            cond = (df.store_id == store_id) & (df.sku == sku)
            min_date = df.loc[cond, 'date'].min()
            max_date = df.loc[cond, 'date'].max()
            dates_in_between = daterange(min_date, max_date)
            tmp['date'] = dates_in_between
            tmp['sku'] = sku
            tmp['store_id'] = store_id
            new_df.append(tmp)
    new_df = pd.concat(new_df)
    new_df = new_df.merge(df, on=['date', 'sku', 'store_id'], how='left').fillna(0)
    return new_df

能够看到整体逻辑就是取所有 store, sku 的组合,而后找到每个组合最小最大的售卖日期,把两头的日期都填上。

PS: 这段代码应该效率不高,后续咱们又迭代了几个版本。

Spark version:

from pyspark.sql import functions as F

def fill_dates_spark(df):
    tmp = df.groupby(['store_id', 'sku']).agg(F.min('date').cast('date').alias('min_date'),
                                              F.max('date').cast('date').alias('max_date'))
    tmp = tmp.withColumn('date', F.explode(F.sequence('min_date', 'max_date'))).select(['date', 'store_id', 'sku'])
    new_df = tmp.join(df, ['date', 'store_id', 'sku'], 'left').fillna(0, subset=['y'])
    return new_df

用了 sequence+explode 操作,代码简洁很多。其中 sequence 会主动生成从 start 到 end 的序列(工夫,数字都反对),explode 操作间接把一行“炸开”成多行,省去了 join 操作,性能也更好。

Lag 特色

这个是咱们最罕用的一种特色了,在 pandas 里次要就是做循环 join:

def shift_daily_data(df, delay, shift_by='date', shift_value='y'):
    groupby_df = [x for x in df.columns if (x != shift_by) and (x != shift_value)]
    shift_df = df.copy()
    shift_df[shift_by] = shift_df[shift_by].apply(lambda x: x + relativedelta(days=delay))
    shift_df = shift_df.rename(columns={shift_value: '%s_%s_day_lag_%d' % ('_'.join(groupby_df), shift_value, delay)})
    return shift_df

def add_daily_shifts(df, days, categories, shift_by='date', shift_value='y'):
    merge_df = df.copy()
    for base_categories in categories:
        feat_cols = base_categories + [shift_by]
        base_df = df.groupby(feat_cols, as_index=False).agg({shift_value: sum})
        for i in days:
            delay_df = shift_daily_data(base_df, i, shift_by, shift_value)
            merge_df = pd.merge(left=merge_df, right=delay_df, how='left', on=feat_cols, sort=False).reset_index(drop=True).fillna(0)
            gc.collect()
    return merge_df

# 依照不同维度生成 lag 自回归时序特色
def add_lag_features(all_data_df, fcst_type):
    lag_days = list(range(1, 11)) + [14, 21, 28, 29, 30, 31]
    lag_days = [x for x in lag_days if x >= fcst_type]
    groupby_cats = [['sku'], ['store_id'], ['sku', 'store_id']]
    all_data_df = add_daily_shifts(all_data_df, lag_days, groupby_cats)
    return all_data_df

在迁徙到 Spark 时第一版我也采纳了相似的写法,不过发现性能比拟差,而且随着 lag 数的增多,join 次数也增多了,数据血缘关系会拉得十分长。

第二版咱们采纳了 window function 的写法:

from pyspark.sql import functions as F
from pyspark.sql import Window

def add_date_index(df, date_col, start_day='2016-01-01'):
    df = df.withColumn(f'{date_col}_index', F.datediff(date_col, F.lit(start_day)))
    return df

def add_shifts_by_window(df, days, group_by, order_by='date_index', shift_value='y'):
    # 取 lag 操作,其实就是要取一个工夫点往前一个工夫窗口中的值
    # 而后这个窗口要思考工夫程序,咱们就加上 orderBy,须要分门店分 sku,咱们就加上 partitionBy
    w = Window.orderBy(order_by).partitionBy(*group_by)
    new_col_prefix = f'{"_".join(group_by)}_{shift_value}_day_lag'
    # 再用 lag 函数取之前的值即可
    new_cols = [F.coalesce(F.lag(shift_value, i).over(w), F.lit(0)).alias(f'{new_col_prefix}_{i}') for i in days]
    df = df.select('*', *new_cols)
    return df

# 接下来次要就是调用了

def add_daily_shifts_by_categories(df, days, categories, shift_by='date', shift_value='y'):
    df = add_date_index(df, shift_by)
    shift_by = f'{shift_by}_index'
    cat_cols = ['store_id', 'sku']
    merge_df = add_shifts_by_window(df, max(days), cat_cols, shift_by, shift_value)
    for base_categories in categories:
        if len(base_categories) < len(cat_cols):
            # 先聚合,再增加 lag 特色
            feat_cols = base_categories + [shift_by]
            base_df = df.groupby(feat_cols).agg(F.sum(shift_value).alias(shift_value))
            join_df = add_shifts_by_window(base_df, max(days), base_categories, shift_by, shift_value)
            join_df = join_df.drop(shift_value)
            merge_df = merge_df.join(join_df, feat_cols, 'left').fillna(0)
    return merge_df

用这个办法的前提是,先要把日期填充做了,否则 window 中的数值可能是不间断的。过后也思考过不做填充可不可以?比方用 F.create_map 的办法创立出工夫点与值的 map:df = df.withColumn(‘m’, F.create_map(‘date_index’, ‘y’)),而后用相似的 collect_list  手法获取 window 中的多个 map,合并 map,而后按 lag 程序取 key,取不到的就填 0 即可。其中合并 map 须要用 udf,大抵如下:combineMap = udf(lambda maps: dict(ChainMap(*maps)), MapType(IntegerType(), DoubleType()))。

在试验中发现,这个 udf 应用过程中会报错,说 pandas udf 目前不反对在 window function 中应用,须要用 Spark 3.0 才行。所以临时用了以上的计划。实测下来发现,用上了 window function,建 lag 特色的工夫从 20 多分钟降到了 200 秒左右,而且不论建多少个 lag,工夫根本都是一样的,可扩展性很强!
从这个例子中也能够看到,window function 联合 Spark SQL 中带的各种办法十分弱小灵便。而到了 PySpark 这里,还有更加神奇的 pandas udf,光看官网示例[2] 就感觉操作性很强,感兴趣的同学能够到文末参考资料中点击查看。

类别编码

这个我的项目中咱们用的是相似 frequency encoding 的手法,Pandas 代码如下:

def y_rank_transform(df, col_name, orderby, ascending=True):
    sorted_df = df.groupby(col_name).agg({orderby: np.sum}).reset_index().sort_values(orderby, ascending=ascending)
    rank_map = {v: i for i, v in enumerate(sorted_df[col_name].values)}
    df[col_name] = df[col_name].map(rank_map)
    return df, rank_map

def convert_category_feats(full_df, category_features, orderby):
    # 依据 orderby 值的大小对 category_features 进行排序编码
    rank_maps = {}
    for c in category_features:
        if c in full_df:
            full_df, rank_map = y_rank_transform(full_df, c, orderby)
            rank_maps = rank_map
            gc.collect()
    return full_df, rank_maps

还是比拟好了解的。而后 Spark 里能够间接用 pyspark.ml.feature 里自带的一些实现来帮忙咱们做相似的事件:

from pyspark.ml.feature import StringIndexer
from pyspark.sql import functions as F

def convert_category_feats(full_df):
    cat_cols = get_category_cols()
    cat_cols = [x for x in cat_cols if x in full_df.columns]
    # 依据 orderby 值的大小对 category_features 进行排序编码
    for c in cat_cols:
        if c in full_df.columns:
            target_col = f'{c}_index'
            indexer = StringIndexer(inputCol=c, outputCol=target_col)
            model = indexer.fit(full_df)
            full_df = model.transform(full_df).withColumn(target_col, F.col(target_col).cast('int'))
    return full_df

所以有时候也能够没事浏览下规范库里的货色,说不定你想要的性能都曾经有现成实现的。

Spark 模型训练‍

构建完特色,就到了模型训练环节!特色构建之类,总体来说还是尽在把握的感觉,但十亿级数据量的训练,就感觉有点心里发虚了。这部分一开始的工作次要由学弟负责。学弟通过一番调研,最终锁定了一个名为 mmlspark 的库:

初识 mmlspark

之前咱们在不少场景用了 lgb,而这个 mmlspark 同是微软出品的框架,感觉应该稳了!

mmlspark 的装置问题

要尝试这个库,第一步必定就是装置了!这个库的装置比拟奇怪,没有提供 pypi/conda 安装包,官网上给出的用法是这样的:

import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1") \
            .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
            .getOrCreate()
import mmlspark

但真正跑起来的时候,碰到了一系列网络问题,两头尝试了良久的更换 maven/ivy2 源等,都没有很好的解决。最初咱们在 GCP 的服务器上跑了一下代码,顺利下好了所有的依赖,而后把 ~/.ivy2/cache  下的所有文件打包回来到本地缓存文件夹解压开。Work like a charm!

mmlspark 的 early_stopping 问题

代码跑起来后,没过多久,学弟就碰到了第二个艰难:

接口变了

上官网文档[3] 瞄了一眼,感觉不妙。个别文档写成这样的库,大概率应用的人很少。不过看反对的参数,比起 Spark ML 的 GBDT 还是丰盛很多的,基本上应该是继承了原生 lgb 的接口。在这里咱们的次要指标是通过 early_stopping 来做最根本的调参,这样能够保障模型运行时有比拟牢靠的体现。然而文档里基本没有提这个 early_stopping 应该怎么用,该怎么办呢?

遇到这种状况,个别就只能找 1)有没有他人的代码用了这个性能,2)源码里是怎么实现这个性能的。具体到这个问题,我就间接抉择在 github 的 mmlspark repo 里搜 earlyStoppingRound  这个参数:

代码搜寻是个好办法

而后通过调用门路做几层追踪,就会看到相干的实现:

early stopping 相干逻辑

所以要用 early_stopping,须要几个条件:

  1. 设置 validationIndicatorCol  参数
  2. 在训练数据中加一列上述参数指定的 column,以 true /false  来指定训练集和验证集
  3. 设置好 earlyStoppingRound  参数,须要大于 0
  4. 接下来就能够 fit model 了

通过相似的伎俩,咱们解决了一系列因为文档和示例不足导致的应用艰难,包含传入类别变量等。

mmlspark 训练数据要求

之前用原生 lgb 时,训练数据处理根本比较简单,间接用 lgb.dataset  从 pandas/numpy 数据集进行构建即可。不过 mmlspark 就很不一样了,居然要求传入 2 个 columns,一个叫 featuresCol,另一个叫 labelCol。总不至于只反对 1 个特色吧?
转念一想,lightgbm 的 Python API 分了 native 和 sklearn 两套,那 mmlspark 这个 API 应该同理是为了合乎 Spark ml 的规范。顺着这个思路,果然发现 Spark ml 都是这个套路,而后 Spark ml 库里也自带了一个类,叫 pyspark.ml.feature.VectorAssembler,间接用上就能把须要的 feature columns 转换成 Vector 类型的单个 column 了。代码相似:

def vectorize(df, feat_cols):
    assembler = VectorAssembler(inputCols=feat_cols, outputCol='features')
    df = assembler.transform(df)
    return df

mmlspark 训练卡死的问题 1

终于所有代码就绪,开始跑训练了!没想到刚开始就呈现了问题,训练启动后就始终没反馈,看 Spark 的工作也齐全没有进度,十分诡异。这个问题的排查绕了一些弯路,看了不少 mmlspark 的源码,尝试 callstack 的收集,strace 等,都没有现实的后果。最初还是从日志中发现了问题。

先截取一个酷炫的图,给大家看下在哪里看日志:

查看 Spark 工作日志

具体日志:

失常日志

下面这个是失常的日志,过后出错时发现 Spark executor 在启动 lgb 时报了一堆的谬误:

NetworkInit failed with exception on local port…Retrying NetworkInit with local port…

而后能够用相似的办法,在 git repo 里搜这些错误信息,看到底是从哪里报进去的:

搜寻到的谬误来源代码

再接着往上排查几层,看了下 lgb 分布式训练的一些文档阐明,就大抵明确问题出在哪了。总体的 mmlspark 训练过程其实就是把散布在各个机器上的数据转化为 lgb.dataset 模式,而后再各自起原生的 lightgbm 来训练。多节点训练时各个节点须要通过网络端口来进行同步,因而须要在启动时设定好大家各自的端口。而且 mmlspark 里用的是 mapPartitions 办法来做具体的训练,我是在单机上跑(当然只有一台机器上有多个 parition 都会有这个问题),所以就呈现多个 partition 启动 lightgbm 时监听的端口抵触问题。要解决的话也比较简单,只须要 repartition 数据到每台服务器启动一个 lgb task 即可。

此外对于相似此类 MPI 的计算 load,官网还提供了一个新的 barrier execution mode 来解决一系列相干问题:

Barrier Mode

如何获取 best_iteration?

刚解决完训练卡死的问题,立即又来了下一个问题。后面刚提到咱们用 early_stopping 来寻找一个适合的树的数量参数,不过在 mmlspark 中用完 early_stopping 后,发现没有办法能够获取到这个 best_iteration 到底是多少?

搜了半天文档和代码,都没发现暗藏性能,只好在 git 上提了一个 issue[4],那会儿没人理,直到一年后终于反对了。如果咱们要自力更生,怎么解决?

  1. mmlspark 的库里还是有不少办法,展示了如何调用原生 API,例如:
def getFeatureImportances(self, importance_type="split"):
    """Get the feature importances as a list.  The importance_type can be"split"or"gain"."""
    return list(self._java_obj.getFeatureImportances(importance_type))

这么简略吗?当然不是,还须要在 Scala 里转一层:

/**
* Calls into LightGBM to retrieve the feature importances.
* @param importanceType Can be "split" or "gain"
* @return The feature importance values as an array.
*/
def getFeatureImportances(importanceType: String): Array[Double] = {val importanceTypeNum = if (importanceType.toLowerCase.trim == "gain") 1 else 0
    if (boosterPtr == null) {LightGBMUtils.initializeNativeLibrary()
        boosterPtr = getBoosterPtrFromModelString(model)
    }
    val numFeaturesOut = lightgbmlib.new_intp()
    LightGBMUtils.validate(lightgbmlib.LGBM_BoosterGetNumFeature(boosterPtr, numFeaturesOut),
        "Booster NumFeature")
    val numFeatures = lightgbmlib.intp_value(numFeaturesOut)
    val featureImportances = lightgbmlib.new_doubleArray(numFeatures)
    LightGBMUtils.validate(lightgbmlib.LGBM_BoosterFeatureImportance(boosterPtr, -1, importanceTypeNum, featureImportances),
        "Booster FeatureImportance")
    (0 until numFeatures).map(lightgbmlib.doubleArray_getitem(featureImportances, _)).toArray
}

所以要实现比方原生的 get_current_iteration  办法,也得依照下面这个流程走一遍。

  1. 改代码还是太麻烦了,还须要思考后续怎么继续保护跟主线不一样的代码(当然也能够间接成为我的项目 contributor)。所以咱们还是另辟蹊径,通过已有的办法来绕过。

首先看到 mmlspark 实现了 saveNativeModel  办法,一看这个名字,应该会把模型存成 lgb native model。看了下代码应该没问题,就尝试存了一个。

接下来拿出咱们的原生 lightgbm,来 load 这个存好的模型。因为是 Spark 存模型,还须要思考分布式文件系统等问题,不过 mmlspark 也比拟暴力,间接用了 coalesce(1) 加 write text 的办法来存模型,所以最终必定就是一个文件啦!

读到原生模型后,取 current iteration 就大海捞针了!最初实现代码如下:

def get_native_lgb_model(file_path):
    txt_files = list(Path(file_path).glob('*.txt'))
    if len(txt_files) != 1:
        raise Exception('Aww...cannot read model file!')
    native_model = lgb.Booster(model_file=txt_files[0].as_posix())
    return native_model

def get_best_iteration(model, path_prefix='/share'):
    file_path = f'{path_prefix}/lgb_model'
    model.saveNativeModel(file_path)
    native_model = get_native_lgb_model(file_path)
    best_iteration = int(native_model.current_iteration() * 1.02)
    return best_iteration

性能优化

接下来通过了一阵惊涛骇浪的开发日子,咱们逐步实现了一些参数搜寻缓存,主动回测,与数据开发平台实现对接等性能,并逐步把训练数据量晋升到了一亿行。在这个阶段咱们次要的指标是评估当数据量增长时,整体的性能变动,机器资源占用变动如何,进而产出对机器资源需要的布局计划来。如果所需的机器数量过多,就须要做一系列的优化管制整体老本。

后面有提到咱们的整个数据获取,荡涤,构建特色,模型训练预测,业务零碎对接产出,必须在 5 个小时以内实现,这其中的每一个阶段要花的工夫都要做好优化工作,确保没有显著的瓶颈点。

首先,优化的前提是监控,在本地集群和开发平台,咱们都设计了相应的日志,用于抓取 pipeline 中每个阶段所须要破费的工夫。另外开发平台部署了 Prometheus 和 Grafana,本地集群咱们装备了 dstat, jstat, top 等脚本,次要用于监控 Spark, Python 相干过程的 cpu,内存应用状况,为整体的 capacity planning 做筹备。

接着在监控的根底上,咱们对各个 stage 做了相应的优化:

1. 数据获取。

  • 对于大数据量的表,咱们采取了增量同步数据的形式。
  • 对所有 Hive 表的查问,都尽量走 partition key。例如销量表,如果不走 partition key 的查问,哪怕获取近 3 天数据,跟全量拉取的速度并没有多少区别。获取 partition key 的信息,能够通过 describe <table_name>  语句进行查问。

2. 数据荡涤。 这块目前速度都比拟快。如果荡涤逻辑没有前后依赖,能够适当并发进行。

3. 特色构建。 在模型训练完之后,咱们能够获取到各个特色的重要度指标。咱们整体的预测流程中,大概会构建 45 个不同的模型,基于这 45 个模型返回的特色重要度信息,咱们制订了一个简略的特色筛选策略:

  • 记录每个模型中 feature importance top 10 的特色
  • 记录每个模型中 feature importance 为 0 的特色
  • 统计 b 中呈现特色的频次,在频次最高的特色中,排除在 a 中呈现过的特色,造成移除特色列表
  • 在特色构建阶段移除这些特色的生成操作
  • 后续特色的增加会思考一个准入规范,同时思考运行工夫损耗和精度晋升量
  • 也能够思考将一些不依赖最新数据的固定特色事后计算生成好,节约运行窗口的工夫

4. 模型训练。 以往的模型训练参数调优都次要以优化准确度为指标。在这个我的项目中,咱们还须要思考训练工夫的问题。在 lgb 模型中,有一些参数与训练工夫会有比拟大的相关性,例如:

  • learningRate : 越大训练所需的轮次越少
  • numLeaves : 越小则每棵树越简略,但理论可能须要的数量越多
  • maxBin : 越小则训练速度越快,但会损失精度
  • baggingFraction 和 baggingFreq : 训练采样率,采样之后训练数据少了,天然速度就快了,能够管制每多少轮从新采样一次
  • featureFraction : 特色采样率,原理相似上一条。留神有些状况下这个参数设置为 1 成果才比拟现实,一个典型的例子是 one hot encoding 后的数据,采样后可能导致类别信息的缺失

咱们采取了一个比较简单的做法来做训练速度的优化,在原先随机搜寻的根底上,除了记录模型的精度指标,咱们还会一并记录训练所破费的工夫。最初在做参数抉择时,能够灵便抉择能够承受的工夫消耗,在训练工夫小于这个要求的前提下,选取成果最优的参数。通过这一步优化,整体训练工夫缩短了一半左右,而且训练精度并没有降落。

  1. 上游零碎数据对接。这个目前改用 Spark 计算生成后,整体速度也十分快,根本在 1 分钟内实现,临时没有优化的必要。
  2. 整体内存应用的优化。Spark 读入数据时为了不失落精度,默认会用 bigint,double  等类型来存储数据,但在咱们这个利用场景中,int,float  类型就曾经足够。因而能够做一些类型转换,节约内存占用和保留文件的大小。

通过一系列的优化工作,基本上能够达到应用 5 台 16c/64g 机器实现十亿级模型训练预测按时产出的需要。

mmlspark 训练卡死的问题 2

当训练数据裁减到一亿规模时,咱们的 mmlspark 又呈现了一个奇怪的卡顿问题。在训练过程中,这一亿数据并不是进入一个对立的大模型来训练,而是会依据策略引擎的规定,散发到不同的模型做训练。后面有提到模型的数量大概有 40+ 个,这其中有些模型分到的数据量会比拟大,因此自身训练工夫就比较慢。但随着训练流程的逐渐进行,这个训练工夫变得越来长,直至 task 失去响应。所以咱们又启动了新一轮的排查流程。

系统资源查看

遇到卡顿,首先察看系统资源状况,例如 cpu, 内存,磁盘 io/ 空间,网络等。但运行过程中发现没有一个资源吃紧的状况,其中特地奇怪的是 cpu 使用率在 100%(机器是 8 核,正好用满一个 core),没有施展所有的性能。
察看模型失常训练时的状况,Spark 启动的 lgb 会根本把所有 cpu 资源打满,因而狐疑是在进入训练之前的某些环节无奈并行计算导致的问题。

JVM 排查工具染指

为了更好的追踪 jvm 外部状况,请出了 visualvm。这是我多年前工作时用的主力排查工具。为了用上这个工具,须要对 Spark 的配置做一些批改:

  1. 配置 ${SPARK_HOME}/conf/metrics.properties  文件,加上 jmx 相干的一些 sink
  2. 在启动工作时加上 jmx 相干的配置 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=22990,留神很多文章都说要加在 spark-defaults.conf  里,然而咱们间接运行 Python 程序并不会调用 spark-submit  命令。所以这些参数须要在程序内的 spark session 中指定:
metrics_conf = f"{spark_home}/conf/metrics.properties"
jmx_conf = "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false" \
           "-Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=22990"
spark = (SparkSession.builder
         .master('spark://127.0.0.1:7077')
         .appName('zijie')
         .config("spark.executor.memory", "36g")
         .config("spark.driver.memory", "6g")
         .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
         .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1")
         .config("spark.metrics.conf", metrics_conf)
         .config("spark.executor.extraJavaOptions", jmx_conf)
         .getOrCreate())

配置好之后,启动利用,就能在 visualvm  里增加 jmx 连贯做监控了。咱们获取到了卡顿时候的 cpu 应用状况截图如下:

单 CPU 开销

能够看出模型训练阶段,cpu 使用率都是在 80% 高低稳定,但模型训练的两头,总有一些只占用了 1 个 cpu 资源的时间段。而且这些 cpu 资源应用是黄色的失常工作线程,而不是垃圾回收。

接下来一个比拟天然的思路就是在这些 cpu 应用低谷去获取 thread dump,看零碎到底在忙什么。用  jstack 或者 visualvm  等工具都能够获取到。一个典型的 thread dump 如下所示:(截取了后面 10% 的内容)

2020-02-20 16:46:33
Full thread dump OpenJDK 64-Bit Server VM (25.242-b08 mixed mode):

"Barrier task timer for barrier() calls." - Thread t@2867
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <51997c78> (a java.util.TaskQueue)
        at java.lang.Object.wait(Object.java:502)
        at java.util.TimerThread.mainLoop(Timer.java:526)
        at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
        - None

"JMX server connection timeout 2854" - Thread t@2854
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <3be7c557> (a [I)
        at com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(ServerCommunicatorAdmin.java:168)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Connection(6)-10.0.50.59" - Thread t@2853
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        - locked <3e35bfb4> (a java.io.BufferedInputStream)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:555)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$37/13510931.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - locked <37829806> (a java.util.concurrent.ThreadPoolExecutor$Worker)

想起我当年作为新人来看 thread dump 时,情绪是如许的冲动!这里有个 WAITING,这里有个 locked,是不是找到问题了!但起初发现,其实都不是问题。如果你一开始看 thread dump 没有脉络,十分失常,一方面能够去搜寻一些这些 thread state 代表什么含意,另一方面也能够在程序失常运行时跑个 thread dump 看看,会发现其实也有很多 WAITING 和 lock。

在这个具体的问题里,出问题的 thread 次要是以下这个:

"Executor task launch worker for task 8327" - Thread t@2767
   java.lang.Thread.State: RUNNABLE
        at java.io.FileInputStream.readBytes(Native Method)
        at java.io.FileInputStream.read(FileInputStream.java:255)
        at org.apache.spark.network.util.LimitedInputStream.read(LimitedInputStream.java:99)
        at net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:269)
        at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:245)
        at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
        at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:591)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        - locked <34545dd4> (a java.io.BufferedInputStream)
        at java.io.DataInputStream.read(DataInputStream.java:149)
        at org.spark_project.guava.io.ByteStreams.read(ByteStreams.java:899)
        at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
        at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
        at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
        at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
        at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
        at com.microsoft.ml.spark.lightgbm.TrainUtils$.translate(TrainUtils.scala:229)
        at com.microsoft.ml.spark.lightgbm.TrainUtils$.trainLightGBM(TrainUtils.scala:385)
        at com.microsoft.ml.spark.lightgbm.LightGBMBase$$anonfun$6.apply(LightGBMBase.scala:145)
        at org.apache.spark.rdd.RDDBarrier$$anonfun$mapPartitions$1$$anonfun$apply$1.apply(RDDBarrier.scala:51)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - locked <3ddc0126> (a java.util.concurrent.ThreadPoolExecutor$Worker)

调用栈外面有很多 spark, mmlspark 的关键字,一看就是“本人人”。看到最上层,这个调用次要是在做文件 IO,那么问题来了,为什么这里 IO 不能并行利用多 CPU 呢?一个比拟可疑的点是 lz4 那块的调用。搜寻一番发现果然 lz4 压缩算法不是 splittable 的,这导致了在解决压缩文件时,必须把所有数据放在一起来运行(比方 MR 里就是 single mapper 了)。另外这里也跟咱们后面用了 1 个 partition 无关,Spark 在做 shuffle, broadcast 时都会用到 lz4 压缩[5],而后在解压缩阶段只有一个 partition 参加来做,就天然呈现了单 CPU 被打满的景象。

流程优化

仔细的同学可能会发现,下面咱们在找 mmlspark 中怎么做 early_stopping 的验证集时贴了一段 Spark 代码,其中有一个诡异的 broadcast 调用。这个 broadcast 被用来把 validation data 发送到各个数据分区去做验证集。咱们之前代码中选取了 30 天的数据来做 validation set,这就导致有些 broadcast 的数据会十分微小。Spark 的 broadcast 数据默认也会触发 compression,进一步加剧了这个问题。所以咱们有几个改良点:

  1. 思考在数据量较大的状况下,减小验证集的大小,以晋升整体性能。
  2. 参数搜寻的操作齐全能够从训练主流程中剥离,不占用线上运行工夫。
  3. 参数优化能够不必 early_stopping 模式,改用随机搜寻或贝叶斯优化等办法。

参数调优

除了流程优化,还能够借鉴一些参数优化的教训(玄学调参处处有)。这里次要参考了几篇文章:

  • Facebook 对于 Spark 性能调优的分享[6] 
  • Intel 对于压缩算法的分享[7]

通过一系列调整和试验,最终确定了一组设置:

spark.io.compression.lz4.blockSize="512k"
spark.serializer="org.apache.spark.serializer.KryoSerializer"
spark.kryoserializer.buffer.max="512m"
spark.shuffle.file.buffer="1m"

在解决卡顿问题的根底上,进一步把整体训练工夫从之前的 55 分钟缩短到了 25 分钟左右。改完参数后看到的 CPU 应用曲线就失常多了:

CPU 的充分利用

其它框架评估
看后面提到了这么多 mmlspark 的问题,咱们还始终保持应用,感觉肯定是真爱了!其实在整个应用过程中也调研过一些其它的框架和计划。

Spark ML
Spark 本人就带了机器学习相干的库,其中就有 GBDT 的实现。在我的项目推动过程中,咱们也尝试了 Spark ML 中的 GBDT 模型来进行训练。须要留神的是,应该应用 pyspark.ml.regression.GBTRegressor  这个类,而不是之前的 pyspark.mllib.tree.GradientBoostedTrees。实现起来还是十分顺利的,但实测下来发现性能十分的差,感觉用 Spark 来构建整个迭代式的算法流程,整体的效率不高。所以这个计划看起来不可行。

Native Lightgbm
Lightgbm 库本人也带了分布式训练的计划,具体能够参考官网文档[8]。

从反对的性能上和官网提供的性能报告上感觉成果十分优良。例如能够依据数据与特色的大小,抉择 feature/data/voting 三种不同的并行计划。官网给的例子里,15 亿行数据,60+ 特色,在多机上做 data parallel 训练,整体性能能够达到线性扩大的成果。

但有一个问题,lightgbm 自身并没有带数据散发的能力。官网上的例子能够看出用户须要自行做数据,配置,可执行文件的转换和散发,而后自行在多节点上启动训练任务。其它几点都还好说,能够用 pssh,pscp  之类的命令。但数据散发和转换就是一个比拟大的问题了。如果认真往下想,就会发现整体实现思路可能跟 mmlspark 目前的实现十分相似了。

所以总体看下来,如果要自行集成 native lightgbm 做分布式训练,可能会须要写一个相似 mmlspark 的库,工作量大,也没有太大必要。

Xgboost/Catboost
顺带考查了 lgb 的两个老竞争对手,看看他们的分布式计划如何。Catboost 齐全没有对分布式的反对,率先出局。Xgboost 里提到如果用 Spark 做数据处理,倡议应用 Xgboost4j-Spark。粗略看了下还挺不错的,起码文档比 mmlspark 好多了!不过美中不足的是这个库叫 4j,所以只有 Java/Scala 接口,木有 Python 反对,集成起来会有一些难度。

Dask
从 mmlspark 的思路登程,天然会想到其实也能够联合别的并行计算框架,例如 Dask。Xgboost, lightgbm 都有相干的库,用 Dask 来反对分布式训练。这个计划看起来有几个问题:

  1. 须要保护 Spark,Dask 两套框架,要么就把原先数据处理的逻辑再迁徙一次到 Dask。但大数据量的解决框架,总体来说 Spark 还是成熟不少,包含用户数量,工具链反对,可运维性等等。
  2. 数据散发依然是一个问题,如何把 Spark 中的分布式数据集转化为 Dask 能够读取到的模式,还须要尽量避免数据的替换,不好解决。
  3. 跟数据开发平台的联合会有点艰难,而且两者性能点上会有些重合。

不过几个 Dask 库里的实现形式还是值得一看的,提供了一些并发框架集成 Python 算法包的思路。

Angel
腾讯一个较为出名分布式机器学习库[9],基于 parameter server 架构实现了一系列算法,反对分布式大规模的训练。大抵看了一下这个库,有几个 concern:1. 同样短少 Python 接口,在 18 年的某个版本还有 PyAngel,起初就删掉了。2. 整个库的使用量,活跃度,都有点存疑,比方自 19 年 12 月以来根本没有 commit,很多 issue 无人回复。3. 部署相干的额定开销比拟多,相比 mmlspark 要简单不少。

所以论断还是不偏向应用,或者前面能够理解下 Angel 的实现形式,看看有没有借鉴意义。

TensorFlow
TF 外面也有 GBDT[10],可能很多人都不晓得,这个我还没试过,另外真的要用的话还得考查下 TensorFlow on Spark。当然益处是说不定还能试一些网络模型看看成果如何。

自行工作治理
在 Spark 实现特色构建后,就能够通过不同的策略,把须要分模型训练的数据别离存储到分布式文件系统中,而后利用一些多机工作治理的框架(例如 Ray,咱们的数据开发平台等),在不同的节点上别离取对应的数据进行训练。这个计划的益处是灵活性十分高,不再局限于 Spark 平台能反对的算法,能够跑任意咱们相熟的算法模块。但毛病就是工作治理,高可用,failover,可运维性等等方面都会有些 concern。

另外一个问题就是数据交换的额定开销。咱们在我的项目中也尝试了一下在单机做 lgb 训练,也就是在 Spark 特色构建完之后,通过 toPandas  调用把数据集转化为 pandas dataframe,而后再调用原生 Python lightgbm 库来做模型训练与预测。这个操作相比写入文件系统还少了磁盘 IO 的开销,然而整体测试下来用原生 lgb 训练整体工夫须要 41 分钟,而间接用 mmlspark 用雷同的配置只须要 27 分钟。假如咱们有 10 亿行数据,70 个特色,那么每次训练的数据量达到了 500GB 左右,这部分的开销还是十分可观的。H2O 有个产品叫 Sparkling Water,就实现了 internal/external 两种 backend,其中也提到了内外部解决的优劣和实用场景等。

总结来看,目前还是 mmlspark 计划更加适合。后续咱们也会继续关注相似框架,并比拟评估大规模的 GBDT 模型与深度学习模型的体现差别。

参考资料

[1] Spark 官方网站: https://spark.apache.org/downloads.html

[2] 官网示例: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

[3] 官网文档: https://mmlspark.blob.core.windows.net/docs/1.0.0-rc1/pyspark/mmlspark.lightgbm.html

[4] issue: https://github.com/Azure/mmlspark/issues/775

[5] lz4 压缩: https://spark.apache.org/docs…

[6] Facebook 对于 Spark 性能调优的分享: https://www.slideshare.net/databricks/tuning-apache-spark-for-largescale-workloads-gaoxiang-liu-and-sital-kedia

[7] Intel 对于压缩算法的分享: https://www.slideshare.net/databricks/best-practice-of-compressiondecompression-codes-in-apache-spark-with-sophia-sun-and-qi-xie

[8] 官网文档: https://lightgbm.readthedocs.io/en/latest/Parallel-Learning-Guide.html

[9] 分布式机器学习库: https://github.com/Angel-ML/sona

[10] TF 外面也有 GBDT: https://www.tensorflow.org/api_docs/python/tf/estimator/BoostedTreesRegressor

退出移动版