介绍
在第一局部,曾经介绍协同举荐,并利用它实现了一个简略的举荐零碎,在第二局部,咱们简要的剖析了咱们领有的数据,包含每个字段的散布,常见的统计信息等,为接下来的多路召回提供了很好的指引。
回忆一下baseline的思路,咱们首先计算了item的之间的类似度,而后基于用户的正反馈item列表,找到与列表中每一个item类似度最高的topn个item,组成一个列表,最初间接依照类似度得分进行排序,失去最初的举荐后果。
在理论的举荐场景中,通常会有两个阶段,第一个阶段是召回阶段,第二个阶段是排序阶段。第一个阶段召回那些类似度较高的N个item列表,它更关注的是召回率,较为粗略;而排序阶段会应用更为简单的模型进行监督学习(转化为分类的工作),失去分类概率,也就是置信度,最初依照置信度进行排序,取top K个item作为最初的举荐列表。
baseline中其实只蕴含了召回
这个阶段,尽管但就这个工作而言,它曾经够了。本节介绍的是多路召回,什么是多路召回呢,比方在一个举荐场景中,咱们能够抉择ItemCF或者UserCF以及基于热点的召回策略等等,因为咱们召回层的目标是为了尽可能的确保召回,所以基于单个的策略必定是成果不如多个策略的,这里就引出了多路召回的概念,也就是多个策略并行的进行召回。上面这个图示意了多路召回的一个例子。
在多路召回中,每个策略之间息息相关,能够应用多种不同的策略来获取用户排序的候选商品汇合,而具体应用哪些召回策略其实是与业务强相干
的 ,针对不同的工作就会有对于该业务实在场景下须要思考的召回规定。例如新闻举荐,召回规定能够是“热门视频”、“导演召回”、“演员召回”、“最近上映“、”风行趋势“、”类型召回“等等。
导入相干库
import pandas as pd import numpy as npfrom tqdm import tqdm from collections import defaultdict import os, math, warnings, math, picklefrom tqdm import tqdmimport faissimport collectionsimport randomfrom sklearn.preprocessing import MinMaxScalerfrom sklearn.preprocessing import LabelEncoderfrom datetime import datetimefrom deepctr.feature_column import SparseFeat, VarLenSparseFeatfrom sklearn.preprocessing import LabelEncoderfrom tensorflow.python.keras import backend as Kfrom tensorflow.python.keras.models import Modelfrom tensorflow.python.keras.preprocessing.sequence import pad_sequencesfrom deepmatch.models import *from deepmatch.utils import sampledsoftmaxlosswarnings.filterwarnings('ignore')
data_path = './data_raw/'save_path = './temp_results/'# 做召回评估的一个标记, 如果不进行评估就是间接应用全量数据进行召回metric_recall = False
读取数据
在个别的举荐零碎较量中读取数据局部次要分为三种模式, 不同的模式对应的不同的数据集:
- Debug模式: 这个的目标是帮忙咱们基于数据先搭建一个繁难的baseline并跑通, 保障写的baseline代码没有什么问题。 因为举荐较量的数据往往十分微小, 如果一上来间接采纳全副的数据进行剖析,搭建baseline框架, 往往会带来工夫和设施上的损耗, 所以这时候咱们往往须要从海量数据的训练集中随机抽取一部分样本来进行调试(train_click_log_sample), 先跑通一个baseline。
- 线下验证模式: 这个的目标是帮忙咱们在线下基于已有的训练集数据, 来抉择好适合的模型和一些超参数。 所以咱们这一块只须要加载整个训练集(train_click_log), 而后把整个训练集再分成训练集和验证集。 训练集是模型的训练数据, 验证集局部帮忙咱们调整模型的参数和其余的一些超参数。
- 线上模式: 咱们用debug模式搭建起一个举荐零碎较量的baseline, 用线下验证模式抉择好了模型和一些超参数, 这一部分就是真正的对于给定的测试集进行预测, 提交到线上, 所以这一块应用的训练数据集是全量的数据集(train_click_log+test_click_log)
上面就别离对这三种不同的数据读取模式先建设不同的代导入函数, 不便前面针对不同的模式下导入数据。
# debug模式: 从训练集中划出一部分数据来调试代码def get_all_click_sample(data_path, sample_nums=10000): """ 训练集中采样一部分数据调试 data_path: 原数据的存储门路 sample_nums: 采样数目(这里因为机器的内存限度,能够采样用户做) """ all_click = pd.read_csv(data_path + 'train_click_log.csv') all_user_ids = all_click.user_id.unique() sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False) all_click = all_click[all_click['user_id'].isin(sample_user_ids)] all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp'])) return all_click# 读取点击数据,这里分成线上和线下,如果是为了获取线上提交后果应该讲测试集中的点击数据合并到总的数据中# 如果是为了线下验证模型的有效性或者特色的有效性,能够只应用训练集def get_all_click_df(data_path='./data', offline=True): if offline: all_click = pd.read_csv(data_path + '/train_click_log.csv') else: trn_click = pd.read_csv(data_path + '/train_click_log.csv') tst_click = pd.read_csv(data_path + '/testA_click_log.csv') all_click = trn_click.append(tst_click) all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp'])) return all_click
# 读取文章的根本属性def get_item_info_df(data_path): item_info_df = pd.read_csv(data_path + 'articles.csv') # 为了不便与训练集中的click_article_id拼接,须要把article_id批改成click_article_id item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'}) return item_info_df
# 读取文章的Embedding数据def get_item_emb_dict(data_path): item_emb_df = pd.read_csv(data_path + '/articles_emb.csv') item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x] item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols]) # 进行归一化 item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True) item_emb_dict = dict(zip(item_emb_df['article_id'], item_emb_np)) pickle.dump(item_emb_dict, open(save_path + 'item_content_emb.pkl', 'wb')) return item_emb_dict
# min-max 归一化函数max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
# 采样数据# all_click_df = get_all_click_sample(data_path)# 全量训练集all_click_df = get_all_click_df(offline=False)# 对工夫戳进行归一化,用于在关联规定的时候计算权重all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(max_min_scaler)item_info_df = get_item_info_df(data_path)item_emb_dict = get_item_emb_dict(data_path)
工具函数
获取用户-文章-工夫函数
这个在基于关联规定的用户协同过滤的时候会用到
# 依据点击工夫获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...}def get_user_item_time(click_df): click_df = click_df.sort_values('click_timestamp') def make_item_time_pair(df): return list(zip(df['click_article_id'], df['click_timestamp'])) user_item_time_df = click_df.groupby('user_id')['click_article_id', 'click_timestamp'].apply(lambda x: make_item_time_pair(x))\ .reset_index().rename(columns={0: 'item_time_list'}) user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list'])) return user_item_time_dict
获取文章-用户-工夫函数
这个在基于关联规定的文章协同过滤的时候会用到
# 依据工夫获取商品被点击的用户序列 {item1: [(user1, time1), (user2, time2)...]...}# 这里的工夫是用户点击以后商品的工夫,如同没有间接的关系。def get_item_user_time_dict(click_df): def make_user_time_pair(df): return list(zip(df['user_id'], df['click_timestamp'])) click_df = click_df.sort_values('click_timestamp') item_user_time_df = click_df.groupby('click_article_id')['user_id', 'click_timestamp'].apply(lambda x: make_user_time_pair(x))\ .reset_index().rename(columns={0: 'user_time_list'}) item_user_time_dict = dict(zip(item_user_time_df['click_article_id'], item_user_time_df['user_time_list'])) return item_user_time_dict
获取历史和最初一次点击
这个在评估召回后果, 特色工程和制作标签转成监督学习测试集的时候回用到
# 获取以后数据的历史点击和最初一次点击def get_hist_and_last_click(all_click): all_click = all_click.sort_values(by=['user_id', 'click_timestamp']) click_last_df = all_click.groupby('user_id').tail(1) # 如果用户只有一个点击,hist为空了,会导致训练的时候这个用户不可见,此时默认泄露一下 def hist_func(user_df): if len(user_df) == 1: return user_df else: return user_df[:-1] click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True) return click_hist_df, click_last_df
获取文章属性特色
# 获取文章id对应的根本属性,保留成字典的模式,不便前面召回阶段,冷启动阶段间接应用def get_item_info_dict(item_info_df): max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x)) item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(max_min_scaler) item_type_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id'])) item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count'])) item_created_time_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts'])) return item_type_dict, item_words_dict, item_created_time_dict
获取用户历史点击的文章信息
def get_user_hist_item_info_dict(all_click): # 获取user_id对应的用户历史点击文章类型的汇合字典 user_hist_item_typs = all_click.groupby('user_id')['category_id'].agg(set).reset_index() user_hist_item_typs_dict = dict(zip(user_hist_item_typs['user_id'], user_hist_item_typs['category_id'])) # 获取user_id对应的用户点击文章的汇合 user_hist_item_ids_dict = all_click.groupby('user_id')['click_article_id'].agg(set).reset_index() user_hist_item_ids_dict = dict(zip(user_hist_item_ids_dict['user_id'], user_hist_item_ids_dict['click_article_id'])) # 获取user_id对应的用户历史点击的文章的均匀字数字典 user_hist_item_words = all_click.groupby('user_id')['words_count'].agg('mean').reset_index() user_hist_item_words_dict = dict(zip(user_hist_item_words['user_id'], user_hist_item_words['words_count'])) # 获取user_id对应的用户最初一次点击的文章的创立工夫 all_click_ = all_click.sort_values('click_timestamp') user_last_item_created_time = all_click_.groupby('user_id')['created_at_ts'].apply(lambda x: x.iloc[-1]).reset_index() max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x)) user_last_item_created_time['created_at_ts'] = user_last_item_created_time[['created_at_ts']].apply(max_min_scaler) user_last_item_created_time_dict = dict(zip(user_last_item_created_time['user_id'], \ user_last_item_created_time['created_at_ts'])) return user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict
获取点击次数最多的Top-k个文章
获取近期点击最多的文章
def get_item_topk_click(click_df, k): topk_click = click_df['click_article_id'].value_counts().index[:k] return topk_click
定义多路召回字典
获取文章的属性信息,保留成字典的模式不便查问
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)# 定义一个多路召回的字典,将各路召回的后果都保留在这个字典当中user_multi_recall_dict = {'itemcf_sim_itemcf_recall': {}, 'embedding_sim_item_recall': {}, 'youtubednn_recall': {}, 'youtubednn_usercf_recall': {}, 'cold_start_recall': {}} # 提取最初一次点击作为召回评估,如果不须要做召回评估间接应用全量的训练集进行召回(线下验证模型)# 如果不是召回评估,间接应用全量数据进行召回,不必将最初一次提取进去trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
召回成果评估
做完了召回有时候也须要对以后的召回办法或者参数进行调整以达到更好的召回成果,因为召回的后果决定了最终排序的下限,上面也会提供一个召回评估的办法
# 顺次评估召回的前10, 20, 30, 40, 50个文章中的击中率def metrics_recall(user_recall_items_dict, trn_last_click_df, topk=5): last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['click_article_id'])) user_num = len(user_recall_items_dict) for k in range(10, topk+1, 10): hit_num = 0 for user, item_list in user_recall_items_dict.items(): # 获取前k个召回的后果 tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]] if last_click_item_dict[user] in set(tmp_recall_items): hit_num += 1 hit_rate = round(hit_num * 1.0 / user_num, 5) print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num)
计算相似性矩阵
这一部分次要是通过协同过滤以及向量检索失去相似性矩阵,相似性矩阵次要分为user2user和item2item,上面顺次获取基于itemCF的item2item的相似性矩阵。
itemCF i2i_sim
借鉴KDD2020的去偏商品举荐,在计算item2item相似性矩阵时,应用关联规定,使得计算的文章的相似性还思考到了:
- 用户点击的工夫权重
- 用户点击的程序权重
- 文章创立的工夫权重
def itemcf_sim(df, item_created_time_dict): """ 文章与文章之间的相似性矩阵计算 :param df: 数据表 :item_created_time_dict: 文章创立工夫的字典 return : 文章与文章的相似性矩阵 思路: 基于物品的协同过滤(具体请参考上一期举荐零碎根底的组队学习) + 关联规定 """ user_item_time_dict = get_user_item_time(df) # 计算物品类似度 i2i_sim = {} item_cnt = defaultdict(int) for user, item_time_list in tqdm(user_item_time_dict.items()): # 在基于商品的协同过滤优化的时候能够思考工夫因素 for loc1, (i, i_click_time) in enumerate(item_time_list): item_cnt[i] += 1 i2i_sim.setdefault(i, {}) for loc2, (j, j_click_time) in enumerate(item_time_list): if(i == j): continue # 思考文章的正向程序点击和反向程序点击 loc_alpha = 1.0 if loc2 > loc1 else 0.7 # 地位信息权重,其中的参数能够调节 loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1)) # 点击工夫权重,其中的参数能够调节 click_time_weight = np.exp(0.7 ** np.abs(i_click_time - j_click_time)) # 两篇文章创立工夫的权重,其中的参数能够调节 created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j])) i2i_sim[i].setdefault(j, 0) # 思考多种因素的权重计算最终的文章之间的类似度 i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len(item_time_list) + 1) i2i_sim_ = i2i_sim.copy() for i, related_items in i2i_sim.items(): for j, wij in related_items.items(): i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j]) # 将失去的相似性矩阵保留到本地 pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb')) return i2i_sim_i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)
100%|██████████| 250000/250000 [14:20<00:00, 290.38it/s]
userCF u2u_sim
在计算用户之间的类似度的时候,也能够应用一些简略的关联规定,比方用户活跃度权重,这里将用户的点击次数作为用户活跃度的指标
def get_user_activate_degree_dict(all_click_df): all_click_df_ = all_click_df.groupby('user_id')['click_article_id'].count().reset_index() # 用户活跃度归一化 mm = MinMaxScaler() all_click_df_['click_article_id'] = mm.fit_transform(all_click_df_[['click_article_id']]) user_activate_degree_dict = dict(zip(all_click_df_['user_id'], all_click_df_['click_article_id'])) return user_activate_degree_dict
def usercf_sim(all_click_df, user_activate_degree_dict): """ 用户相似性矩阵计算 :param all_click_df: 数据表 :param user_activate_degree_dict: 用户活跃度的字典 return 用户相似性矩阵 思路: 基于用户的协同过滤(具体请参考上一期举荐零碎根底的组队学习) + 关联规定 """ item_user_time_dict = get_item_user_time_dict(all_click_df) u2u_sim = {} user_cnt = defaultdict(int) for item, user_time_list in tqdm(item_user_time_dict.items()): for u, click_time in user_time_list: user_cnt[u] += 1 u2u_sim.setdefault(u, {}) for v, click_time in user_time_list: u2u_sim[u].setdefault(v, 0) if u == v: continue # 用户均匀活跃度作为活跃度的权重,这里的式子也能够改善 activate_weight = 100 * 0.5 * (user_activate_degree_dict[u] + user_activate_degree_dict[v]) u2u_sim[u][v] += activate_weight / math.log(len(user_time_list) + 1) u2u_sim_ = u2u_sim.copy() for u, related_users in u2u_sim.items(): for v, wij in related_users.items(): u2u_sim_[u][v] = wij / math.sqrt(user_cnt[u] * user_cnt[v]) # 将失去的相似性矩阵保留到本地 pickle.dump(u2u_sim_, open(save_path + 'usercf_u2u_sim.pkl', 'wb')) return u2u_sim_
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
# 因为usercf计算时候太消耗内存了,这里就不间接运行了# 如果是采样的话,是能够运行的user_activate_degree_dict = get_user_activate_degree_dict(all_click_df)u2u_sim = usercf_sim(all_click_df, user_activate_degree_dict)
item embedding sim
应用Embedding计算item之间的类似度是为了后续冷启动的时候能够获取未呈现在点击数据中的文章,前面有对冷启动专门的介绍,这里简略的说一下faiss。
aiss是Facebook的AI团队开源的一套用于做聚类或者相似性搜寻的软件库,底层是用C++实现。Faiss因为超级优越的性能,被广泛应用于举荐相干的业务当中.
faiss工具包个别应用在举荐零碎中的向量召回局部。在做向量召回的时候要么是u2u,u2i或者i2i,这里的u和i指的是user和item.咱们晓得在理论的场景中user和item的数量都是海量的,咱们最容易想到的基于向量类似度的召回就是应用两层循环遍历user列表或者item列表计算两个向量的类似度,然而这样做在面对海量数据是不切实际的,faiss就是用来减速计算某个查问向量最类似的topk个索引向量。
faiss查问的原理:
faiss应用了PCA和PQ(Product quantization乘积量化)两种技术进行向量压缩和编码,当然还应用了其余的技术进行优化,然而PCA和PQ是其中最外围局部。
- PCA降维算法细节参考上面这个链接进行学习
主成分剖析(PCA)原理总结 4 - PQ编码的细节上面这个链接进行学习
实例了解product quantization算法 4
faiss应用
faiss官网教程 7
# 向量检索类似度计算# topk指的是每个item, faiss搜寻后返回最类似的topk个itemdef embdding_sim(click_df, item_emb_df, save_path, topk): """ 基于内容的文章embedding相似性矩阵计算 :param click_df: 数据表 :param item_emb_df: 文章的embedding :param save_path: 保留门路 :patam topk: 找最类似的topk篇 return 文章相似性矩阵 思路: 对于每一篇文章, 基于embedding的相似性返回topk个与其最类似的文章, 只不过因为文章数量太多,这里用了faiss进行减速 """ # 文章索引与文章id的字典映射 item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['article_id'])) item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x] item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32) # 向量进行单位化 item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True) # 建设faiss索引 item_index = faiss.IndexFlatIP(item_emb_np.shape[1]) item_index.add(item_emb_np) # 类似度查问,给每个索引地位上的向量返回topk个item以及类似度 sim, idx = item_index.search(item_emb_np, topk) # 返回的是列表 # 将向量检索的后果保留成原始id的对应关系 item_sim_dict = collections.defaultdict(dict) for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)): target_raw_id = item_idx_2_rawid_dict[target_idx] # 从1开始是为了去掉商品自身, 所以最终取得的类似商品只有topk-1 for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): rele_raw_id = item_idx_2_rawid_dict[rele_idx] item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value # 保留i2i类似度矩阵 pickle.dump(item_sim_dict, open(save_path + 'emb_i2i_sim.pkl', 'wb')) return item_sim_dict
item_emb_df = pd.read_csv(data_path + '/articles_emb.csv')emb_i2i_sim = embdding_sim(all_click_df, item_emb_df, save_path, topk=10) # topk能够自行设置
召回
这个就是咱们开篇提到的那个问题, 面的36万篇文章, 20多万用户的举荐, 咱们又有哪些策略来缩减问题的规模? 咱们就能够再召回阶段筛选出用户对于点击文章的候选汇合, 从而升高问题的规模。召回罕用的策略:
- Youtube DNN 召回
基于文章的召回
- 文章的协同过滤
- 基于文章embedding的召回
基于用户的召回
- 用户的协同过滤
- 用户embedding
下面的各种召回形式一部分在基于用户曾经看得文章的根底下来召回与这些文章类似的一些文章, 而这个相似性的计算形式不同, 就失去了不同的召回形式, 比方文章的协同过滤, 文章内容的embedding等。还有一部分是依据用户的相似性进行举荐,对于某用户举荐与其类似的其余用户看过的文章,比方用户的协同过滤和用户embedding。 还有一种思路是相似矩阵合成的思路,先计算出用户和文章的embedding之后,就能够间接算用户和文章的类似度, 依据这个类似度进行举荐, 比方YouTube DNN。 咱们上面具体来看一下每一个召回办法:
YoutubeDNN召回
(这一步是间接获取用户召回的候选文章列表)
Youtubednn召回架构
对于YoutubeDNN原理和利用举荐看王喆的两篇博客:
# 获取双塔召回时的训练验证数据# negsample指的是通过滑窗构建样本的时候,负样本的数量def gen_data_set(data, negsample=0): data.sort_values("click_timestamp", inplace=True) item_ids = data['click_article_id'].unique() train_set = [] test_set = [] for reviewerID, hist in tqdm(data.groupby('user_id')): pos_list = hist['click_article_id'].tolist() if negsample > 0: candidate_set = list(set(item_ids) - set(pos_list)) # 用户没看过的文章外面抉择负样本 neg_list = np.random.choice(candidate_set,size=len(pos_list)*negsample,replace=True) # 对于每个正样本,抉择n个负样本 # 长度只有一个的时候,须要把这条数据也放到训练集中,不然的话最终学到的embedding就会有缺失 if len(pos_list) == 1: train_set.append((reviewerID, [pos_list[0]], pos_list[0],1,len(pos_list))) test_set.append((reviewerID, [pos_list[0]], pos_list[0],1,len(pos_list))) # 滑窗结构正负样本 for i in range(1, len(pos_list)): hist = pos_list[:i] if i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]))) # 正样本 [user_id, his_item, pos_item, label, len(his_item)] for negi in range(negsample): train_set.append((reviewerID, hist[::-1], neg_list[i*negsample+negi], 0,len(hist[::-1]))) # 负样本 [user_id, his_item, neg_item, label, len(his_item)] else: # 将最长的那一个序列长度作为测试数据 test_set.append((reviewerID, hist[::-1], pos_list[i],1,len(hist[::-1]))) random.shuffle(train_set) random.shuffle(test_set) return train_set, test_set# 将输出的数据进行padding,使得序列特色的长度都统一def gen_model_input(train_set,user_profile,seq_max_len): train_uid = np.array([line[0] for line in train_set]) train_seq = [line[1] for line in train_set] train_iid = np.array([line[2] for line in train_set]) train_label = np.array([line[3] for line in train_set]) train_hist_len = np.array([line[4] for line in train_set]) train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0) train_model_input = {"user_id": train_uid, "click_article_id": train_iid, "hist_article_id": train_seq_pad, "hist_len": train_hist_len} return train_model_input, train_label
def youtubednn_u2i_dict(data, topk=20): sparse_features = ["click_article_id", "user_id"] SEQ_LEN = 30 # 用户点击序列的长度,短的填充,长的截断 user_profile_ = data[["user_id"]].drop_duplicates('user_id') item_profile_ = data[["click_article_id"]].drop_duplicates('click_article_id') # 类别编码 features = ["click_article_id", "user_id"] feature_max_idx = {} for feature in features: lbe = LabelEncoder() data[feature] = lbe.fit_transform(data[feature]) feature_max_idx[feature] = data[feature].max() + 1 # 提取user和item的画像,这里具体抉择哪些特色还须要进一步的剖析和思考 user_profile = data[["user_id"]].drop_duplicates('user_id') item_profile = data[["click_article_id"]].drop_duplicates('click_article_id') user_index_2_rawid = dict(zip(user_profile['user_id'], user_profile_['user_id'])) item_index_2_rawid = dict(zip(item_profile['click_article_id'], item_profile_['click_article_id'])) # 划分训练和测试集 # 因为深度学习须要的数据量通常都是十分大的,所以为了保障召回的成果,往往会通过滑窗的模式裁减训练样本 train_set, test_set = gen_data_set(data, 0) # 整顿输出数据,具体的操作能够看下面的函数 train_model_input, train_label = gen_model_input(train_set, user_profile, SEQ_LEN) test_model_input, test_label = gen_model_input(test_set, user_profile, SEQ_LEN) # 确定Embedding的维度 embedding_dim = 16 # 将数据整顿成模型能够间接输出的模式 user_feature_columns = [SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim), VarLenSparseFeat(SparseFeat('hist_article_id', feature_max_idx['click_article_id'], embedding_dim, embedding_name="click_article_id"), SEQ_LEN, 'mean', 'hist_len'),] item_feature_columns = [SparseFeat('click_article_id', feature_max_idx['click_article_id'], embedding_dim)] # 模型的定义 # num_sampled: 负采样时的样本数量 model = YoutubeDNN(user_feature_columns, item_feature_columns, num_sampled=5, user_dnn_hidden_units=(64, embedding_dim)) # 模型编译 model.compile(optimizer="adam", loss=sampledsoftmaxloss) # 模型训练,这里能够定义验证集的比例,如果设置为0的话就是全量数据间接进行训练 history = model.fit(train_model_input, train_label, batch_size=256, epochs=1, verbose=1, validation_split=0.0) # 训练完模型之后,提取训练的Embedding,包含user端和item端 test_user_model_input = test_model_input all_item_model_input = {"click_article_id": item_profile['click_article_id'].values} user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding) item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding) # 保留以后的item_embedding 和 user_embedding 排序的时候可能可能用到,然而须要留神保留的时候须要和原始的id对应 user_embs = user_embedding_model.predict(test_user_model_input, batch_size=2 ** 12) item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12) # embedding保留之前归一化一下 user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True) item_embs = item_embs / np.linalg.norm(item_embs, axis=1, keepdims=True) # 将Embedding转换成字典的模式不便查问 raw_user_id_emb_dict = {user_index_2_rawid[k]: v for k, v in zip(user_profile['user_id'], user_embs)} raw_item_id_emb_dict = {item_index_2_rawid[k]: v for k, v in zip(item_profile['click_article_id'], item_embs)} # 将Embedding保留到本地 pickle.dump(raw_user_id_emb_dict, open(save_path + 'user_youtube_emb.pkl', 'wb')) pickle.dump(raw_item_id_emb_dict, open(save_path + 'item_youtube_emb.pkl', 'wb')) # faiss紧邻搜寻,通过user_embedding 搜寻与其相似性最高的topk个item index = faiss.IndexFlatIP(embedding_dim) # 下面曾经进行了归一化,这里能够不进行归一化了# faiss.normalize_L2(user_embs)# faiss.normalize_L2(item_embs) index.add(item_embs) # 将item向量构建索引 sim, idx = index.search(np.ascontiguousarray(user_embs), topk) # 通过user去查问最类似的topk个item user_recall_items_dict = collections.defaultdict(dict) for target_idx, sim_value_list, rele_idx_list in tqdm(zip(test_user_model_input['user_id'], sim, idx)): target_raw_id = user_index_2_rawid[target_idx] # 从1开始是为了去掉商品自身, 所以最终取得的类似商品只有topk-1 for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): rele_raw_id = item_index_2_rawid[rele_idx] user_recall_items_dict[target_raw_id][rele_raw_id] = user_recall_items_dict.get(target_raw_id, {}) .get(rele_raw_id, 0) + sim_value user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) for k, v in user_recall_items_dict.items()} # 将召回的后果进行排序 # 保留召回的后果 # 这里是间接通过向量的形式失去了召回后果,相比于下面的召回办法,下面的只是失去了i2i及u2u的相似性矩阵,还须要进行协同过滤召回能力失去召回后果 # 能够间接对这个召回后果进行评估,为了不便能够对立写一个评估函数对所有的召回后果进行评估 pickle.dump(user_recall_items_dict, open(save_path + 'youtube_u2i_dict.pkl', 'wb')) return user_recall_items_dict
# 因为这里须要做召回评估,所以讲训练集中的最初一次点击都提取了进去if not metric_recall: user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(all_click_df, topk=20)else: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(trn_hist_click_df, topk=20) # 召回成果评估 metrics_recall(user_multi_recall_dict['youtubednn_recall'], trn_last_click_df, topk=20)
itemCF recall
下面曾经通过协同过滤,Embedding检索的形式失去了文章的类似度矩阵,上面应用协同过滤的思维,给用户召回与其历史文章类似的文章。
这里在召回的时候,也是用了关联规定的形式:
- 思考类似文章与历史点击文章程序的权重(细节看代码)
- 思考文章创立工夫的权重,也就是思考类似文章与历史点击文章创立时间差的权重
- 思考文章内容类似度权重(应用Embedding计算类似文章类似度,然而这里须要留神,在Embedding的时候并没有计算所有商品两两之间的类似度,所以类似的文章与历史点击文章不存在类似度,须要做非凡解决)
# 基于商品的召回i2idef item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim): """ 基于文章协同过滤的召回 :param user_id: 用户id :param user_item_time_dict: 字典, 依据点击工夫获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...} :param i2i_sim: 字典,文章相似性矩阵 :param sim_item_topk: 整数, 抉择与以后文章最类似的前k篇文章 :param recall_item_num: 整数, 最初的召回文章数量 :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全 :param emb_i2i_sim: 字典基于内容embedding算的文章相似矩阵 return: 召回的文章列表 [(item1, score1), (item2, score2)...] """ # 获取用户历史交互的文章 user_hist_items = user_item_time_dict[user_id] user_hist_items_ = {user_id for user_id, _ in user_hist_items } item_rank = {} for loc, (i, click_time) in enumerate(user_hist_items): for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]: if j in user_hist_items_: continue # 文章创立时间差权重 created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j])) # 类似文章和历史点击文章序列中历史文章所在的地位权重 loc_weight = (0.9 ** (len(user_hist_items) - loc)) content_weight = 1.0 if emb_i2i_sim.get(i, {}).get(j, None) is not None: content_weight += emb_i2i_sim[i][j] if emb_i2i_sim.get(j, {}).get(i, None) is not None: content_weight += emb_i2i_sim[j][i] item_rank.setdefault(j, 0) item_rank[j] += created_time_weight * loc_weight * content_weight * wij # 有余10个,用热门商品补全 if len(item_rank) < recall_item_num: for i, item in enumerate(item_topk_click): if item in item_rank.items(): # 填充的item应该不在原来的列表中 continue item_rank[item] = - i - 100 # 轻易给个正数就行 if len(item_rank) == recall_item_num: break item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num] return item_rank
itemCF sim召回
# 先进行itemcf召回, 为了召回评估,所以提取最初一次点击if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)else: trn_hist_click_df = all_click_dfuser_recall_items_dict = collections.defaultdict(dict)user_item_time_dict = get_user_item_time(trn_hist_click_df)i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb'))emb_i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl', 'rb'))sim_item_topk = 20recall_item_num = 10item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)for user in tqdm(trn_hist_click_df['user_id'].unique()): user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, \ i2i_sim, sim_item_topk, recall_item_num, \ item_topk_click, item_created_time_dict, emb_i2i_sim)user_multi_recall_dict['itemcf_sim_itemcf_recall'] = user_recall_items_dictpickle.dump(user_multi_recall_dict['itemcf_sim_itemcf_recall'], open(save_path + 'itemcf_recall_dict.pkl', 'wb'))if metric_recall: # 召回成果评估 metrics_recall(user_multi_recall_dict['itemcf_sim_itemcf_recall'], trn_last_click_df, topk=recall_item_num)
embedding sim 召回
# 这里是为了召回评估,所以提取最初一次点击if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)else: trn_hist_click_df = all_click_dfuser_recall_items_dict = collections.defaultdict(dict)user_item_time_dict = get_user_item_time(trn_hist_click_df)i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))sim_item_topk = 20recall_item_num = 10item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)for user in tqdm(trn_hist_click_df['user_id'].unique()): user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim) user_multi_recall_dict['embedding_sim_item_recall'] = user_recall_items_dictpickle.dump(user_multi_recall_dict['embedding_sim_item_recall'], open(save_path + 'embedding_sim_item_recall.pkl', 'wb'))if metric_recall: # 召回成果评估 metrics_recall(user_multi_recall_dict['embedding_sim_item_recall'], trn_last_click_df, topk=recall_item_num)
userCF召回
基于用户协同过滤,核心思想是给用户举荐与其类似的用户历史点击文章,因为这里波及到了类似用户的历史文章,这里依然能够加上一些关联规定来给用户可能点击的文章进行加权,这里应用的关联规定次要是思考类似用户的历史点击文章与被举荐用户历史点击商品的关系权重,而这里的关系就能够间接借鉴基于物品的协同过滤类似的做法,只不过这里是对被举荐物品关系的一个累加的过程,上面是应用的一些关系权重,及相干的代码:
- 计算被举荐用户历史点击文章与类似用户历史点击文章的类似度,文章创立时间差,绝对地位的总和,作为各自的权重
# 基于用户的召回 u2u2idef user_based_recommend(user_id, user_item_time_dict, u2u_sim, sim_user_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim): """ 基于文章协同过滤的召回 :param user_id: 用户id :param user_item_time_dict: 字典, 依据点击工夫获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...} :param u2u_sim: 字典,文章相似性矩阵 :param sim_user_topk: 整数, 抉择与以后用户最类似的前k个用户 :param recall_item_num: 整数, 最初的召回文章数量 :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全 :param item_created_time_dict: 文章创立工夫列表 :param emb_i2i_sim: 字典基于内容embedding算的文章相似矩阵 return: 召回的文章列表 [(item1, score1), (item2, score2)...] """ # 历史交互 user_item_time_list = user_item_time_dict[user_id] # {item1: time1, item2: time2...} user_hist_items = set([i for i, t in user_item_time_list]) # 存在一个用户与某篇文章的屡次交互, 这里得去重 items_rank = {} for sim_u, wuv in sorted(u2u_sim[user_id].items(), key=lambda x: x[1], reverse=True)[:sim_user_topk]: for i, click_time in user_item_time_dict[sim_u]: if i in user_hist_items: continue items_rank.setdefault(i, 0) loc_weight = 1.0 content_weight = 1.0 created_time_weight = 1.0 # 以后文章与该用户看的历史文章进行一个权重交互 for loc, (j, click_time) in enumerate(user_item_time_list): # 点击时的绝对地位权重 loc_weight += 0.9 ** (len(user_item_time_list) - loc) # 内容相似性权重 if emb_i2i_sim.get(i, {}).get(j, None) is not None: content_weight += emb_i2i_sim[i][j] if emb_i2i_sim.get(j, {}).get(i, None) is not None: content_weight += emb_i2i_sim[j][i] # 创立时间差权重 created_time_weight += np.exp(0.8 * np.abs(item_created_time_dict[i] - item_created_time_dict[j])) items_rank[i] += loc_weight * content_weight * created_time_weight * wuv # 热度补全 if len(items_rank) < recall_item_num: for i, item in enumerate(item_topk_click): if item in items_rank.items(): # 填充的item应该不在原来的列表中 continue items_rank[item] = - i - 100 # 轻易给个复数就行 if len(items_rank) == recall_item_num: break items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num] return items_rank
userCF sim召回
# 这里是为了召回评估,所以提取最初一次点击# 因为usercf中计算user之间的类似度的过程太费内存了,全量数据这里就没有跑,跑了一个采样之后的数据if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)else: trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict)user_item_time_dict = get_user_item_time(trn_hist_click_df)u2u_sim = pickle.load(open(save_path + 'usercf_u2u_sim.pkl', 'rb'))sim_user_topk = 20recall_item_num = 10item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)for user in tqdm(trn_hist_click_df['user_id'].unique()): user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \ recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim) pickle.dump(user_recall_items_dict, open(save_path + 'usercf_u2u2i_recall.pkl', 'wb'))if metric_recall: # 召回成果评估 metrics_recall(user_recall_items_dict, trn_last_click_df, topk=recall_item_num)
user embedding sim召回
尽管没有间接跑usercf的计算用户之间的类似度,为了验证上述基于用户的协同过滤的代码,上面应用了YoutubeDNN过程中产生的user embedding来进行向量检索每个user最类似的topk个user,在应用这里失去的u2u的相似性矩阵,应用usercf进行召回,具体代码如下
# 应用Embedding的形式获取u2u的相似性矩阵# topk指的是每个user, faiss搜寻后返回最类似的topk个userdef u2u_embdding_sim(click_df, user_emb_dict, save_path, topk): user_list = [] user_emb_list = [] for user_id, user_emb in user_emb_dict.items(): user_list.append(user_id) user_emb_list.append(user_emb) user_index_2_rawid_dict = {k: v for k, v in zip(range(len(user_list)), user_list)} user_emb_np = np.array(user_emb_list, dtype=np.float32) # 建设faiss索引 user_index = faiss.IndexFlatIP(user_emb_np.shape[1]) user_index.add(user_emb_np) # 类似度查问,给每个索引地位上的向量返回topk个item以及类似度 sim, idx = user_index.search(user_emb_np, topk) # 返回的是列表 # 将向量检索的后果保留成原始id的对应关系 user_sim_dict = collections.defaultdict(dict) for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(user_emb_np)), sim, idx)): target_raw_id = user_index_2_rawid_dict[target_idx] # 从1开始是为了去掉商品自身, 所以最终取得的类似商品只有topk-1 for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): rele_raw_id = user_index_2_rawid_dict[rele_idx] user_sim_dict[target_raw_id][rele_raw_id] = user_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value # 保留i2i类似度矩阵 pickle.dump(user_sim_dict, open(save_path + 'youtube_u2u_sim.pkl', 'wb')) return user_sim_dict
# 读取YoutubeDNN过程中产生的user embedding, 而后应用faiss计算用户之间的类似度# 这里须要留神,这里失去的user embedding其实并不是很好,因为YoutubeDNN中应用的是用户点击序列来训练的user embedding,# 如果序列广泛都比拟短的话,其实成果并不是很好user_emb_dict = pickle.load(open(save_path + 'user_youtube_emb.pkl', 'rb'))u2u_sim = u2u_embdding_sim(all_click_df, user_emb_dict, save_path, topk=10)
通过YoutubeDNN失去的user_embedding
# 应用召回评估函数验证以后召回形式的成果if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)else: trn_hist_click_df = all_click_dfuser_recall_items_dict = collections.defaultdict(dict)user_item_time_dict = get_user_item_time(trn_hist_click_df)u2u_sim = pickle.load(open(save_path + 'youtube_u2u_sim.pkl', 'rb'))sim_user_topk = 20recall_item_num = 10item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)for user in tqdm(trn_hist_click_df['user_id'].unique()): user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \ recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim) user_multi_recall_dict['youtubednn_usercf_recall'] = user_recall_items_dictpickle.dump(user_multi_recall_dict['youtubednn_usercf_recall'], open(save_path + 'youtubednn_usercf_recall.pkl', 'wb'))if metric_recall: # 召回成果评估 metrics_recall(user_multi_recall_dict['youtubednn_usercf_recall'], trn_last_click_df, topk=recall_item_num)
冷启动问题
冷启动问题能够分成三类:文章冷启动,用户冷启动,零碎冷启动。
- 文章冷启动:对于一个平台零碎新退出的文章,该文章没有任何的交互记录,如何举荐给用户的问题。(对于咱们场景能够认为是,日志数据中没有呈现过的文章都能够认为是冷启动的文章)
- 用户冷启动:对于一个平台零碎新来的用户,该用户还没有文章的交互信息,如何给该用户进行举荐。(对于咱们场景就是,测试集中的用户是否在测试集对应的log数据中呈现过,如果没有呈现过,那么能够认为该用户是冷启动用户。然而有时候并没有这么严格,咱们也能够本人设定某些指标来判断哪些用户是冷启动用户,比方通过应用时长,点击率,留存率等等)
- 零碎冷启动:就是对于一个平台刚上线,还没有任何的相干历史数据,此时就是零碎冷启动,其实也就是后面两种的一个综合。
以后场景下冷启动问题的剖析:
对以后的数据进行剖析会发现,日志中所有呈现过的点击文章只有3w多个,而整个文章库中却有30多万,那么测试集中的用户最初一次点击是否会点击没有呈现在日志中的文章呢?如果存在这种状况,阐明用户点击的文章之前没有任何的交互信息,这也就是咱们所说的文章冷启动。通过数据分析还能够发现,测试集用户只有一次点击的数据占得比例还不少,其实仅仅通过用户的一次点击就给用户举荐文章应用模型的形式也是比拟难的,这里其实也能够思考用户冷启动的问题,然而这里只给出物品冷启动的一些解决方案及代码,对于用户冷启动的话提一些可行性的做法。
文章冷启动(没有冷启动的摸索问题)
其实咱们这里不是为了做文章的冷启动而做冷启动,而是猜想用户可能会点击一些没有在log数据中呈现的文章,咱们要做的就是如何从将近27万的文章中抉择一些文章作为用户冷启动的文章,这里其实也能够看成是一种召回策略,咱们这里就采纳简略的比拟好了解的基于规定的召回策略来获取用户可能点击的未呈现在log数据中的文章。
当初的问题变成了:如何给每个用户思考从27万个商品中获取一小部分商品?随机选一些可能是一种计划。上面给出一些参考的计划。- 首先基于Embedding召回一部分与用户历史类似的文章
- 从基于Embedding召回的文章中通过一些规定过滤掉一些文章,使得留下的文章用户更可能点击。咱们这里的规定,能够是,留下那些与用户历史点击文章主题雷同的文章,或者字数相差不大的文章。并且留下的文章尽量是与测试集用户最初一次点击工夫更靠近的文章,或者是当天的文章也行。
- 用户冷启动
这里对测试集中的用户点击数据进行剖析会发现,测试集中有百分之20的用户只有一次点击,那么这些点击特地少的用户的召回是不是能够独自做一些策略上的补充呢?或者是在排序后间接基于规定加上一些文章呢?这些都能够去尝试,这里没有提供具体的做法。
留神:
这里看似和基于embedding计算的item之间类似度而后做itemcf是统一的,然而当初咱们的目标不一样,咱们这里的目标是找到类似的向量,并且还没有呈现在log日志中的商品,再加上一些其余的冷启动的策略,这里须要找回的数量会偏多一点,不然被筛选完之后可能都没有文章了
# 先进行itemcf召回,这里不须要做召回评估,这里只是一种策略trn_hist_click_df = all_click_dfuser_recall_items_dict = collections.defaultdict(dict)user_item_time_dict = get_user_item_time(trn_hist_click_df)i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))sim_item_topk = 150recall_item_num = 100 # 略微召回多一点文章,便于后续的规定筛选item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)for user in tqdm(trn_hist_click_df['user_id'].unique()): user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click,item_created_time_dict, emb_i2i_sim)pickle.dump(user_recall_items_dict, open(save_path + 'cold_start_items_raw_dict.pkl', 'wb'))
# 基于规定进行文章过滤# 保留文章主题与用户历史浏览主题类似的文章# 保留文章字数与用户历史浏览文章字数相差不大的文章# 保留最初一次点击当天的文章# 依照类似度返回最终的后果def get_click_article_ids_set(all_click_df): return set(all_click_df.click_article_id.values)def cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \ user_last_item_created_time_dict, item_type_dict, item_words_dict, item_created_time_dict, click_article_ids_set, recall_item_num): """ 冷启动的状况下召回一些文章 :param user_recall_items_dict: 基于内容embedding相似性召回来的很多文章, 字典, {user1: [item1, item2, ..], } :param user_hist_item_typs_dict: 字典, 用户点击的文章的主题映射 :param user_hist_item_words_dict: 字典, 用户点击的历史文章的字数映射 :param user_last_item_created_time_idct: 字典,用户点击的历史文章创立工夫映射 :param item_tpye_idct: 字典,文章主题映射 :param item_words_dict: 字典,文章字数映射 :param item_created_time_dict: 字典, 文章创立工夫映射 :param click_article_ids_set: 汇合,用户点击过得文章, 也就是日志外面呈现过的文章 :param recall_item_num: 召回文章的数量, 这个指的是没有呈现在日志外面的文章数量 """ cold_start_user_items_dict = {} for user, item_list in tqdm(user_recall_items_dict.items()): cold_start_user_items_dict.setdefault(user, []) for item, score in item_list: # 获取历史文章信息 hist_item_type_set = user_hist_item_typs_dict[user] hist_mean_words = user_hist_item_words_dict[user] hist_last_item_created_time = user_last_item_created_time_dict[user] hist_last_item_created_time = datetime.fromtimestamp(hist_last_item_created_time) # 获取以后召回文章的信息 curr_item_type = item_type_dict[item] curr_item_words = item_words_dict[item] curr_item_created_time = item_created_time_dict[item] curr_item_created_time = datetime.fromtimestamp(curr_item_created_time) # 首先,文章不能呈现在用户的历史点击中, 而后依据文章主题,文章单词数,文章创立工夫进行筛选 if curr_item_type not in hist_item_type_set or \ item in click_article_ids_set or \ abs(curr_item_words - hist_mean_words) > 200 or \ abs((curr_item_created_time - hist_last_item_created_time).days) > 90: continue cold_start_user_items_dict[user].append((item, score)) # {user1: [(item1, score1), (item2, score2)..]...} # 须要管制一下冷启动召回的数量 cold_start_user_items_dict = {k: sorted(v, key=lambda x:x[1], reverse=True)[:recall_item_num] \ for k, v in cold_start_user_items_dict.items()} pickle.dump(cold_start_user_items_dict, open(save_path + 'cold_start_user_items_dict.pkl', 'wb')) return cold_start_user_items_dict
all_click_df_ = all_click_df.copy()all_click_df_ = all_click_df_.merge(item_info_df, how='left', on='click_article_id')user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict = get_user_hist_item_info_dict(all_click_df_)click_article_ids_set = get_click_article_ids_set(all_click_df)# 须要留神的是# 这里应用了很多规定来筛选冷启动的文章,所以后面再召回的阶段就应该尽可能的多召回一些文章,否则很容易被删掉cold_start_user_items_dict = cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, user_last_item_created_time_dict, item_type_dict, item_words_dict, item_created_time_dict, click_article_ids_set, recall_item_num)user_multi_recall_dict['cold_start_recall'] = cold_start_user_items_dict
多路召回合并
多路召回合并就是将后面所有的召回策略失去的用户文章列表合并起来,上面是对后面所有召回后果的汇总
- 基于itemcf计算的item之间的类似度sim进行的召回
- 基于embedding搜寻失去的item之间的类似度进行的召回
- YoutubeDNN召回
- YoutubeDNN失去的user之间的类似度进行的召回
- 基于冷启动策略的召回
留神:
在做召回评估的时候就会发现有些召回的成果不错有些召回的成果很差,所以对每一路召回的后果,咱们能够认为的定义一些权重,来做最终的类似度交融
def combine_recall_results(user_multi_recall_dict, weight_dict=None, topk=25): final_recall_items_dict = {} # 对每一种召回后果依照用户进行归一化,不便前面多种召回后果,雷同用户的物品之间权重相加 def norm_user_recall_items_sim(sorted_item_list): # 如果冷启动中没有文章或者只有一篇文章,间接返回,呈现这种状况的起因可能是冷启动召回的文章数量太少了, # 基于规定筛选之后就没有文章了, 这里还能够做一些其余的策略性的筛选 if len(sorted_item_list) < 2: return sorted_item_list min_sim = sorted_item_list[-1][1] max_sim = sorted_item_list[0][1] norm_sorted_item_list = [] for item, score in sorted_item_list: if max_sim > 0: norm_score = 1.0 * (score - min_sim) / (max_sim - min_sim) if max_sim > min_sim else 1.0 else: norm_score = 0.0 norm_sorted_item_list.append((item, norm_score)) return norm_sorted_item_list print('多路召回合并...') for method, user_recall_items in tqdm(user_multi_recall_dict.items()): print(method + '...') # 在计算最终召回后果的时候,也能够为每一种召回后果设置一个权重 if weight_dict == None: recall_method_weight = 1 else: recall_method_weight = weight_dict[method] for user_id, sorted_item_list in user_recall_items.items(): # 进行归一化 user_recall_items[user_id] = norm_user_recall_items_sim(sorted_item_list) for user_id, sorted_item_list in user_recall_items.items(): # print('user_id') final_recall_items_dict.setdefault(user_id, {}) for item, score in sorted_item_list: final_recall_items_dict[user_id].setdefault(item, 0) final_recall_items_dict[user_id][item] += recall_method_weight * score final_recall_items_dict_rank = {} # 多路召回时也能够管制最终的召回数量 for user, recall_item_dict in final_recall_items_dict.items(): final_recall_items_dict_rank[user] = sorted(recall_item_dict.items(), key=lambda x: x[1], reverse=True)[:topk] # 将多路召回后的最终后果字典保留到本地 pickle.dump(final_recall_items_dict, open(os.path.join(save_path, 'final_recall_items_dict.pkl'),'wb')) return final_recall_items_dict_rank
# 这里间接对多路召回的权重给了一个雷同的值,其实能够依据后面召回的状况来调整参数的值weight_dict = {'itemcf_sim_itemcf_recall': 1.0, 'embedding_sim_item_recall': 1.0, 'youtubednn_recall': 1.0, 'youtubednn_usercf_recall': 1.0, 'cold_start_recall': 1.0} # 最终合并之后每个用户召回150个商品进行排序final_recall_items_dict_rank = combine_recall_results(user_multi_recall_dict, weight_dict, topk=150)
总结
上述实现了如下召回策略:
- 基于关联规定的itemcf
- 基于关联规定的usercf
- youtubednn召回
- 冷启动召回
对于上述实现的召回策略其实都不是最优的后果,咱们只是做了个简略的尝试,其中还有很多中央能够优化,包含曾经实现的这些召回策略的参数或者新加一些,批改一些关联规定都能够。当然还能够尝试更多的召回策略,比方对新闻进行热度召回等等。
Reference
- 重读Youtube深度学习举荐零碎论文,字字珠玑,惊为神文
- YouTube深度学习举荐零碎的十大工程问题
- YouTubeDNN原理
- Word2Vec知乎众赞文章 — word2vec放到排序中的w2v的介绍局部