https://github.com/wzhe06/SparrowRecSys
文章目錄
- 根據介面進行除錯
- RecommendationService
- MovieService
- SimilarMovieService
- RecForYouService
- Spark離線計算的Scala代碼
- Embedding
- processItemSequence
- item2vec
- deepwalk
- generateTransitionMatrix
- oneRandomWalk
- generateUserEmb
- FeatureEngineering
- oneHotEncoderExample
- multiHotEncoderExample
- ratingFeatures
根據介面進行除錯
RecommendationService
在主函式中,RecommendationService是和getrecommendation介面系結的
context.addServlet(new ServletHolder(new RecommendationService()), "/getrecommendation");
在com.sparrowrecsys.online.service.RecommendationService#doGet打一個斷點

找到webroot/index.html, 最后的js

看js代碼:webroot/js/recsys.js

先找genre為Action的電影,size=8
知道了呼叫源頭在哪之后,看Java里面呼叫了些什么,
com.sparrowrecsys.online.datamanager.DataManager#getMoviesByGenre
從倒排索引中,根據體裁genre找出電影的ID
List<Movie> movies = new ArrayList<>(this.genreReverseIndexMap.get(genre));
MovieService
任意點擊一部電影,進入這個斷點:
com.sparrowrecsys.online.service.MovieService#doGet
在主函式中,MovieService是和getmovie介面系結的
context.addServlet(new ServletHolder(new MovieService()), "/getmovie");
前端的請求來源我估計是webroot/js/recsys.js:182的addMovieDetails函式
SimilarMovieService
在主函式中,SimilarMovieService是和getsimilarmovie介面系結的
context.addServlet(new ServletHolder(new MovieService()), "/getmovie");
呼叫的是online.recprocess.SimilarMovieProcess類的getRecList的靜態方法
具體程序其實分為召回和排序,體現在兩個方法中
List<Movie> candidates = candidateGenerator(movie);
List<Movie> rankedList = ranker(movie, candidates, model);
原生的candidateGenerator就是根據體裁做了個單路召回,排序用的是Embedding相似度,emb是Movie類的一個屬性,是加載到記憶體中的!!!!
RecForYouService
上一個的引數是電影ID,這個的引數是用戶ID,是要協同過濾?
從業務上,可以理解為以這個ID登錄的用戶在主頁上看到的資訊流?
online.recprocess.RecForYouProcess#getRecList
物料的emb是用graph Embedding算的,用戶的emb是用他喜好的物料取平均算的
正常操作應該從redis里面拿特征,而不是一把梭全放記憶體里面
candidates是800個根據評分(rating)得到的電影,排名第一的是4.3分的辛德勒名單
online.recprocess.RecForYouProcess#ranker
本質上就是個雙塔召回
雖然看起來這個代碼寫得就那么回事,但仔細想想,協同過濾 → \rightarrow →矩陣分解 → \rightarrow →emb,好像也有點道理
Spark離線計算的Scala代碼
Scala的語法糖實在太甜了,我已經暈了,受不了
Embedding
processItemSequence
單機偽分布式
val conf = new SparkConf()
.setMaster("local")
.setAppName("ctrModel")
.set("spark.submit.deployMode", "client")
處理出item2vec所需的樣本序列:
val samples = processItemSequence(spark, rawSampleDataPath)
ratingSamples是評分與時間戳資料

定義排序UDF
val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
rows.map { case Row(movieId: String, timestamp: String) => (movieId, timestamp) }
.sortBy { case (_, timestamp) => timestamp }
.map { case (movieId, _) => movieId }
})
感覺搞不定了,,需要互動式編程才能理清楚
scala> ratingSamples
res6: org.apache.spark.sql.DataFrame = [userId: string, movieId: string ... 2 more fields]
scala> ratingSamples.where(col("rating") >= 3.5).groupBy("userId")
res7: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [userId: string], value: [userId: string, movieId: string ... 2 more fields], type: GroupBy]
scala> var tmp = ratingSamples.where(col("rating") >= 3.5).groupBy("userId").agg( collect_list(struct("movieId", "timestamp")) as "tmp" ).take(1)
tmp: Array[org.apache.spark.sql.Row] = Array([10096,WrappedArray([50,954365515], [457,954365571], [593,954365552], [858,954364961])])
scala> tmp
res12: Array[org.apache.spark.sql.Row] = Array([10096,WrappedArray([50,954365515], [457,954365571], [593,954365552], [858,954364961])])
scala> tmp(0)
res13: org.apache.spark.sql.Row = [10096,WrappedArray([50,954365515], [457,954365571], [593,954365552], [858,954364961])]
scala> tmp(0)(0)
res14: Any = 10096
scala> tmp(0)(1)
res15: Any = WrappedArray([50,954365515], [457,954365571], [593,954365552], [858,954364961])
scala> userSeq.select("userId", "movieIdStr").show(10, truncate = false)
+------+--------------------------------------------------------------------------------------------------------------------------------------------+
|userId|movieIdStr |
+------+--------------------------------------------------------------------------------------------------------------------------------------------+
|10096 |858 50 593 457 |
|10351 |1 25 32 6 608 52 58 26 30 103 582 588 |
本質上就是獲取用戶看過的所有高評分的電影(用戶給出了高評分), 然后按時間戳排序
最后回傳RDD[Seq[String]]
userSeq.select("movieIdStr").rdd.map(r => r.getAs[String]("movieIdStr").split(" ").toSeq)
scala> userSeq.select("movieIdStr").rdd.map(r => r.getAs[String]("movieIdStr").split(" ").toSeq).take(2)
res5: Array[Seq[String]] = Array(WrappedArray(858, 50, 593, 457), WrappedArray(1, 25, 32, 6, 608, 52, 58, 26, 30, 103, 582, 588))
item2vec
samples : RDD[Seq[String]]
val word2vec = new Word2Vec()
.setVectorSize(embLength)
.setWindowSize(5)
.setNumIterations(10)
val model = word2vec.fit(samples)
deepwalk
本質上是采樣方式不同
generateTransitionMatrix
對于用戶log的物料瀏覽記錄,按時序pairs,建立邊,
val pairSamples = samples.flatMap[(String, String)]( sample => {
var pairSeq = Seq[(String,String)]()
var previousItem:String = null
sample.foreach((element:String) => {
if(previousItem != null){
pairSeq = pairSeq :+ (previousItem, element)
}
previousItem = element
})
pairSeq
})
計數
注意pairSamples是rdd,countByValue是action操作,pairCountMap是scala.collection.Map資料型別,
val pairCountMap = pairSamples.countByValue()

其實就是一個簡單的鄰接表資料結構:
val transitionCountMatrix = mutable.Map[String, mutable.Map[String, Long]]()
剛看到這行代碼我還在想為什么不用邊表,仔細一想正解就是鄰接表,因為邊表沒法求某個結點的鄰接結點
注意邊是單向的,比如用戶的瀏覽記錄是ABC, 那么建的圖就是 A → B → C A \rightarrow B \rightarrow C A→B→C
邊表的建立,本質是對hashMap的foreach操作:
pairCountMap.foreach( pair => {
val pairItems = pair._1
val count = pair._2
if(!transitionCountMatrix.contains(pairItems._1)){
transitionCountMatrix(pairItems._1) = mutable.Map[String, Long]()
}
transitionCountMatrix(pairItems._1)(pairItems._2) = count
itemCountMap(pairItems._1) = itemCountMap.getOrElse[Long](pairItems._1, 0) + count
pairTotalCount = pairTotalCount + count
})
這波回圈會形成兩個關鍵的資料結構用于后續的計算中:
transitionCountMatrix計數轉移矩陣itemCountMap每個物料的計數pairTotalCount邊的計數
最后會建議物料與物料的概率轉移矩陣+物料的單變數多項分布
generateTransitionMatrix方法的回傳值也是這兩個東西:
(mutable.Map[String, mutable.Map[String, Double]], mutable.Map[String, Double])
列印size=956
oneRandomWalk
- 隨機選一個初始頂點
- 隨機轉移,知道序列長度滿足條件
代碼我看著挺疑惑的,需要與其他的實作對比確認,
最后轉成RDD,丟到trainItem2vec的鍋里燉了,
generateUserEmb
雖然我知道只是一個超簡單的取平均,但這么騷的代碼成功征服了我,
看不懂,
FeatureEngineering
oneHotEncoderExample
+-------+--------------------+--------------------+
|movieId| title| genres|
+-------+--------------------+--------------------+
| 1| Toy Story (1995)|Adventure|Animati...|
| 2| Jumanji (1995)|Adventure|Childre...|
| 3|Grumpier Old Men ...| Comedy|Romance|
將movieId從string轉為integer
val samplesWithIdNumber = samples.withColumn("movieIdNumber", col("movieId").cast(sql.types.IntegerType))
如果.setDropLast(true),就是dummy encoding
val oneHotEncoder = new OneHotEncoderEstimator()
.setInputCols(Array("movieIdNumber"))
.setOutputCols(Array("movieIdVector"))
.setDropLast(false)
這波操作問題不大,
multiHotEncoderExample
val samplesWithGenre = samples.select(col("movieId"), col("title"),explode(split(col("genres"), "\\|").cast("array<string>")).as("genre"))


對genres進行多熱編碼
對分割、explode出來的電影體裁進行有序編碼
val genreIndexer = new StringIndexer().setInputCol("genre").setOutputCol("genreIndex")
val stringIndexerModel : StringIndexerModel = genreIndexer.fit(samplesWithGenre)
val genreIndexSamples = stringIndexerModel.transform(samplesWithGenre)
.withColumn("genreIndexInt", col("genreIndex").cast(sql.types.IntegerType))
indexSize值為19,表示某電影能擁有的最大體裁數
val indexSize = genreIndexSamples.agg(max(col("genreIndexInt"))).head().getAs[Int](0) + 1
mapreduce程序太長,拆開看:
portion 1
genreIndexSamples
.groupBy(col("movieId")).agg(collect_list("genreIndexInt").as("genreIndexes"))

portion 2
indexSize =19, 創建一個常量列
val processedSamples = genreIndexSamples
.groupBy(col("movieId")).agg(collect_list("genreIndexInt").as("genreIndexes"))
.withColumn("indexSize", typedLit(indexSize))

最后用一個叫的UDF根據形成多熱編碼
- UDF
val array2vec: UserDefinedFunction = udf {
(a: Seq[Int], length: Int) => Vectors.sparse(
length, a.sortWith(_ < _).toArray, Array.fill[Double](a.length)(1.0)
)
}

- DataFrame transform
val finalSample = processedSamples.withColumn("vector", array2vec(col("genreIndexes"), col("indexSize")))
ratingFeatures
val movieFeatures = samples.groupBy(col("movieId"))
.agg(count(lit(1)).as("ratingCount"), //注意,lit(1)
avg(col("rating")).as("avgRating"),
variance(col("rating")).as("ratingVar"))
.withColumn("avgRatingVec", double2vec(col("avgRating")))

計數、均值、方差
用一個UDF造一個向量出來,如 x → [ x ] x\rightarrow [x] x→[x]
為什么標量轉向量?為了喂給MinMaxScaler
val double2vec: UserDefinedFunction = udf {
(value: Double) => Vectors.dense(value)
}
特征工程器1:分箱
//bucketing
val ratingCountDiscretizer = new QuantileDiscretizer()
.setInputCol("ratingCount")
.setOutputCol("ratingCountBucket")
.setNumBuckets(100)
特征工程器2:歸一化
//Normalization
val ratingScaler = new MinMaxScaler()
.setInputCol("avgRatingVec")
.setOutputCol("scaleAvgRating")
管道
val pipelineStage: Array[PipelineStage] = Array(ratingCountDiscretizer, ratingScaler)
val featurePipeline = new Pipeline().setStages(pipelineStage)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/248569.html
標籤:其他
上一篇:基于storm的電商可視化大屏
