共计 36733 个字符,预计需要花费 92 分钟才能阅读完成。
介绍
在第一局部,曾经介绍协同举荐,并利用它实现了一个简略的举荐零碎,在第二局部,咱们简要的剖析了咱们领有的数据,包含每个字段的散布,常见的统计信息等,为接下来的多路召回提供了很好的指引。
回忆一下 baseline 的思路,咱们首先计算了 item 的之间的类似度,而后基于用户的正反馈 item 列表,找到与列表中每一个 item 类似度最高的 topn 个 item,组成一个列表,最初间接依照类似度得分进行排序,失去最初的举荐后果。
在理论的举荐场景中,通常会有两个阶段,第一个阶段是召回阶段,第二个阶段是排序阶段。第一个阶段召回那些类似度较高的 N 个 item 列表,它更关注的是召回率,较为粗略;而排序阶段会应用更为简单的模型进行监督学习(转化为分类的工作),失去分类概率,也就是置信度,最初依照置信度进行排序,取 top K 个 item 作为最初的举荐列表。
baseline 中其实只蕴含了 召回
这个阶段,尽管但就这个工作而言,它曾经够了。本节介绍的是多路召回,什么是多路召回呢,比方在一个举荐场景中,咱们能够抉择 ItemCF 或者 UserCF 以及基于热点的召回策略等等,因为咱们召回层的目标是为了尽可能的确保召回,所以基于单个的策略必定是成果不如多个策略的,这里就引出了多路召回的概念,也就是多个策略并行的进行召回。上面这个图示意了多路召回的一个例子。
在多路召回中,每个策略之间息息相关,能够应用多种不同的策略来获取用户排序的候选商品汇合,而具体应用哪些召回策略其实是与 业务强相干
的,针对不同的工作就会有对于该业务实在场景下须要思考的召回规定。例如新闻举荐,召回规定能够是“热门视频”、“导演召回”、“演员召回”、“最近上映“、”风行趋势“、”类型召回“等等。
导入相干库
import pandas as pd
import numpy as np
from tqdm import tqdm
from collections import defaultdict
import os, math, warnings, math, pickle
from tqdm import tqdm
import faiss
import collections
import random
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from datetime import datetime
from deepctr.feature_column import SparseFeat, VarLenSparseFeat
from sklearn.preprocessing import LabelEncoder
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.preprocessing.sequence import pad_sequences
from deepmatch.models import *
from deepmatch.utils import sampledsoftmaxloss
warnings.filterwarnings('ignore')
data_path = './data_raw/'
save_path = './temp_results/'
# 做召回评估的一个标记, 如果不进行评估就是间接应用全量数据进行召回
metric_recall = False
读取数据
在个别的举荐零碎较量中读取数据局部次要分为三种模式,不同的模式对应的不同的数据集:
- Debug 模式:这个的目标是帮忙咱们基于数据先搭建一个繁难的 baseline 并跑通,保障写的 baseline 代码没有什么问题。因为举荐较量的数据往往十分微小,如果一上来间接采纳全副的数据进行剖析,搭建 baseline 框架,往往会带来工夫和设施上的损耗,所以这时候咱们往往须要从海量数据的训练集中随机抽取一部分样本来进行调试(train_click_log_sample),先跑通一个 baseline。
- 线下验证模式:这个的目标是帮忙咱们在线下基于已有的训练集数据,来抉择好适合的模型和一些超参数。所以咱们这一块只须要加载整个训练集(train_click_log),而后把整个训练集再分成训练集和验证集。训练集是模型的训练数据,验证集局部帮忙咱们调整模型的参数和其余的一些超参数。
- 线上模式:咱们用 debug 模式搭建起一个举荐零碎较量的 baseline,用线下验证模式抉择好了模型和一些超参数,这一部分就是真正的对于给定的测试集进行预测,提交到线上,所以这一块应用的训练数据集是全量的数据集(train_click_log+test_click_log)
上面就别离对这三种不同的数据读取模式先建设不同的代导入函数,不便前面针对不同的模式下导入数据。
# debug 模式:从训练集中划出一部分数据来调试代码
def get_all_click_sample(data_path, sample_nums=10000):
"""
训练集中采样一部分数据调试
data_path: 原数据的存储门路
sample_nums: 采样数目(这里因为机器的内存限度,能够采样用户做)"""all_click = pd.read_csv(data_path +'train_click_log.csv')
all_user_ids = all_click.user_id.unique()
sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False)
all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
return all_click
# 读取点击数据,这里分成线上和线下,如果是为了获取线上提交后果应该讲测试集中的点击数据合并到总的数据中
# 如果是为了线下验证模型的有效性或者特色的有效性,能够只应用训练集
def get_all_click_df(data_path='./data', offline=True):
if offline:
all_click = pd.read_csv(data_path + '/train_click_log.csv')
else:
trn_click = pd.read_csv(data_path + '/train_click_log.csv')
tst_click = pd.read_csv(data_path + '/testA_click_log.csv')
all_click = trn_click.append(tst_click)
all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
return all_click
# 读取文章的根本属性
def get_item_info_df(data_path):
item_info_df = pd.read_csv(data_path + 'articles.csv')
# 为了不便与训练集中的 click_article_id 拼接,须要把 article_id 批改成 click_article_id
item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'})
return item_info_df
# 读取文章的 Embedding 数据
def get_item_emb_dict(data_path):
item_emb_df = pd.read_csv(data_path + '/articles_emb.csv')
item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols])
# 进行归一化
item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
item_emb_dict = dict(zip(item_emb_df['article_id'], item_emb_np))
pickle.dump(item_emb_dict, open(save_path + 'item_content_emb.pkl', 'wb'))
return item_emb_dict
# min-max 归一化函数
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
# 采样数据
# all_click_df = get_all_click_sample(data_path)
# 全量训练集
all_click_df = get_all_click_df(offline=False)
# 对工夫戳进行归一化, 用于在关联规定的时候计算权重
all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(max_min_scaler)
item_info_df = get_item_info_df(data_path)
item_emb_dict = get_item_emb_dict(data_path)
工具函数
获取用户 - 文章 - 工夫函数
这个在基于关联规定的用户协同过滤的时候会用到
# 依据点击工夫获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...}
def get_user_item_time(click_df):
click_df = click_df.sort_values('click_timestamp')
def make_item_time_pair(df):
return list(zip(df['click_article_id'], df['click_timestamp']))
user_item_time_df = click_df.groupby('user_id')['click_article_id', 'click_timestamp'].apply(lambda x: make_item_time_pair(x))\
.reset_index().rename(columns={0: 'item_time_list'})
user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))
return user_item_time_dict
获取文章 - 用户 - 工夫函数
这个在基于关联规定的文章协同过滤的时候会用到
# 依据工夫获取商品被点击的用户序列 {item1: [(user1, time1), (user2, time2)...]...}
# 这里的工夫是用户点击以后商品的工夫,如同没有间接的关系。def get_item_user_time_dict(click_df):
def make_user_time_pair(df):
return list(zip(df['user_id'], df['click_timestamp']))
click_df = click_df.sort_values('click_timestamp')
item_user_time_df = click_df.groupby('click_article_id')['user_id', 'click_timestamp'].apply(lambda x: make_user_time_pair(x))\
.reset_index().rename(columns={0: 'user_time_list'})
item_user_time_dict = dict(zip(item_user_time_df['click_article_id'], item_user_time_df['user_time_list']))
return item_user_time_dict
获取历史和最初一次点击
这个在评估召回后果,特色工程和制作标签转成监督学习测试集的时候回用到
# 获取以后数据的历史点击和最初一次点击
def get_hist_and_last_click(all_click):
all_click = all_click.sort_values(by=['user_id', 'click_timestamp'])
click_last_df = all_click.groupby('user_id').tail(1)
# 如果用户只有一个点击,hist 为空了,会导致训练的时候这个用户不可见,此时默认泄露一下
def hist_func(user_df):
if len(user_df) == 1:
return user_df
else:
return user_df[:-1]
click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True)
return click_hist_df, click_last_df
获取文章属性特色
# 获取文章 id 对应的根本属性,保留成字典的模式,不便前面召回阶段,冷启动阶段间接应用
def get_item_info_dict(item_info_df):
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(max_min_scaler)
item_type_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id']))
item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count']))
item_created_time_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts']))
return item_type_dict, item_words_dict, item_created_time_dict
获取用户历史点击的文章信息
def get_user_hist_item_info_dict(all_click):
# 获取 user_id 对应的用户历史点击文章类型的汇合字典
user_hist_item_typs = all_click.groupby('user_id')['category_id'].agg(set).reset_index()
user_hist_item_typs_dict = dict(zip(user_hist_item_typs['user_id'], user_hist_item_typs['category_id']))
# 获取 user_id 对应的用户点击文章的汇合
user_hist_item_ids_dict = all_click.groupby('user_id')['click_article_id'].agg(set).reset_index()
user_hist_item_ids_dict = dict(zip(user_hist_item_ids_dict['user_id'], user_hist_item_ids_dict['click_article_id']))
# 获取 user_id 对应的用户历史点击的文章的均匀字数字典
user_hist_item_words = all_click.groupby('user_id')['words_count'].agg('mean').reset_index()
user_hist_item_words_dict = dict(zip(user_hist_item_words['user_id'], user_hist_item_words['words_count']))
# 获取 user_id 对应的用户最初一次点击的文章的创立工夫
all_click_ = all_click.sort_values('click_timestamp')
user_last_item_created_time = all_click_.groupby('user_id')['created_at_ts'].apply(lambda x: x.iloc[-1]).reset_index()
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
user_last_item_created_time['created_at_ts'] = user_last_item_created_time[['created_at_ts']].apply(max_min_scaler)
user_last_item_created_time_dict = dict(zip(user_last_item_created_time['user_id'], \
user_last_item_created_time['created_at_ts']))
return user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict
获取点击次数最多的 Top- k 个文章
获取近期点击最多的文章
def get_item_topk_click(click_df, k):
topk_click = click_df['click_article_id'].value_counts().index[:k]
return topk_click
定义多路召回字典
获取文章的属性信息,保留成字典的模式不便查问
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
# 定义一个多路召回的字典,将各路召回的后果都保留在这个字典当中
user_multi_recall_dict = {'itemcf_sim_itemcf_recall': {},
'embedding_sim_item_recall': {},
'youtubednn_recall': {},
'youtubednn_usercf_recall': {},
'cold_start_recall': {}}
# 提取最初一次点击作为召回评估,如果不须要做召回评估间接应用全量的训练集进行召回(线下验证模型)
# 如果不是召回评估,间接应用全量数据进行召回,不必将最初一次提取进去
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
召回成果评估
做完了召回有时候也须要对以后的召回办法或者参数进行调整以达到更好的召回成果,因为召回的后果决定了最终排序的下限,上面也会提供一个召回评估的办法
# 顺次评估召回的前 10, 20, 30, 40, 50 个文章中的击中率
def metrics_recall(user_recall_items_dict, trn_last_click_df, topk=5):
last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['click_article_id']))
user_num = len(user_recall_items_dict)
for k in range(10, topk+1, 10):
hit_num = 0
for user, item_list in user_recall_items_dict.items():
# 获取前 k 个召回的后果
tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]]
if last_click_item_dict[user] in set(tmp_recall_items):
hit_num += 1
hit_rate = round(hit_num * 1.0 / user_num, 5)
print('topk:', k, ':', 'hit_num:', hit_num, 'hit_rate:', hit_rate, 'user_num :', user_num)
计算相似性矩阵
这一部分次要是通过协同过滤以及向量检索失去相似性矩阵,相似性矩阵次要分为 user2user 和 item2item,上面顺次获取基于 itemCF 的 item2item 的相似性矩阵。
itemCF i2i_sim
借鉴 KDD2020 的去偏商品举荐,在计算 item2item 相似性矩阵时,应用关联规定,使得计算的文章的相似性还思考到了:
- 用户点击的工夫权重
- 用户点击的程序权重
- 文章创立的工夫权重
def itemcf_sim(df, item_created_time_dict):
"""
文章与文章之间的相似性矩阵计算
:param df: 数据表
:item_created_time_dict: 文章创立工夫的字典
return : 文章与文章的相似性矩阵
思路: 基于物品的协同过滤(具体请参考上一期举荐零碎根底的组队学习) + 关联规定
"""
user_item_time_dict = get_user_item_time(df)
# 计算物品类似度
i2i_sim = {}
item_cnt = defaultdict(int)
for user, item_time_list in tqdm(user_item_time_dict.items()):
# 在基于商品的协同过滤优化的时候能够思考工夫因素
for loc1, (i, i_click_time) in enumerate(item_time_list):
item_cnt[i] += 1
i2i_sim.setdefault(i, {})
for loc2, (j, j_click_time) in enumerate(item_time_list):
if(i == j):
continue
# 思考文章的正向程序点击和反向程序点击
loc_alpha = 1.0 if loc2 > loc1 else 0.7
# 地位信息权重,其中的参数能够调节
loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1))
# 点击工夫权重,其中的参数能够调节
click_time_weight = np.exp(0.7 ** np.abs(i_click_time - j_click_time))
# 两篇文章创立工夫的权重,其中的参数能够调节
created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
i2i_sim[i].setdefault(j, 0)
# 思考多种因素的权重计算最终的文章之间的类似度
i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len(item_time_list) + 1)
i2i_sim_ = i2i_sim.copy()
for i, related_items in i2i_sim.items():
for j, wij in related_items.items():
i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])
# 将失去的相似性矩阵保留到本地
pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))
return i2i_sim_
i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)
100%|██████████| 250000/250000 [14:20<00:00, 290.38it/s]
userCF u2u_sim
在计算用户之间的类似度的时候,也能够应用一些简略的关联规定,比方用户活跃度权重,这里将用户的点击次数作为用户活跃度的指标
def get_user_activate_degree_dict(all_click_df):
all_click_df_ = all_click_df.groupby('user_id')['click_article_id'].count().reset_index()
# 用户活跃度归一化
mm = MinMaxScaler()
all_click_df_['click_article_id'] = mm.fit_transform(all_click_df_[['click_article_id']])
user_activate_degree_dict = dict(zip(all_click_df_['user_id'], all_click_df_['click_article_id']))
return user_activate_degree_dict
def usercf_sim(all_click_df, user_activate_degree_dict):
"""
用户相似性矩阵计算
:param all_click_df: 数据表
:param user_activate_degree_dict: 用户活跃度的字典
return 用户相似性矩阵
思路: 基于用户的协同过滤(具体请参考上一期举荐零碎根底的组队学习) + 关联规定
"""
item_user_time_dict = get_item_user_time_dict(all_click_df)
u2u_sim = {}
user_cnt = defaultdict(int)
for item, user_time_list in tqdm(item_user_time_dict.items()):
for u, click_time in user_time_list:
user_cnt[u] += 1
u2u_sim.setdefault(u, {})
for v, click_time in user_time_list:
u2u_sim[u].setdefault(v, 0)
if u == v:
continue
# 用户均匀活跃度作为活跃度的权重,这里的式子也能够改善
activate_weight = 100 * 0.5 * (user_activate_degree_dict[u] + user_activate_degree_dict[v])
u2u_sim[u][v] += activate_weight / math.log(len(user_time_list) + 1)
u2u_sim_ = u2u_sim.copy()
for u, related_users in u2u_sim.items():
for v, wij in related_users.items():
u2u_sim_[u][v] = wij / math.sqrt(user_cnt[u] * user_cnt[v])
# 将失去的相似性矩阵保留到本地
pickle.dump(u2u_sim_, open(save_path + 'usercf_u2u_sim.pkl', 'wb'))
return u2u_sim_
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
# 因为 usercf 计算时候太消耗内存了,这里就不间接运行了
# 如果是采样的话,是能够运行的
user_activate_degree_dict = get_user_activate_degree_dict(all_click_df)
u2u_sim = usercf_sim(all_click_df, user_activate_degree_dict)
item embedding sim
应用 Embedding 计算 item 之间的类似度是为了后续冷启动的时候能够获取未呈现在点击数据中的文章,前面有对冷启动专门的介绍,这里简略的说一下 faiss。
aiss 是 Facebook 的 AI 团队开源的一套用于做聚类或者相似性搜寻的软件库,底层是用 C ++ 实现。Faiss 因为超级优越的性能,被广泛应用于举荐相干的业务当中.
faiss 工具包个别应用在举荐零碎中的向量召回局部。在做向量召回的时候要么是 u2u,u2i 或者 i2i,这里的 u 和 i 指的是 user 和 item. 咱们晓得在理论的场景中 user 和 item 的数量都是海量的,咱们最容易想到的基于向量类似度的召回就是应用两层循环遍历 user 列表或者 item 列表计算两个向量的类似度,然而这样做在面对海量数据是不切实际的,faiss 就是用来减速计算某个查问向量最类似的 topk 个索引向量。
faiss 查问的原理:
faiss 应用了 PCA 和 PQ(Product quantization 乘积量化)两种技术进行向量压缩和编码,当然还应用了其余的技术进行优化,然而 PCA 和 PQ 是其中最外围局部。
- PCA 降维算法细节参考上面这个链接进行学习
主成分剖析(PCA)原理总结 4 - PQ 编码的细节上面这个链接进行学习
实例了解 product quantization 算法 4
faiss 应用
faiss 官网教程 7
# 向量检索类似度计算
# topk 指的是每个 item, faiss 搜寻后返回最类似的 topk 个 item
def embdding_sim(click_df, item_emb_df, save_path, topk):
"""
基于内容的文章 embedding 相似性矩阵计算
:param click_df: 数据表
:param item_emb_df: 文章的 embedding
:param save_path: 保留门路
:patam topk: 找最类似的 topk 篇
return 文章相似性矩阵
思路: 对于每一篇文章,基于 embedding 的相似性返回 topk 个与其最类似的文章,只不过因为文章数量太多,这里用了 faiss 进行减速
"""
# 文章索引与文章 id 的字典映射
item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['article_id']))
item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32)
# 向量进行单位化
item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
# 建设 faiss 索引
item_index = faiss.IndexFlatIP(item_emb_np.shape[1])
item_index.add(item_emb_np)
# 类似度查问,给每个索引地位上的向量返回 topk 个 item 以及类似度
sim, idx = item_index.search(item_emb_np, topk) # 返回的是列表
# 将向量检索的后果保留成原始 id 的对应关系
item_sim_dict = collections.defaultdict(dict)
for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)):
target_raw_id = item_idx_2_rawid_dict[target_idx]
# 从 1 开始是为了去掉商品自身, 所以最终取得的类似商品只有 topk-1
for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
rele_raw_id = item_idx_2_rawid_dict[rele_idx]
item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
# 保留 i2i 类似度矩阵
pickle.dump(item_sim_dict, open(save_path + 'emb_i2i_sim.pkl', 'wb'))
return item_sim_dict
item_emb_df = pd.read_csv(data_path + '/articles_emb.csv')
emb_i2i_sim = embdding_sim(all_click_df, item_emb_df, save_path, topk=10) # topk 能够自行设置
召回
这个就是咱们开篇提到的那个问题,面的 36 万篇文章,20 多万用户的举荐,咱们又有哪些策略来缩减问题的规模?咱们就能够再召回阶段筛选出用户对于点击文章的候选汇合,从而升高问题的规模。召回罕用的策略:
- Youtube DNN 召回
-
基于文章的召回
- 文章的协同过滤
- 基于文章 embedding 的召回
-
基于用户的召回
- 用户的协同过滤
- 用户 embedding
下面的各种召回形式一部分在基于用户曾经看得文章的根底下来召回与这些文章类似的一些文章,而这个相似性的计算形式不同,就失去了不同的召回形式,比方文章的协同过滤,文章内容的 embedding 等。还有一部分是依据用户的相似性进行举荐,对于某用户举荐与其类似的其余用户看过的文章,比方用户的协同过滤和用户 embedding。还有一种思路是相似矩阵合成的思路,先计算出用户和文章的 embedding 之后,就能够间接算用户和文章的类似度,依据这个类似度进行举荐,比方 YouTube DNN。咱们上面具体来看一下每一个召回办法:
YoutubeDNN 召回
(这一步是间接获取用户召回的候选文章列表)
Youtubednn 召回架构
对于 YoutubeDNN 原理和利用举荐看王喆的两篇博客:
# 获取双塔召回时的训练验证数据
# negsample 指的是通过滑窗构建样本的时候,负样本的数量
def gen_data_set(data, negsample=0):
data.sort_values("click_timestamp", inplace=True)
item_ids = data['click_article_id'].unique()
train_set = []
test_set = []
for reviewerID, hist in tqdm(data.groupby('user_id')):
pos_list = hist['click_article_id'].tolist()
if negsample > 0:
candidate_set = list(set(item_ids) - set(pos_list)) # 用户没看过的文章外面抉择负样本
neg_list = np.random.choice(candidate_set,size=len(pos_list)*negsample,replace=True) # 对于每个正样本,抉择 n 个负样本
# 长度只有一个的时候,须要把这条数据也放到训练集中,不然的话最终学到的 embedding 就会有缺失
if len(pos_list) == 1:
train_set.append((reviewerID, [pos_list[0]], pos_list[0],1,len(pos_list)))
test_set.append((reviewerID, [pos_list[0]], pos_list[0],1,len(pos_list)))
# 滑窗结构正负样本
for i in range(1, len(pos_list)):
hist = pos_list[:i]
if i != len(pos_list) - 1:
train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]))) # 正样本 [user_id, his_item, pos_item, label, len(his_item)]
for negi in range(negsample):
train_set.append((reviewerID, hist[::-1], neg_list[i*negsample+negi], 0,len(hist[::-1]))) # 负样本 [user_id, his_item, neg_item, label, len(his_item)]
else:
# 将最长的那一个序列长度作为测试数据
test_set.append((reviewerID, hist[::-1], pos_list[i],1,len(hist[::-1])))
random.shuffle(train_set)
random.shuffle(test_set)
return train_set, test_set
# 将输出的数据进行 padding,使得序列特色的长度都统一
def gen_model_input(train_set,user_profile,seq_max_len):
train_uid = np.array([line[0] for line in train_set])
train_seq = [line[1] for line in train_set]
train_iid = np.array([line[2] for line in train_set])
train_label = np.array([line[3] for line in train_set])
train_hist_len = np.array([line[4] for line in train_set])
train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0)
train_model_input = {"user_id": train_uid, "click_article_id": train_iid, "hist_article_id": train_seq_pad,
"hist_len": train_hist_len}
return train_model_input, train_label
def youtubednn_u2i_dict(data, topk=20):
sparse_features = ["click_article_id", "user_id"]
SEQ_LEN = 30 # 用户点击序列的长度,短的填充,长的截断
user_profile_ = data[["user_id"]].drop_duplicates('user_id')
item_profile_ = data[["click_article_id"]].drop_duplicates('click_article_id')
# 类别编码
features = ["click_article_id", "user_id"]
feature_max_idx = {}
for feature in features:
lbe = LabelEncoder()
data[feature] = lbe.fit_transform(data[feature])
feature_max_idx[feature] = data[feature].max() + 1
# 提取 user 和 item 的画像,这里具体抉择哪些特色还须要进一步的剖析和思考
user_profile = data[["user_id"]].drop_duplicates('user_id')
item_profile = data[["click_article_id"]].drop_duplicates('click_article_id')
user_index_2_rawid = dict(zip(user_profile['user_id'], user_profile_['user_id']))
item_index_2_rawid = dict(zip(item_profile['click_article_id'], item_profile_['click_article_id']))
# 划分训练和测试集
# 因为深度学习须要的数据量通常都是十分大的,所以为了保障召回的成果,往往会通过滑窗的模式裁减训练样本
train_set, test_set = gen_data_set(data, 0)
# 整顿输出数据,具体的操作能够看下面的函数
train_model_input, train_label = gen_model_input(train_set, user_profile, SEQ_LEN)
test_model_input, test_label = gen_model_input(test_set, user_profile, SEQ_LEN)
# 确定 Embedding 的维度
embedding_dim = 16
# 将数据整顿成模型能够间接输出的模式
user_feature_columns = [SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim),
VarLenSparseFeat(SparseFeat('hist_article_id', feature_max_idx['click_article_id'], embedding_dim,
embedding_name="click_article_id"), SEQ_LEN, 'mean', 'hist_len'),]
item_feature_columns = [SparseFeat('click_article_id', feature_max_idx['click_article_id'], embedding_dim)]
# 模型的定义
# num_sampled: 负采样时的样本数量
model = YoutubeDNN(user_feature_columns, item_feature_columns, num_sampled=5, user_dnn_hidden_units=(64, embedding_dim))
# 模型编译
model.compile(optimizer="adam", loss=sampledsoftmaxloss)
# 模型训练,这里能够定义验证集的比例,如果设置为 0 的话就是全量数据间接进行训练
history = model.fit(train_model_input, train_label, batch_size=256, epochs=1, verbose=1, validation_split=0.0)
# 训练完模型之后, 提取训练的 Embedding,包含 user 端和 item 端
test_user_model_input = test_model_input
all_item_model_input = {"click_article_id": item_profile['click_article_id'].values}
user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding)
item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding)
# 保留以后的 item_embedding 和 user_embedding 排序的时候可能可能用到,然而须要留神保留的时候须要和原始的 id 对应
user_embs = user_embedding_model.predict(test_user_model_input, batch_size=2 ** 12)
item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12)
# embedding 保留之前归一化一下
user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True)
item_embs = item_embs / np.linalg.norm(item_embs, axis=1, keepdims=True)
# 将 Embedding 转换成字典的模式不便查问
raw_user_id_emb_dict = {user_index_2_rawid[k]:
v for k, v in zip(user_profile['user_id'], user_embs)}
raw_item_id_emb_dict = {item_index_2_rawid[k]:
v for k, v in zip(item_profile['click_article_id'], item_embs)}
# 将 Embedding 保留到本地
pickle.dump(raw_user_id_emb_dict, open(save_path + 'user_youtube_emb.pkl', 'wb'))
pickle.dump(raw_item_id_emb_dict, open(save_path + 'item_youtube_emb.pkl', 'wb'))
# faiss 紧邻搜寻,通过 user_embedding 搜寻与其相似性最高的 topk 个 item
index = faiss.IndexFlatIP(embedding_dim)
# 下面曾经进行了归一化,这里能够不进行归一化了
# faiss.normalize_L2(user_embs)
# faiss.normalize_L2(item_embs)
index.add(item_embs) # 将 item 向量构建索引
sim, idx = index.search(np.ascontiguousarray(user_embs), topk) # 通过 user 去查问最类似的 topk 个 item
user_recall_items_dict = collections.defaultdict(dict)
for target_idx, sim_value_list, rele_idx_list in tqdm(zip(test_user_model_input['user_id'], sim, idx)):
target_raw_id = user_index_2_rawid[target_idx]
# 从 1 开始是为了去掉商品自身, 所以最终取得的类似商品只有 topk-1
for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
rele_raw_id = item_index_2_rawid[rele_idx]
user_recall_items_dict[target_raw_id][rele_raw_id] = user_recall_items_dict.get(target_raw_id, {})
.get(rele_raw_id, 0) + sim_value
user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) for k, v in user_recall_items_dict.items()}
# 将召回的后果进行排序
# 保留召回的后果
# 这里是间接通过向量的形式失去了召回后果,相比于下面的召回办法,下面的只是失去了 i2i 及 u2u 的相似性矩阵,还须要进行协同过滤召回能力失去召回后果
# 能够间接对这个召回后果进行评估,为了不便能够对立写一个评估函数对所有的召回后果进行评估
pickle.dump(user_recall_items_dict, open(save_path + 'youtube_u2i_dict.pkl', 'wb'))
return user_recall_items_dict
# 因为这里须要做召回评估,所以讲训练集中的最初一次点击都提取了进去
if not metric_recall:
user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(all_click_df, topk=20)
else:
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(trn_hist_click_df, topk=20)
# 召回成果评估
metrics_recall(user_multi_recall_dict['youtubednn_recall'], trn_last_click_df, topk=20)
itemCF recall
下面曾经通过协同过滤,Embedding 检索的形式失去了文章的类似度矩阵,上面应用协同过滤的思维,给用户召回与其历史文章类似的文章。
这里在召回的时候,也是用了关联规定的形式:
- 思考类似文章与历史点击文章程序的权重(细节看代码)
- 思考文章创立工夫的权重,也就是思考类似文章与历史点击文章创立时间差的权重
- 思考文章内容类似度权重(应用 Embedding 计算类似文章类似度,然而这里须要留神,在 Embedding 的时候并没有计算所有商品两两之间的类似度,所以类似的文章与历史点击文章不存在类似度,须要做非凡解决)
# 基于商品的召回 i2i
def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim):
"""
基于文章协同过滤的召回
:param user_id: 用户 id
:param user_item_time_dict: 字典, 依据点击工夫获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...}
:param i2i_sim: 字典,文章相似性矩阵
:param sim_item_topk: 整数,抉择与以后文章最类似的前 k 篇文章
:param recall_item_num: 整数,最初的召回文章数量
:param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全
:param emb_i2i_sim: 字典基于内容 embedding 算的文章相似矩阵
return: 召回的文章列表 [(item1, score1), (item2, score2)...]
"""
# 获取用户历史交互的文章
user_hist_items = user_item_time_dict[user_id]
user_hist_items_ = {user_id for user_id, _ in user_hist_items}
item_rank = {}
for loc, (i, click_time) in enumerate(user_hist_items):
for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
if j in user_hist_items_:
continue
# 文章创立时间差权重
created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
# 类似文章和历史点击文章序列中历史文章所在的地位权重
loc_weight = (0.9 ** (len(user_hist_items) - loc))
content_weight = 1.0
if emb_i2i_sim.get(i, {}).get(j, None) is not None:
content_weight += emb_i2i_sim[i][j]
if emb_i2i_sim.get(j, {}).get(i, None) is not None:
content_weight += emb_i2i_sim[j][i]
item_rank.setdefault(j, 0)
item_rank[j] += created_time_weight * loc_weight * content_weight * wij
# 有余 10 个,用热门商品补全
if len(item_rank) < recall_item_num:
for i, item in enumerate(item_topk_click):
if item in item_rank.items(): # 填充的 item 应该不在原来的列表中
continue
item_rank[item] = - i - 100 # 轻易给个正数就行
if len(item_rank) == recall_item_num:
break
item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
return item_rank
itemCF sim 召回
# 先进行 itemcf 召回, 为了召回评估,所以提取最初一次点击
if metric_recall:
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
trn_hist_click_df = all_click_df
user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb'))
emb_i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl', 'rb'))
sim_item_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, \
i2i_sim, sim_item_topk, recall_item_num, \
item_topk_click, item_created_time_dict, emb_i2i_sim)
user_multi_recall_dict['itemcf_sim_itemcf_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['itemcf_sim_itemcf_recall'], open(save_path + 'itemcf_recall_dict.pkl', 'wb'))
if metric_recall:
# 召回成果评估
metrics_recall(user_multi_recall_dict['itemcf_sim_itemcf_recall'], trn_last_click_df, topk=recall_item_num)
embedding sim 召回
# 这里是为了召回评估,所以提取最初一次点击
if metric_recall:
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
trn_hist_click_df = all_click_df
user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))
sim_item_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk,
recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)
user_multi_recall_dict['embedding_sim_item_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['embedding_sim_item_recall'], open(save_path + 'embedding_sim_item_recall.pkl', 'wb'))
if metric_recall:
# 召回成果评估
metrics_recall(user_multi_recall_dict['embedding_sim_item_recall'], trn_last_click_df, topk=recall_item_num)
userCF 召回
基于用户协同过滤,核心思想是给用户举荐与其类似的用户历史点击文章,因为这里波及到了类似用户的历史文章,这里依然能够加上一些关联规定来给用户可能点击的文章进行加权,这里应用的关联规定次要是思考类似用户的历史点击文章与被举荐用户历史点击商品的关系权重,而这里的关系就能够间接借鉴基于物品的协同过滤类似的做法,只不过这里是对被举荐物品关系的一个累加的过程,上面是应用的一些关系权重,及相干的代码:
- 计算被举荐用户历史点击文章与类似用户历史点击文章的类似度,文章创立时间差,绝对地位的总和,作为各自的权重
# 基于用户的召回 u2u2i
def user_based_recommend(user_id, user_item_time_dict, u2u_sim, sim_user_topk, recall_item_num,
item_topk_click, item_created_time_dict, emb_i2i_sim):
"""
基于文章协同过滤的召回
:param user_id: 用户 id
:param user_item_time_dict: 字典, 依据点击工夫获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...}
:param u2u_sim: 字典,文章相似性矩阵
:param sim_user_topk: 整数,抉择与以后用户最类似的前 k 个用户
:param recall_item_num: 整数,最初的召回文章数量
:param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全
:param item_created_time_dict: 文章创立工夫列表
:param emb_i2i_sim: 字典基于内容 embedding 算的文章相似矩阵
return: 召回的文章列表 [(item1, score1), (item2, score2)...]
"""
# 历史交互
user_item_time_list = user_item_time_dict[user_id] # {item1: time1, item2: time2...}
user_hist_items = set([i for i, t in user_item_time_list]) # 存在一个用户与某篇文章的屡次交互,这里得去重
items_rank = {}
for sim_u, wuv in sorted(u2u_sim[user_id].items(), key=lambda x: x[1], reverse=True)[:sim_user_topk]:
for i, click_time in user_item_time_dict[sim_u]:
if i in user_hist_items:
continue
items_rank.setdefault(i, 0)
loc_weight = 1.0
content_weight = 1.0
created_time_weight = 1.0
# 以后文章与该用户看的历史文章进行一个权重交互
for loc, (j, click_time) in enumerate(user_item_time_list):
# 点击时的绝对地位权重
loc_weight += 0.9 ** (len(user_item_time_list) - loc)
# 内容相似性权重
if emb_i2i_sim.get(i, {}).get(j, None) is not None:
content_weight += emb_i2i_sim[i][j]
if emb_i2i_sim.get(j, {}).get(i, None) is not None:
content_weight += emb_i2i_sim[j][i]
# 创立时间差权重
created_time_weight += np.exp(0.8 * np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
items_rank[i] += loc_weight * content_weight * created_time_weight * wuv
# 热度补全
if len(items_rank) < recall_item_num:
for i, item in enumerate(item_topk_click):
if item in items_rank.items(): # 填充的 item 应该不在原来的列表中
continue
items_rank[item] = - i - 100 # 轻易给个复数就行
if len(items_rank) == recall_item_num:
break
items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
return items_rank
userCF sim 召回
# 这里是为了召回评估,所以提取最初一次点击
# 因为 usercf 中计算 user 之间的类似度的过程太费内存了,全量数据这里就没有跑,跑了一个采样之后的数据
if metric_recall:
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
trn_hist_click_df = all_click_df
user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
u2u_sim = pickle.load(open(save_path + 'usercf_u2u_sim.pkl', 'rb'))
sim_user_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \
recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)
pickle.dump(user_recall_items_dict, open(save_path + 'usercf_u2u2i_recall.pkl', 'wb'))
if metric_recall:
# 召回成果评估
metrics_recall(user_recall_items_dict, trn_last_click_df, topk=recall_item_num)
user embedding sim 召回
尽管没有间接跑 usercf 的计算用户之间的类似度,为了验证上述基于用户的协同过滤的代码,上面应用了 YoutubeDNN 过程中产生的 user embedding 来进行向量检索每个 user 最类似的 topk 个 user,在应用这里失去的 u2u 的相似性矩阵,应用 usercf 进行召回,具体代码如下
# 应用 Embedding 的形式获取 u2u 的相似性矩阵
# topk 指的是每个 user, faiss 搜寻后返回最类似的 topk 个 user
def u2u_embdding_sim(click_df, user_emb_dict, save_path, topk):
user_list = []
user_emb_list = []
for user_id, user_emb in user_emb_dict.items():
user_list.append(user_id)
user_emb_list.append(user_emb)
user_index_2_rawid_dict = {k: v for k, v in zip(range(len(user_list)), user_list)}
user_emb_np = np.array(user_emb_list, dtype=np.float32)
# 建设 faiss 索引
user_index = faiss.IndexFlatIP(user_emb_np.shape[1])
user_index.add(user_emb_np)
# 类似度查问,给每个索引地位上的向量返回 topk 个 item 以及类似度
sim, idx = user_index.search(user_emb_np, topk) # 返回的是列表
# 将向量检索的后果保留成原始 id 的对应关系
user_sim_dict = collections.defaultdict(dict)
for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(user_emb_np)), sim, idx)):
target_raw_id = user_index_2_rawid_dict[target_idx]
# 从 1 开始是为了去掉商品自身, 所以最终取得的类似商品只有 topk-1
for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
rele_raw_id = user_index_2_rawid_dict[rele_idx]
user_sim_dict[target_raw_id][rele_raw_id] = user_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
# 保留 i2i 类似度矩阵
pickle.dump(user_sim_dict, open(save_path + 'youtube_u2u_sim.pkl', 'wb'))
return user_sim_dict
# 读取 YoutubeDNN 过程中产生的 user embedding, 而后应用 faiss 计算用户之间的类似度
# 这里须要留神,这里失去的 user embedding 其实并不是很好,因为 YoutubeDNN 中应用的是用户点击序列来训练的 user embedding,
# 如果序列广泛都比拟短的话,其实成果并不是很好
user_emb_dict = pickle.load(open(save_path + 'user_youtube_emb.pkl', 'rb'))
u2u_sim = u2u_embdding_sim(all_click_df, user_emb_dict, save_path, topk=10)
通过 YoutubeDNN 失去的 user_embedding
# 应用召回评估函数验证以后召回形式的成果
if metric_recall:
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
trn_hist_click_df = all_click_df
user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
u2u_sim = pickle.load(open(save_path + 'youtube_u2u_sim.pkl', 'rb'))
sim_user_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \
recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)
user_multi_recall_dict['youtubednn_usercf_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['youtubednn_usercf_recall'], open(save_path + 'youtubednn_usercf_recall.pkl', 'wb'))
if metric_recall:
# 召回成果评估
metrics_recall(user_multi_recall_dict['youtubednn_usercf_recall'], trn_last_click_df, topk=recall_item_num)
冷启动问题
冷启动问题能够分成三类:文章冷启动,用户冷启动,零碎冷启动。
- 文章冷启动:对于一个平台零碎新退出的文章,该文章没有任何的交互记录,如何举荐给用户的问题。(对于咱们场景能够认为是,日志数据中没有呈现过的文章都能够认为是冷启动的文章)
- 用户冷启动:对于一个平台零碎新来的用户,该用户还没有文章的交互信息,如何给该用户进行举荐。(对于咱们场景就是,测试集中的用户是否在测试集对应的 log 数据中呈现过,如果没有呈现过,那么能够认为该用户是冷启动用户。然而有时候并没有这么严格,咱们也能够本人设定某些指标来判断哪些用户是冷启动用户,比方通过应用时长,点击率,留存率等等)
- 零碎冷启动:就是对于一个平台刚上线,还没有任何的相干历史数据,此时就是零碎冷启动,其实也就是后面两种的一个综合。
以后场景下冷启动问题的剖析:
对以后的数据进行剖析会发现,日志中所有呈现过的点击文章只有 3w 多个,而整个文章库中却有 30 多万,那么测试集中的用户最初一次点击是否会点击没有呈现在日志中的文章呢?如果存在这种状况,阐明用户点击的文章之前没有任何的交互信息,这也就是咱们所说的文章冷启动。通过数据分析还能够发现,测试集用户只有一次点击的数据占得比例还不少,其实仅仅通过用户的一次点击就给用户举荐文章应用模型的形式也是比拟难的,这里其实也能够思考用户冷启动的问题,然而这里只给出物品冷启动的一些解决方案及代码,对于用户冷启动的话提一些可行性的做法。
-
文章冷启动 (没有冷启动的摸索问题)
其实咱们这里不是为了做文章的冷启动而做冷启动,而是猜想用户可能会点击一些没有在 log 数据中呈现的文章,咱们要做的就是如何从将近 27 万的文章中抉择一些文章作为用户冷启动的文章,这里其实也能够看成是一种召回策略,咱们这里就采纳简略的比拟好了解的基于规定的召回策略来获取用户可能点击的未呈现在 log 数据中的文章。
当初的问题变成了:如何给每个用户思考从 27 万个商品中获取一小部分商品?随机选一些可能是一种计划。上面给出一些参考的计划。- 首先基于 Embedding 召回一部分与用户历史类似的文章
- 从基于 Embedding 召回的文章中通过一些规定过滤掉一些文章,使得留下的文章用户更可能点击。咱们这里的规定,能够是,留下那些与用户历史点击文章主题雷同的文章,或者字数相差不大的文章。并且留下的文章尽量是与测试集用户最初一次点击工夫更靠近的文章,或者是当天的文章也行。
- 用户冷启动
这里对测试集中的用户点击数据进行剖析会发现,测试集中有百分之 20 的用户只有一次点击,那么这些点击特地少的用户的召回是不是能够独自做一些策略上的补充呢?或者是在排序后间接基于规定加上一些文章呢?这些都能够去尝试,这里没有提供具体的做法。
留神:
这里看似和基于 embedding 计算的 item 之间类似度而后做 itemcf 是统一的,然而当初咱们的目标不一样,咱们这里的目标是找到类似的向量,并且还没有呈现在 log 日志中的商品,再加上一些其余的冷启动的策略,这里须要找回的数量会偏多一点,不然被筛选完之后可能都没有文章了
# 先进行 itemcf 召回,这里不须要做召回评估,这里只是一种策略
trn_hist_click_df = all_click_df
user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))
sim_item_topk = 150
recall_item_num = 100 # 略微召回多一点文章,便于后续的规定筛选
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk,
recall_item_num, item_topk_click,item_created_time_dict, emb_i2i_sim)
pickle.dump(user_recall_items_dict, open(save_path + 'cold_start_items_raw_dict.pkl', 'wb'))
# 基于规定进行文章过滤
# 保留文章主题与用户历史浏览主题类似的文章
# 保留文章字数与用户历史浏览文章字数相差不大的文章
# 保留最初一次点击当天的文章
# 依照类似度返回最终的后果
def get_click_article_ids_set(all_click_df):
return set(all_click_df.click_article_id.values)
def cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \
user_last_item_created_time_dict, item_type_dict, item_words_dict,
item_created_time_dict, click_article_ids_set, recall_item_num):
"""
冷启动的状况下召回一些文章
:param user_recall_items_dict: 基于内容 embedding 相似性召回来的很多文章,字典,{user1: [item1, item2, ..], }
:param user_hist_item_typs_dict: 字典,用户点击的文章的主题映射
:param user_hist_item_words_dict: 字典,用户点击的历史文章的字数映射
:param user_last_item_created_time_idct: 字典,用户点击的历史文章创立工夫映射
:param item_tpye_idct: 字典,文章主题映射
:param item_words_dict: 字典,文章字数映射
:param item_created_time_dict: 字典,文章创立工夫映射
:param click_article_ids_set: 汇合,用户点击过得文章, 也就是日志外面呈现过的文章
:param recall_item_num: 召回文章的数量,这个指的是没有呈现在日志外面的文章数量
"""
cold_start_user_items_dict = {}
for user, item_list in tqdm(user_recall_items_dict.items()):
cold_start_user_items_dict.setdefault(user, [])
for item, score in item_list:
# 获取历史文章信息
hist_item_type_set = user_hist_item_typs_dict[user]
hist_mean_words = user_hist_item_words_dict[user]
hist_last_item_created_time = user_last_item_created_time_dict[user]
hist_last_item_created_time = datetime.fromtimestamp(hist_last_item_created_time)
# 获取以后召回文章的信息
curr_item_type = item_type_dict[item]
curr_item_words = item_words_dict[item]
curr_item_created_time = item_created_time_dict[item]
curr_item_created_time = datetime.fromtimestamp(curr_item_created_time)
# 首先,文章不能呈现在用户的历史点击中,而后依据文章主题,文章单词数,文章创立工夫进行筛选
if curr_item_type not in hist_item_type_set or \
item in click_article_ids_set or \
abs(curr_item_words - hist_mean_words) > 200 or \
abs((curr_item_created_time - hist_last_item_created_time).days) > 90:
continue
cold_start_user_items_dict[user].append((item, score)) # {user1: [(item1, score1), (item2, score2)..]...}
# 须要管制一下冷启动召回的数量
cold_start_user_items_dict = {k: sorted(v, key=lambda x:x[1], reverse=True)[:recall_item_num] \
for k, v in cold_start_user_items_dict.items()}
pickle.dump(cold_start_user_items_dict, open(save_path + 'cold_start_user_items_dict.pkl', 'wb'))
return cold_start_user_items_dict
all_click_df_ = all_click_df.copy()
all_click_df_ = all_click_df_.merge(item_info_df, how='left', on='click_article_id')
user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict = get_user_hist_item_info_dict(all_click_df_)
click_article_ids_set = get_click_article_ids_set(all_click_df)
# 须要留神的是
# 这里应用了很多规定来筛选冷启动的文章,所以后面再召回的阶段就应该尽可能的多召回一些文章,否则很容易被删掉
cold_start_user_items_dict = cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict,
user_last_item_created_time_dict, item_type_dict, item_words_dict,
item_created_time_dict, click_article_ids_set, recall_item_num)
user_multi_recall_dict['cold_start_recall'] = cold_start_user_items_dict
多路召回合并
多路召回合并就是将后面所有的召回策略失去的用户文章列表合并起来,上面是对后面所有召回后果的汇总
- 基于 itemcf 计算的 item 之间的类似度 sim 进行的召回
- 基于 embedding 搜寻失去的 item 之间的类似度进行的召回
- YoutubeDNN 召回
- YoutubeDNN 失去的 user 之间的类似度进行的召回
- 基于冷启动策略的召回
留神:
在做召回评估的时候就会发现有些召回的成果不错有些召回的成果很差,所以对每一路召回的后果,咱们能够认为的定义一些权重,来做最终的类似度交融
def combine_recall_results(user_multi_recall_dict, weight_dict=None, topk=25):
final_recall_items_dict = {}
# 对每一种召回后果依照用户进行归一化,不便前面多种召回后果,雷同用户的物品之间权重相加
def norm_user_recall_items_sim(sorted_item_list):
# 如果冷启动中没有文章或者只有一篇文章,间接返回,呈现这种状况的起因可能是冷启动召回的文章数量太少了,# 基于规定筛选之后就没有文章了, 这里还能够做一些其余的策略性的筛选
if len(sorted_item_list) < 2:
return sorted_item_list
min_sim = sorted_item_list[-1][1]
max_sim = sorted_item_list[0][1]
norm_sorted_item_list = []
for item, score in sorted_item_list:
if max_sim > 0:
norm_score = 1.0 * (score - min_sim) / (max_sim - min_sim) if max_sim > min_sim else 1.0
else:
norm_score = 0.0
norm_sorted_item_list.append((item, norm_score))
return norm_sorted_item_list
print('多路召回合并...')
for method, user_recall_items in tqdm(user_multi_recall_dict.items()):
print(method + '...')
# 在计算最终召回后果的时候,也能够为每一种召回后果设置一个权重
if weight_dict == None:
recall_method_weight = 1
else:
recall_method_weight = weight_dict[method]
for user_id, sorted_item_list in user_recall_items.items(): # 进行归一化
user_recall_items[user_id] = norm_user_recall_items_sim(sorted_item_list)
for user_id, sorted_item_list in user_recall_items.items():
# print('user_id')
final_recall_items_dict.setdefault(user_id, {})
for item, score in sorted_item_list:
final_recall_items_dict[user_id].setdefault(item, 0)
final_recall_items_dict[user_id][item] += recall_method_weight * score
final_recall_items_dict_rank = {}
# 多路召回时也能够管制最终的召回数量
for user, recall_item_dict in final_recall_items_dict.items():
final_recall_items_dict_rank[user] = sorted(recall_item_dict.items(), key=lambda x: x[1], reverse=True)[:topk]
# 将多路召回后的最终后果字典保留到本地
pickle.dump(final_recall_items_dict, open(os.path.join(save_path, 'final_recall_items_dict.pkl'),'wb'))
return final_recall_items_dict_rank
# 这里间接对多路召回的权重给了一个雷同的值,其实能够依据后面召回的状况来调整参数的值
weight_dict = {'itemcf_sim_itemcf_recall': 1.0,
'embedding_sim_item_recall': 1.0,
'youtubednn_recall': 1.0,
'youtubednn_usercf_recall': 1.0,
'cold_start_recall': 1.0}
# 最终合并之后每个用户召回 150 个商品进行排序
final_recall_items_dict_rank = combine_recall_results(user_multi_recall_dict, weight_dict, topk=150)
总结
上述实现了如下召回策略:
- 基于关联规定的 itemcf
- 基于关联规定的 usercf
- youtubednn 召回
- 冷启动召回
对于上述实现的召回策略其实都不是最优的后果,咱们只是做了个简略的尝试,其中还有很多中央能够优化,包含曾经实现的这些召回策略的参数或者新加一些,批改一些关联规定都能够。当然还能够尝试更多的召回策略,比方对新闻进行热度召回等等。
Reference
- 重读 Youtube 深度学习举荐零碎论文,字字珠玑,惊为神文
- YouTube 深度学习举荐零碎的十大工程问题
- YouTubeDNN 原理
- Word2Vec 知乎众赞文章 — word2vec 放到排序中的 w2v 的介绍局部