关于算法:推荐算法基于隐语义模型的协同过滤推荐之商品相似度矩阵

6次阅读

共计 3302 个字符,预计需要花费 9 分钟才能阅读完成。

我的项目采纳 ALS 作为协同过滤算法,依据 MongoDB 中的用户评分表计算离线的用户商品举荐列表以及商品类似度矩阵。

通过 ALS 计算商品类似度矩阵,该矩阵用于查问以后商品的类似商品并为实时举荐零碎服务。

离线计算的 ALS 算法,算法最终会为用户、商品别离生成最终的特色矩阵,别离是示意用户特色矩阵的 U(m x k)矩阵,每个用户有 k 个特征描述;示意物品特色矩阵的 V(n x k)矩阵,每个物品也由 k 个特征描述。

V(n x k)示意物品特色矩阵,每一行是一个 k 维向量,尽管咱们并不知道每一个维度的特色意义是什么,然而 k 个维度的数学向量示意了该行对应商品的特色。

所以,每个商品用 V(n x k)每一行的向量示意其特色,于是任意两个商品 p:特征向量为,商品 q:特征向量为之间的类似度 sim(p,q)能够应用和的余弦值来示意:

举荐算法!基于隐语义模型的协同过滤举荐之商品类似度矩阵
数据集中任意两个商品间类似度都能够由公式计算失去,商品与商品之间的类似度在一段时间内根本是固定值。最初生成的数据保留到 MongoDB 的 ProductRecs 表中。

举荐算法!基于隐语义模型的协同过滤举荐之商品类似度矩阵
外围代码如下:

// 计算商品类似度矩阵

// 获取商品的特色矩阵,数据格式 RDD[(scala.Int, scala.Array[scala.Double])]

val productFeatures = model.productFeatures.map{case (productId,features) =>

  (productId, new DoubleMatrix(features))

}

// 计算笛卡尔积并过滤合并
val productRecs = productFeatures.cartesian(productFeatures)


  .filter{case (a,b) => a._1 != b._1} 
  .map{case (a,b) =>

    val simScore = this.consinSim(a._2,b._2) // 求余弦类似度

    (a._1,(b._1,simScore))

  }.filter(_._2._2 > 0.6)   
  .groupByKey()            
  .map{case (productId,items) =>

    ProductRecs(productId,items.toList.map(x => Recommendation(x._1,x._2)))

  }.toDF()

productRecs

  .write

  .option("uri", mongoConfig.uri)

  .option("collection",PRODUCT_RECS)

  .mode("overwrite")

  .format("com.mongodb.spark.sql")

  .save()

其中,consinSim 是求两个向量余弦类似度的函数,代码实现如下:

// 计算两个商品之间的余弦类似度
def consinSim(product1: DoubleMatrix, product2:DoubleMatrix): Double ={product1.dot(product2) / (product1.norm2()  * product2.norm2())
}

在上述模型训练的过程中,咱们间接给定了隐语义模型的 rank,iterations,lambda 三个参数。对于咱们的模型,这并不一定是最优的参数选取,所以咱们须要对模型进行评估。通常的做法是计算均方根误差(RMSE),考查预测评分与理论评分之间的误差。

举荐算法!基于隐语义模型的协同过滤举荐之商品类似度矩阵
有了 RMSE,咱们能够就能够通过屡次调整参数值,来选取 RMSE 最小的一组作为咱们模型的优化抉择。

在 scala/com.atguigu.offline/ 下新建单例对象 ALSTrainer,代码主体架构如下:

def main(args: Array[String]): Unit = {


  val config = Map("spark.cores" -> "local[*]",


    "mongo.uri" -> "mongodb://localhost:27017/recommender",


    "mongo.db" -> "recommender"


  )


  // 创立 SparkConf


  val sparkConf = new SparkConf().setAppName("ALSTrainer").setMaster(config("spark.cores"))


  // 创立 SparkSession


  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))

  import spark.implicits._

  // 加载评分数据


  val ratingRDD = spark


    .read


    .option("uri",mongoConfig.uri)


    .option("collection",OfflineRecommender.MONGODB_RATING_COLLECTION)


    .format("com.mongodb.spark.sql")


    .load()


    .as[ProductRating]


    .rdd


    .map(rating => Rating(rating.userId,rating.productId,rating.score)).cache()


  // 将一个 RDD 随机切分成两个 RDD,用以划分训练集和测试集

  val splits = ratingRDD.randomSplit(Array(0.8, 0.2))

  val trainingRDD = splits(0)


  val testingRDD = splits(1)

  // 输入最优参数

  adjustALSParams(trainingRDD, testingRDD)

  // 敞开 Spark


  spark.close()}

其中 adjustALSParams 办法是模型评估的外围,输出一组训练数据和测试数据,输入计算失去最小 RMSE 的那组参数。代码实现如下:


// 输入最终的最优参数
def adjustALSParams(trainData:RDD[Rating], testData:RDD[Rating]): Unit ={
// 这里指定迭代次数为 5,rank 和 lambda 在几个值中选取调整  
val result = for(rank <- Array(100,200,250); lambda <- Array(1, 0.1, 0.01, 0.001))


    yield {val model = ALS.train(trainData,rank,5,lambda)


      val rmse = getRMSE(model, testData)


      (rank,lambda,rmse)


    }


  // 依照 rmse 排序


  println(result.sortBy(_._3).head)


}
计算 RMSE 的函数 getRMSE 代码实现如下:def getRMSE(model:MatrixFactorizationModel, data:RDD[Rating]):Double={val userProducts = data.map(item => (item.user,item.product))


  val predictRating = model.predict(userProducts)
val real = data.map(item => ((item.user,item.product),item.rating))


  val predict = predictRating.map(item => ((item.user,item.product),item.rating))


  // 计算 RMSE


  sqrt(real.join(predict).map{case ((userId,productId),(real,pre))=>


      // 实在值和预测值之间的差


      val err = real - pre


      err * err


    }.mean())


}

运行代码,咱们就能够失去目前数据的最优模型参数。

关键词:大数据培训

正文完
 0