我的 lambda 函式通過 boto3 glue.start_job_run 觸發粘合作業
這是我的膠水作業腳本
from awsglue.utils import getResolvedOptions
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.functions import col, regexp_extract, max
conf = SparkConf().setAppName("pyspark-etl")
sc = SparkContext.getOrCreate(conf=conf)
args = getResolvedOptions(sys.argv,['s3_target_path_key','s3_target_path_bucket'])
bucket = args['s3_target_path_bucket']
fileName = args['s3_target_path_key']
inputFilePath = f"s3a://{bucket}/{fileName}"
finalFilePath = f"s3a://glu-job-final-juiceb"
print(bucket, fileName)
rdd = sc.textFile(inputFilePath)
rdd = rdd.flatMap(lambda x: x.split(" ")).map(lambda x : (x.split(" ")[0], 1)).reduceByKey(add)
df = rdd.toDF(schema=('rawEntities string, Count int'))
df = df.withColumn("Entities", regexp_extract(col("rawEntities"),'[^!".?@:,\'*…_()] ',0))
df = df.filter(col("Entities") != "")
df = df.select("Entities","Count").groupBy("Entities").agg(max("Count").alias("Count"))
df.write.mode("append").options(header='True').parquet(finalFilePath)
Glue 作業錯誤訊息是“AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
谷歌搜索后,我注意到膠水“toDF”表示DynamicFrame到DataFrame。
這并不意味著 RDD 到 DataFrame。
如何在膠水中將 RDD 轉換為 DataFrame?
uj5u.com熱心網友回復:
您不能使用toDF(). 通過使用toDF()方法,我們無法控制模式自定義。話雖如此,使用createDataFrame()方法我們可以完全控制模式定制。
見下面的邏輯 -
from pyspark.sql.types import *
schema = StructType([ StructField('rawEntities', StringType()), StructField('Count' , IntegerType())])
df = spark.createDataFrame(data=<your rdd>, schema = schema)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/447536.html
