你能幫我這個代碼中的錯誤是什么嗎?該檔案確實存在,但我知道你正在 HDFS sc.textFile("/user/spark/archivo.csv") 中尋找它
或者為什么會出現這個錯誤?
執行
export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=python3
spark-submit --queue=OID Proceso_Match1.py
Python
import os
import sys
from pyspark.sql import HiveContext, Row
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
if __name__ =='__main__':
conf=SparkConf().setAppName("Spark RDD").set("spark.speculation","true")
sc=SparkContext(conf=conf)
sc.setLogLevel("OFF")
sqlContext = HiveContext(sc)
#rddCentral = sc.textFile("hdfs:///user/spark/archivo.csv")
rddCentral = sc.textFile("/user/spark/archivo.csv")
rddCentralMap = rddCentral.map(lambda line : line.split(","))
print('paso 1')
dfCentral = sqlContext.createDataFrame(rddCentralMap, ["ROWID_CDR","DURACION","FECHA_LLAMADA","FECHA_LLAMADA_2","MATCH"])
dfCentral=dfCentral.withColumn("FECHA_LLAMADA_NUM",dfCentral.FECHA_LLAMADA_2.cast(IntegerType()))
dfCentral=dfCentral.withColumn("DURACION_NUM",dfCentral.DURACION.cast(IntegerType()))
dfCentral=dfCentral.withColumn("MATCH_NUM",dfCentral.MATCH.cast(IntegerType()))
sc.stop()
錯誤日志
22/09/30 12:49:14 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
paso 1
/usr/local/bin/python3/lib/python3.7/site-packages/pandas/compat/__init__.py:124: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
warnings.warn(msg)
Traceback (most recent call last):
File "/home/aic_proceso_vfs/rjaimea/vfs_504/bin/Proceso_Match1.py", line 21, in <module>
dfCentral = sqlContext.createDataFrame(rddCentralMap, ["ROWID_CDR","DURACION","FECHA_LLAMADA","FECHA_LLAMADA_2","MATCH"])
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, cl-hdp-cdp-dn7.cse-cph.int, executor 1): java.io.IOException: Cannot run program "python3": error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
Caused by: java.io.IOException: error=2, No such file or directory
... 16 more
檔案 HDFS
hdfs dfs -ls /user/spark
Found 3 items
drwxr-xr-x - spark hdfs 0 2022-07-25 10:11 /user/spark/.sparkStaging
-rw------- 3 hadoopadmin hdfs 21 2022-09-30 12:25 /user/spark/archivo.csv
drwxrwxrwt - spark spark 0 2022-09-30 12:33 /user/spark/driverLogs

uj5u.com熱心網友回復:
我不確定,但您似乎誤用了正在創建的資料庫的架構
線
dfCentral = sqlContext.createDataFrame(rddCentralMap, ["ROWID_CDR","DURACION","FECHA_LLAMADA","FECHA_LLAMADA_2","MATCH"])
將表示您的資料幀資料的類 dict 物件作為第一個引數,將模式作為第二個引數。您剛剛給出了一個字串串列作為第二個引數。
為了創建具有某種模式的資料框,您必須構造該串列中的欄位,然后構造該串列
所以你的程式看起來像這樣
import os
import sys
from pyspark.sql import HiveContext, Row
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import * #where structType and structField come from
if __name__ =='__main__':
conf=SparkConf().setAppName("Spark RDD").set("spark.speculation","true")
sc=SparkContext(conf=conf)
sc.setLogLevel("OFF")
sqlContext = HiveContext(sc)
#rddCentral = sc.textFile("hdfs:///user/spark/archivo.csv")
rddCentral = sc.textFile("/user/spark/archivo.csv")
rddCentralMap = rddCentral.map(lambda line : line.split(","))
print('paso 1')
dfFields = ["ROWID_CDR","DURACION","FECHA_LLAMADA","FECHA_LLAMADA_2","MATCH"]
dfSchema = StructType([StructField(field_name, StringType(), True) for field_name in dfFields])
dfCentral = sqlContext.createDataFrame(rddCentralMap, dfSchema)
dfCentral=dfCentral.withColumn("FECHA_LLAMADA_NUM",dfCentral.FECHA_LLAMADA_2.cast(IntegerType()))
dfCentral=dfCentral.withColumn("DURACION_NUM",dfCentral.DURACION.cast(IntegerType()))
dfCentral=dfCentral.withColumn("MATCH_NUM",dfCentral.MATCH.cast(IntegerType()))
sc.stop()
或者,createDataFrame 函式將RDD作為第一個引數。映射通過讀取檔案創建的 RDD 也可能導致您的問題
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/511159.html
