我有一個RDDwhich 是 type (String,Iterable[GenericData.Record])。現在我想根據這個 RDD 的鍵將這些迭代保存到路徑中。例如,如果 RDD 包含
("a",[1,2,3,4])
("b",[5,6,7,9])
我需要堅持 [1,2,3,4] underresult-path/a和 [5,6,7,8,9] under result-path/b。執行此操作的一種方法 - 編譯但在運行時失敗 - 如下:
implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:RDD[(String,Iterable[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)
re.forearch {
case (key,collection) =>
val reRDD = sc.makeRDD(collection)
reRDD.saveAsNewAPIHadoopFile(s"$uri/$key",
classOf[SpecificRecord],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecord]],
hadoopConf)
}
這里的問題是我不能這樣做,因為 SparkContext 不可序列化。re如此RDD呼叫foreach它必須序列化內部 lambda 并發送到作業節點。所以我試圖想一種方法可以將初始re轉換為 aMap[(String,RDD[GenericData.Record])]以便我可以執行以下操作:
implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:Map[(String,RDD[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)
re.forearch {
case (key,rddCollection) =>
rddCollection.saveAsNewAPIHadoopFile(s"$uri/$key",
classOf[SpecificRecord],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecord]],
hadoopConf)
}
uj5u.com熱心網友回復:
可以收集密鑰,并為每個密鑰過濾原始 RDD:
val re = rdd
.keys
.collect()
.map(v => v -> rdd.filter(_._1 == v).values)
.toMap
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/477859.html
