本文整顿自 OpenMLDB Meetup No.5 中 OpenMLDB PMC 黄威的演讲,将以 京东高潜用户购买动向预测问题 为例,示范如何应用 OpenMLDB 和 OneFlow 联结来打造一个残缺的机器学习利用。分享视频如下:
导读:
如何从历史数据中找出法则,去预测用户将来的购买需要,让最合适的商品遇见最须要的人,是大数据利用在精准营销中的关键问题,也是所有电商平台在做智能化降级时所须要的核心技术。京东作为中国最大的自营式电商,积淀了数亿的忠诚用户,积攒了海量的实在数据。本案例以京东商城实在的用户、商品和行为数据(脱敏后)为根底,通过数据挖掘的技术和机器学习的算法,构建用户购买商品的预测模型,输入高潜用户和指标商品的匹配后果,为精准营销提供高质量的目标群体,开掘数据背地潜在的意义,为电商用户提供更简略、快捷、省心的购物体验。本案例应用 OpenMLDB 进行数据挖掘,应用 OneFlow 中的 DeepFM 模型进行高性能训练推理,提供精准的商品举荐。
本案例基于 OpenMLDB 集群版进行教程演示。留神,本文档应用的是预编译好的 docker 镜像。如果心愿在本人编译和搭建的 OpenMLDB 环境下进行测试,举荐针对 OpenMLDB 优化的 Spark 发行版。
(参考章节:https://openmldb.ai/docs/zh/m…)
环境筹备和准备常识
OneFlow 工具包装置
OneFlow 工具依赖 GPU 的弱小算力,所以请确保部署机器具备 Nvidia GPU,并且保障驱动版本 >=460.X.X 驱动版本需反对 CUDA 11.0。
应用一下指令装置 OneFlow:
conda activate oneflow
python3 -m pip install --pre oneflow -f https://staging.oneflow.info/branch/support_oneembedding_serving/cu102
还须要装置以下 Python 工具包:
pip install psutil petastorm pandas sklearn
拉取和启动 OpenMLDB Docker 镜像
留神,请确保 Docker Engine 版本号 >= 18.03
拉取 OpenMLDB docker 镜像,并且运行相应容器
映射 demo 文件夹至 /root/project,这里咱们应用的门路为 demodir=/home/gtest/demo
export demodir=/home/gtest/demo
docker run -dit --name=demo --network=host -v $demodir:/root/project 4pdosc/openmldb:0.5.2 bash
docker exec -it demo bash
上述镜像预装了 OpenMLDB 的工具等,咱们须要进一步装置 Oneflow 推理所需依赖。
因为咱们将在 OpenMLDB 的服务中嵌入 OneFlow 模型推理的预处理及调用,须要装置以下的依赖。
pip install tritonclient xxhash geventhttpclient
留神,本教程以下的 OpenMLDB 局部的演示命令默认均在该曾经启动的 docker 容器内运行。OneFlow 命令默认在 1.1 装置的 OneFlow 环境下运行。
初始化环境
./init.sh
咱们在镜像内提供了 init.sh 脚本帮忙用户疾速初始化环境,包含:
配置 zookeeper
启动集群版 OpenMLDB
启动 OpenMLDB CLI 客户端
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client
留神,本教程大部分命令在 OpenMLDB CLI 下执行,为了跟一般 shell 环境做辨别,在 OpenMLDB CLI 下执行的命令均应用非凡的提示符 >
。
准备常识:集群版的非阻塞工作
集群版的局部命令是非阻塞工作,包含在线模式的 LOAD DATA,以及离线模式的 LOAD DATA,SELECT,SELECT INTO 命令。提交工作当前能够应用相干的命令如 SHOW JOBS, SHOW JOB 来查看工作进度,详情参见离线工作治理文档。
机器学习训练流程
流程概览
应用 OpenMLDB+OneFlow 进行机器学习训练可总结为以下大抵步骤。
接下来会介绍每一个步骤的具体操作细节。
应用 OpenMLDB 进行离线特色抽取
创立数据库和数据表 ✦
以下命令均在 OpenMLDB CLI 环境下执行。
CREATE DATABASE JD_db;
USE JD_db;
CREATE TABLE action(reqId string, eventTime timestamp, ingestionTime timestamp, actionValue int);
CREATE TABLE flattenRequest(reqId string, eventTime timestamp, main_id string, pair_id string, user_id string, sku_id string, time bigint, split_id int, time1 string);
CREATE TABLE bo_user(ingestionTime timestamp, user_id string, age string, sex string, user_lv_cd string, user_reg_tm bigint);
CREATE TABLE bo_action(ingestionTime timestamp, pair_id string, time bigint, model_id string, type string, cate string, br string);
CREATE TABLE bo_product(ingestionTime timestamp, sku_id string, a1 string, a2 string, a3 string, cate string, br string);
CREATE TABLE bo_comment(ingestionTime timestamp, dt bigint, sku_id string, comment_num int, has_bad_comment string, bad_comment_rate float);
也可应用 sql 脚本 (/root/project/create_tables.sql) 运行:
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /root/project/create_tables.sql
离线数据筹备 ✦
首先,切换到离线执行模式。接着,导入数据作为离线数据,用于离线特色计算。
以下命令均在 OpenMLDB CLI 下执行。
USE JD_db;
SET @@execute_mode=’offline’;
LOAD DATA INFILE ‘/root/project/data/JD_data/action/*.parquet’ INTO TABLE action options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/flattenRequest_clean/*.parquet’ INTO TABLE flattenRequest options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_user/*.parquet’ INTO TABLE bo_user options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_action/*.parquet’ INTO TABLE bo_action options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_product/*.parquet’ INTO TABLE bo_product options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_comment/*.parquet’ INTO TABLE bo_comment options(format=’parquet’, header=true, mode=’append’);
或应用脚本执行:
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /root/project/load_data.sql
并通过以下命令疾速查问 jobs 状态:
echo "show jobs;" | /work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client
留神,集群版 LOAD DATA
为非阻塞工作,能够应用命令 SHOW JOBS
查看工作运行状态,请期待工作运行胜利(state
转至 FINISHED
状态),再进行下一步操作。
特色设计 ✦
通常在设计特色前,用户须要依据机器学习的指标对数据进行剖析,而后依据剖析设计和调研特色。机器学习的数据分析和特色钻研不是本文探讨的领域,咱们将不作开展。本文假设用户具备机器学习的根本理论知识,有解决机器学习问题的能力,可能了解 SQL 语法,并可能应用 SQL 语法构建特色。针对本案例,用户通过剖析和调研设计了若干特色。
请留神,在理论的机器学习特色调研过程中,科学家对特色进行重复试验,寻求模型成果最好的特色集。所以会一直的反复屡次特色设计 -> 离线特色抽取 -> 模型训练过程,并一直调整特色以达到预期成果。
离线特色抽取 ✦
用户在离线模式下,进行特色抽取,并将特色后果输入到 ’/root/project/out/1’ 目录下保留(对应映射为 $demodir/out/1),以供后续的模型训练。SELECT 命令对应了基于上述特色设计所产生的 SQL 特色计算脚本。以下命令均在 OpenMLDB CLI 下执行。
USE JD_db;
select * from(select
`reqId` as reqId_1,
`eventTime` as flattenRequest_eventTime_original_0,
`reqId` as flattenRequest_reqId_original_1,
`pair_id` as flattenRequest_pair_id_original_24,
`sku_id` as flattenRequest_sku_id_original_25,
`user_id` as flattenRequest_user_id_original_26,
distinct_count(`pair_id`) over flattenRequest_user_id_eventTime_0_10_ as flattenRequest_pair_id_window_unique_count_27,
fz_top1_ratio(`pair_id`) over flattenRequest_user_id_eventTime_0_10_ as flattenRequest_pair_id_window_top1_ratio_28,
fz_top1_ratio(`pair_id`) over flattenRequest_user_id_eventTime_0s_14d_200 as flattenRequest_pair_id_window_top1_ratio_29,
distinct_count(`pair_id`) over flattenRequest_user_id_eventTime_0s_14d_200 as flattenRequest_pair_id_window_unique_count_32,
case when !isnull(at(`pair_id`, 0)) over flattenRequest_user_id_eventTime_0_10_ then count_where(`pair_id`, `pair_id` = at(`pair_id`, 0)) over flattenRequest_user_id_eventTime_0_10_ else null end as flattenRequest_pair_id_window_count_35,
dayofweek(timestamp(`eventTime`)) as flattenRequest_eventTime_dayofweek_41,
case when 1 < dayofweek(timestamp(`eventTime`)) and dayofweek(timestamp(`eventTime`)) < 7 then 1 else 0 end as flattenRequest_eventTime_isweekday_43from
`flattenRequest`
window flattenRequest_user_id_eventTime_0_10_ as (partition by `user_id` order by `eventTime` rows between 10 preceding and 0 preceding),
flattenRequest_user_id_eventTime_0s_14d_200 as (partition by `user_id` order by `eventTime` rows_range between 14d preceding and 0s preceding MAXSIZE 200))as out0last join(select
`flattenRequest`.`reqId` as reqId_3,
`action_reqId`.`actionValue` as action_actionValue_multi_direct_2,
`bo_product_sku_id`.`a1` as bo_product_a1_multi_direct_3,
`bo_product_sku_id`.`a2` as bo_product_a2_multi_direct_4,
`bo_product_sku_id`.`a3` as bo_product_a3_multi_direct_5,
`bo_product_sku_id`.`br` as bo_product_br_multi_direct_6,
`bo_product_sku_id`.`cate` as bo_product_cate_multi_direct_7,
`bo_product_sku_id`.`ingestionTime` as bo_product_ingestionTime_multi_direct_8,
`bo_user_user_id`.`age` as bo_user_age_multi_direct_9,
`bo_user_user_id`.`ingestionTime` as bo_user_ingestionTime_multi_direct_10,
`bo_user_user_id`.`sex` as bo_user_sex_multi_direct_11,
`bo_user_user_id`.`user_lv_cd` as bo_user_user_lv_cd_multi_direct_12from
`flattenRequest`
last join `action` as `action_reqId` on `flattenRequest`.`reqId` = `action_reqId`.`reqId`
last join `bo_product` as `bo_product_sku_id` on `flattenRequest`.`sku_id` = `bo_product_sku_id`.`sku_id`
last join `bo_user` as `bo_user_user_id` on `flattenRequest`.`user_id` = `bo_user_user_id`.`user_id`)as out1on out0.reqId_1 = out1.reqId_3last join(select
`reqId` as reqId_14,
max(`bad_comment_rate`) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_bad_comment_rate_multi_max_13,
min(`bad_comment_rate`) over bo_comment_sku_id_ingestionTime_0_10_ as bo_comment_bad_comment_rate_multi_min_14,
min(`bad_comment_rate`) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_bad_comment_rate_multi_min_15,
distinct_count(`comment_num`) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_comment_num_multi_unique_count_22,
distinct_count(`has_bad_comment`) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_has_bad_comment_multi_unique_count_23,
fz_topn_frequency(`has_bad_comment`, 3) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_has_bad_comment_multi_top3frequency_30,
fz_topn_frequency(`comment_num`, 3) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_comment_num_multi_top3frequency_33from
(select `eventTime` as `ingestionTime`, bigint(0) as `dt`, `sku_id` as `sku_id`, int(0) as `comment_num`, '' as `has_bad_comment`, float(0) as `bad_comment_rate`, reqId from `flattenRequest`)
window bo_comment_sku_id_ingestionTime_0s_64d_100 as (UNION (select `ingestionTime`, `dt`, `sku_id`, `comment_num`, `has_bad_comment`, `bad_comment_rate`, '' as reqId from `bo_comment`) partition by `sku_id` order by `ingestionTime` rows_range between 64d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
bo_comment_sku_id_ingestionTime_0_10_ as (UNION (select `ingestionTime`, `dt`, `sku_id`, `comment_num`, `has_bad_comment`, `bad_comment_rate`, '' as reqId from `bo_comment`) partition by `sku_id` order by `ingestionTime` rows between 10 preceding and 0 preceding INSTANCE_NOT_IN_WINDOW))as out2on out0.reqId_1 = out2.reqId_14last join(select
`reqId` as reqId_17,
fz_topn_frequency(`br`, 3) over bo_action_pair_id_ingestionTime_0s_10h_100 as bo_action_br_multi_top3frequency_16,
fz_topn_frequency(`cate`, 3) over bo_action_pair_id_ingestionTime_0s_10h_100 as bo_action_cate_multi_top3frequency_17,
fz_topn_frequency(`model_id`, 3) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_model_id_multi_top3frequency_18,
distinct_count(`model_id`) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_model_id_multi_unique_count_19,
distinct_count(`model_id`) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_model_id_multi_unique_count_20,
distinct_count(`type`) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_type_multi_unique_count_21,
fz_topn_frequency(`type`, 3) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_type_multi_top3frequency_40,
fz_topn_frequency(`type`, 3) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_type_multi_top3frequency_42from
(select `eventTime` as `ingestionTime`, `pair_id` as `pair_id`, bigint(0) as `time`, ''as `model_id`,'' as `type`, ''as `cate`,'' as `br`, reqId from `flattenRequest`)
window bo_action_pair_id_ingestionTime_0s_10h_100 as (UNION (select `ingestionTime`, `pair_id`, `time`, `model_id`, `type`, `cate`, `br`, '' as reqId from `bo_action`) partition by `pair_id` order by `ingestionTime` rows_range between 10h preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
bo_action_pair_id_ingestionTime_0s_7d_100 as (UNION (select `ingestionTime`, `pair_id`, `time`, `model_id`, `type`, `cate`, `br`, '' as reqId from `bo_action`) partition by `pair_id` order by `ingestionTime` rows_range between 7d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
bo_action_pair_id_ingestionTime_0s_14d_100 as (UNION (select `ingestionTime`, `pair_id`, `time`, `model_id`, `type`, `cate`, `br`, ''as reqId from `bo_action`) partition by `pair_id` order by `ingestionTime` rows_range between 14d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW))as out3on out0.reqId_1 = out3.reqId_17INTO OUTFILE'/root/project/out/1';
此处仅一个命令,能够应用阻塞式 LOAD DATA,间接运行 sql 脚本 sync_select_out.sql:
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /root/project/sync_select_out.sql
留神,集群版 LOAD DATA
为非阻塞工作,能够应用命令 SHOW JOBS
查看工作运行状态,请期待工作运行胜利(state
转至 FINISHED
状态),再进行下一步操作。
预处理数据集以配合 DeepFM 模型要求
留神,以下命令在 docker 外执行,应用装置了 1.1 所形容的 OneFlow 运行环境
依据 DeepFM 论文, 类别特色和间断特色都被当作稠密特色看待。
χ may include categorical fields (e.g., gender, location) and continuous fields (e.g., age). Each categorical field is represented as a vector of one-hot encoding, and each continuous field is represented as the value itself, or a vector of one-hot encoding after discretization.
进入 demo 文件夹,运行以下指令进行数据处理
cd $demodir/openmldb_process/
sh process_JD_out_full.sh $demodir/out/1
After generating parquet dataset, dataset information will also be printed. It contains the information about the number of samples and table size array, which is needed when training.
对应生成 parquet 数据集将生成在 $demodir/openmldb_process/out。数据信息将被打印如下,该信息将被输出为训练的配置文件。
train samples = 4007924
val samples = 504398
test samples = 530059
table size array:
11,42,1105,200,11,1295,1,1,5,3,23,23,7,5042381,3127923,5042381,3649642,28350,105180,7,2,5042381,5,4,4,41,2,2,8,3456,4,5,5042381,10,60,5042381,843,17,1276,101,100
启动 OneFlow 进行模型训练
留神,以下命令在装置 1.1 所形容的 OneFlow 运行环境中运行
批改对应 train-deepfm.sh 配置文件 ✦
cd $demodir/oneflow_process/
#!/bin/bash
DEVICE_NUM_PER_NODE=1
DATA_DIR=$demodir/openmldb_process/out
PERSISTENT_PATH=/$demodir/oneflow_process/persistent
MODEL_SAVE_DIR=$demodir/oneflow_process/model_out
MODEL_SERVING_PATH=$demodir/oneflow_process/model/embedding/1/model
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes 1 \--node_rank 0 \
--master_addr 127.0.0.1 \
deepfm_train_eval_JD.py \
--disable_fusedmlp \
--data_dir $DATA_DIR \
--persistent_path $PERSISTENT_PATH \
--table_size_array "11,42,1105,200,11,1295,1,1,5,3,23,23,7,5042381,3127923,5042381,3649642,28350,105180,7,2,5042381,5,4,4,41,2,2,8,3456,4,5,5042381,10,60,5042381,843,17,1276,101,100" \
--store_type 'cached_host_mem' \
--cache_memory_budget_mb 1024 \
--batch_size 10000 \
--train_batches 75000 \
--loss_print_interval 100 \
--dnn "1000,1000,1000,1000,1000" \
--net_dropout 0.2 \
--learning_rate 0.001 \
--embedding_vec_size 16 \
--num_train_samples 4007924 \
--num_val_samples 504398 \
--num_test_samples 530059 \
--model_save_dir $MODEL_SAVE_DIR \
--save_best_model \
--save_graph_for_serving \
--model_serving_path $MODEL_SERVING_PATH \
--save_model_after_each_eval
开始模型训练 ✦
bash train_deepfm.sh
生成模型将寄存在 $demodir/oneflow_process/model_out,用来 serving 的模型寄存在 $demodir/oneflow_process/model/embedding/1/model
模型上线流程
流程概览
应用 OpenMLDB+OneFlow 进行模型 serving 可总结为以下大抵步骤。
接下来会介绍每一个步骤的具体操作细节。
配置 OpenMLDB 进行在线特色抽取
特色抽取 SQL 脚本上线 ✦
假设 2.4 节中所设计的特色在上一步的模型训练中产出的模型合乎预期,那么下一步就是将该特色抽取 SQL 脚本部署到线下来,以提供在线的特色抽取。
重新启动 OpenMLDB CLI,以进行 SQL 上线部署
docker exec -it demo bash
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client
执行上线部署,以下命令在 OpenMLDB CLI 内执行。
USE JD_db;
SET @@execute_mode=’online’;
deploy demo select * from(selectreqId
as reqId_1,eventTime
as flattenRequest_eventTime_original_0,reqId
as flattenRequest_reqId_original_1,pair_id
as flattenRequest_pair_id_original_24,sku_id
as flattenRequest_sku_id_original_25,user_id
as flattenRequest_user_id_original_26,
distinct_count(pair_id
) over flattenRequest_user_id_eventTime_0_10_ as flattenRequest_pair_id_window_unique_count_27,
fz_top1_ratio(pair_id
) over flattenRequest_user_id_eventTime_0_10_ as flattenRequest_pair_id_window_top1_ratio_28,
fz_top1_ratio(pair_id
) over flattenRequest_user_id_eventTime_0s_14d_200 as flattenRequest_pair_id_window_top1_ratio_29,
distinct_count(pair_id
) over flattenRequest_user_id_eventTime_0s_14d_200 as flattenRequest_pair_id_window_unique_count_32,
case when !isnull(at(pair_id
, 0)) over flattenRequest_user_id_eventTime_0_10_ then count_where(pair_id
,pair_id
= at(pair_id
, 0)) over flattenRequest_user_id_eventTime_0_10_ else null end as flattenRequest_pair_id_window_count_35,
dayofweek(timestamp(eventTime
)) as flattenRequest_eventTime_dayofweek_41,
case when 1 < dayofweek(timestamp(eventTime
)) and dayofweek(timestamp(eventTime
)) < 7 then 1 else 0 end as flattenRequest_eventTime_isweekday_43fromflattenRequest
window flattenRequest_user_id_eventTime_0_10_ as (partition byuser_id
order byeventTime
rows between 10 preceding and 0 preceding),
flattenRequest_user_id_eventTime_0s_14d_200 as (partition byuser_id
order byeventTime
rows_range between 14d preceding and 0s preceding MAXSIZE 200))as out0last join(selectflattenRequest
.reqId
as reqId_3,action_reqId
.actionValue
as action_actionValue_multi_direct_2,bo_product_sku_id
.a1
as bo_product_a1_multi_direct_3,bo_product_sku_id
.a2
as bo_product_a2_multi_direct_4,bo_product_sku_id
.a3
as bo_product_a3_multi_direct_5,bo_product_sku_id
.br
as bo_product_br_multi_direct_6,bo_product_sku_id
.cate
as bo_product_cate_multi_direct_7,bo_product_sku_id
.ingestionTime
as bo_product_ingestionTime_multi_direct_8,bo_user_user_id
.age
as bo_user_age_multi_direct_9,bo_user_user_id
.ingestionTime
as bo_user_ingestionTime_multi_direct_10,bo_user_user_id
.sex
as bo_user_sex_multi_direct_11,bo_user_user_id
.user_lv_cd
as bo_user_user_lv_cd_multi_direct_12fromflattenRequest
last joinaction
asaction_reqId
onflattenRequest
.reqId
=action_reqId
.reqId
last joinbo_product
asbo_product_sku_id
onflattenRequest
.sku_id
=bo_product_sku_id
.sku_id
last joinbo_user
asbo_user_user_id
onflattenRequest
.user_id
=bo_user_user_id
.user_id
)as out1on out0.reqId_1 = out1.reqId_3last join(selectreqId
as reqId_14,
max(bad_comment_rate
) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_bad_comment_rate_multi_max_13,
min(bad_comment_rate
) over bo_comment_sku_id_ingestionTime_0_10_ as bo_comment_bad_comment_rate_multi_min_14,
min(bad_comment_rate
) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_bad_comment_rate_multi_min_15,
distinct_count(comment_num
) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_comment_num_multi_unique_count_22,
distinct_count(has_bad_comment
) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_has_bad_comment_multi_unique_count_23,
fz_topn_frequency(has_bad_comment
, 3) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_has_bad_comment_multi_top3frequency_30,
fz_topn_frequency(comment_num
, 3) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_comment_num_multi_top3frequency_33from
(selecteventTime
asingestionTime
, bigint(0) asdt
,sku_id
assku_id
, int(0) ascomment_num
, ” ashas_bad_comment
, float(0) asbad_comment_rate
, reqId fromflattenRequest
)
window bo_comment_sku_id_ingestionTime_0s_64d_100 as (UNION (selectingestionTime
,dt
,sku_id
,comment_num
,has_bad_comment
,bad_comment_rate
, ” as reqId frombo_comment
) partition bysku_id
order byingestionTime
rows_range between 64d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
bo_comment_sku_id_ingestionTime_0_10_ as (UNION (selectingestionTime
,dt
,sku_id
,comment_num
,has_bad_comment
,bad_comment_rate
, ” as reqId frombo_comment
) partition bysku_id
order byingestionTime
rows between 10 preceding and 0 preceding INSTANCE_NOT_IN_WINDOW))as out2on out0.reqId_1 = out2.reqId_14last join(selectreqId
as reqId_17,
fz_topn_frequency(br
, 3) over bo_action_pair_id_ingestionTime_0s_10h_100 as bo_action_br_multi_top3frequency_16,
fz_topn_frequency(cate
, 3) over bo_action_pair_id_ingestionTime_0s_10h_100 as bo_action_cate_multi_top3frequency_17,
fz_topn_frequency(model_id
, 3) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_model_id_multi_top3frequency_18,
distinct_count(model_id
) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_model_id_multi_unique_count_19,
distinct_count(model_id
) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_model_id_multi_unique_count_20,
distinct_count(type
) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_type_multi_unique_count_21,
fz_topn_frequency(type
, 3) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_type_multi_top3frequency_40,
fz_topn_frequency(type
, 3) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_type_multi_top3frequency_42from
(selecteventTime
asingestionTime
,pair_id
aspair_id
, bigint(0) astime
, ” asmodel_id
, ” astype
, ” ascate
, ” asbr
, reqId fromflattenRequest
)
window bo_action_pair_id_ingestionTime_0s_10h_100 as (UNION (selectingestionTime
,pair_id
,time
,model_id
,type
,cate
,br
, ” as reqId frombo_action
) partition bypair_id
order byingestionTime
rows_range between 10h preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
bo_action_pair_id_ingestionTime_0s_7d_100 as (UNION (selectingestionTime
,pair_id
,time
,model_id
,type
,cate
,br
, ” as reqId frombo_action
) partition bypair_id
order byingestionTime
rows_range between 7d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),
bo_action_pair_id_ingestionTime_0s_14d_100 as (UNION (selectingestionTime
,pair_id
,time
,model_id
,type
,cate
,br
, ” as reqId frombo_action
) partition bypair_id
order byingestionTime
rows_range between 14d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW))as out3on out0.reqId_1 = out3.reqId_17;
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /root/project/deploy.sql
可应用如下命令确认 deploy 信息
show deployment demo;
在线数据筹备 ✦
首先,请切换到在线执行模式。接着在在线模式下,导入数据作为在线数据,用于在线特色计算。以下命令均在 OpenMLDB CLI 下执行。
USE JD_db;
SET @@execute_mode=’online’;
LOAD DATA INFILE ‘/root/project/data/JD_data/action/*.parquet’ INTO TABLE action options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/flattenRequest_remove_last/*.parquet’ INTO TABLE flattenRequest options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_user/*.parquet’ INTO TABLE bo_user options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_action/*.parquet’ INTO TABLE bo_action options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_product/*.parquet’ INTO TABLE bo_product options(format=’parquet’, header=true, mode=’append’);
LOAD DATA INFILE ‘/root/project/data/JD_data/bo_comment/*.parquet’ INTO TABLE bo_comment options(format=’parquet’, header=true, mode=’append’);
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /root/project/load_online_data.sql
留神,集群版 `LOAD DATA` 为非阻塞工作,能够应用命令 `SHOW JOBS` 查看工作运行状态,请期待工作运行胜利(`state` 转至 `FINISHED` 状态),再进行下一步操作。
配置 OneFlow 推理服务
Oneflow 的推理服务须要 One Embedding 的反对。该反对目前还没有合入主框架中。若须要从新编译,可参考附录 A 进行编译测试。接下来步骤默认相干反对已编译实现,并且寄存在 /home/gtest/work/oneflow_serving/ 门路中。
** 离线特色查看模型门路($demodir/oneflow_process/model)中模型文件及组织形式是否正确
✦**
3.3.1
$ tree -L 3 model/
model/
└── embedding
├── 1
│ └── model
└── config.pbtxt
确认 config.pbtxt 中的配置正确 ✦
确认 persistent 门路($demodir/oneflow_process/persistent)正确 ✦
启动推理服务
启动 OneFlow 推理服务 ✦
留神,以下命令在装置 1.1 所形容的 OneFlow 运行环境中运行
应用一下命令启动 OneFlow 推理服务:
docker run --runtime=nvidia --rm --network=host \
-v $demodir/oneflow_process/model:/models \
-v /home/gtest/work/oneflow_serving/serving/build/libtriton_oneflow.so:/backends/oneflow/libtriton_oneflow.so \
-v /home/gtest/work/oneflow_serving/oneflow/build/liboneflow_cpp/lib/:/mylib \
-v $demodir/oneflow_process/persistent:/root/demo/persistent \
registry.cn-beijing.aliyuncs.com/oneflow/triton-devel \
bash -c 'LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/mylib /opt/tritonserver/bin/tritonserver \
--model-repository=/models --backend-directory=/backends'
启动在线推理服务 demo ✦
留神,以下命令在 demo docker 中运行
OpenMLDB 的在线特色计算服务已通过 SQL 上线实现,OneFlow 推理服务也曾经启动。这个 demo 将串联两者,在收到实时申请后,拜访 OpenMLDB 进行特色抽取,再拜访 OneFlow 推理服务,进行在线推理,最初返回推理后果。
如果尚未退出 OpenMLDB CLI,请应用 quit 命令退出 OpenMLDB CLI。
在一般命令行下启动预估服务:
cd /root/project/serving/openmldb_serving
./start_predict_server.sh 0.0.0.0:9080
发送预估申请
预估申请可在 OpenMLDB 的容器外执行。容器内部拜访的具体信息可参见 IP 配置。
在一般命令行下执行内置的 predict.py 脚本。该脚本发送一行申请数据到预估服务,接管返回的预估后果,并打印进去。
python predict.py
范例输入:
----------------ins---------------
['200080_5505_2016-03-15 20:43:04' '1458045784000'
'200080_5505_2016-03-15 20:43:04' '200080_5505' '5505' '200080' '1' '1.0'
'1.0' '1' '1' '3' '1' '200080_5505_2016-03-15 20:43:04' '0' '3' '1' '1'
'214' '8' '1603438960564' '-1' '1453824000000' '2' '1'
'200080_5505_2016-03-15 20:43:04' '0.02879999950528145' '0.0' '0.0' '2'
'2' '1,,NULL' '4,0,NULL' '200080_5505_2016-03-15 20:43:04' ',NULL,NULL'
',NULL,NULL' ',NULL,NULL' '1' '1' '1' ',NULL,NULL' ',NULL,NULL']
---------------predict change of purchase -------------
[[b'0.025186:0']]
附录– OneFlow 定制代码编译
此章节介绍为 One Embedding 推理服务所定制的代码批改的编译过程。该代码会尽快合入 OneFlow 中,届时以下步骤可省略。
容器环境筹备
应用以下容器环境进行编译。该容器曾经装置编译所需的依赖等。
docker pull registry.cn-beijing.aliyuncs.com/oneflow/triton-devel:latest
启动容器并映射相干门路(此处可映射 /home/work/gtest 至 /root/project)。接下来的操作均在容器内进行。
编译 OneFlow
cd /root/project
mkdir oneflow_serving && cd oneflow_serving
git clone -b support_oneembedding_serving --single-branch https://github.com/Oneflow-Inc/oneflow --depth=1 cd oneflow
mkdir build && cd build
cmake -C ..
/cmake/caches/cn/cuda.cmake \
-DBUILD_CPP_API=ON \
-DWITH_MLIR=ON \
-G Ninja \
-DBUILD_SHARED_LIBS=ON \
-DBUILD_HWLOC=OFF \
-DCMAKE_INTERPROCEDURAL_OPTIMIZATION=OFF \
-DCMAKE_EXE_LINKER_FLAGS_INIT="-fuse-ld=lld" \
-DCMAKE_MODULE_LINKER_FLAGS_INIT="-fuse-ld=lld" \
-DCMAKE_SHARED_LINKER_FLAGS_INIT="-fuse-ld=lld" ..
ninja
编译 Serving
git clone -b support_one_embedding --single-branchhttps://github.com/Oneflow-Inc/serving.git
cd serving
mkdir build && cd build
cmake -DCMAKE_PREFIX_PATH=/path/to/liboneflow_cpp/share -DTRITON_RELATED_REPO_TAG=r21.10 \
-DTRITON_ENABLE_GPU=ON -G Ninja -DTHIRD_PARTY_MIRROR=aliyun ..
ninja
上述命令中的 /path/to/liboneflow_cpp/share 要替换成上边编译的 oneflow 的外面的门路,在{oneflow 门路}/build/liboneflow_cpp/share
测试 TritonServer
复制 backend 库文件:
mkdir /root/project/oneflow_sering/backends && cd /root/project/oneflow_sering/backends
mkdir oneflow
cp /roor/prject/oneflow_serving/serving/build/libtriton_oneflow.so oneflow/.
在命令行启动 TritonServer 测试:
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/root/project/oneflow_serving/oneflow/build/liboneflow_cpp/lib \
/opt/tritonserver/bin/tritonserver \
--model-repository=/root/project/oneflow_process/model \
--backend-directory=/root/project/oneflow_serving/backends
在另一个命令行运行如下指令,若胜利,输入示例如下:
python $demodir/serving/client.py
0.045439958572387695
(1, 1)
[[b’0.025343:0′]]
留神:
如果呈现 libunwind.so.8 未找到须要用 -v /lib/x86_64-linux-gnu:/unwind_path 映射一下 libunwind.so.8 所在目录,而后增加到 LD_LIBRARY_PATH 外面: … bash -c ‘LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/mylib:/unwind_path …
如果呈现 libcupti.so 未找到须要用 -v /usr/local/cuda-11.7/extras/CUPTI/lib64:/cupti_path 映射一下 libcupti.so 所在目录,而后增加到 LD_LIBRARY_PATH 外面: … bash -c ‘LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/mylib:/cupti_path …, 其中具体的 cuda 的门路按理论装置的地位,能够用 ldd {oneflow 门路}/build/liboneflow.so | grep cupti 来找到
心愿本文可能帮大家疾速了解把握如何应用 OpenMLDB 和 OneFlow 联结来打造一个残缺的机器学习利用,链接特色工程到模型训练的全流程。
如果想进一步理解 OpenMLDB 或者参加社区技术交换,能够通过以下渠道取得相干信息和互动~
Github: https://github.com/4paradigm/…
官网:https://openmldb.ai/
Email: [email protected]
OpenMLDB 微信交换群: