实时推荐项目,练习 demo

分享 123456789987654321 ⋅ 于 2022-08-27 19:05:05 ⋅ 1228 阅读

数据清洗加载到mongodb

import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Product数据集
  * 3982                            商品ID
  * Fuhlen 富勒 M8眩光舞者时尚节能    商品名称
  * 1057,439,736                    商品分类ID,不需要
  * B009EJN4T2                      亚马逊ID,不需要
  * https://images-cn-4.ssl-image   商品的图片URL
  * 外设产品|鼠标|电脑/办公           商品分类
  * 富勒|鼠标|电子产品|好用|外观漂亮   商品UGC标签
  */
case class Product(productId: Int, name: String, imageUrl: String, categories: String, tags: String)

/**
  * Rating数据集
  * 4867        用户ID
  * 457976      商品ID
  * 5.0         评分
  * 1395676800  时间戳
  */
case class Rating(userId: Int, productId: Int, score: Double, timestamp: Int)

/**
  * MongoDB连接配置
  *
  * @param uri    MongoDB的连接uri
  *
  * @param db     要操作的db
  */
case class MongoConfig(uri: String, db: String)

object DataLoader {
    // 定义数据文件路径
    val PRODUCT_DATA_PATH = "D:\\Projects\\BigData\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\products.csv"
    val RATING_DATA_PATH = "D:\\Projects\\BigData\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"
    // 定义mongodb中存储的表名
    val MONGODB_PRODUCT_COLLECTION = "Product"
    val MONGODB_RATING_COLLECTION = "Rating"
    import spark.implicits._
    def main(args: Array[String]): Unit = {
        val config = Map(
            "spark.cores" -> "local[*]",
            "mongo.uri" -> "mongodb://localhost:27017/recommender",
            "mongo.db" -> "recommender"
        )
        // 创建一个spark config
        val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")
        // 创建spark session
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        // 加载数据
        val productRDD = spark.sparkContext.textFile(PRODUCT_DATA_PATH)
        val productDF = productRDD.map(item => {
            // product数据通过^分隔,切分出来
            val attr = item.split("\\^")
            // 转换成Product
            Product(attr(0).toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim)
        }).toDF()

        val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
        val ratingDF = ratingRDD.map(item => {
            val attr = item.split(",")
            Rating(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
        }).toDF()

        implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
        storeDataInMongoDB(productDF, ratingDF)

        spark.stop()
    }

    def storeDataInMongoDB(productDF: DataFrame, ratingDF: DataFrame)(implicit mongoConfig: MongoConfig): Unit = {
        // 新建一个mongodb的连接,客户端
        val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))
        // 定义要操作的mongodb表,可以理解为 db.Product
        val productCollection = mongoClient(mongoConfig.db)(MONGODB_PRODUCT_COLLECTION)
        val ratingCollection = mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)

        // 如果表已经存在,则删掉
        productCollection.dropCollection()
        ratingCollection.dropCollection()

        // 将当前数据存入对应的表中
        productDF.write
                .option("uri", mongoConfig.uri)
                .option("collection", MONGODB_PRODUCT_COLLECTION)
                .mode("overwrite")
                .format("com.mongodb.spark.sql")
                .save()

        ratingDF.write
                .option("uri", mongoConfig.uri)
                .option("collection", MONGODB_RATING_COLLECTION)
                .mode("overwrite")
                .format("com.mongodb.spark.sql")
                .save()

        // 对表创建索引
        productCollection.createIndex(MongoDBObject("productId" -> 1))
        ratingCollection.createIndex(MongoDBObject("productId" -> 1))
        ratingCollection.createIndex(MongoDBObject("userId" -> 1))

        mongoClient.close()
    }
}

离线统计推荐

/**
1. 历史热门商品,按照评分个数统计,productId,count
2. 近期热门商品,把时间戳转换成yyyyMM格式进行评分个数统计,最终得到productId, count, yearmonth
3. 优质商品统计,商品的平均评分,productId,avg
*/

import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

case class Rating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)

object StatisticsRecommender {
    // 定义mongodb中存储的表名
    val MONGODB_RATING_COLLECTION = "Rating"

    val RATE_MORE_PRODUCTS = "RateMoreProducts"
    val RATE_MORE_RECENTLY_PRODUCTS = "RateMoreRecentlyProducts"
    val AVERAGE_PRODUCTS = "AverageProducts"

    def main(args: Array[String]): Unit = {
        val config = Map(
            "spark.cores" -> "local[1]",
            "mongo.uri" -> "mongodb://localhost:27017/recommender",
            "mongo.db" -> "recommender"
        )
        // 创建一个spark config
        val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommender")
        // 创建spark session
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        import spark.implicits._
        implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

        // 加载数据
        val ratingDF = spark.read
                .option("uri", mongoConfig.uri)
                .option("collection", MONGODB_RATING_COLLECTION)
                .format("com.mongodb.spark.sql")
                .load()
                .as[Rating]
                .toDF()

        // 创建一张叫ratings的临时表
        ratingDF.createOrReplaceTempView("ratings")

        // TODO: 用spark sql去做不同的统计推荐
        // 1. 历史热门商品,按照评分个数统计,productId,count
        val rateMoreProductsDF = spark.sql("select productId, count(productId) as count from ratings group by productId order by count desc")
        storeDFInMongoDB(rateMoreProductsDF, RATE_MORE_PRODUCTS)

        // 2. 近期热门商品,把时间戳转换成yyyyMM格式进行评分个数统计,最终得到productId, count, yearmonth
        // 创建一个日期格式化工具
        val simpleDateFormat = new SimpleDateFormat("yyyyMM")
        // 注册UDF,将timestamp转化为年月格式yyyyMM
        spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)
        // 把原始rating数据转换成想要的结构productId, score, yearmonth
        val ratingOfYearMonthDF = spark.sql("select productId, score, changeDate(timestamp) as yearmonth from ratings")
        ratingOfYearMonthDF.createOrReplaceTempView("ratingOfMonth")
        val rateMoreRecentlyProductsDF = spark.sql("select productId, count(productId) as count, yearmonth from ratingOfMonth group by yearmonth, productId order by yearmonth desc, count desc")
        // 把df保存到mongodb
        storeDFInMongoDB(rateMoreRecentlyProductsDF, RATE_MORE_RECENTLY_PRODUCTS)

        // 3. 优质商品统计,商品的平均评分,productId,avg
        val averageProductsDF = spark.sql("select productId, avg(score) as avg from ratings group by productId order by avg desc")
        storeDFInMongoDB(averageProductsDF, AVERAGE_PRODUCTS)

        spark.stop()
    }

    def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit = {
        df.write
                .option("uri", mongoConfig.uri)
                .option("collection", collection_name)
                .mode("overwrite")
                .format("com.mongodb.spark.sql")
                .save()
    }
}

基于LMF(隐语模型)的协同过滤推荐

//采用ALS作为协同过滤算法,根据MongoDB中的用户分表计算离线的用户商品推荐列表以及商品相似度矩阵
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)

case class MongoConfig(uri: String, db: String)

// 定义标准推荐对象
case class Recommendation(productId: Int, score: Double)

// 定义用户的推荐列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])

// 定义商品相似度列表
case class ProductRecs(productId: Int, recs: Seq[Recommendation])

object OfflineRecommender {
    // 定义mongodb中存储的表名
    val MONGODB_RATING_COLLECTION = "Rating"

    val USER_RECS = "UserRecs"
    val PRODUCT_RECS = "ProductRecs"
    val USER_MAX_RECOMMENDATION = 20

    def main(args: Array[String]): Unit = {
        val config = Map(
            "spark.cores" -> "local[*]",
            "mongo.uri" -> "mongodb://localhost:27017/recommender",
            "mongo.db" -> "recommender"
        )
        // 创建一个spark config
        val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
        // 创建spark session
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        import spark.implicits._
        implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

        // 加载数据
        val ratingRDD = spark.read
                .option("uri", mongoConfig.uri)
                .option("collection", MONGODB_RATING_COLLECTION)
                .format("com.mongodb.spark.sql")
                .load()
                .as[ProductRating]
                .rdd
                .map(
                    rating => (rating.userId, rating.productId, rating.score)
                ).cache()

        // 提取出所有用户和商品的数据集
        val userRDD = ratingRDD.map(_._1).distinct()
        val productRDD = ratingRDD.map(_._2).distinct()

        // 核心计算过程
        // 1. 训练隐语义模型
        val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))

        // 定义模型训练的参数,rank隐特征个数,iterations迭代词数,lambda正则化系数
        val (rank, iterations, lambda) = (5, 10, 0.01)
        val model = ALS.train(trainData, rank, iterations, lambda)

        // 2. 获得预测评分矩阵,得到用户的推荐列表
        // 用userRDD和productRDD做一个笛卡尔积,得到空的userProductsRDD表示的评分矩阵
        val userProducts = userRDD.cartesian(productRDD)
        val preRating = model.predict(userProducts)

        // 从预测评分矩阵中提取得到用户推荐列表
        val userRecs = preRating.filter(_.rating > 0)
                .map(
                    rating => (rating.user, (rating.product, rating.rating))
                )
                .groupByKey()
                .map {
                    case (userId, recs) =>                  //降序排序    
                        UserRecs(userId, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))
                }
                .toDF()
        userRecs.write
                .option("uri", mongoConfig.uri)
                .option("collection", USER_RECS)
                .mode("overwrite")
                .format("com.mongodb.spark.sql")
                .save()

        // 3. 利用商品的特征向量,计算商品的相似度列表
        val productFeatures = model.productFeatures.map {
            case (productId, features) => (productId, new DoubleMatrix(features))
        }
        // 两两配对商品,计算余弦相似度
        val productRecs = productFeatures.cartesian(productFeatures)
                .filter {
                    case (a, b) => a._1 != b._1
                }
                // 计算余弦相似度
                .map {
                    case (a, b) =>
                        val simScore = consinSim(a._2, b._2)
                        (a._1, (b._1, simScore))
                }
                .filter(_._2._2 > 0.4)
                .groupByKey()
                .map {
                    case (productId, recs) =>
                        ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
                }
                .toDF()
        productRecs.write
                .option("uri", mongoConfig.uri)
                .option("collection", PRODUCT_RECS)
                .mode("overwrite")
                .format("com.mongodb.spark.sql")
                .save()

        spark.stop()
    }

    def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {
        product1.dot(product2) / (product1.norm2() * product2.norm2())
    }
}

ALS模型评估和参数选择

 //val (rank, iterations, lambda) = (5, 10, 0.01)
import breeze.numerics.sqrt
import com.atguigu.offline.OfflineRecommender.MONGODB_RATING_COLLECTION
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object ALSTrainer {
    def main(args: Array[String]): Unit = {
        val config = Map(
            "spark.cores" -> "local[*]",
            "mongo.uri" -> "mongodb://localhost:27017/recommender",
            "mongo.db" -> "recommender"
        )
        // 创建一个spark config
        val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
        // 创建spark session
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        import spark.implicits._
        implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

        // 加载数据
        val ratingRDD = spark.read
                .option("uri", mongoConfig.uri)
                .option("collection", MONGODB_RATING_COLLECTION)
                .format("com.mongodb.spark.sql")
                .load()
                .as[ProductRating]
                .rdd
                .map(
                    rating => Rating(rating.userId, rating.productId, rating.score)
                ).cache()

        // 数据集切分成训练集和测试集
        val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
        val trainingRDD = splits(0)
        val testingRDD = splits(1)

        // 核心实现:输出最优参数
        adjustALSParams(trainingRDD, testingRDD)

        spark.stop()
    }

    def adjustALSParams(trainData: RDD[Rating], testData: RDD[Rating]): Unit = {
        // 遍历数组中定义的参数取值
        val result = for (rank <- Array(5, 10, 20, 50); lambda <- Array(1, 0.1, 0.01))
            yield {
                val model = ALS.train(trainData, rank, 10, lambda)
                val rmse = getRMSE(model, testData)
                (rank, lambda, rmse)
            }
        // 按照rmse排序并输出最优参数
        println(result.minBy(_._3))
    }

    def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
        // 构建userProducts,得到预测评分矩阵
        val userProducts = data.map(item => (item.user, item.product))
        val predictRating = model.predict(userProducts)

        // 按照公式计算rmse,首先把预测评分和实际评分表按照(userId, productId)做一个连接
        val observed = data.map(item => ((item.user, item.product), item.rating))
        val predict = predictRating.map(item => ((item.user, item.product), item.rating))

        sqrt(
            observed.join(predict).map {
                case ((userId, productId), (actual, pre)) =>
                    val err = actual - pre
                    err * err
            }.mean()
        )
    }
}

实时推荐模块

import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

// 定义一个连接助手对象,建立到redis和mongodb的连接
object ConnHelper extends Serializable {
    // 懒变量定义,使用的时候才初始化
    lazy val jedis = new Jedis("localhost")
    lazy val mongoClient = MongoClient(MongoClientURI("mongodb://localhost:27017/recommender"))
}

case class MongoConfig(uri: String, db: String)

// 定义标准推荐对象
case class Recommendation(productId: Int, score: Double)

// 定义用户的推荐列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])

// 定义商品相似度列表
case class ProductRecs(productId: Int, recs: Seq[Recommendation])

object OnlineRecommender {
    // 定义常量和表名
    val MONGODB_RATING_COLLECTION = "Rating"
    val STREAM_RECS = "StreamRecs"
    val PRODUCT_RECS = "ProductRecs"

    val MAX_USER_RATING_NUM = 20//矩阵
    val MAX_SIM_PRODUCTS_NUM = 20//候选

    def main(args: Array[String]): Unit = {
        val config = Map(
            "spark.cores" -> "local[*]",
            "mongo.uri" -> "mongodb://localhost:27017/recommender",
            "mongo.db" -> "recommender",
            "kafka.topic" -> "recommender"
        )

        // 创建spark conf
        val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OnlineRecommender")
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()
        val sc = spark.sparkContext
        val ssc = new StreamingContext(sc, Seconds(2))

        import spark.implicits._
        implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

        // 加载数据,相似度矩阵,广播出去
        val simProductsMatrix = spark.read
                .option("uri", mongoConfig.uri)
                .option("collection", PRODUCT_RECS)
                .format("com.mongodb.spark.sql")
                .load()
                .as[ProductRecs]
                .rdd
                // 为了后续查询相似度方便,把数据转换成map形式
                .map { item =>
                    (item.productId, item.recs.map(x => (x.productId, x.score)).toMap)
                }
                .collectAsMap()
        // 定义广播变量
        val simProcutsMatrixBC = sc.broadcast(simProductsMatrix)

        // 创建kafka配置参数
        val kafkaParam = Map(
            "bootstrap.servers" -> "localhost:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "recommender",
            "auto.offset.reset" -> "latest"
        )
        // 创建一个DStream
        val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaParam)
        )
        // 对kafkaStream进行处理,产生评分流,userId|productId|score|timestamp
        val ratingStream = kafkaStream.map { msg =>
            var attr = msg.value().split("\\|")
            (attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
        }

        // 核心算法部分,定义评分流的处理流程
        ratingStream.foreachRDD {
            rdds =>
                rdds.foreach {
                    case (userId, productId, score, timestamp) =>
                        println("rating data coming!>>>>>>>>>>>>>>>>>>")

                        // TODO: 核心算法流程
                        // 1. 从redis里取出当前用户的最近评分,保存成一个数组Array[(productId, score)]
                        val userRecentlyRatings = getUserRecentlyRatings(MAX_USER_RATING_NUM, userId, ConnHelper.jedis)

                        // 2. 从相似度矩阵中获取当前商品最相似的商品列表,作为备选列表,保存成一个数组Array[productId]
                        val candidateProducts = getTopSimProducts(MAX_SIM_PRODUCTS_NUM, productId, userId, simProcutsMatrixBC.value)

                        // 3. 计算每个备选商品的推荐优先级,得到当前用户的实时推荐列表,保存成 Array[(productId, score)]
                        val streamRecs = computeProductScore(candidateProducts, userRecentlyRatings, simProcutsMatrixBC.value)

                        // 4. 把推荐列表保存到mongodb
                        saveDataToMongoDB(userId, streamRecs)
                }
        }

        // 启动streaming
        ssc.start()
        println("streaming started!")
        ssc.awaitTermination()

    }

    /**
      * 从redis里获取最近num次评分
      */

    import scala.collection.JavaConversions._

    def getUserRecentlyRatings(num: Int, userId: Int, jedis: Jedis): Array[(Int, Double)] = {
        // 从redis中用户的评分队列里获取评分数据,list键名为uid:USERID,值格式是 PRODUCTID:SCORE
        jedis.lrange("userId:" + userId.toString, 0, num)
                .map { item =>
                    val attr = item.split("\\:")
                    (attr(0).trim.toInt, attr(1).trim.toDouble)
                }
                .toArray
    }

    // 获取当前商品的相似列表,并过滤掉用户已经评分过的,作为备选列表
    def getTopSimProducts(num: Int,
                          productId: Int,
                          userId: Int,
                          simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
                         (implicit mongoConfig: MongoConfig): Array[Int] = {
        // 从广播变量相似度矩阵中拿到当前商品的相似度列表
        val allSimProducts = simProducts(productId).toArray

        // 获得用户已经评分过的商品,过滤掉,排序输出
        val ratingCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
        val ratingExist = ratingCollection.find(MongoDBObject("userId" -> userId))
                .toArray
                .map { item => // 只需要productId
                    item.get("productId").toString.toInt
                }
        // 从所有的相似商品中进行过滤
        allSimProducts.filter(x => !ratingExist.contains(x._1))
                .sortWith(_._2 > _._2)
                .take(num)
                .map(x => x._1)
    }

    // 计算每个备选商品的推荐得分
    def computeProductScore(candidateProducts: Array[Int],
                            userRecentlyRatings: Array[(Int, Double)],
                            simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
    : Array[(Int, Double)] = {
        // 定义一个长度可变数组ArrayBuffer,用于保存每一个备选商品的基础得分,(productId, score)
        val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
        // 定义两个map,用于保存每个商品的高分和低分的计数器,productId -> count
        val increMap = scala.collection.mutable.HashMap[Int, Int]()
        val decreMap = scala.collection.mutable.HashMap[Int, Int]()

        // 遍历每个备选商品,计算和已评分商品的相似度
        for (candidateProduct <- candidateProducts; userRecentlyRating <- userRecentlyRatings) {
            // 从相似度矩阵中获取当前备选商品和当前已评分商品间的相似度
            val simScore = getProductsSimScore(candidateProduct, userRecentlyRating._1, simProducts)
            if (simScore > 0.4) {
                // 按照公式进行加权计算,得到基础评分
                scores += ((candidateProduct, simScore * userRecentlyRating._2))
                if (userRecentlyRating._2 > 3) {
                    increMap(candidateProduct) = increMap.getOrDefault(candidateProduct, 0) + 1
                } else {
                    decreMap(candidateProduct) = decreMap.getOrDefault(candidateProduct, 0) + 1
                }
            }
        }

        // 根据公式计算所有的推荐优先级,首先以productId做groupby
        scores.groupBy(_._1).map {
            case (productId, scoreList) =>
                (productId, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(productId, 1)) - log(decreMap.getOrDefault(productId, 1)))
        }
                // 返回推荐列表,按照得分排序
                .toArray
                .sortWith(_._2 > _._2)
    }

    def getProductsSimScore(product1: Int, product2: Int,
                            simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Double = {
        simProducts.get(product1) match {
            case Some(sims) => sims.get(product2) match {
                case Some(score) => score
                case None => 0.0
            }
            case None => 0.0
        }
    }

    // 自定义log函数,以N为底
    def log(m: Int): Double = {
        val N = 10
        math.log(m) / math.log(N)
    }

    // 写入mongodb
    def saveDataToMongoDB(userId: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit = {
        val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(STREAM_RECS)
        // 按照userId查询并更新
        streamRecsCollection.findAndRemove(MongoDBObject("userId" -> userId))
        streamRecsCollection.insert(MongoDBObject("userId" -> userId,
            "recs" -> streamRecs.map(x => MongoDBObject("productId" -> x._1, "score" -> x._2))))
    }

}

基于内容的离线推荐模块

import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

case class Product(productId: Int, name: String, imageUrl: String, categories: String, tags: String)

case class MongoConfig(uri: String, db: String)

// 定义标准推荐对象
case class Recommendation(productId: Int, score: Double)

// 定义商品相似度列表
case class ProductRecs(productId: Int, recs: Seq[Recommendation])

object ContentRecommender {
    // 定义mongodb中存储的表名
    val MONGODB_PRODUCT_COLLECTION = "Product"
    val CONTENT_PRODUCT_RECS = "ContentBasedProductRecs"

    def main(args: Array[String]): Unit = {
        val config = Map(
            "spark.cores" -> "local[*]",
            "mongo.uri" -> "mongodb://localhost:27017/recommender",
            "mongo.db" -> "recommender"
        )
        // 创建一个spark config
        val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")
        // 创建spark session
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        import spark.implicits._
        implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

        // 载入数据,做预处理
        val productTagsDF = spark.read
                .option("uri", mongoConfig.uri)
                .option("collection", MONGODB_PRODUCT_COLLECTION)
                .format("com.mongodb.spark.sql")
                .load()
                .as[Product]
                .map(
                    x => (x.productId, x.name, x.tags.map(c => if (c == '|') ' ' else c))
                )
                .toDF("productId", "name", "tags")
                .cache()

        // TODO: 用TF-IDF提取商品特征向量
        // 1. 实例化一个分词器,用来做分词,默认按照空格分
        val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")
        // 用分词器做转换,得到增加一个新列words的DF
        val wordsDataDF = tokenizer.transform(productTagsDF)

        // 2. 定义一个HashingTF工具,计算频次
        val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(800)
        val featurizedDataDF = hashingTF.transform(wordsDataDF)

        // 3. 定义一个IDF工具,计算TF-IDF
        val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
        // 训练一个idf模型
        val idfModel = idf.fit(featurizedDataDF)
        // 得到增加新列features的DF
        val rescaledDataDF = idfModel.transform(featurizedDataDF)

        // 对数据进行转换,得到RDD形式的features
        val productFeatures = rescaledDataDF.map {
            row => (row.getAs[Int]("productId"), row.getAs[SparseVector]("features").toArray)
        }
                .rdd
                .map {
                    case (productId, features) => (productId, new DoubleMatrix(features))
                }

        // 两两配对商品,计算余弦相似度
        val productRecs = productFeatures.cartesian(productFeatures)
                .filter {
                    case (a, b) => a._1 != b._1
                }
                // 计算余弦相似度
                .map {
                    case (a, b) =>
                        val simScore = consinSim(a._2, b._2)
                        (a._1, (b._1, simScore))
                }
                .filter(_._2._2 > 0.4)
                .groupByKey()
                .map {
                    case (productId, recs) =>
                        ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
                }
                .toDF()
        productRecs.write
                .option("uri", mongoConfig.uri)
                .option("collection", CONTENT_PRODUCT_RECS)
                .mode("overwrite")
                .format("com.mongodb.spark.sql")
                .save()

        spark.stop()
    }

    def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {
        product1.dot(product2) / (product1.norm2() * product2.norm2())
    }
}

基于ItemCF的离线推荐

package com.atguigu.itemcf

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)

case class MongoConfig(uri: String, db: String)

// 定义标准推荐对象
case class Recommendation(productId: Int, score: Double)

// 定义商品相似度列表
case class ProductRecs(productId: Int, recs: Seq[Recommendation])

object ItemCFRecommender {
    // 定义常量和表名
    val MONGODB_RATING_COLLECTION = "Rating"
    val ITEM_CF_PRODUCT_RECS = "ItemCFProductRecs"
    val MAX_RECOMMENDATION = 10

    def main(args: Array[String]): Unit = {
        val config = Map(
            "spark.cores" -> "local[*]",
            "mongo.uri" -> "mongodb://localhost:27017/recommender",
            "mongo.db" -> "recommender"
        )
        // 创建一个spark config
        val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ItemCFRecommender")
        // 创建spark session
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        import spark.implicits._
        implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

        // 加载数据,转换成DF进行处理
        val ratingDF = spark.read
                .option("uri", mongoConfig.uri)
                .option("collection", MONGODB_RATING_COLLECTION)
                .format("com.mongodb.spark.sql")
                .load()
                .as[ProductRating]
                .map(
                    x => (x.userId, x.productId, x.score)
                )
                .toDF("userId", "productId", "score")
                .cache()

        // TODO: 核心算法,计算同现相似度,得到商品的相似列表
        // 统计每个商品的评分个数,按照productId来做group by
        val productRatingCountDF = ratingDF.groupBy("productId").count()
        // 在原有的评分表上rating添加count
        val ratingWithCountDF = ratingDF.join(productRatingCountDF, "productId")

        // 将评分按照用户id两两配对,统计两个商品被同一个用户评分过的次数
        val joinedDF = ratingWithCountDF.join(ratingWithCountDF, "userId")
                .toDF("userId", "product1", "score1", "count1", "product2", "score2", "count2")
                .select("userId", "product1", "count1", "product2", "count2")
        // 创建一张临时表,用于写sql查询
        joinedDF.createOrReplaceTempView("joined")

        // 按照product1,product2 做group by,统计userId的数量,就是对两个商品同时评分的人数
        val cooccurrenceDF = spark.sql(
            """
              |select product1
              |, product2
              |, count(userId) as cocount
              |, first(count1) as count1
              |, first(count2) as count2
              |from joined
              |group by product1, product2
      """.stripMargin
        ).cache()

        // 提取需要的数据,包装成( productId1, (productId2, score) )
        val simDF = cooccurrenceDF.map {
            row =>
                val coocSim = cooccurrenceSim(row.getAs[Long]("cocount"), row.getAs[Long]("count1"), row.getAs[Long]("count2"))
                (row.getInt(0), (row.getInt(1), coocSim))
        }
                .rdd
                .groupByKey()
                .map {
                    case (productId, recs) =>
                        ProductRecs(productId, recs.toList
                                .filter(x => x._1 != productId)
                                .sortWith(_._2 > _._2)
                                .take(MAX_RECOMMENDATION)
                                .map(x => Recommendation(x._1, x._2)))
                }
                .toDF()

        // 保存到mongodb
        simDF.write
                .option("uri", mongoConfig.uri)
                .option("collection", ITEM_CF_PRODUCT_RECS)
                .mode("overwrite")
                .format("com.mongodb.spark.sql")
                .save()

        spark.stop()
    }

    // 按照公式计算同现相似度
    def cooccurrenceSim(coCount: Long, count1: Long, count2: Long): Double = {
        coCount / math.sqrt(count1 * count2)
    }
}
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-123456789987654321,http://hainiubl.com/topics/75898
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter