乐趣区

关于php:基于Surprise协同过滤实现短视频推荐

前言

后面一文介绍了通过根底的 web 我的项目构造实现简略的内容举荐,与其说那个是举荐不如说是一个排序算法。因为热度计算形式尽管解决了内容的时效品质动态化。然而绝对用户而言,大家看到的都是简直统一的内容(不一样也可能只是某工夫里某视频的排前或靠后),没有做到个性化的千人千面。

尽管如此,基于内容的热度举荐仍然有他独特的利用场景——热门榜单。所以只须要把这个性能换一个模块就能够了,将个性化举荐留给更善于做这方面的算法。

当然了,做举荐零碎的办法很多,平台层面的像 spark 和明天要讲的 Surprise。办法层面能够用深度学习做,也能够用协同过滤,或综合一起等等。大厂可能就更欠缺了,在召回阶段就有很多通道,比方基于卷积截帧辨认视频内容,文本类似度计算和现有数据撑持,前面又通过荡涤,粗排,精排,重排等等流程,可能他们更多的是要保障平台内容的多样性。

那咱们这里仍然走入门理论应用为主,能让咱们的我的项目疾速对接上个性化举荐,以下就是在起因 PHP 我的项目构造上对接 Surprise,实现用户和物品的类似度举荐。

环境

  • python3.8
  • Flask2.0
  • pandas2.0
  • mysql-connector-python     
  • surpriseopenpyxlgunicorn 

Surprise 介绍

Surprise 库是一款用于构建和剖析举荐零碎的工具库,他提供了多种举荐算法,包含基线算法、邻域办法、基于矩阵合成的算法(如 SVD、PMF、SVD++、NMF)等。内置了多种相似性度量办法,如余弦相似性、均方差(MSD)、皮尔逊相关系数等。这些相似性度量办法能够用于评估用户之间的相似性,从而为举荐零碎提供重要的数据反对。

协同过滤数据集

既然要基于工具库实现协同过滤举荐,天然就须要按该库的规范进行。Surprise 也和大多数协同过滤框架相似,数据集只须要有用户对某个物品打分分值,如果本人没有能够在网上下载收费的 Movielens 或 Jester,以下是我依据业务创立的表格,自行参考。

CREATE TABLE `short_video_rating` (`id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(120) DEFAULT '',
  `item_id` int(11) DEFAULT '0',
  `rating` int(11) unsigned DEFAULT '0' COMMENT '评分',
  `scoring_set` json DEFAULT NULL COMMENT '行为汇合',
  `create_time` int(11) DEFAULT '0',
  `action_day_time` int(11) DEFAULT '0' COMMENT '更新当天工夫',
  `update_time` int(11) DEFAULT '0' COMMENT '更新工夫',
  `delete_time` int(11) DEFAULT '0' COMMENT '删除工夫',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=107 DEFAULT CHARSET=utf8mb4 COMMENT='用户对视频评分表';

业务介绍

Web 业务端通过接口或埋点,在用户操作的中央依据预设的规范记录评分记录。当打分表有数据后,用 python 将 SQL 记录转为表格再导入 Surprise,依据不同的算法训练,最初依据接管的参数返回对应的举荐 top 列表。python 局部由 Flask 启动的服务,与 php 进行 http 交互,前面将以片段代码阐明。

编码局部

1. PHP 申请封装

<?php
/**
 * Created by ZERO 开发.
 * User: 北桥苏
 * Date: 2023/6/26 0026
 * Time: 14:43
 */

namespace app\common\service;


class Recommend
{
    private $condition;

    private $cfRecommends = [];

    private $output = [];

    public function __construct($flag = 1, $lastRecommendIds = [], $userId = "")
    {$this->condition['flag'] = $flag;
        $this->condition['last_recommend_ids'] = $lastRecommendIds;
        $this->condition['user_id'] = $userId;
    }

    public function addObserver($cfRecommend)
    {$this->cfRecommends[] = $cfRecommend;
    }

    public function startRecommend()
    {foreach ($this->cfRecommends as $cfRecommend) {$res = $cfRecommend->recommend($this->condition);
            $this->output = array_merge($res, $this->output);
        }

        $this->output = array_values(array_unique($this->output));

        return $this->output;
    }
}


abstract class cfRecommendBase
{

    protected $cfGatewayUrl = "127.0.0.1:6016";
    protected $limit = 15;

    public function __construct($limit = 15)
    {
        $this->limit = $limit;
        $this->cfGatewayUrl = config('api.video_recommend.gateway_url');
    }

    abstract public function recommend($condition);
}


class mcf extends cfRecommendBase
{public function recommend($condition)
    {
        //echo "mcf\n";
        $videoIdArr = [];

        $flag = $condition['flag'] ?? 1;
        $userId = $condition['user_id'] ?? '';
        $url = "{$this->cfGatewayUrl}/mcf_recommend";

        if ($flag == 1 && $userId) {
            //echo "mcf2\n";
            $param['raw_uid'] = (string)$userId;
            $param['top_k'] = $this->limit;

            $list = httpRequest($url, $param, 'json');
            $videoIdArr = json_decode($list, true) ?? [];}

        return $videoIdArr;
    }
}


class icf extends cfRecommendBase
{public function recommend($condition)
    {
        //echo "icf\n";
        $videoIdArr = [];

        $flag = $condition['flag'] ?? 1;
        $userId = $condition['user_id'] ?? '';
        $lastRecommendIds = $condition['last_recommend_ids'] ?? [];
        $url = "{$this->cfGatewayUrl}/icf_recommend";

        if ($flag > 1 && $lastRecommendIds && $userId) {
            //echo "icf2\n";
            $itemId = $lastRecommendIds[0] ?? 0;
            $param['raw_item_id'] = $itemId;
            $param['top_k'] = $this->limit;

            $list = httpRequest($url, $param, 'json');
            $videoIdArr = json_decode($list, true) ?? [];}

        return $videoIdArr;
    }
}

2. PHP 发动举荐获取

因为思考到后期视频存量有余,是采纳协同过滤加热度榜单联合的形式,前端获取视频举荐,接口返回视频举荐列表的同时也带了下次申请的标识 (分页码)。这个分页码用于当协同过滤服务挂了或没有举荐时,放在榜单列表的分页。然而又要保障分页数是否理论无效,所以当页码太大没有数据返回就通过递归重置为第一页,也把页码返回前端让数据获取更晦涩。

public static function recommend($flag, $videoIds, $userId)
 
    {
        $nexFlag = $flag + 1;
        $formatterVideoList = [];
 
        try {
            // 协同过滤举荐
            $isOpen = config('api.video_recommend.is_open');
            $cfVideoIds = [];
            if ($isOpen == 1) {$recommend = new Recommend($flag, $videoIds, $userId);
                $recommend->addObserver(new mcf(15));
                $recommend->addObserver(new icf(15));
                $cfVideoIds = $recommend->startRecommend();}
 
            // 已读视频
            $nowTime = strtotime(date('Ymd'));
            $timeBefore = $nowTime - 60 * 60 * 24 * 100;
            $videoIdsFilter = self::getUserVideoRatingByTime($userId, $timeBefore);
            $cfVideoIds = array_diff($cfVideoIds, $videoIdsFilter);
 
            // 违规视频过滤
            $videoPool = [];
            $cfVideoIds && $videoPool = ShortVideoModel::listByOrderRaw($cfVideoIds, $flag);
 
            // 冷启动举荐
            !$videoPool && $videoPool = self::hotRank($userId, $videoIdsFilter, $flag);
 
            if ($videoPool) {list($nexFlag, $videoList) = $videoPool;
                $formatterVideoList = self::formatterVideoList($videoList, $userId);
            }
        } catch (\Exception $e) {$preFileName = str::snake(__FUNCTION__);
            $path = self::getClassName();
            write_log("msg:" . $e->getMessage(), $preFileName . "_error", $path);
        }
 
        return [$nexFlag, $formatterVideoList];
    }

3. 数据集生成

import os
import mysql.connector
import datetime
import pandas as pd

now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
fullDate = str(year) + str(month) + str(day)

dir_data = './collaborative_filtering/cf_excel'
file_path = '{}/dataset_{}.xlsx'.format(dir_data, fullDate)
db_config = {
    "host": "127.0.0.1",
    "database": "database",
    "user": "user",
    "password": "password"
}

if not os.path.exists(file_path):
    cnx = mysql.connector.connect(user=db_config['user'], password=db_config['password'],
                                  host=db_config['host'], database=db_config['database'])

    df = pd.read_sql_query("SELECT user_id, item_id, rating FROM short_video_rating", cnx)

    print('--------------- 插入数据集 ----------------')

    # 将数据帧写入 Excel 文件
    df.to_excel(file_path, index=False)

if not os.path.exists(file_path):
    raise IOError("Dataset file is not exists!")

4. 协同过滤服务

from flask import Flask, request, json, Response, abort
from collaborative_filtering import cf_item
from collaborative_filtering import cf_user
from collaborative_filtering import cf_mix
from werkzeug.middleware.proxy_fix import ProxyFix

app = Flask(__name__)

@app.route('/')
def hello_world():
    return abort(404)

@app.route('/mcf_recommend', methods=["POST", "GET"])
def get_mcf_recommendation():
    json_data = request.get_json()

    raw_uid = json_data.get("raw_uid")
    top_k = json_data.get("top_k")

    recommend_result = cf_mix.collaborative_fitlering(raw_uid, top_k)

    return Response(json.dumps(recommend_result), mimetype='application/json')

@app.route('/ucf_recommend', methods=["POST", "GET"])
def get_ucf_recommendation():
    json_data = request.get_json()

    raw_uid = json_data.get("raw_uid")
    top_k = json_data.get("top_k")

    recommend_result = cf_user.collaborative_fitlering(raw_uid, top_k)

    return Response(json.dumps(recommend_result), mimetype='application/json')

@app.route('/icf_recommend', methods=["POST", "GET"])
def get_icf_recommendation():
    json_data = request.get_json()

    raw_item_id = json_data.get("raw_item_id")
    top_k = json_data.get("top_k")

    recommend_result = cf_item.collaborative_fitlering(raw_item_id, top_k)

    return Response(json.dumps(recommend_result), mimetype='application/json')

if __name__ == '__main__':
    app.run(host="0.0.0.0",
            debug=True,
            port=6016
            )

5. 基于用户举荐


# -*- coding: utf-8 -*-
# @File    : cf_recommendation.py
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
from collections import defaultdict

import os
from surprise import Dataset
from surprise import Reader
from surprise import BaselineOnly
from surprise import KNNBasic
from surprise import KNNBaseline
from heapq import nlargest
import pandas as pd
import datetime
import time

def get_top_n(predictions, n=10):
    top_n = defaultdict(list)
    for uid, iid, true_r, est, _ in predictions:
        top_n[uid].append((iid, est))

    for uid, user_ratings in top_n.items():
        top_n[uid] = nlargest(n, user_ratings, key=lambda s: s[1])

    return top_n

class PredictionSet():

    def __init__(self, algo, trainset, user_raw_id=None, k=40):
        self.algo = algo
        self.trainset = trainset
        self.k = k
        if user_raw_id is not None:
            self.r_uid = user_raw_id
            self.i_uid = trainset.to_inner_uid(user_raw_id)
            self.knn_userset = self.algo.get_neighbors(self.i_uid, self.k)
            user_items = set([j for (j, _) in self.trainset.ur[self.i_uid]])
            self.neighbor_items = set()
            for nnu in self.knn_userset:
                for (j, _) in trainset.ur[nnu]:
                    if j not in user_items:
                        self.neighbor_items.add(j)

    def user_build_anti_testset(self, fill=None):
        fill = self.trainset.global_mean if fill is None else float(fill)

        anti_testset = []
        user_items = set([j for (j, _) in self.trainset.ur[self.i_uid]])
        anti_testset += [(self.r_uid, self.trainset.to_raw_iid(i), fill) for
                         i in self.neighbor_items if
                         i not in user_items]
        return anti_testset

def user_build_anti_testset(trainset, user_raw_id, fill=None):
    fill = trainset.global_mean if fill is None else float(fill)

    i_uid = trainset.to_inner_uid(user_raw_id)

    anti_testset = []

    user_items = set([j for (j, _) in trainset.ur[i_uid]])

    anti_testset += [(user_raw_id, trainset.to_raw_iid(i), fill) for
                     i in trainset.all_items() if
                     i not in user_items]

    return anti_testset


# ================= surprise 举荐局部 ====================
def collaborative_fitlering(raw_uid, top_k):

    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    fullDate = str(year) + str(month) + str(day)

    dir_data = './collaborative_filtering/cf_excel'
    file_path = '{}/dataset_{}.xlsx'.format(dir_data, fullDate)

    if not os.path.exists(file_path):
        raise IOError("Dataset file is not exists!")

    # 读取数据集 #####################
    alldata = pd.read_excel(file_path)

    reader = Reader(line_format='user item rating')
    dataset = Dataset.load_from_df(alldata, reader=reader)

    # 所有数据生成训练集
    trainset = dataset.build_full_trainset()

    # ================= BaselineOnly  ==================
    bsl_options = {'method': 'sgd', 'learning_rate': 0.0005}
    algo_BaselineOnly = BaselineOnly(bsl_options=bsl_options)
    algo_BaselineOnly.fit(trainset)

    # 取得举荐后果
    rset = user_build_anti_testset(trainset, raw_uid)

    # 测试休眠 5 秒,让客户端超时
    # time.sleep(5)
    # print(rset)
    # exit()

    predictions = algo_BaselineOnly.test(rset)
    top_n_baselineonly = get_top_n(predictions, n=5)

    # ================= KNNBasic  ==================
    sim_options = {'name': 'pearson', 'user_based': True}
    algo_KNNBasic = KNNBasic(sim_options=sim_options)
    algo_KNNBasic.fit(trainset)

    # 取得举荐后果  ---  只思考 knn 用户的
    predictor = PredictionSet(algo_KNNBasic, trainset, raw_uid)
    knn_anti_set = predictor.user_build_anti_testset()
    predictions = algo_KNNBasic.test(knn_anti_set)
    top_n_knnbasic = get_top_n(predictions, n=top_k)

    # ================= KNNBaseline  ==================
    sim_options = {'name': 'pearson_baseline', 'user_based': True}
    algo_KNNBaseline = KNNBaseline(sim_options=sim_options)
    algo_KNNBaseline.fit(trainset)

    # 取得举荐后果  ---  只思考 knn 用户的
    predictor = PredictionSet(algo_KNNBaseline, trainset, raw_uid)
    knn_anti_set = predictor.user_build_anti_testset()
    predictions = algo_KNNBaseline.test(knn_anti_set)
    top_n_knnbaseline = get_top_n(predictions, n=top_k)

    # =============== 按比例生成举荐后果 ==================
    recommendset = set()
    for results in [top_n_baselineonly, top_n_knnbasic, top_n_knnbaseline]:
        for key in results.keys():
            for recommendations in results[key]:
                iid, rating = recommendations
                recommendset.add(iid)

    items_baselineonly = set()
    for key in top_n_baselineonly.keys():
        for recommendations in top_n_baselineonly[key]:
            iid, rating = recommendations
            items_baselineonly.add(iid)

    items_knnbasic = set()
    for key in top_n_knnbasic.keys():
        for recommendations in top_n_knnbasic[key]:
            iid, rating = recommendations
            items_knnbasic.add(iid)

    items_knnbaseline = set()
    for key in top_n_knnbaseline.keys():
        for recommendations in top_n_knnbaseline[key]:
            iid, rating = recommendations
            items_knnbaseline.add(iid)

    rank = dict()
    for recommendation in recommendset:
        if recommendation not in rank:
            rank[recommendation] = 0
        if recommendation in items_baselineonly:
            rank[recommendation] += 1
        if recommendation in items_knnbasic:
            rank[recommendation] += 1
        if recommendation in items_knnbaseline:
            rank[recommendation] += 1

    max_rank = max(rank, key=lambda s: rank[s])
    if max_rank == 1:
        return list(items_baselineonly)
    else:
        result = nlargest(top_k, rank, key=lambda s: rank[s])

        return list(result)

        # print("排名后果: {}".format(result))

6. 基于物品举荐

-*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
from collections import defaultdict

import io
import os
from surprise import SVD, KNNBaseline, Reader, Dataset
import pandas as pd
import datetime
import mysql.connector
import pickle

# ================= surprise 举荐局部 ====================
def collaborative_fitlering(raw_item_id, top_k):

    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    fullDate = str(year) + str(month) + str(day)

    # dir_data = './collaborative_filtering/cf_excel'
    dir_data = './cf_excel'
    file_path = '{}/dataset_{}.xlsx'.format(dir_data, fullDate)

    if not os.path.exists(file_path):
        raise IOError("Dataset file is not exists!")

    # 读取数据集 #####################
    alldata = pd.read_excel(file_path)

    reader = Reader(line_format='user item rating')
    dataset = Dataset.load_from_df(alldata, reader=reader)

    # 应用协同过滤必须有这行,将咱们的算法使用于整个数据集,而不进行穿插验证,构建了新的矩阵
    trainset = dataset.build_full_trainset()

    # print(pd.DataFrame(list(trainset.global_mean())))
    # exit()

    # 度量准则:pearson 间隔,协同过滤:基于 item
    sim_options = {'name': 'pearson_baseline', 'user_based': False}
    algo = KNNBaseline(sim_options=sim_options)
    algo.fit(trainset)

    # 将训练好的模型序列化到磁盘上
    # with open('./cf_models/cf_item_model.pkl', 'wb') as f:
    #     pickle.dump(algo, f)

    #从磁盘中读取训练好的模型
    # with open('cf_item_model.pkl', 'rb') as f:
    #     algo = pickle.load(f)

    # 转换为外部 id
    toy_story_inner_id = algo.trainset.to_inner_iid(raw_item_id)
    # 依据外部 id 找到最近的 10 个街坊
    toy_story_neighbors = algo.get_neighbors(toy_story_inner_id, k=top_k)
    # 将 10 个街坊的外部 id 转换为 item id 也就是 raw
    toy_story_neighbors_rids = (algo.trainset.to_raw_iid(inner_id) for inner_id in toy_story_neighbors)

    result = list(toy_story_neighbors_rids)

    return result

    # print(list(toy_story_neighbors_rids))


if __name__ == "__main__":
    res = collaborative_fitlering(15, 20)
    print(res)

其余

  1. 举荐服务生产部署开发环境下能够通过 python recommend_service.py 启动,前面部署环境须要用到 gunicorn,形式是装置后配置环境变量。代码里导入 werkzeug.middleware.proxy_fix,批改以下的启动局部以下内容,启动改为 gunicorn -w 5 -b 0.0.0.0:6016 app:appapp.wsgi_app = ProxyFix(app.wsgi_app)
    app.run()
  2. 模型本地保留随着业务数据的累计,天然须要训练的数据集也越来越大,所以前期对于模型训练周期,能够缩短。也就是定时训练模型后保留到本地,而后依据线上的数据做出举荐,模型存储与读取办法如下。
    2.1. 模型存储

    sim_options = {'name': 'pearson_baseline', 'user_based': False}
     algo = KNNBaseline(sim_options=sim_options)
     algo.fit(trainset)
    
     # 将训练好的模型序列化到磁盘上
     with open('./cf_models/cf_item_model.pkl', 'wb') as f:
         pickle.dump(algo, f)

2.2. 模型读取

with open('cf_item_model.pkl', 'rb') as f:
        algo = pickle.load(f)

    # 转换为外部 id
    toy_story_inner_id = algo.trainset.to_inner_iid(raw_item_id)
    # 依据外部 id 找到最近的 10 个街坊
    toy_story_neighbors = algo.get_neighbors(toy_story_inner_id, k=top_k)
    # 将 10 个街坊的外部 id 转换为 item id 也就是 raw
    toy_story_neighbors_rids = (algo.trainset.to_raw_iid(inner_id) for inner_id in toy_story_neighbors)

    result = list(toy_story_neighbors_rids)

    return result

写在最初

下面的仍然只是实现了举荐零碎的一小部分,在做数据召回不论能够对视频截帧还能够拆散音频,通过卷积神经网络辨认音频品种和视频大抵内容。再依据用户以往浏览记录造成的标签实现内容匹配等等,这个还要前期一直学习和欠缺的。​

退出移动版