資料框架由兩列(s3ObjectName,batchName)組成,有數萬行,如:-
目標是使用foreachPartition()和foreach()函式從S3桶中檢索物件,并使用資料框架中每一行的詳細資訊并行寫入datalake中
//s3 connector details defined as an object so it can be serialized and available on all executors in the cluster
object container{
def getDataSource()= {
val AccessKey = dbutils.secrets.get(scope = "ADBTEL_Scope"/span>, key = "Telematics-TrueMotion-AccessKey-ID")
val SecretKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-Secret")
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
val clientRegion: Regions = Regions.US_EAST_1.
AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build()
}
}
dataframe.foreachPartition(partition => {
//為每個磁區初始化s3連接。
val client。AmazonS3 = container.getDataSource()
partition.foreach(row => {
val s3ObjectName = row.getString(0)
val batchname = row.getString(1)
val inputS3Stream = client.getObject("s3bucketname", s3ObjectName).getObjectContent
val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8" )
val filePath = s"/dbfs/mnt/test/${batchname}/${s3ObjectName}"
val file = new File(filePath)。
val fileWriter = new FileWriter( file)
val bw = new BufferedWriter( fileWriter)
bw.write(inputS3String)
bw.close()
fileWriter.close()
})
})
上述程序給了我
錯誤:foreach的值不是Object的成員
。
uj5u.com熱心網友回復:
在呼叫foreachPartition之前將Dataframe轉換為RDD。
dataframe.rdd.forachPartition(partition => {
})
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/309102.html
標籤:
