-- encoding=utf-8 --
import os
import time
import pickle
import numpy as np
import xgboost
import sklearn.metrics as metrics
from ray import tune
from ray.tune.suggest.bohb import TuneBOHB
from ray.tune.schedulers import HyperBandForBOHB
def get_auc_ks(scores, labels):
"""计算KS,AUC值:param scores: list-like, model scores;:param labels: list-like, labels;:return: tuple(float, float), auc & ks ;"""flg = Falseif isinstance(labels, xgboost.DMatrix): flg = True labels = labels.get_label()fpr, tpr, thresholds = metrics.roc_curve(labels, scores, pos_label=1)auc = metrics.auc(fpr, tpr)ks = np.max(np.abs(tpr - fpr))if flg: return [('my_auc', auc), ('KS', ks)]else: return auc, ks
def metric_ks(pred, dtrain):
"""ks metric:param estimator: 模型:param X: 特色:param y: label"""scores = predy = dtrain.get_label()fpr, tpr, thresholds = metrics.roc_curve(y, scores, pos_label=1)ks = np.max(np.abs(tpr - fpr))return 'ks', ks
def custom_metric(pred, dtrain):
labels = dtrain.get_label()scores = predfpr, tpr, thresholds = metrics.roc_curve(labels, scores, pos_label=1)auc = metrics.auc(fpr, tpr)ks = np.max(np.abs(tpr - fpr))return [('auc', auc), ('KS', ks)]
def objective_function(config, checkpoint_dir=None, path=None):
"""须要优化的指标函数:config: 优化对象,超参范畴:path: (训练集,OOT文件门路)"""train_path, oot_path = pathtrain_mat = xgboost.DMatrix(train_path)param = config.copy()param["max_depth"] = int(param["max_depth"])n_estimators = int(param.pop("n_estimators"))result = {}cv_results = xgboost.cv(param, dtrain=train_mat, num_boost_round=n_estimators, nfold=5, metrics='logloss', feval=custom_metric, maximize=True, callbacks=[record_evaluation(result, oot_path)])test_score = (result["detail_metrics"]["my_oot"]["auc"][-1], result["detail_metrics"]["my_oot"]["KS"][-1])valid_score = (result["detail_metrics"]["my_valid"]["auc"][-1], result["detail_metrics"]["my_valid"]["KS"][-1])train_score = (result["detail_metrics"]["my_train"]["auc"][-1], result["detail_metrics"]["my_train"]["KS"][-1])nfold = len(valid_score[0])monitor_metric = sum(valid_score[0]) / nfoldwith tune.checkpoint_dir(step=1) as checkpoint_dir: path = os.path.join(checkpoint_dir, "cv_result") with open(path, 'wb') as f: pickle.dump(cv_results, f)return tune.report(valid_auc=monitor_metric, test_score=test_score, valid_score=valid_score, train_score=train_score, done=True)
def record_evaluation(eval_result, oot_path):
"""callback记录xgboost.cv的指标后果[Skrill下载](https://www.gendan5.com/wallet/Skrill.html),蕴含train, valid, oot:eval_result: dict A dictionary to store the evaluation results.:oot_path: OOT Data file path"""if not isinstance(eval_result, dict): raise TypeError('eval_result has to be a dictionary')eval_result.clear() oot_mat = xgboost.DMatrix(oot_path)def init(env): """internal function""" for item in env.evaluation_result_list: k = item[0] pos = k.index('-') key = k[:pos] metric = k[pos + 1:] if key not in eval_result: eval_result[key] = {} if metric not in eval_result[key]: eval_result[key][metric] = [] if 'detail_metrics' not in eval_result: eval_result['detail_metrics'] = {"my_train": {}, "my_valid": {}, "my_oot": {}}def callback(env): """internal function""" if not eval_result: init(env) for item in env.evaluation_result_list: k, v = item[0], item[1] pos = k.index('-') key = k[:pos] metric = k[pos + 1:] eval_result[key][metric].append(v) tmp = {"my_train": {}, "my_valid": {}, "my_oot": {}} for cvpack in env.cvfolds: bst = cvpack.bst pred_train = bst.predict(cvpack.dtrain) pred_valid = bst.predict(cvpack.dtest) pred_oot = bst.predict(oot_mat) metrics_result_train = dict(custom_metric(pred_train, cvpack.dtrain)) metrics_result_valid = dict(custom_metric(pred_valid, cvpack.dtest)) metrics_result_oot = dict(custom_metric(pred_oot, oot_mat)) for k in metrics_result_oot: tmp["my_train"][k] = tmp["my_train"].get(k, [])+ [metrics_result_train[k]] tmp["my_valid"][k] = tmp["my_valid"].get(k, [])+ [metrics_result_valid[k]] tmp["my_oot"][k] = tmp["my_oot"].get(k, [])+ [metrics_result_oot[k]] for k1 in tmp: for k2 in tmp[k1]: eval_result["detail_metrics"][k1].setdefault(k2, []).append(tmp[k1][k2])return callback
def hyperopt(param_space, trainpath, testpath, num_eval, name, obj_funcs, log_path='~/ray_results'):
"""贝叶斯主动寻参数:param_space: 参数范畴,组合范畴:X_train: 训练集特色:y_train: 寻链接标签:X_test: 测试集特色:y_test: 测试集标签:num_eval: 寻参次数:log_path: log文件存储门路"""start = time.time()path = (trainpath, testpath)opt = TuneBOHB(max_concurrent=2)bohb = HyperBandForBOHB(time_attr="training_iteration", max_t=num_eval)analysis = tune.run(tune.with_parameters(obj_funcs, path=path), config=param_space, num_samples=num_eval, local_dir=log_path, metric='valid_auc', mode='max', search_alg=opt, scheduler=bohb, resources_per_trial={"cpu": 5}, name=name)best_params = analysis.get_best_config(metric="valid_auc", mode="max")best_params["max_depth"] = int(best_params["max_depth"])n_estimators = int(best_params.pop("n_estimators"))train_mat = xgboost.DMatrix(trainpath)test_mat = xgboost.DMatrix(testpath)model = xgboost.train(best_params, train_mat, n_estimators) pred_test = model.predict(test_mat)pred_train = model.predict(train_mat)print("-----Results-----")print("Best model & parameters: {}".format(best_params))print("Train Score: {}".format(get_auc_ks(pred_train, train_mat.get_label())))print("Test Score: {}".format(get_auc_ks(pred_test, test_mat.get_label())))print("Time elapsed: {}".format(time.time() - start))print("Parameter combinations evaluated: {}".format(num_eval))return None
if name == "__main__":
trainfile_path = "./train.buffer"testfile_path = "./oot.buffer"name = 'ppdnew_V2'control_overfitting = Falseparam = { 'booster': "gbtree", 'eta': tune.uniform(0.01, 1), 'seed': 1, 'max_depth': tune.uniform(3, 5), 'n_estimators': tune.uniform(50, 500), 'min_child_weight': tune.uniform(1, 300), 'colsample_bytree': tune.uniform(0.6, 1.0), 'subsample': tune.uniform(0.5, 1), 'lambda': tune.uniform(0.0, 100), 'alpha': tune.uniform(0.0, 100), 'scale_pos_weight': tune.uniform(1, 5), 'n_jobs': 5 }print("begin tuning")hyperopt(param, trainfile_path, testfile_path, 100, name, obj_funcs=objective_function)