我創建了一個AWS EMR Spark集群,Release label:emr-6.2.0。
Hadoop分布:AmazonApplications:Spark 3.0.1, Zeppelin 0.9.0并將我所有的本地檔案(.jars, .py, .csv 和 sas7bdat)復制到集群主控端
當我這樣做時
[hadoop@ip-172-31-22-207 ~]$ ls -al /home/hadoop/sas_data1/
總數1071812。
rwxrwxr-x 2 hadoop hadoop 66 Sep 13 04:08 .
drwxr-xr-x 7 hadoop hadoop 4096 Sep 13 04:38 。
-rw-r--r-- 1 hadoop hadoop 471990272 Sep 13 04:7 file1.sas7bdat
-rw-r--r-- 1 hadoop hadoop 625541120 Sep 13 04:08 file2.sas7bdat
輸出顯示該檔案是存在的。另外,在我的程式中,它在/home/hadoop中,
def process_raw_data(inputs, output)。
spark = SparkSession.builder.
config("spark.jars.packages", "saurfang:spark-sas7bdat:3.0.0-s_2.12").
enableHiveSupport().getOrCreate()
sas_dir = f'{os.getcwd()}/sas_data1'。
for filename in os.listdir(f"{sas_dir}") 。
extension = os.path.splitext(filename)[1]
print("!!!!!!!!!!",f'{sas_dir}/{filename}')
df_spark = spark.read.format('com.github.saurfang.sas.spark')。
load(f'{sas_dir}/{filename}')
raw_df = df_spark.select('field1','field2')
raw_df.write.mode('append').parquet(output '/raw_data_output')
我正在迭代sas_data1目錄中的檔案,在輸出中,它正確顯示了檔案名為!!!!!!!!!! /home/hadoop/sas_data1/file1.sas7bdat 這只有在檔案存在的情況下才可能。但我得到的錯誤是檔案不存在。我運行了以下命令;
spark-submit --jars parso-2. 0.11.jar, spark-sas7bdat-3.0. 0-s_2.12.jar,hadoop-aws-2.7。 4.jar,aws-java-sdk-1.7.4.jar --master yarn process_raw_files.py
檔案"/home/hadoop/process_raw_files.py",行112,in <module>
output_bucket % 'raw_immigration_output')
檔案 "/home/hadoop/process_raw_files.py", line 21, in process_raw_immigration
load(f'{sas_dir}/{filename}'/span>)
檔案 "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 178, in load
檔案 "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 行 1305, in __call__
檔案 "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 行 128, in deco
檔案 "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", 行 328, in get_return_value
py4j.protocol.Py4JJavaError。同時呼叫o67.load時發生錯誤。
: java.io.FileNotFoundException。檔案不不存在。/home/hadoop/sas_data1/i94_apr16_sub.sas7bdat
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1962)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)。
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:866)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:853)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:842)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1010)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:319)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java: 315)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:327)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
at com.github.saurfang.sas.spark.SasRelation.inferSchema(SasRelation.scala:181)
at com.github.saurfang.sas.spark.SasRelation.<init>(SasRelation.scala:73)
at com.github.saurfang.sas.spark.SasRelation$.apply(SasRelation.scala:45)
at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:209)
at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:42)
at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflecture.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
原因是:org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException)。檔案不不存在。/home/hadoop/sas_data1/i94_apr16_sub.sas7bdat
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1962)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)。
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1545)
at org.apache.hadoop.ipc.Client.call(Client.java:1491)
at org.apache.hadoop.ipc.Client.call(Client.java:1388)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
at com.sun.proxy.$Proxy17.getBlockLocations(未知來源)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:324)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy18.getBlockLocations(未知來源)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:864)
... 31 more
我將這些檔案存盤在EMR集群主控的正常存盤中。為什么除錯中會顯示檔案名,但錯誤地提到檔案不存在?這是否與我沒有復制檔案的作業節點有關呢?
uj5u.com熱心網友回復:
Spark會在HDFS上尋找這些檔案。將檔案復制到HDFS并重新運行作業。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/310748.html
標籤:
