# spark **Repository Path**: lieutenant-colonel-wang/spark ## Basic Information - **Project Name**: spark - **Description**: 这是基于spark的数字仓库推荐系统 - **Primary Language**: Scala - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 1 - **Created**: 2023-07-04 - **Last Updated**: 2024-01-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 大数据实训(数仓推荐系统) ## 第一章 项目体系架构设计 ### 1.1 项目系统架构 项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合教学体系的一体化的电商推荐系统,包含了离线推荐与实时推荐体系,综合利用了协同过滤算法以及基于内容的推荐方法来提供混合推荐。提供了从前端应用、后台服务、算法设计实现、平台部署等多方位的闭环的业务实现。 **用户可视化** :主要负责实现和用户的交互以及业务数据的展示,主体采用AngularJS2进行实现,部署在 Apache服务上。 **综合业务服务**:主要实现JavaEE层面整体的业务逻辑,通过Spring进行构建,对接业务需求。部署在 Tomcat上。 **【数据存储部分】** **业务数据库**:项目采用广泛应用的文档数据库MongDB作为主数据库,主要负责平台业务逻辑数据的存储。 **缓存数据库**:项目采用Redis作为缓存数据库,主要用来支撑实时推荐系统部分对于数据的高速获取需 求。 **【离线推荐部分】** **离线统计服务**:批处理统计性业务采用Spark Core + Spark SQL进行实现,实现对指标类数据的统计任务。 **离线推荐服务**:离线推荐业务采用Spark Core + Spark MLlib进行实现,采用ALS算法进行实现。 **【实时推荐部分】** **日志采集服务**:通过利用Flume-ng对业务平台中用户对于商品的一次评分行为进行采集,实时发送到 Kafka集群。 **消息缓冲服务**:项目采用Kafka作为流式数据的缓存组件,接受来自Flume的数据采集请求。并将数据 推送到项目的实时推荐系统部分。 **实时推荐服务**:项目采用Spark Streaming作为实时推荐系统,通过接收Kafka中缓存的数据,通过设 计的推荐算法实现对实时推荐的数据处理,并将结构合并更新到MongoDB数据库。 ### 1.2 项目数据流程 ![](img\1-2.png) ## 第二章 创建项目并初始化业务数据 我们的项目主体用Scala编写,采用IDEA作为开发环境进行项目编写,采用maven作为项目构建和管理工具。 ### 2.1 在IDEA中创建maven项目 打开IDEA,创建一个maven项目,命名为hn-recommender。具体流程:idea--->file--->new ---->project--->左导航选择maven---> 选择在new window中开。 #### 2.1.1 项目框架搭建 1. 确保项目maven是否是自己安装的maven。 2. 在项目src/main/的目录下创建scala目录(流程:选中main目录--->右键--->new --->directory ---> 写入Scala),然后并将其设置为资源目录(idea--->file--->project structure 3. 为项目添加Scala的sdk依赖,接着3的步骤做如下流程: #### 2.1.2 添加项目依赖 首先,对于整个项目而言,需要引入很多相关jar包依赖,将如下代码10行及后面所有内容覆盖hn-recommender项目中的pom.xml的第10行及以后的内容: ``` 4.0.0 com.qianfeng ECRecommand 1.0 8 8 1.2.17 1.7.22 2.4.3 3.1.1 4.0.2 2.4.1 3.1.2 1.2.1 org.slf4j jcl-over-slf4j ${slf4j.version} org.slf4j slf4j-api ${slf4j.version} org.slf4j slf4j-log4j12 ${slf4j.version} log4j log4j ${log4j.version} org.apache.spark spark-core_2.12 ${spark.version} org.apache.spark spark-sql_2.12 ${spark.version} org.mongodb casbah-core_2.12 ${casbah.version} org.mongodb.spark mongo-spark-connector_2.12 ${mongodb-spark.version} org.apache.kafka kafka-streams ${kafka.version} org.apache.kafka kafka-clients ${kafka.version} org.scalanlp jblas ${jblas.version} org.apache.spark spark-mllib_2.12 ${spark.version} org.apache.spark spark-streaming_2.12 ${spark.version} redis.clients jedis 3.0.0 org.apache.spark spark-streaming-kafka-0-10_2.12 ${spark.version} org.apache.maven.plugins maven-compiler-plugin 3.6.1 1.8 1.8 org.apache.maven.plugins maven-assembly-plugin 3.0.0 make-assembly package single net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile ``` ### 2.2 日志配置 在项目resources目录下创建log4j.properties文件,内容如下: ``` log4j.rootLogger=ERROR, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n ``` ### 2.3 数据初始化 将product.csv和ratings.csv数据拷贝到hn-recommender项目下的resources包目录下。使用SparkSQL读取两份csv数并写入MongoDB中,MongoDB数据库为recommender,集合分别为Product和Rating。当然数据库和集合都可以自定义。 - Product数据集 ```` productId,name,categoryIds,amazonId,imageUrl,categoryies,tags ```` - Rating数据集 数据格式: ``` userId,productId,rating,timestamp ``` - 数据初始化实现编码 ``` package com.qianfeng.dataload import com.mongodb.casbah.commons.MongoDBObject import com.mongodb.casbah.{MongoClient, MongoClientURI} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} /** * 商品数据、用户商品评分数据装载到MongoDB中的Recommender库下面 */ //封装mongoDB的连接信息 case class MongoConfig(url:String,db:String) //封装商品数据 case class Product(productID:Int,name:String,imageUrl:String,categoryies:String,tags:String) //封装用户商品评分数据 case class Rating(userID:Int,productID:Int,rating:Double,timestamp:Long) /** * 加载数据的核心方法 */ object DataLoader { //集合常量 val RATTING_COLLECTION = "Rating" val PRODUCT_COLLECTION = "Product" def main(args: Array[String]): Unit = { //Mongo的连接配置 val configMap = Map( "url" -> "mongodb://qianfeng01:27017/recommender", "db" -> "recommender" ) //定义一个隐士参数 implicit val config = MongoConfig(configMap.getOrElse("url", ""), configMap.getOrElse("db", "")) //获取SparkSQL的上下文 val spark = SparkSession.builder() .appName("dataloader") .master("local[*]") //如果往服务器部署,需要修改一下即可 .getOrCreate() //引入SparkSQL的隐士转换 import spark.implicits._ //构建ProductDF val productDF = spark.sparkContext.textFile("/Users/liyadong/IdeaProjects/hn-recommender/src/main/resources/products.csv") .filter(_.length > 0) .map(line => { //将每行数据按照^进行拆分 val fileds = line.split("\\^") //封装数据 Product(fileds(0).trim.toInt, fileds(1).trim, fileds(4).trim, fileds(5).trim, fileds(6).trim) } ) .toDF() productDF.show() //构建RatingDF val ratingDF = spark.sparkContext.textFile("/Users/liyadong/IdeaProjects/hn-recommender/src/main/resources/ratings.csv") .filter(_.length > 0) .map(line => { //将每行数据按照^进行拆分 val fileds = line.split("\\,") //封装数据 Rating(fileds(0).trim.toInt, fileds(1).trim.toInt, fileds(2).trim.toDouble, fileds(3).trim.toLong) } ) .toDF() ratingDF.show() //将productDF和ratingDF写入MongoDB中 writeDataToMongoDB(productDF,ratingDF) //关闭spark spark.stop() } /** * 将productDF和ratingDF写入数库的集合中 * @param spark * @param productDF * @param ratingDF * @param config */ def writeDataToMongoDB(productDF:DataFrame,ratingDF:DataFrame)(implicit config:MongoConfig): Unit ={ //获取Mongo的client val client = MongoClient(MongoClientURI(config.url)) //获取操作集合 val productCollection = client(config.db)(RATTING_COLLECTION) val ratingCollection = client(config.db)(RATTING_COLLECTION) //删除集合 productCollection.dropCollection() ratingCollection.dropCollection() //将productDF写入collection中 productDF .write .option("uri",config.url) .option("collection",PRODUCT_COLLECTION) .mode(SaveMode.Overwrite) .format("com.mongodb.spark.sql") .save() //将ratingDF写入collection中 ratingDF .write .option("uri",config.url) .option("collection",RATTING_COLLECTION) .mode(SaveMode.Overwrite) .format("com.mongodb.spark.sql") .save() //对数据创建索引 productCollection.createIndex(MongoDBObject("productID"->1)) ratingCollection.createIndex(MongoDBObject("productID"->1)) ratingCollection.createIndex(MongoDBObject("userID"->1)) } } ``` ## 第三章 离线推荐服务建设 ### 3.1 静态推荐 根据Mongo中的Rating数据,使用Spark统计如下指标,通常用于静态推荐。 - 统计所有历史数据当中每个商品的评分数次数,并存储到MongoDB的RateMoreProducts集合中。 - 统计以月为单位拟每个商品的评分数次数,并存储到MongoDB的RateMoreRecentlyProducts集合中。 - 统计每个商品的平均评分的分数,并存储到MongoDB的AverageProducts集合中。 代码如下: ``` package com.qianfeng.statics import com.mongodb.casbah.{MongoClient, MongoClientURI} import com.mongodb.casbah.commons.MongoDBObject import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import java.text.SimpleDateFormat import java.util.Date /** * 基于评分统计 * ● 统计所有历史数据当中每个商品的评分数次数,并存储到MongoDB的RateMoreProducts集合中。 * ● 统计以月为单位拟每个商品的评分数次数,并存储到MongoDB的RateMoreRecentlyProducts集合中。 * ● 统计每个商品的平均评分的分数,并存储到MongoDB的AverageProducts集合中。 */ //封装mongoDB的连接信息 case class MongoConfig(url:String,db:String) //封装用户商品评分数据 case class Rating(userID:Int,productID:Int,rating:Double,timestamp:Long) /** * 静态推荐 */ object StaticsRecommender { def main(args: Array[String]): Unit = { //Mongo的连接配置 val configMap = Map( "url" -> "mongodb://qianfeng01:27017/recommender", "db" -> "recommender" ) //定义集合常量 val RATING_COLLECTION = "Rating" val RATE_MORE_COLLECTION = "RateMoreProducts" val RATE_MORE_RECENTLY_COLLECTION = "RateMoreRecentlyProducts" val AVERAGE_COLLECTION = "AverageProducts" //定义一个隐士参数 implicit val config = MongoConfig(configMap.getOrElse("url", ""), configMap.getOrElse("db", "")) //获取SparkSQL的上下文 val spark = SparkSession.builder() .appName("dataloader") .master("local[*]") //如果往服务器部署,需要修改一下即可 .getOrCreate() //引入SparkSQL的隐士转换 import spark.implicits._ //加载Rating数据集 val ratingDF = spark .read .option("uri", config.url) .option("collection", RATING_COLLECTION) .format("com.mongodb.spark.sql") .load() //返回DataFrame .as[Rating] //将df转换成DS, .toDF() //将ratingDF注册成表 ratingDF.createOrReplaceTempView("ratings") /** * 1、统计所有历史数据当中每个商品的评分数次数,并存储到MongoDB的RateMoreProducts集合中。 */ val rateMoreDF = spark.sql( """ |select |productID productId, |count(*) count |from ratings |group by productID |""".stripMargin) rateMoreDF.show() //将历史商品中每个商品的评分次数统计存储到MongoDB中 writeDataToMongoDB(rateMoreDF,RATE_MORE_COLLECTION) /** * 2、统计以月为单位拟每个商品的评分数次数,并存储到MongoDB的RateMoreRecentlyProducts集合中。 */ //自定义一个函数,将时间戳转换为YYYYMM val sdf = new SimpleDateFormat("YYYYMM") spark.udf.register("getMonth",(timeStamp:Long)=>sdf.format(new Date(timeStamp * 1000))) /* //计算每月每商品的评分次数---方法一 val rateMoreRecentlyDF = spark.sql( """ |select |productID productId, |count(*) count, |getMonth(timestamp) yearmonth |from ratings |group by productID,getMonth(timestamp) |""".stripMargin)*/ //计算每月每商品的评分次数---方法二 val rateMoreRecentlyDF = spark.sql( """ |select |productID productId, |count(*) count, |from_unixtime(cast(timestamp as bigint),"yyyyMM") yearmonth |from ratings |group by productID,from_unixtime(cast(timestamp as bigint),"yyyyMM") |""".stripMargin) rateMoreRecentlyDF.show() //将最近的年月商品评分次数存储到MongoDB中 writeDataToMongoDB(rateMoreRecentlyDF,RATE_MORE_RECENTLY_COLLECTION) /** * 3、统计每个商品的平均评分的分数,并存储到MongoDB的AverageProducts集合中。 */ val avgDF = spark.sql( """ |select |productID productId, |avg(rating) avg |from ratings |group by productID |""".stripMargin) avgDF.show() //将avgDF存储到Mongo中 writeDataToMongoDB(avgDF,AVERAGE_COLLECTION) //关闭spark spark.stop() } /** * 将productDF和ratingDF写入数库的集合中 * @param spark * @param productDF * @param ratingDF * @param config */ def writeDataToMongoDB(df: DataFrame,col:String)(implicit config:MongoConfig): Unit ={ //获取Mongo的client val client = MongoClient(MongoClientURI(config.url)) //获取操作集合 val collection = client(config.db)(col) //删除集合 collection.dropCollection() //将productDF写入collection中 df .write .option("uri",config.url) .option("collection",col) .mode(SaveMode.Overwrite) .format("com.mongodb.spark.sql") .save() //对数据创建索引 collection.createIndex(MongoDBObject("productID"->1)) } } ``` ### 3.2 离线推荐 **OfflineRecommender** 服务主要是采用 ALS 作为协同过滤算法,根据 MongoDB 中的用户评分表计算离线的用户商品推荐列表以及商品相似度矩阵。 显式数据与隐式数据(Explicit data and implicit data) 1. 显式数据(Explicit Data) 是指那些有评价得分的数据,比如对电影的评分。此类数据明确指出用户对物品的喜好程度,但往往很难获取到。 2. 隐式数据(Implicit Data) 是指从用户行为中收集到的数据,缺少评分或评分所必须的特定行为。这可能是一个用户购买了某个物品、重复播放了多少次歌曲、观看某部电影多长时间以及阅读某篇文章的时长等等。此类数据优点是数据量大,缺点是噪音较多并且往往含义不明显。举个例子,通过给商品打星的数量,我们知道 1 表示用户不喜欢该商品,5 表示用户非常喜爱。用户播放了某歌曲,我们推测用户可能喜欢该歌曲或者讨厌该歌曲,也可能介于两者之间。如果用户没有播放某歌曲,有可能是因为用户不喜欢它,也可能只是不知道这首歌曲的存在。 协同模型(Neighborhood models) 1. UserBased CF - 基于用户的协同过滤 2. ItemBase CF - 基于物品的协同过滤 3. Mode CF - 基于模型的协同过滤 所有以物品为中心的模型在处理隐式数据时有个共同的劣势 -- 他们都不提供区分用户偏好与偏好的置信度的能力。 #### 3.2.1 协同过滤 利用集体智慧,把大家都喜欢的东西互相推荐。所以就要求提前收集用户偏好,有大量的数据可供分析。 #### 3.2.1.1 分类 **基于领域的** - 基于物品的 - 基于用户的 **基于模型的** - 基于隐语义模型的 ALS 算法 - 基于贝叶斯网络 - 基于 SVM 的 **基于混合的** - Item-User CF ##### 3.2.1.2 ALS ALS 算法属于一种协同过滤算法 ``` ALS 是交替最小二乘 (alternating least squares)的简称。 在机器学习的上下文中,ALS 特指使用交替最小二乘求解的一个协同推荐算法。它通过观察到的所有用户给产品的打分,来推断每个用户的喜好并向用户推荐适合的产品。 从协同过滤的分类来说,ALS算法属于 User-Item CF,也叫做混合 CF。它同时考虑了 User 和 Item 两个方面。 用户和商品的关系,可以抽象为一个三元组: ``` ##### 3.2.1.3 余弦相似度 余弦相似度用向量空间中两个向量夹角的余弦值作为衡量两个个体间差异的大小。 余弦值越接近1,就表明夹角越接近 0 度,也就是两个向量越相似,这就叫"余弦相似性"。 #### 3.2.2 实现流程 ALS 这个东西的使用在 mllib 里面已经有实现了,他需要先训练一个模型出来,再通过模型预测数据 训练数据需要使用 ALS.train() 方法,传入训练数据以及控制参数 预测数据需要使用 model.predict() 方法,传入测试数据即可 1. 加载用户评分数据 2. 把 userId 和 productId 提取出来形成两个独立的 RDD,计算他们的笛卡尔积得到 userProducts,作为训练数据集 3. 把 userId 和 productId 以及 score 提取出来形成一个三元组构造训练数据集以及测试数据集 4. 训练数据得到 model,再通过 model 预测得到推荐数据,然后把推荐数据写入到 MongoDB(该推荐是基于用户的,如下图) 5. 通过 model 得到的不只有预测数据,还可以拿到 productFeatures,即各个商品以及它的特征 1. 使用 可以构造商品的特征向量:(productId, new DoubleMatrix(features)) 2. 把这个特征向量拿来和它自己做笛卡尔积可以得到 A 商品和其他所有非 A 商品的相似度关系 3. 点击查看为什么要使用笛卡尔积 4. 然后计算余弦相似度 ``` // a 是做笛卡尔积的 this // b 是做笛卡尔积的 other case (a, b) => val simScore = consinSim(a._2, b._2) // a._2 就是 new DoubleMatrix(features) (a._1, (b._1, simScore)) // 这样就得到了 a 商品和 b 商品的相似度评分 ``` #### 3.2.3 训练参数的选取 在 ALS.train 方法中涉及三个控制参数,他们的选取影响着预测的精度。 ```` // 定义模型训练的参数,rank隐特征个数,iterations迭代词数,lambda正则化系数 val (rank, iterations, lambda) = (5, 10, 0.1) // rank:表示隐特征的维度 K,要使用多少个特征 // iterations:迭代次数,交替相乘的次数 // lambda:正则化参数 val model = ALS.train(trainData, rank, iterations, lambda) //得到一个成熟的预测模型,下面使用该模型去预测 ```` 通过计算均方根误差(RMSE)可以找到最佳的训练参数,就是不断地调整参数值,来选取 RMSE 最小的一组作为我们模型的优化选择 具体过程: 1. 加载评分数据 2. 把数据按照三七分、二八分等规则切成两部分,一部分是训练数据,一部分是测试数据(要试很多次) 3. 然后给 rank 和 lambda 也给很多值,挨个测试,得到 model 4. 把 model 和测试数据丢带 RMSE 公式里面计算标准误差 5. 在这么多次尝试中,输出误差值最小的结果 #### 3.2.4 编码实施 ALS案例 1. 将sample_movielens_ratings.txt拷贝到resource目录下面。 2. 编写代码: ```` import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.recommendation.ALS import org.apache.spark.sql.SparkSession /** * ALS的案例 */ //封装数据 case class Ratings(userId:Int,movieId:Int,rating:Int) object ALSTest { def main(args: Array[String]): Unit = { //获取sparkSQL的上下文 val spark = SparkSession.builder() .appName("dataloader") .master("local[*]") //如果往服务器部署,需要修改一下即可 .getOrCreate() //初始化数据 import spark.implicits._ val ratings = spark.sparkContext .textFile("/Users/liyadong/IdeaProjects/hn-recommender/src/main/resources/sample_movielens_ratings.txt") .map(line => { val fileds = line.split("::") //封装ratings Ratings(fileds(0).trim.toInt, fileds(1).trim.toInt, fileds(2).trim.toInt) }) .toDF() ratings.show() //划分训练和测试数据集 val Array(train,test) = ratings.randomSplit(Array(0.8, 0.2)) //构建ALS模型 val als = new ALS() .setRank(10) //隐士因子,其实就是维度数量 .setMaxIter(10) //迭代次数 .setRegParam(0.01) //正则系数 .setColdStartStrategy("drop") //冷启动策略 .setUserCol("userId") .setItemCol("movieId") .setRatingCol("rating") //训练数据 val model = als.fit(train) //测试模型 --- 使用已经训练好的模型进行预测 val prediction = model.transform(test) //构建一个计算器 val evaluator = new RegressionEvaluator() .setMetricName("rmse") .setLabelCol("rating") .setPredictionCol("prediction") //使用计算器对预测数据集进行均方根误差计算 val rmse = evaluator.evaluate(prediction) println(s"均方根误差为: $rmse") //数据推荐 //为每个用户推荐10个电影 val forUserMoivesDF = model.recommendForAllUsers(10) forUserMoivesDF.show() //为每个电影推荐10个用户 val forMovieUsersDF = model.recommendForAllItems(10) forMovieUsersDF.show() //为3个用户推荐3个电影 val frame = model.recommendForUserSubset(ratings.select(als.getUserCol).distinct().limit(3), 3) frame.show() //为3个电影推荐3个用户 val frame1 = model.recommendForItemSubset(ratings.select(als.getItemCol).limit(3), 3) frame1.show() //关闭spark spark.stop() } } ```` ALS模型参数选取实施 ``` package com.qianfeng.recommand.offline import breeze.numerics.sqrt import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating} import org.apache.spark.sql.SparkSession /** * ALS模型 */ object ALSTrainer { def main(args: Array[String]): Unit = { val config = Map( "spark.cores" -> "local[*]", "mongo.uri" -> "mongodb://qianfeng01:27017/recommender", "mongo.db" -> "recommender" ) //创建SparkConf val sparkConf = new SparkConf().setAppName("ALSTrainer").setMaster(config("spark.cores")) //创建SparkSession val spark = SparkSession.builder().config(sparkConf).getOrCreate() val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db")) import spark.implicits._ //加载评分数据 val ratingRDD = spark .read .option("uri",mongoConfig.uri) .option("collection",OfflineRecommender.MONGODB_RATING_COLLECTION) .format("com.mongodb.spark.sql") .load() .as[ProductRating] .rdd .map(rating => Rating(rating.userId,rating.productId,rating.score)) //Rating是SparkMLlib依赖库自带的 .cache() // 将一个RDD随机切分成两个RDD,用以划分训练集和测试集 val splits = ratingRDD.randomSplit(Array(0.8, 0.2)) val trainingRDD = splits(0) val testingRDD = splits(1) //输出最优参数 adjustALSParams(trainingRDD, testingRDD) //关闭Spark spark.close() } // 输出最终的最优参数 def adjustALSParams(trainData:RDD[Rating], testData:RDD[Rating]): Unit ={ // 这里指定迭代次数为5,rank和lambda在几个值中选取调整 val result = for(rank <- Array(100,200,250); lambda <- Array(0.1, 0.01, 0.001)) yield { val model = ALS.train(trainData,rank,5,lambda) val rmse = getRMSE(model, testData) (rank,lambda,rmse) } // 按照rmse排序 println(result.sortBy(_._3).head) } def getRMSE(model:MatrixFactorizationModel, data:RDD[Rating]):Double={ val userProducts = data.map(item => (item.user,item.product)) val predictRating = model.predict(userProducts) val real = data.map(item => ((item.user,item.product),item.rating)) val predict = predictRating.map(item => ((item.user,item.product),item.rating)) // 计算RMSE sqrt( real.join(predict).map{case ((userId,productId),(real,pre))=> // 真实值和预测值之间的差 val err = real - pre err * err }.mean() ) } } ``` ALS推荐实施 ```` package com.qianfeng.offline import com.mongodb.casbah.Imports.{MongoClientURI, MongoDBObject} import com.mongodb.casbah.MongoClient import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.jblas.DoubleMatrix /** * 离线推荐服务和相似商品的加工 */ //封装评分数据 //封装用户商品评分数据 case class ProductRating(userId:Int,productId:Int,rating:Double,timestamp:Long) //封装mongoDB的连接信息 case class MongoConfig(url:String,db:String) //封装商品类 case class Recommendation(productId: Int,score:Double) //封装产品相似矩阵 case class ProductRecs(productId:Int,recs:Seq[Recommendation]) //封装用户推荐列表 case class UserRecs(userId:Int,recs:Seq[Recommendation]) object OffLineRecommender { def main(args: Array[String]): Unit = { //Mongo的连接配置 val configMap = Map( "url" -> "mongodb://qianfeng01:27017/recommender", "db" -> "recommender" ) //定义一个隐士参数 implicit val config = MongoConfig(configMap.getOrElse("url", ""), configMap.getOrElse("db", "")) //定义集合常量 val USER_RECS_COLLECTION = "UserRecs" //用户推荐存储在MongoDB中的集合 val PRODUCT_RECS_COLLECTION = "ProductRecs" //商品相似矩阵存储在MongoDB中的集合 val RATING_COLLECTION = "Rating" //用户商品评分集合 val MAX_RECOMMEND_NUMBER = 20 //最大推荐数量 //获取SparkSQL的上下文 val spark = SparkSession.builder() .appName("offline-recommender") .master("local[*]") //如果往服务器部署,需要修改一下即可 .getOrCreate() //加载Ratings的数据,然后转换成RDD,并将其进行缓存 import spark.implicits._ val ratingRDD = spark .read .option("uri", config.url) .option("collection", RATING_COLLECTION) .format("com.mongodb.spark.sql") .load() //返回DataFrame .as[ProductRating] //将df转换成DS .rdd //将ds转换成RDD .map(line => (line.userId, line.productId, line.rating)) //构建用户、商品、打分三元组 .cache() //缓存 //ratingRDD.foreach(println(_)) //构建训练集 --- 封装的Rating一定是recommendation包下的 val trainData = ratingRDD.map(x => org.apache.spark.mllib.recommendation.Rating(x._1, x._2, x._3)) //获取userId和productId的去重列表 val userRDD = ratingRDD.map(x => x._1).distinct() //去重后的用户列表 val productRDD = ratingRDD.map(x => x._2).distinct() //去重后的商品列表 //使用去重后的userRDD和productRDD进行笛卡尔积计算,形成一个空的用户商品矩阵 val userProduct = userRDD.cartesian(productRDD) //userProduct.foreach(println(_)) //构建ALS模型 val (rank,iterations,regex) = (10,10,0.01) //分别为隐士因子个数;迭代次数;正则化系数 //引入mllib包下的ALS --->使用训练集来训练ALS模型 val alsMode = ALS.train(trainData,rank,iterations,regex) //预测 --->使用userProduct来进行预测 val preRDD = alsMode.predict(userProduct) //对预测结果集进行过滤和封装 val userRecsDF = preRDD //RDD[Rating] .filter(x => x.rating > 0) //将预测的评分大于0的商品过滤出来 .map(rating => { (rating.user, (rating.product, rating.rating)) //将数据封装成 (userid,(productId,rating)) }) .groupByKey() //按照user来进行分组 //(12,Iterable[(),(),(),(),()]) .map(x => { //封装成最终的结果数据 UserRecs( x._1, //用户Id x._2.toList //将Iterable[(),(),(),(),()]转换成List .sortWith(_._2 > _._2) //使用得分进行降序排序;;大于是降序 .take(MAX_RECOMMEND_NUMBER) //获取得分最高的前20个商品 .map(x => Recommendation(x._1, x._2)) //将商品Id和预测得分封装到Recommendation中 ) }) .toDF() //userRecsDF.show() //将数据写出MongoDB中UserRecs集合中 writeDataToMongoDB(userRecsDF,USER_RECS_COLLECTION) //商品相似矩阵 //1、获取商品的特征向量 --->10维度 val productFeatures:RDD[(Int,DoubleMatrix)] = alsMode.productFeatures .map(x => { (x._1, new DoubleMatrix(x._2)) //(商品ID,对应商品的特征向量) }) //2、获取每一个商品的余弦相似度 val productRecsDF = productFeatures.cartesian(productFeatures) //商品特征向量笛卡尔积 .filter(x => x._1._1 != x._2._1) //过滤掉两个相同的商品 .map(x => { val consin = consinSim(x._1._2, x._2._2) //使用商品的特征向量调用consinSim方法 //封装返回 (x._1._1, (x._2._1, consin)) //(p1商品Id,(p2商品Id,p1和p2的余弦相似度)) }) .filter(x => x._2._2 > 0.6) //过滤余弦相似度小于0.6的商品 .groupByKey() //根据商品ID(x._1._1)来进行分组 .map(x => { //封装返回的数据ProductRecs ProductRecs( x._1, //商品ID x._2 .toList .map(x => Recommendation(x._1, x._2)) //封装(商品Id,余弦相似度) ) }) .toDF() //productRecsDF.show() //将商品的相似矩阵存储到MongoDB中,--->已被实时推荐需要 writeDataToMongoDB(productRecsDF,PRODUCT_RECS_COLLECTION) //关闭spark spark.stop() } /** * 将productDF和ratingDF写入数库的集合中 * @param spark * @param productDF * @param ratingDF * @param config */ def writeDataToMongoDB(df: DataFrame,col:String)(implicit config:MongoConfig): Unit ={ //获取Mongo的client val client = MongoClient(MongoClientURI(config.url)) //获取操作集合 val collection = client(config.db)(col) //删除集合 collection.dropCollection() //将productDF写入collection中 df .write .option("uri",config.url) .option("collection",col) .mode(SaveMode.Overwrite) .format("com.mongodb.spark.sql") .save() } /** * 余弦相似度 * @param product1 * @param product2 */ def consinSim(product1:DoubleMatrix,product2:DoubleMatrix): Double = { //相似公式:y = (DXi * DYi)的加和值 / (DXi平方和 + DYi平方和) i属于(1-n) product1.dot(product2) / (product1.norm2() * product2.norm2()) } } ```` ## 第四章 实时推荐服务建设 简易的 Spring Web 后台,用 maven 打包后直接跑起来即可,注意打包的时候要将依赖也打进去。 用户的评分数据除了通过 Kafka 发送到实时推荐系统,还要记录到 Redis 里面,这样就可以获取同一个用户最近几次的全部评分数据了,使实时计算更加精准。 1. 使用idea打开businessServer项目 2. 安装tomcate,直接解压到某个目录即可s 3. 修改recommend.properties 4. 如果redis设置密码了,需要修改Configure类中为jedis设置密码。(jedis.auth("root");) ### 4.2 实时推荐 #### 4.2.1 特点 离线服务是综合历史数据来计算的,但实时服务应该根据用户最近的行为来推荐 实时推荐要求响应迅速,所以不能再使用 ALS 了 为了快速响应,应该提前预热数据,在已有的数据集上再次少量计算即可 实时推荐系统更关心推荐结果的动态变化能力,只要更新推荐结果的理由合理即可,至于推荐的精度则可以适当放宽 #### 4.2.2 算法设计 总体目标:用户对某一个商品进行了评分,所以选取与该商品最相似的 K 个商品作为备选商品 每个商品按照【推荐优先级】这一权重作为推荐给用户的先后顺序。根据用户最近的若干评分,计算出每个备选商品的【推荐优先级】,同时,根据上一次的推荐结果进行基于优先级的合并、替换,得到新的推荐结果。 具体而言: 首先,获取用户 u 按时间顺序最近的 K 个评分,记为 RK;获取商品 p 的最相似的 K 个商品集合,记为 S; 然后,对于每个备选商品 q ,计算其推荐优先级 其中: Rr 表示用户 u 对商品 r 的评分; sim(q,r) 表示商品 q 与商品 r 的相似度,设定最小相似度为 0.6,当商品 q 和商品 r 相似度低于 0.6 的阈值,则视为两者不相关并忽略; sim_sum 表示 q 与 RK 中商品相似度大于最小阈值的个数; incount 表示 RK 中与商品 q 相似的、且本身评分较高(>=3)的商品个数; recount 表示 RK 中与商品 q 相似的、且本身评分较低(<3)的商品个数; #### 4.2.3 实现流程 1. 加载 MongoDB 里面的 ProductRecs 作为实时计算的基础数据 2. 用户 u 对商品 p 进行了评分,就触发一次实时计算 3. 从 ProductRecs 中选出与商品 p 最相似的 K 个商品作为集合 S 1. 要过滤掉用户 u 自己评分过的其他全部商品,过滤掉之后推荐的东西才是他没有见过的 4. 从 Redis 中获取用户 u 最近时间内的 K 条评分,包含本次评分,作为集合 RK 5. 把从1、2、3 里面拿到的数据作为参数,开始计算商品的推荐优先级,产生 集合 updated_S 6. 将 updated_S 与上次对用户 u 的推荐结果 Rec 利用公式进行合并,产生新的推荐结果 NewRec 作为最终输出 #### 4.2.4 冷启动问题处理 对于新注册的用户,没有任何评分数据可以使用,也就是不知道他的偏好,导致以上的所有推荐失效。 解决办法: 1. 强制用户选择个人喜好(弹个框) 2. 收集用户浏览行为,然后预测用户偏好(最常用的办法) 3. 提前给所有的商品计算相似关联商品 下面介绍项目里面使用的第三个解决办法,通过基于内容与物品的离线计算可以得到商品展示页面类似于【猜你喜欢】【看过这个商品的人也看了..】的数据 #### 4.2.5 编码实施 ```` package com.qianfeng.recommender.online import com.mongodb.casbah.commons.MongoDBObject import com.mongodb.casbah.{MongoClient, MongoClientURI} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import redis.clients.jedis.Jedis // 定义一个连接助手对象,建立到redis和mongodb的连接 object ConnHelper extends Serializable{ // 懒变量定义,使用的时候才初始化 lazy val jedis = new Jedis("qianfeng01") jedis.auth("root") lazy val mongoClient = MongoClient(MongoClientURI("mongodb://qianfeng01:27017/recommender")) } //mongoDB的连接配置封装 case class MongoConfig( uri: String, db: String ) // 定义标准推荐对象 case class Recommendation( productId: Int, score: Double ) // 定义用户的推荐列表 case class UserRecs( userId: Int, recs: Seq[Recommendation] ) // 定义商品相似度列表 case class ProductRecs( productId: Int, recs: Seq[Recommendation] ) /** * 实时推荐 */ object OnlineRecommender { // 定义常量和表名 val MONGODB_RATING_COLLECTION = "Rating" val STREAM_RECS = "StreamRecs" val PRODUCT_RECS = "ProductRecs" val MAX_USER_RATING_NUM = 20 val MAX_SIM_PRODUCTS_NUM = 20 def main(args: Array[String]): Unit = { val config = Map( "spark.cores" -> "local[*]", "mongo.uri" -> "mongodb://qianfeng01:27017/recommender", "mongo.db" -> "recommender", "kafka.topic" -> "recommender" ) // 创建spark conf val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OnlineRecommender") val spark = SparkSession.builder().config(sparkConf).getOrCreate() val sc = spark.sparkContext val ssc = new StreamingContext(sc, Seconds(2)) import spark.implicits._ implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") ) // 加载商品相似度矩阵,广播出去 val simProductsMatrix = spark .read .option("uri", mongoConfig.uri) .option("collection", PRODUCT_RECS) .format("com.mongodb.spark.sql") .load() .as[ProductRecs] .rdd // 为了后续查询相似度方便,把数据转换成map形式 .map{item => ( item.productId, item.recs.map( x=>(x.productId, x.score) ).toMap ) } .collectAsMap() // 将simProductsMatrix定义为广播变量 val simProcutsMatrixBC = sc.broadcast(simProductsMatrix) // 创建kafka配置参数 val kafkaParam = Map( "bootstrap.servers" -> "qianfeng01:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "recommender", "auto.offset.reset" -> "latest" //从主题的最近offset开始消费 ) // 创建一个DStream val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam ) ) // 对kafkaStream进行处理,产生评分流,userId|productId|score|timestamp val ratingStream = kafkaStream.map{msg=> val attr = msg.value().split("\\|") ( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt ) } // 核心算法部分,定义评分流的处理流程 ratingStream.foreachRDD{ rdds => rdds.foreach{ case ( userId, productId, score, timestamp ) => println("rating data coming!>>>>>>>>>>>>>>>>>>") // TODO: 核心算法流程 // 1. 从redis里取出当前用户的最近评分,保存成一个数组Array[(productId, score)] val userRecentlyRatings = getUserRecentlyRatings( MAX_USER_RATING_NUM, userId, ConnHelper.jedis ) // 2. 从商品的相似度矩阵中获取当前商品最相似的商品列表,作为备选列表,保存成一个数组Array[productId] val candidateProducts = getTopSimProducts( MAX_SIM_PRODUCTS_NUM, productId, userId, simProcutsMatrixBC.value ) // 3. 计算每个备选商品的推荐优先级,得到当前用户的实时推荐列表,保存成 Array[(productId, score)] val streamRecs = computeProductScore( candidateProducts, userRecentlyRatings, simProcutsMatrixBC.value ) // 4. 把推荐列表保存到mongodb saveDataToMongoDB( userId, streamRecs ) } } // 启动streaming ssc.start() println("streaming started!") ssc.awaitTermination() } /** * 从redis里获取最近num次评分 */ import scala.collection.JavaConversions._ def getUserRecentlyRatings(num: Int, userId: Int, jedis: Jedis): Array[(Int, Double)] = { // 从redis中用户的评分队列里获取评分数据,list键名为uid:USERID,值格式是 PRODUCTID:SCORE jedis.lrange( "userId:" + userId.toString, 0, num ) //因为这儿返回的是Java版本的List,所以需要进行Java代码转换scala .map{ item => val attr = item.split("\\:") ( attr(0).trim.toInt, attr(1).trim.toDouble ) } .toArray } // 获取当前商品的相似列表,并过滤掉用户已经评分过的,作为备选列表 def getTopSimProducts(num: Int, productId: Int, userId: Int, simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]) (implicit mongoConfig: MongoConfig): Array[Int] ={ // 从广播变量相似度矩阵中拿到当前商品的相似度列表 val allSimProducts = simProducts(productId).toArray // 获得用户已经评分过的商品,过滤掉,排序输出 val ratingCollection = ConnHelper.mongoClient( mongoConfig.db )( MONGODB_RATING_COLLECTION ) //从Rating集合中获取当前登陆用户评过分的productId val ratingExist = ratingCollection .find( MongoDBObject("userId"->userId) ) .toArray .map{item=> // 只需要productId item.get("productId").toString.toInt } // 从所有的商品相似度举证中进行过滤 allSimProducts .filter( x => ! ratingExist.contains(x._1) ) .sortWith(_._2 > _._2) .take(num) .map(x=>x._1) } // 自定义log函数,以N为底 def log(m: Int): Double = { val N = 10 math.log(m)/math.log(N) } //获取产品相似度 def getProductsSimScore(product1: Int, product2: Int, simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Double ={ simProducts.get(product1) match { case Some(sims) => sims.get(product2) match { case Some(score) => score case None => 0.0 } case None => 0.0 } } // 计算每个备选商品的推荐得分 def computeProductScore(candidateProducts: Array[Int], userRecentlyRatings: Array[(Int, Double)], simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={ // 定义一个长度可变数组ArrayBuffer,用于保存每一个备选商品的基础得分,(productId, score) val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]() // 定义两个map,用于保存每个商品的高分和低分的计数器,productId -> count val increMap = scala.collection.mutable.HashMap[Int, Int]() val decreMap = scala.collection.mutable.HashMap[Int, Int]() // 遍历每个备选商品,计算和已评分商品的相似度 for( candidateProduct <- candidateProducts; userRecentlyRating <- userRecentlyRatings ){ // 从相似度矩阵中获取当前备选商品和当前已评分商品间的相似度 val simScore = getProductsSimScore( candidateProduct, userRecentlyRating._1, simProducts ) if( simScore > 0.4 ){ // 按照公式进行加权计算,得到基础评分 scores += ( (candidateProduct, simScore * userRecentlyRating._2) ) if( userRecentlyRating._2 > 3 ){ increMap(candidateProduct) = increMap.getOrDefault(candidateProduct, 0) + 1 } else { decreMap(candidateProduct) = decreMap.getOrDefault(candidateProduct, 0) + 1 } } } // 根据公式计算所有的推荐优先级,首先以productId做groupby scores .groupBy(_._1) .map{ case (productId, scoreList) => ( productId, scoreList.map(_._2).sum / scoreList.length //平均分 + log(increMap.getOrDefault(productId, 1)) //加强分 - log(decreMap.getOrDefault(productId, 1)) //削弱分 ) } // 返回推荐列表,按照得分排序 .toArray .sortWith(_._2>_._2) } // 写入mongodb def saveDataToMongoDB(userId: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit ={ val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(STREAM_RECS) // 按照userId查询并更新 streamRecsCollection.findAndRemove( MongoDBObject( "userId" -> userId ) ) streamRecsCollection.insert( MongoDBObject( "userId" -> userId, "recs" -> streamRecs.map(x=>MongoDBObject("productId"->x._1, "score"->x._2)) ) ) } } ```` ## 第五章 推荐服务 使用Spring web+Vue等前后端技术进行推荐可视化。 操作流程: 1. 解压businessServer.zip到某一个目录即可。 2. 使用IDEA-->open--->找到businessServer.zip解压的根目录--->open--->点击new window即可。 3. 使用IDEA--->settings--->搜索maven--->将maven修改成自己安装的maven(目录、settings.xml、仓库路径)。 4. 在本地安装Tomcate--->将安装包解压到某个目录即可。 5. 修改business代码中的几个配置: src/main/resources/recommend.properties :将该文件中的所有ip或者主机名换成自己的对应服务的ip和主机名 src/main/java/com/tqz/business/utils/Configure.java :该代码中的23行,修改成你的redis的服务密码;如果redis没有设置密码,将23行注释掉或者删除即可。 src/main/resources/log4j.properties :修改该代码中的第6行,将路径设置为自己宿主机tomcate安装目录下的logs,最后的文件名称可以不变。如果上传服务器运行,该行的目录需要服务器中的目录。 6. 在idea中部署businessServer到宿主机的tomcat中,步骤如下: idea--->Edit Configurations --->点击弹框中最左上脚的 + ---->下滑找到Tomcat Server ---->点击下面的local ----> 7. 启动businessServer之前将服务器中的Redis、mongoDB、Kafka集群都启动并可用。 8. dea中右上脚--->点击绿色三角运行按钮--->启动tomcat服务 9. 会自动弹出浏览器显示界面 ## 第六章 拓展推荐(选学) ### 6.1 基于内容推荐 基于内容(UGC)的推荐 商品数据集格式为 字段名|字段类型|字段描述|字段备注 ---|---|---|---- productId|Int|商品的ID name||String|商品的名称 categories|String|商品所属类别|每一项用“ | ”分割 imageUrl| String |商品图片的URL tags|String|商品的UGC标签|每一项用“ | ”分割 这个 tags 是用户自定义的标签,他可以很好的说明该商品的特性,可以将 tags 的内容进行提取,得到商品的内容特征向量,进而可以通过求解相似度矩阵,再按照离线推荐那一套来处理矩阵。为了避免热门标签对特征提取的影响,我们还可以通过TF-IDF算法对标签的权重进行调整,从而尽可能地接近用户偏好。 这部分内容也可以与实时推荐系统直接对接,计算出与用户当前评分商品的相似商品,实现基于内容的实时推荐 ```` package com.qianfeng.recommender.content import org.apache.spark.SparkConf import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer} import org.apache.spark.ml.linalg.SparseVector import org.apache.spark.sql.SparkSession import org.jblas.DoubleMatrix case class Product(productId: Int, name: String, imageUrl: String, categories: String, tags: String) case class MongoConfig(uri: String, db: String) // 定义标准推荐对象 case class Recommendation(productId: Int, score: Double) // 定义商品相似度列表 case class ProductRecs(productId: Int, recs: Seq[Recommendation]) object ContentBaseRecommender { // 定义mongodb中存储的表名 val MONGODB_PRODUCT_COLLECTION = "Product" val CONTENT_PRODUCT_RECS = "ContentBasedProductRecs" def main(args: Array[String]): Unit = { val config = Map( "spark.cores" -> "local[*]", "mongo.uri" -> "mongodb://qianfeng01:27017/recommender", "mongo.db" -> "recommender" ) // 创建一个spark config val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender") // 创建spark session val spark = SparkSession.builder().config(sparkConf).getOrCreate() import spark.implicits._ implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db")) // 载入数据,做预处理 val productTagsDF = spark .read .option("uri", mongoConfig.uri) .option("collection", MONGODB_PRODUCT_COLLECTION) .format("com.mongodb.spark.sql") .load() .as[Product] .map( // 分词器默认按照空格分词,这里把 tag 的 | 替换为空格 x => (x.productId, x.name, x.tags.map(c => if (c == '|') ' ' else c)) ) .toDF("productId", "name", "tags") .cache() // 使用 cache 优化性能 // 用 TF-IDF 提取商品特征向量 // 1. 实例化一个分词器,用来做分词,默认按照空格分词。下面的意思就是输入的数据的列名叫 tags,分词结束后输出的列叫 words val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words") // 用分词器做转换,得到增加一个新列 words 的 DF val wordsDataDF = tokenizer.transform(productTagsDF) /* 2. 定义一个 HashingTF 工具,计算频次 原始特征通过 hash 函数,映射到一个索引值。 后面只需要统计这些索引值的频率,就可以知道对应词的频率。 transform 方法会把词哈希成向量,结果类似于这样:(800,[67,259,267,350,579,652],[1.0,1.0,1.0,1.0,1.0,1.0]) 800 表示纬度、特征向量数、哈希表的桶数 [67,259,267,350,579,652] 表示哈希到下标为这些数字上的词语 [1.0,1.0,1.0,1.0,1.0,1.0] 表示上面这些词出现的次数 注意:这是一个稀疏矩阵,只显示非 0 的结果 */ val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(800) val featurizedDataDF = hashingTF.transform(wordsDataDF) /* 3. 定义一个 IDF 工具,计算 TF-IDF 调用 IDF 的方法来重新构造特征向量的规模,生成的 idf 是一个 Estimator, 在特征向量上应用它的 fit() 方法,会产生一个 IDFModel */ val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") val idfModel = idf.fit(featurizedDataDF) /* 4.同时,调用 IDFModel 的 transform 方法,可以得到每一个单词对应的 TF-IDF 度量值。 得到增加新列 features 的 DF,该 DF 的格式是 ( productId, name, tags, words, rawFeatures, features) ( 259637, 小狗钱钱, 书 少儿图书 教育类 童书 不错 孩子很喜欢 , [书, 少儿图书, 教育类, 童书, 不错, 孩子很喜欢] , (800,[67,259,267,350,579,652],[1.0,1.0,1.0,1.0,1.0,1.0]) , (800,[67,259,267,350,579,652],[0.4638371143300716,2.272125885509337,3.188416617383492,3.4760986898352733,1.8021222562636017,3.8815637979434374]) | |--> 这里最后一行的最后一组代表的就是 TF-IDF 度量值,通过观察发现 3.88 是最大值,也就是哈希到 652 桶里面的词最能代表本数据 */ val rescaledDataDF = idfModel.transform(featurizedDataDF) /* 经过 TF-IDF 提取之后,会过滤掉热门标签等因子对数据的干扰,现在的数据会更加符合用户喜好 然后可以开始操作现在的数据了,对数据进行转换,得到 RDD 形式的 features 二元组的形式:(productId, features) */ val productFeatures = rescaledDataDF.map { row => (row.getAs[Int]("productId"), row.getAs[SparseVector]("features").toArray) } .rdd .map { case (productId, features) => (productId, new DoubleMatrix(features)) } // 此处与 OfflineRecommender 第三步一致,利用商品的特征向量,计算商品的相似度列表 // 两两配对商品,计算余弦相似度 val productRecs = productFeatures.cartesian(productFeatures) .filter { case (a, b) => a._1 != b._1 } // 计算余弦相似度 .map { case (a, b) => val simScore = consinSim(a._2, b._2) (a._1, (b._1, simScore)) } .filter(_._2._2 > 0.4) .groupByKey() .map { case (productId, recs) => ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2))) } .toDF() productRecs.write .option("uri", mongoConfig.uri) .option("collection", CONTENT_PRODUCT_RECS) .mode("overwrite") .format("com.mongodb.spark.sql") .save() spark.stop() } //于弦相似度 def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = { product1.dot(product2) / (product1.norm2() * product2.norm2()) } } ```` ### 6.2 基于物品协同推荐 ```` package com.qianfeng import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession case class ProductRating(userId: Int, productId: Int, rating: Double, timestamp: Long) case class MongoConfig(uri: String, db: String) // 定义标准推荐对象 case class Recommendation(productId: Int, score: Double) // 定义商品相似度列表 case class ProductRecs(productId: Int, recs: Seq[Recommendation]) /** * 基于物品的协同 */ object ItemCFRecommender { // 定义常量和表名 val MONGODB_RATING_COLLECTION = "Rating" val ITEM_CF_PRODUCT_RECS = "ItemCFProductRecs" val MAX_RECOMMENDATION = 10 def main(args: Array[String]): Unit = { val config = Map( "spark.cores" -> "local[*]", "mongo.uri" -> "mongodb://qianfeng01:27017/recommender", "mongo.db" -> "recommender" ) // 创建一个spark config val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ItemCFRecommender") // 创建spark session val spark = SparkSession.builder().config(sparkConf).getOrCreate() import spark.implicits._ implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db")) // 加载数据,转换成 DF 进行处理,进行缓存 val ratingDF = spark.read .option("uri", mongoConfig.uri) .option("collection", MONGODB_RATING_COLLECTION) .format("com.mongodb.spark.sql") .load() .as[ProductRating] .map( x => (x.userId, x.productId, x.rating) ) .toDF("userId", "productId", "rating") .cache() // 核心算法,计算同现相似度,得到商品的相似列表 /* 统计每个商品的评分个数,按照 productId 来做 groupBy 得到的数据是: | productId | count | | 12345689 | 10 | */ val productRatingCountDF = ratingDF.groupBy("productId").count() /* 在原有的评分表上 rating 添加 count join 结果: +---------+------+-----+-----+ |productId|userId|score|count| +---------+------+-----+-----+ |505556 |13784 |3.0 |172 | ...... */ val ratingWithCountDF = ratingDF.join(productRatingCountDF, "productId") // 将评分按照用户 id 两两配对,统计两个商品被同一个用户评分过的次数 // 也就是计算公式里面分子处的交集 val joinedDF = ratingWithCountDF.join(ratingWithCountDF, "userId") .toDF("userId", "product1", "score1", "count1", "product2", "score2", "count2") .select("userId", "product1", "count1", "product2", "count2") // 创建一张临时表,用于写 sql 查询 joinedDF.createOrReplaceTempView("joined") // 按照 product1, product2 做 groupBy,统计 userId 的数量,就是对两个商品同时评分的人数 val cooccurrenceDF = spark.sql( """ |select | product1 |, product2 |, count(userId) as cocount |, first(count1) as count1 |, first(count2) as count2 |from joined |group by product1, product2 """.stripMargin ).cache() // 提取需要的数据,包装成( productId1, (productId2, score) ) val simDF = cooccurrenceDF.map { row => val coocSim = cooccurrenceSim(row.getAs[Long]("cocount") , row.getAs[Long]("count1") , row.getAs[Long]("count2")) (row.getInt(0), (row.getInt(1), coocSim)) } .rdd .groupByKey() .map { case (productId, recs) => ProductRecs(productId, recs.toList .filter(x => x._1 != productId) // 在 recs 中过滤掉与 productId 相同的商品,不同自己推荐自己 .sortWith(_._2 > _._2) // 按照分数降序排序 .take(MAX_RECOMMENDATION) .map(x => Recommendation(x._1, x._2)) // 组装成被推荐列表的格式 (productId, score) ) // 最后封装成存入 MongoDB 的格式 (productId, 多个(productId, score)) } .toDF() // 保存到mongodb simDF.write .option("uri", mongoConfig.uri) .option("collection", ITEM_CF_PRODUCT_RECS) .mode("overwrite") .format("com.mongodb.spark.sql") .save() spark.stop() } // 按照公式计算同现相似度 def cooccurrenceSim(coCount: Long, count1: Long, count2: Long): Double = { // y = AI ^ CI / AI coCount / math.sqrt(count1 * count2) } } ````