def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("SparkAndHive")
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse 2")
.enableHiveSupport()
.getOrCreate()
GeoSparkSQLRegistrator.registerAll(spark.sqlContext)
val sparkConf: SparkConf = new SparkConf().setAppName("Spark RDD foreach Example").setMaster("local[2]").set("spark.executor.memory", "2g")
def displayFiles(files: Array[File], a: util.List[String], b: util.List[String]): Unit = {
for (filename <- files) { // If a sub directory is found,
if (filename.isDirectory) if (filename.getName.contains("fire")) {
rds.add(filename.getAbsolutePath)
println(filename.getAbsolutePath)
}
else if (filename.getName.contains("water")){
rdd.add(filename.getAbsolutePath)
println(filename.getAbsolutePath)
}
else {
displayFiles(filename.listFiles, a, b)
}
}
}
val files = new File("C://folder").listFiles
val list1 = new util.ArrayList[String]
val list2 = new util.ArrayList[String]
displayFiles(files, list1, list2)
val a= Seq(list1)
println(a)
val b= Seq(list2)
println(b)
val rdd1 = spark.sparkContext.parallelize(Seq(a))
rdd1.foreach(rrd)
val rdd2 = spark.sparkContext.parallelize(Seq(a))
rdd1.foreach(rrd2)
val dfSeq1 = Seq(rdd1)
println(dfSeq1)
val mergeSeqDf1 = dfSeq1.reduce(_ union _)
mergeSeqDf1.show()
val dfSeq2 = Seq(rdd2)
println(dfSeq2)
val mergeSeqDf2 = dfSeq2.reduce(_ union _)
mergeSeqDf2.show()
我創建了一個包含子檔案夾路徑的串列,其中包含“fire”串列看起來像 List("C//1_fire", "C//2_fire", "C//3_fire")
并創建了其他具有包含“水”串列的子檔案夾路徑的串列看起來像 List("C//1_water", "C//2_water", "C//3_water")
我為串列創建了 RDD 并列印,然后顯示 List("C//1_fire", "C//2_fire", "C//3_fire") for fire 和 List("C//1_water", "C/ /2_water", "C//3_water") 用于水。
然后我合并了 rdd1 中的所有火 RDD 和 rdd2 中的所有水 RDD 但我收到錯誤顯示為“值顯示不是 org.apache.spark.rdd.RDD[java.util.ArrayList[String] 的成員] ] 合并SeqDf1.show()"
如何將RDD轉換為資料框以顯示資料框
資料框結構
>
>>person1
>>>a_fire
>>>>a_fire
>>>>>1_fire
>>>>>2_fire
>>>>>3_fire
>>>>>4_fire
>>>>a_water
>>>>>1_water
>>>>>2_water
>>>>>3_fire
>>>>>4_fire
>>person2
>>>b_fire
>>>>b_fire
>>>>>1_fire
>>>>>2_fire
>>>>>3_fire
>>>>>4_fire
>>>>b_water
>>>>>1_water
>>>>>2_water
>>>>>3_fire
>>>>>4_fire
uj5u.com熱心網友回復:
Spark 有 3 個主要概念-RDD和.DataSetDataFrame
所以假設你有一個簡單的元組串列
// list of tuple (String, String)
// these tupele are contain id and name of people
val list: List[(String, String)] =
List(
("1", "abc"),
("2", "def")
)
RDDAPI 最容易獲得并且可以通過SparkContext. 你只需要spark-core在你的專案中有一個依賴項。
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("appName").setMaster("local[*]")
// people generally use `sc` variable to refer to `SparkContext`
val sc = new SparkContext(conf)
val rdd: RDD[(String, String)] = sc.parallelize(list)
對于DataSet并且DataFrame您還需要spark-sql在專案中添加為依賴項。這SparkContext還不夠,您還需要一個SparkSession.
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
// people generally use `spark` variable to refer to `SparkSession`
val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
// you can get the SparkContext from SparkSession
val sc = spark.sparkContext
// then you import the implicits required for working with DataSet API
import spark.implicits._
// rdd of tuple (String, String)
val rdd: RDD[(String, String)] = sc.parallelize(list)
// you can get a DataSet of tuple (String, String)
val ds1: Dataset[(String, String)] = rdd.toDS()
ds1.show()
// --- ---
//| _1| _2|
// --- ---
//| 1|abc|
//| 2|def|
// --- ---
現在,DataFrame實際上只是 a 的另一個名稱DataSet[Row],其中Row是另一個包含列的 Spark 資料結構。
// convert to df without giving specific column names
// the Rows will use the tuple index as column names
val df1: DataFrame = rdd.toDF()
df1.show()
// --- ---
//| _1| _2|
// --- ---
//| 1|abc|
//| 2|def|
// --- ---
// remember DataFrame is jut a name for DataSet[Row]
val df11: Dataset[Row] = rdd.toDF()
df11.show()
// --- ---
//| _1| _2|
// --- ---
//| 1|abc|
//| 2|def|
// --- ---
但是,您也可以提供列名
val df2: DataFrame = rdd.toDF("id", "name")
df2.show()
// --- ----
//| id|name|
// --- ----
//| 1| abc|
//| 2| def|
// --- ----
除了使用 a DataFrame(即 a DataSet[Row]),您還可以使用您的域特定資料結構。
case class Person(id: String, name: String)
val ds2: Dataset[Person] = rdd.map(t => Person(t._1, t._2)).toDS()
ds2.show()
// --- ----
//| id|name|
// --- ----
//| 1| abc|
//| 2| def|
// --- ----
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/518341.html
下一篇:通過多個變化的值對資料框進行排序
