1.本地向量
本地向量的基類是 Vector,我們提供了兩個實作 DenseVector 和 SparseVector。我們建議通過 Vectors中實作的工廠方法來創建本地向量:(注意:Scala語言默認引入的是 scala.collection.immutable.Vector,為了使用MLlib的Vector,你必須顯示引入org.apache.spark.mllib.linalg.Vector。)
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values
corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))2.含類標簽的點
含有類標簽的點通過case class LabeledPoint來表示。
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
3.稀疏資料Sparse data
實際運用中,稀疏資料是很常見的。MLlib可以讀取以LIBSVM格式存盤的訓練實體,LIBSVM格式是 LIBSVM 和 LIBLINEAR的默認格式,這是一種文本格式,每行代表一個含類標簽的稀疏特征向量。格式如下:
label index1:value1 index2:value2 ...
索引是從 1 開始并且遞增。加載完成后,索引被轉換為從 0 開始。
通過 MLUtils.loadLibSVMFile讀取訓練實體并以LIBSVM 格式存盤。
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
4.本地矩陣
一個本地矩陣由整型的行列索引資料和對應的 double 型值資料組成,存盤在某一個機器中。MLlib 支持密集矩陣(暫無稀疏矩陣!),物體值以列優先的方式存盤在一個 double陣列中。
本 地 矩 陣 的 基 類 是 Matrix , 我 們 提 供 了 一 個 實 現 DenseMatrix 。 我 們 建 議 通過 Matrices 中實作的工廠方法來創建本地矩陣:
import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))5.分布式矩陣
一個分布式矩陣由 long 型行列索引資料和對應的 double 型值資料組成,分布式存盤在一個或多個 RDD 中。對于巨大的分布式的矩陣來說,選擇正確的存盤格式非常重要。將一個分布式矩陣轉換為另一個不同格式需要全域洗牌(shuffle),所以代價很高。目前,實作了三類分布式矩陣存盤格式。最基本的型別是 RowMatrix。一個 RowMatrix 是一個面向行的分布式矩陣,其行索引是沒有具體含義的。比如一系列特征向量的一個集合。通過一個 RDD 來代表所有的行,每一行就是一個本地向量。對于 RowMatrix,我們假定其列數量并不巨大,所以一個本地向量可以恰當的與驅動節點(driver)交換資訊,并且能夠在某一節點中存盤和操作。
IndexedRowMatrix 與 RowMatrix 相似,但有行索引,可以用來識別行和進行 join 操作。而 CoordinateMatrix 是一個以三元組串列格式(coordinate list ,COO)存盤的分布式矩陣,其物體集合是一個 RDD。注 意 : 因 為 我 們 需 要 緩 存 矩 陣 大 小 , 分 布 式 矩 陣 的 底 層 RDD 必 須 是 確 定 的(deterministic)。通常來說,使用非確定的 RDD(non-deterministic RDDs)會導致錯誤。
5.1 面向行的分布式矩陣(RowMatrix)
一個 RowMatrix 是一個面向行的分布式矩陣,其行索引是沒有具體含義的。比如一系列特征向量的一個集合。通過一個 RDD 來代表所有的行,每一行就是一個本地向量。既然每一行由一個本地向量表示,所以其列數就被整型資料大小所限制,其實實踐中列數是一個很小的數值。
一個 RowMatrix可從一個RDD[Vector]實體創建。然后我們可以計算出其概要統計資訊。
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
5.2行索引矩陣(IndexedRowMatrix)
IndexedRowMatrix 與 RowMatrix 相似,但其行索引具有特定含義,本質上是一個含有索引資訊的行資料集合(an RDD of indexed rows)。每一行由 long 型索引和一個本地向量組成。一個 IndexedRowMatrix可從一個RDD[IndexedRow]實體創建,這里的 IndexedRow是 (Long, Vector) 的 封 裝 類 。 剔 除 IndexedRowMatrix 中 的 行 索 引 信 息 就 變 成 一 個RowMatrix。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()5.3三元組矩陣(CoordinateMatrix)
一個 CoordinateMatrix 是一個分布式矩陣,其物體集合是一個 RDD。每一個物體是一個(i: Long, j: Long, value: Double)三元組,其中 i 代表行索引,j 代表列索引,value 代表物體的值。只有當矩陣的行和列都很巨大,并且矩陣很稀疏時才使用 CoordinateMatrix。
一個 CoordinateMatrix可從一個RDD[MatrixEntry]實體創建,這里的 MatrixEntry是 (Long, Long, Double) 的 封 裝 類 。 通 過 調 用 toIndexedRowMatrix 可 以 將 一 個CoordinateMatrix轉變為一個IndexedRowMatrix(但其行是稀疏的)。目前暫不支持其他計算操作。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()
uj5u.com熱心網友回復:
你好 請問scala> val rows: RDD[Vector] = ... // an RDD of local vectors
<console>:1: error: illegal start of simple expression
val rows: RDD[Vector] = ... // an RDD of local vectors
這段代碼為什么會報錯啊?? 我是復制你的代碼 但是 “...” 這個符號感覺有點不對
謝謝
uj5u.com熱心網友回復:
您好,我能跟您約一些spark方面的稿件嗎uj5u.com熱心網友回復:
...是引數,你得自己輸檔案地址uj5u.com熱心網友回復:
uj5u.com熱心網友回復:
深入淺出Spark機器學習實戰(用戶行為分析)課程觀看地址:http://www.xuetuwuyou.com/course/144
課程出自學途無憂網:http://www.xuetuwuyou.com
一、課程目標
熟練掌握SparkSQL的各種操作,深入了解Spark內部實作原理
深入了解SparkML機器學習各種演算法模型的構建和運行
熟練Spark的API并能靈活運用
能掌握Spark在作業當中的運用
二、適合人群
適合給,有java,scala基礎,想往大資料spark機器學習這塊發展
適合給想學習spark,往資料倉庫,大資料挖掘機器學習,方向發展的學員
三、課程用到的軟體及版本:
Spark2.0,Spark1.6.2,STS,maven,Linux Centos6.5,mysql,mongodb3.2
四、課程目錄:
課時1:Spark介紹
課時2:Spark2集群安裝
課時3:Spark RDD操作
課時4:SparkRDD原理剖析
課時5:Spark2sql從mysql中匯入
課時6:Spark1.6.2sql與mysql資料互動
課時7:SparkSQL java操作mysql資料
課時8:Spark統計用戶的收藏轉換率
課時9:Spark梳理用戶的收藏以及訂單轉換率
課時10:最侄訓取用戶的收藏以及訂單轉換率
課時11:Spark Pipeline構建隨機森林回歸預測模型
課時12:Spark 隨機森林回歸預測結果并存盤進mysql
課時13:Spark對收藏轉預測換率與真正的轉換率對比,以及決策樹模型構建
課時14:Spark機器學習對各種監督與非監督分類學習詳細介紹
課時15:Spark協同過濾演算法,構建用戶與產品模型
課時16:Spark協同演算法完成給用戶推薦產品
課時17:mongodb的安裝以及其基本操作
課時18:Spark與mongodb整合
課時19:Spark預測收藏以及給用戶推薦的產品存盤進mongodb
課時20:操作RDD需要注意點,以及Spark記憶體分配資源調優
課時21:Spark整個學習程序及其總結
推薦組合學習:《國內首部系統性介紹Scala語言培訓課程》
課程觀看地址:http://www.xuetuwuyou.com/course/12
Spark+Kafka 實時流機器學習實戰
課程觀看地址:http://www.xuetuwuyou.com/course/147
uj5u.com熱心網友回復:
/*** RowMatrix 行矩陣
*/
val rdd1 = sc.parallelize(Array(Array(1.0, 2.0, 3.0, 4.0), Array(2.0, 3.0, 4.0, 5.0), Array(3.0, 4.0, 5.0, 6.0)))
.map(f => Vectors.dense(f))
val rows: RDD[Vector] = rdd1 // an RDD of local vectors
println("本地向量RDD: "+rows)
// 從一個向量RDD創建一個行矩陣
val mat = new RowMatrix(rows)
// 獲得它的size
val m = mat.numRows()
val n = mat.numCols()
println("numRows: "+m+"\n"+"numCols: "+n)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/56393.html
標籤:Spark
