关于数据库:OpenMLDB-OneFlow-手把手教你快速链接特征工程到模型训练

本文整顿自 OpenMLDB Meetup No.5 中 OpenMLDB PMC 黄威的演讲,将以 京东高潜用户购买动向预测问题 为例,示范如何应用 OpenMLDB 和 OneFlow 联结来打造一个残缺的机器学习利用。分享视频如下:

导读:

如何从历史数据中找出法则,去预测用户将来的购买需要,让最合适的商品遇见最须要的人,是大数据利用在精准营销中的关键问题,也是所有电商平台在做智能化降级时所须要的核心技术。京东作为中国最大的自营式电商,积淀了数亿的忠诚用户,积攒了海量的实在数据。本案例以京东商城实在的用户、商品和行为数据(脱敏后)为根底,通过数据挖掘的技术和机器学习的算法,构建用户购买商品的预测模型,输入高潜用户和指标商品的匹配后果,为精准营销提供高质量的目标群体,开掘数据背地潜在的意义,为电商用户提供更简略、快捷、省心的购物体验。本案例应用 OpenMLDB 进行数据挖掘,应用 OneFlow 中的 DeepFM 模型进行高性能训练推理,提供精准的商品举荐。

本案例基于 OpenMLDB 集群版进行教程演示。留神,本文档应用的是预编译好的 docker 镜像。如果心愿在本人编译和搭建的 OpenMLDB 环境下进行测试,举荐针对 OpenMLDB 优化的 Spark 发行版。

(参考章节:https://openmldb.ai/docs/zh/m… )

环境筹备和准备常识

OneFlow 工具包装置

OneFlow 工具依赖 GPU 的弱小算力,所以请确保部署机器具备 Nvidia GPU,并且保障驱动版本 >=460.X.X 驱动版本需反对 CUDA 11.0。
应用一下指令装置 OneFlow:

conda activate oneflow
python3 -m pip install --pre oneflow -f https://staging.oneflow.info/branch/support_oneembedding_serving/cu102

还须要装置以下 Python 工具包:

pip install psutil petastorm pandas sklearn

拉取和启动 OpenMLDB Docker 镜像

留神,请确保 Docker Engine 版本号 >= 18.03

拉取 OpenMLDB docker 镜像,并且运行相应容器

映射demo文件夹至/root/project,这里咱们应用的门路为demodir=/home/gtest/demo

export demodir=/home/gtest/demo
docker run -dit --name=demo --network=host -v $demodir:/root/project 4pdosc/openmldb:0.5.2 bash
docker exec -it demo bash

上述镜像预装了OpenMLDB的工具等,咱们须要进一步装置Oneflow推理所需依赖。

因为咱们将在OpenMLDB的服务中嵌入OneFlow模型推理的预处理及调用,须要装置以下的依赖。

pip install tritonclient xxhash geventhttpclient

留神,本教程以下的 OpenMLDB 局部的演示命令默认均在该曾经启动的 docker 容器内运行。OneFlow 命令默认在 1.1 装置的 OneFlow 环境下运行。

初始化环境

./init.sh

咱们在镜像内提供了init.sh脚本帮忙用户疾速初始化环境,包含:

配置 zookeeper

启动集群版 OpenMLDB

启动 OpenMLDB CLI 客户端

/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client

留神,本教程大部分命令在 OpenMLDB CLI 下执行,为了跟一般 shell 环境做辨别,在 OpenMLDB CLI 下执行的命令均应用非凡的提示符 >

准备常识:集群版的非阻塞工作

集群版的局部命令是非阻塞工作,包含在线模式的 LOAD DATA,以及离线模式的 LOAD DATA ,SELECT,SELECT INTO 命令。提交工作当前能够应用相干的命令如 SHOW JOBS, SHOW JOB 来查看工作进度,详情参见离线工作治理文档。

机器学习训练流程

流程概览

应用OpenMLDB+OneFlow进行机器学习训练可总结为以下大抵步骤。

接下来会介绍每一个步骤的具体操作细节。

应用OpenMLDB进行离线特色抽取

创立数据库和数据表 ✦

以下命令均在 OpenMLDB CLI 环境下执行。

CREATE DATABASE JD_db;
USE JD_db;
CREATE TABLE action(reqId string, eventTime timestamp, ingestionTime timestamp, actionValue int);
CREATE TABLE flattenRequest(reqId string, eventTime timestamp, main_id string, pair_id string, user_id string, sku_id string, time bigint, split_id int, time1 string);
CREATE TABLE bo_user(ingestionTime timestamp, user_id string, age string, sex string, user_lv_cd string, user_reg_tm bigint);
CREATE TABLE bo_action(ingestionTime timestamp, pair_id string, time bigint, model_id string, type string, cate string, br string);
CREATE TABLE bo_product(ingestionTime timestamp, sku_id string, a1 string, a2 string, a3 string, cate string, br string);
CREATE TABLE bo_comment(ingestionTime timestamp, dt bigint, sku_id string, comment_num int, has_bad_comment string, bad_comment_rate float);

也可应用 sql 脚本(/root/project/create_tables.sql)运行:

/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /root/project/create_tables.sql

离线数据筹备 ✦

首先,切换到离线执行模式。接着,导入数据作为离线数据,用于离线特色计算。

以下命令均在 OpenMLDB CLI 下执行。

USE JD_db;
SET @@execute_mode=’offline’;
LOAD DATA INFILE ‘/root/project/data/JD_data/action/*.parquet’ INTO TABLE action options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/flattenRequest_clean/*.parquet’ INTO TABLE flattenRequest options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_user/*.parquet’ INTO TABLE bo_user options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_action/*.parquet’ INTO TABLE bo_action options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_product/*.parquet’ INTO TABLE bo_product options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_comment/*.parquet’ INTO TABLE bo_comment options(format=’parquet’, header=true, mode=’append’);

或应用脚本执行:

/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /root/project/load_data.sql

并通过以下命令疾速查问 jobs 状态:

echo "show jobs;" | /work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client

留神,集群版 LOAD DATA 为非阻塞工作,能够应用命令 SHOW JOBS 查看工作运行状态,请期待工作运行胜利( state 转至 FINISHED 状态),再进行下一步操作 。

特色设计 ✦

通常在设计特色前,用户须要依据机器学习的指标对数据进行剖析,而后依据剖析设计和调研特色。机器学习的数据分析和特色钻研不是本文探讨的领域,咱们将不作开展。本文假设用户具备机器学习的根本理论知识,有解决机器学习问题的能力,可能了解 SQL 语法,并可能应用 SQL 语法构建特色。针对本案例,用户通过剖析和调研设计了若干特色。
请留神,在理论的机器学习特色调研过程中,科学家对特色进行重复试验,寻求模型成果最好的特色集。所以会一直的反复屡次特色设计->离线特色抽取->模型训练过程,并一直调整特色以达到预期成果。

离线特色抽取 ✦

用户在离线模式下,进行特色抽取,并将特色后果输入到’/root/project/out/1’目录下保留(对应映射为$demodir/out/1),以供后续的模型训练。 SELECT 命令对应了基于上述特色设计所产生的 SQL 特色计算脚本。以下命令均在 OpenMLDB CLI 下执行。

USE JD_db;
select * from(select
    `reqId` as reqId_1,
    `eventTime` as flattenRequest_eventTime_original_0,
    `reqId` as flattenRequest_reqId_original_1,
    `pair_id` as flattenRequest_pair_id_original_24,
    `sku_id` as flattenRequest_sku_id_original_25,
    `user_id` as flattenRequest_user_id_original_26,
    distinct_count(`pair_id`) over flattenRequest_user_id_eventTime_0_10_ as flattenRequest_pair_id_window_unique_count_27,
    fz_top1_ratio(`pair_id`) over flattenRequest_user_id_eventTime_0_10_ as flattenRequest_pair_id_window_top1_ratio_28,
    fz_top1_ratio(`pair_id`) over flattenRequest_user_id_eventTime_0s_14d_200 as flattenRequest_pair_id_window_top1_ratio_29,
    distinct_count(`pair_id`) over flattenRequest_user_id_eventTime_0s_14d_200 as flattenRequest_pair_id_window_unique_count_32,
    case when !isnull(at(`pair_id`, 0)) over flattenRequest_user_id_eventTime_0_10_ then count_where(`pair_id`, `pair_id` = at(`pair_id`, 0)) over flattenRequest_user_id_eventTime_0_10_ else null end as flattenRequest_pair_id_window_count_35,
    dayofweek(timestamp(`eventTime`)) as flattenRequest_eventTime_dayofweek_41,
    case when 1 < dayofweek(timestamp(`eventTime`)) and dayofweek(timestamp(`eventTime`)) < 7 then 1 else 0 end as flattenRequest_eventTime_isweekday_43from
    `flattenRequest`
    window flattenRequest_user_id_eventTime_0_10_ as (partition by `user_id` order by `eventTime` rows between 10 preceding and 0 preceding),
    flattenRequest_user_id_eventTime_0s_14d_200 as (partition by `user_id` order by `eventTime` rows_range between 14d preceding and 0s preceding MAXSIZE 200))as out0last join(select
    `flattenRequest`.`reqId` as reqId_3,
    `action_reqId`.`actionValue` as action_actionValue_multi_direct_2,
    `bo_product_sku_id`.`a1` as bo_product_a1_multi_direct_3,
    `bo_product_sku_id`.`a2` as bo_product_a2_multi_direct_4,
    `bo_product_sku_id`.`a3` as bo_product_a3_multi_direct_5,
    `bo_product_sku_id`.`br` as bo_product_br_multi_direct_6,
    `bo_product_sku_id`.`cate` as bo_product_cate_multi_direct_7,
    `bo_product_sku_id`.`ingestionTime` as bo_product_ingestionTime_multi_direct_8,
    `bo_user_user_id`.`age` as bo_user_age_multi_direct_9,
    `bo_user_user_id`.`ingestionTime` as bo_user_ingestionTime_multi_direct_10,
    `bo_user_user_id`.`sex` as bo_user_sex_multi_direct_11,
    `bo_user_user_id`.`user_lv_cd` as bo_user_user_lv_cd_multi_direct_12from
    `flattenRequest`
    last join `action` as `action_reqId` on `flattenRequest`.`reqId` = `action_reqId`.`reqId`
    last join `bo_product` as `bo_product_sku_id` on `flattenRequest`.`sku_id` = `bo_product_sku_id`.`sku_id`
    last join `bo_user` as `bo_user_user_id` on `flattenRequest`.`user_id` = `bo_user_user_id`.`user_id`)as out1on out0.reqId_1 = out1.reqId_3last join(select
    `reqId` as reqId_14,
    max(`bad_comment_rate`) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_bad_comment_rate_multi_max_13,
    min(`bad_comment_rate`) over bo_comment_sku_id_ingestionTime_0_10_ as bo_comment_bad_comment_rate_multi_min_14,
    min(`bad_comment_rate`) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_bad_comment_rate_multi_min_15,
    distinct_count(`comment_num`) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_comment_num_multi_unique_count_22,
    distinct_count(`has_bad_comment`) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_has_bad_comment_multi_unique_count_23,
    fz_topn_frequency(`has_bad_comment`, 3) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_has_bad_comment_multi_top3frequency_30,
    fz_topn_frequency(`comment_num`, 3) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_comment_num_multi_top3frequency_33from
    (select `eventTime` as `ingestionTime`, bigint(0) as `dt`, `sku_id` as `sku_id`, int(0) as `comment_num`, '' as `has_bad_comment`, float(0) as `bad_comment_rate`, reqId from `flattenRequest`)
    window bo_comment_sku_id_ingestionTime_0s_64d_100 as (UNION (select `ingestionTime`, `dt`, `sku_id`, `comment_num`, `has_bad_comment`, `bad_comment_rate`, '' as reqId from `bo_comment`) partition by `sku_id` order by `ingestionTime` rows_range between 64d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
    bo_comment_sku_id_ingestionTime_0_10_ as (UNION (select `ingestionTime`, `dt`, `sku_id`, `comment_num`, `has_bad_comment`, `bad_comment_rate`, '' as reqId from `bo_comment`) partition by `sku_id` order by `ingestionTime` rows between 10 preceding and 0 preceding INSTANCE_NOT_IN_WINDOW))as out2on out0.reqId_1 = out2.reqId_14last join(select
    `reqId` as reqId_17,
    fz_topn_frequency(`br`, 3) over bo_action_pair_id_ingestionTime_0s_10h_100 as bo_action_br_multi_top3frequency_16,
    fz_topn_frequency(`cate`, 3) over bo_action_pair_id_ingestionTime_0s_10h_100 as bo_action_cate_multi_top3frequency_17,
    fz_topn_frequency(`model_id`, 3) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_model_id_multi_top3frequency_18,
    distinct_count(`model_id`) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_model_id_multi_unique_count_19,
    distinct_count(`model_id`) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_model_id_multi_unique_count_20,
    distinct_count(`type`) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_type_multi_unique_count_21,
    fz_topn_frequency(`type`, 3) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_type_multi_top3frequency_40,
    fz_topn_frequency(`type`, 3) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_type_multi_top3frequency_42from
    (select `eventTime` as `ingestionTime`, `pair_id` as `pair_id`, bigint(0) as `time`, '' as `model_id`, '' as `type`, '' as `cate`, '' as `br`, reqId from `flattenRequest`)
    window bo_action_pair_id_ingestionTime_0s_10h_100 as (UNION (select `ingestionTime`, `pair_id`, `time`, `model_id`, `type`, `cate`, `br`, '' as reqId from `bo_action`) partition by `pair_id` order by `ingestionTime` rows_range between 10h preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
    bo_action_pair_id_ingestionTime_0s_7d_100 as (UNION (select `ingestionTime`, `pair_id`, `time`, `model_id`, `type`, `cate`, `br`, '' as reqId from `bo_action`) partition by `pair_id` order by `ingestionTime` rows_range between 7d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
    bo_action_pair_id_ingestionTime_0s_14d_100 as (UNION (select `ingestionTime`, `pair_id`, `time`, `model_id`, `type`, `cate`, `br`, '' as reqId from `bo_action`) partition by `pair_id` order by `ingestionTime` rows_range between 14d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW))as out3on out0.reqId_1 = out3.reqId_17INTO OUTFILE '/root/project/out/1';

此处仅一个命令,能够应用阻塞式LOAD DATA,间接运行sql脚本sync_select_out.sql:

/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /root/project/sync_select_out.sql

留神,集群版 LOAD DATA 为非阻塞工作,能够应用命令 SHOW JOBS 查看工作运行状态,请期待工作运行胜利( state 转至 FINISHED 状态),再进行下一步操作 。

预处理数据集以配合 DeepFM 模型要求

留神,以下命令在 docker 外执行,应用装置了 1.1 所形容的 OneFlow 运行环境
依据 DeepFM 论文, 类别特色和间断特色都被当作稠密特色看待。

χ may include categorical fields (e.g., gender, location) and continuous fields (e.g., age). Each categorical field is represented as a vector of one-hot encoding, and each continuous field is represented as the value itself, or a vector of one-hot encoding after discretization.

进入demo文件夹,运行以下指令进行数据处理

cd $demodir/openmldb_process/
sh process_JD_out_full.sh $demodir/out/1
After generating parquet dataset, dataset information will also be printed. It contains the information about the number of samples and table size array, which is needed when training.

对应生成parquet数据集将生成在 $demodir/openmldb_process/out。数据信息将被打印如下,该信息将被输出为训练的配置文件。

train samples = 4007924
val samples = 504398
test samples = 530059
table size array:
11,42,1105,200,11,1295,1,1,5,3,23,23,7,5042381,3127923,5042381,3649642,28350,105180,7,2,5042381,5,4,4,41,2,2,8,3456,4,5,5042381,10,60,5042381,843,17,1276,101,100

启动 OneFlow 进行模型训练

留神,以下命令在装置 1.1 所形容的 OneFlow 运行环境中运行

批改对应 train-deepfm.sh 配置文件 ✦

cd $demodir/oneflow_process/
#!/bin/bash
DEVICE_NUM_PER_NODE=1
DATA_DIR=$demodir/openmldb_process/out
PERSISTENT_PATH=/$demodir/oneflow_process/persistent
MODEL_SAVE_DIR=$demodir/oneflow_process/model_out
MODEL_SERVING_PATH=$demodir/oneflow_process/model/embedding/1/model

python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes 1 \--node_rank 0 \
--master_addr 127.0.0.1 \
deepfm_train_eval_JD.py \
--disable_fusedmlp \
--data_dir $DATA_DIR \
--persistent_path $PERSISTENT_PATH \
--table_size_array "11,42,1105,200,11,1295,1,1,5,3,23,23,7,5042381,3127923,5042381,3649642,28350,105180,7,2,5042381,5,4,4,41,2,2,8,3456,4,5,5042381,10,60,5042381,843,17,1276,101,100" \
--store_type 'cached_host_mem' \
--cache_memory_budget_mb 1024 \
--batch_size 10000 \
--train_batches 75000 \
--loss_print_interval 100 \
--dnn "1000,1000,1000,1000,1000" \
--net_dropout 0.2 \
--learning_rate 0.001 \
--embedding_vec_size 16 \
--num_train_samples 4007924 \
--num_val_samples 504398 \
--num_test_samples 530059 \
--model_save_dir $MODEL_SAVE_DIR \
--save_best_model \
--save_graph_for_serving \
--model_serving_path $MODEL_SERVING_PATH \
--save_model_after_each_eval

开始模型训练 ✦

bash train_deepfm.sh
生成模型将寄存在$demodir/oneflow_process/model_out,用来 serving 的模型寄存在$demodir/oneflow_process/model/embedding/1/model

模型上线流程

流程概览

应用 OpenMLDB+OneFlow 进行模型 serving 可总结为以下大抵步骤。
接下来会介绍每一个步骤的具体操作细节。

配置 OpenMLDB 进行在线特色抽取

特色抽取 SQL 脚本上线 ✦

假设 2.4 节中所设计的特色在上一步的模型训练中产出的模型合乎预期,那么下一步就是将该特色抽取SQL脚本部署到线下来,以提供在线的特色抽取。

重新启动 OpenMLDB CLI,以进行 SQL 上线部署

docker exec -it demo bash
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client

执行上线部署,以下命令在 OpenMLDB CLI 内执行。

USE JD_db;
SET @@execute_mode=’online’;
deploy demo select * from(select
reqId as reqId_1,
eventTime as flattenRequest_eventTime_original_0,
reqId as flattenRequest_reqId_original_1,
pair_id as flattenRequest_pair_id_original_24,
sku_id as flattenRequest_sku_id_original_25,
user_id as flattenRequest_user_id_original_26,
distinct_count(pair_id) over flattenRequest_user_id_eventTime_0_10_ as flattenRequest_pair_id_window_unique_count_27,
fz_top1_ratio(pair_id) over flattenRequest_user_id_eventTime_0_10_ as flattenRequest_pair_id_window_top1_ratio_28,
fz_top1_ratio(pair_id) over flattenRequest_user_id_eventTime_0s_14d_200 as flattenRequest_pair_id_window_top1_ratio_29,
distinct_count(pair_id) over flattenRequest_user_id_eventTime_0s_14d_200 as flattenRequest_pair_id_window_unique_count_32,
case when !isnull(at(pair_id, 0)) over flattenRequest_user_id_eventTime_0_10_ then count_where(pair_id, pair_id = at(pair_id, 0)) over flattenRequest_user_id_eventTime_0_10_ else null end as flattenRequest_pair_id_window_count_35,
dayofweek(timestamp(eventTime)) as flattenRequest_eventTime_dayofweek_41,
case when 1 < dayofweek(timestamp(eventTime)) and dayofweek(timestamp(eventTime)) < 7 then 1 else 0 end as flattenRequest_eventTime_isweekday_43from
flattenRequest
window flattenRequest_user_id_eventTime_0_10_ as (partition by user_id order by eventTime rows between 10 preceding and 0 preceding),
flattenRequest_user_id_eventTime_0s_14d_200 as (partition by user_id order by eventTime rows_range between 14d preceding and 0s preceding MAXSIZE 200))as out0last join(select
flattenRequest.reqId as reqId_3,
action_reqId.actionValue as action_actionValue_multi_direct_2,
bo_product_sku_id.a1 as bo_product_a1_multi_direct_3,
bo_product_sku_id.a2 as bo_product_a2_multi_direct_4,
bo_product_sku_id.a3 as bo_product_a3_multi_direct_5,
bo_product_sku_id.br as bo_product_br_multi_direct_6,
bo_product_sku_id.cate as bo_product_cate_multi_direct_7,
bo_product_sku_id.ingestionTime as bo_product_ingestionTime_multi_direct_8,
bo_user_user_id.age as bo_user_age_multi_direct_9,
bo_user_user_id.ingestionTime as bo_user_ingestionTime_multi_direct_10,
bo_user_user_id.sex as bo_user_sex_multi_direct_11,
bo_user_user_id.user_lv_cd as bo_user_user_lv_cd_multi_direct_12from
flattenRequest
last join action as action_reqId on flattenRequest.reqId = action_reqId.reqId
last join bo_product as bo_product_sku_id on flattenRequest.sku_id = bo_product_sku_id.sku_id
last join bo_user as bo_user_user_id on flattenRequest.user_id = bo_user_user_id.user_id)as out1on out0.reqId_1 = out1.reqId_3last join(select
reqId as reqId_14,
max(bad_comment_rate) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_bad_comment_rate_multi_max_13,
min(bad_comment_rate) over bo_comment_sku_id_ingestionTime_0_10_ as bo_comment_bad_comment_rate_multi_min_14,
min(bad_comment_rate) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_bad_comment_rate_multi_min_15,
distinct_count(comment_num) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_comment_num_multi_unique_count_22,
distinct_count(has_bad_comment) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_has_bad_comment_multi_unique_count_23,
fz_topn_frequency(has_bad_comment, 3) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_has_bad_comment_multi_top3frequency_30,
fz_topn_frequency(comment_num, 3) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_comment_num_multi_top3frequency_33from
(select eventTime as ingestionTime, bigint(0) as dt, sku_id as sku_id, int(0) as comment_num, ” as has_bad_comment, float(0) as bad_comment_rate, reqId from flattenRequest)
window bo_comment_sku_id_ingestionTime_0s_64d_100 as (UNION (select ingestionTime, dt, sku_id, comment_num, has_bad_comment, bad_comment_rate, ” as reqId from bo_comment) partition by sku_id order by ingestionTime rows_range between 64d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
bo_comment_sku_id_ingestionTime_0_10_ as (UNION (select ingestionTime, dt, sku_id, comment_num, has_bad_comment, bad_comment_rate, ” as reqId from bo_comment) partition by sku_id order by ingestionTime rows between 10 preceding and 0 preceding INSTANCE_NOT_IN_WINDOW))as out2on out0.reqId_1 = out2.reqId_14last join(select
reqId as reqId_17,
fz_topn_frequency(br, 3) over bo_action_pair_id_ingestionTime_0s_10h_100 as bo_action_br_multi_top3frequency_16,
fz_topn_frequency(cate, 3) over bo_action_pair_id_ingestionTime_0s_10h_100 as bo_action_cate_multi_top3frequency_17,
fz_topn_frequency(model_id, 3) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_model_id_multi_top3frequency_18,
distinct_count(model_id) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_model_id_multi_unique_count_19,
distinct_count(model_id) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_model_id_multi_unique_count_20,
distinct_count(type) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_type_multi_unique_count_21,
fz_topn_frequency(type, 3) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_type_multi_top3frequency_40,
fz_topn_frequency(type, 3) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_type_multi_top3frequency_42from
(select eventTime as ingestionTime, pair_id as pair_id, bigint(0) as time, ” as model_id, ” as type, ” as cate, ” as br, reqId from flattenRequest)
window bo_action_pair_id_ingestionTime_0s_10h_100 as (UNION (select ingestionTime, pair_id, time, model_id, type, cate, br, ” as reqId from bo_action) partition by pair_id order by ingestionTime rows_range between 10h preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
bo_action_pair_id_ingestionTime_0s_7d_100 as (UNION (select ingestionTime, pair_id, time, model_id, type, cate, br, ” as reqId from bo_action) partition by pair_id order by ingestionTime rows_range between 7d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
bo_action_pair_id_ingestionTime_0s_14d_100 as (UNION (select ingestionTime, pair_id, time, model_id, type, cate, br, ” as reqId from bo_action) partition by pair_id order by ingestionTime rows_range between 14d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW))as out3on out0.reqId_1 = out3.reqId_17;

/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /root/project/deploy.sql

可应用如下命令确认 deploy 信息

show deployment demo;

在线数据筹备 ✦

首先,请切换到在线执行模式。接着在在线模式下,导入数据作为在线数据,用于在线特色计算。以下命令均在 OpenMLDB CLI 下执行。

USE JD_db;
SET @@execute_mode=’online’;
LOAD DATA INFILE ‘/root/project/data/JD_data/action/*.parquet’ INTO TABLE action options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/flattenRequest_remove_last/*.parquet’ INTO TABLE flattenRequest options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_user/*.parquet’ INTO TABLE bo_user options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_action/*.parquet’ INTO TABLE bo_action options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_product/*.parquet’ INTO TABLE bo_product options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_comment/*.parquet’ INTO TABLE bo_comment options(format=’parquet’, header=true, mode=’append’);

/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /root/project/load_online_data.sql
留神,集群版 `LOAD  DATA` 为非阻塞工作,能够应用命令 `SHOW  JOBS` 查看工作运行状态,请期待工作运行胜利( `state` 转至 `FINISHED` 状态),再进行下一步操作 。

配置 OneFlow 推理服务

Oneflow 的推理服务须要 One Embedding 的反对。该反对目前还没有合入主框架中。若须要从新编译,可参考附录 A 进行编译测试。接下来步骤默认相干反对已编译实现,并且寄存在/home/gtest/work/oneflow_serving/门路中。

**离线特色查看模型门路($demodir/oneflow_process/model)中模型文件及组织形式是否正确
✦**

3.3.1

$ tree  -L 3 model/
model/
└── embedding
    ├── 1
    │   └── model
    └── config.pbtxt

确认 config.pbtxt 中的配置正确 ✦

确认 persistent 门路($demodir/oneflow_process/persistent)正确 ✦

启动推理服务

启动 OneFlow 推理服务 ✦

留神,以下命令在装置 1.1 所形容的 OneFlow 运行环境中运行

应用一下命令启动 OneFlow 推理服务:

docker run --runtime=nvidia --rm --network=host \
  -v $demodir/oneflow_process/model:/models \
  -v /home/gtest/work/oneflow_serving/serving/build/libtriton_oneflow.so:/backends/oneflow/libtriton_oneflow.so \
  -v /home/gtest/work/oneflow_serving/oneflow/build/liboneflow_cpp/lib/:/mylib \
  -v $demodir/oneflow_process/persistent:/root/demo/persistent \
  registry.cn-beijing.aliyuncs.com/oneflow/triton-devel \
  bash -c 'LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/mylib /opt/tritonserver/bin/tritonserver \
  --model-repository=/models --backend-directory=/backends'

启动在线推理服务 demo ✦

留神,以下命令在 demo docker 中运行

OpenMLDB 的在线特色计算服务已通过 SQL 上线实现,OneFlow 推理服务也曾经启动。这个 demo 将串联两者,在收到实时申请后,拜访 OpenMLDB 进行特色抽取,再拜访 OneFlow 推理服务,进行在线推理,最初返回推理后果。

如果尚未退出 OpenMLDB CLI,请应用 quit 命令退出 OpenMLDB CLI。

在一般命令行下启动预估服务:

cd /root/project/serving/openmldb_serving
./start_predict_server.sh 0.0.0.0:9080

发送预估申请

预估申请可在 OpenMLDB 的容器外执行。容器内部拜访的具体信息可参见IP 配置。
在一般命令行下执行内置的 predict.py 脚本。该脚本发送一行申请数据到预估服务,接管返回的预估后果,并打印进去。

python predict.py

范例输入:

----------------ins---------------
['200080_5505_2016-03-15 20:43:04' '1458045784000'
'200080_5505_2016-03-15 20:43:04' '200080_5505' '5505' '200080' '1' '1.0'
'1.0' '1' '1' '3' '1' '200080_5505_2016-03-15 20:43:04' '0' '3' '1' '1'
'214' '8' '1603438960564' '-1' '1453824000000' '2' '1'
'200080_5505_2016-03-15 20:43:04' '0.02879999950528145' '0.0' '0.0' '2'
'2' '1,,NULL' '4,0,NULL' '200080_5505_2016-03-15 20:43:04' ',NULL,NULL'
',NULL,NULL' ',NULL,NULL' '1' '1' '1' ',NULL,NULL' ',NULL,NULL']
---------------predict change of purchase -------------
[[b'0.025186:0']]

附录– OneFlow定制代码编译

此章节介绍为 One Embedding 推理服务所定制的代码批改的编译过程。该代码会尽快合入 OneFlow 中,届时以下步骤可省略。

容器环境筹备

应用以下容器环境进行编译。该容器曾经装置编译所需的依赖等。

docker pull registry.cn-beijing.aliyuncs.com/oneflow/triton-devel:latest

启动容器并映射相干门路(此处可映射/home/work/gtest至/root/project)。接下来的操作均在容器内进行。

编译 OneFlow

cd /root/project
mkdir oneflow_serving && cd oneflow_serving
git clone -b support_oneembedding_serving --single-branch https://github.com/Oneflow-Inc/oneflow --depth=1 cd oneflow
mkdir build && cd build
cmake -C ..
/cmake/caches/cn/cuda.cmake \
-DBUILD_CPP_API=ON \
-DWITH_MLIR=ON \
-G Ninja \
-DBUILD_SHARED_LIBS=ON \
-DBUILD_HWLOC=OFF \
-DCMAKE_INTERPROCEDURAL_OPTIMIZATION=OFF \
-DCMAKE_EXE_LINKER_FLAGS_INIT="-fuse-ld=lld" \
-DCMAKE_MODULE_LINKER_FLAGS_INIT="-fuse-ld=lld" \
-DCMAKE_SHARED_LINKER_FLAGS_INIT="-fuse-ld=lld" ..
ninja

编译 Serving

git clone -b support_one_embedding --single-branchhttps://github.com/Oneflow-Inc/serving.git
cd serving
mkdir build && cd build
cmake -DCMAKE_PREFIX_PATH=/path/to/liboneflow_cpp/share -DTRITON_RELATED_REPO_TAG=r21.10 \
  -DTRITON_ENABLE_GPU=ON -G Ninja -DTHIRD_PARTY_MIRROR=aliyun ..
ninja

上述命令中的/path/to/liboneflow_cpp/share要替换成上边编译的oneflow的外面的门路,在{oneflow门路}/build/liboneflow_cpp/share

测试 TritonServer

复制 backend 库文件:

mkdir /root/project/oneflow_sering/backends && cd /root/project/oneflow_sering/backends
mkdir oneflow
cp /roor/prject/oneflow_serving/serving/build/libtriton_oneflow.so oneflow/.

在命令行启动 TritonServer 测试:

LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/root/project/oneflow_serving/oneflow/build/liboneflow_cpp/lib \
/opt/tritonserver/bin/tritonserver \
--model-repository=/root/project/oneflow_process/model \
--backend-directory=/root/project/oneflow_serving/backends

在另一个命令行运行如下指令,若胜利,输入示例如下:

python $demodir/serving/client.py

0.045439958572387695
(1, 1)
[[b’0.025343:0′]]

留神:

如果呈现libunwind.so.8未找到须要用-v /lib/x86_64-linux-gnu:/unwind_path 映射一下libunwind.so.8所在目录,而后增加到LD_LIBRARY_PATH外面: … bash -c ‘LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/mylib:/unwind_path …

如果呈现libcupti.so未找到须要用-v /usr/local/cuda-11.7/extras/CUPTI/lib64:/cupti_path 映射一下libcupti.so所在目录,而后增加到LD_LIBRARY_PATH外面: … bash -c ‘LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/mylib:/cupti_path …, 其中具体的cuda的门路按理论装置的地位,能够用ldd {oneflow门路}/build/liboneflow.so | grep cupti来找到

心愿本文可能帮大家疾速了解把握如何应用 OpenMLDB 和 OneFlow 联结来打造一个残缺的机器学习利用,链接特色工程到模型训练的全流程。

如果想进一步理解 OpenMLDB 或者参加社区技术交换,能够通过以下渠道取得相干信息和互动~

Github: https://github.com/4paradigm/…

官网:https://openmldb.ai/

Email: [email protected]

OpenMLDB 微信交换群:

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理