我開始了解 spark 并想將串列(大約 1000 個條目)轉換為 spark df。
不幸的是,我在標題中得到了提到的錯誤。我無法真正弄清楚導致此錯誤的原因,如果有人可以幫助我,我將不勝感激。到目前為止,這是我的代碼:
# Pyspark SQL library
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col,isnan, when, count
from pyspark.sql.functions import countDistinct
import findspark
findspark.init("/usr/local/spark/")
spark = SparkSession.builder \
.master("local[*]") \
.appName("project") \
.config("spark.executor.memory", "1gb") \
.getOrCreate()
comments = ["string1", "string2", "string3",...]
schema = StructType([StructField('Comments', StringType(), True),])
# Convert list to RDD
rdd = spark.sparkContext.parallelize(comments)
# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show(5)
這是完整的錯誤訊息:
Py4JJavaError Traceback (most recent call last)
<ipython-input-36-ff4bb233dd51> in <module>
5 df = spark.createDataFrame(rdd,schema)
6 print(df.schema)
----> 7 df.show(5)
/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
438 """
439 if isinstance(truncate, bool) and truncate:
--> 440 print(self._jdf.showString(n, 20, vertical))
441 else:
442 print(self._jdf.showString(n, int(truncate), vertical))
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o244.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, jupyter-h11910677, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
File "/usr/local/spark/python/pyspark/sql/session.py", line 612, in prepare
verify_func(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1408, in verify
verify_value(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 'Free PDF version\r \n\r \n[https://canvas.umn.edu/courses/188156/files/16432145](https://canvas.umn.edu/courses/188156/files/16432145) \r \n\r \nPhysical copy for purchase\r \n\r \n[https://www.akpress.org/undoing-border-imperialism.html](https://www.akpress.org/undoing-border-imperialism.html) \r \n\r \nText to speech reader for audio form\r \n\r \n[https://www.naturalreaders.com/online/](https://www.naturalreaders.com/online/) \n\nSubreddit for further discussion r/AnarchoBooks' in type <class 'str'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
File "/usr/local/spark/python/pyspark/sql/session.py", line 612, in prepare
verify_func(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1408, in verify
verify_value(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 'Free PDF version\r \n\r \n[https://canvas.umn.edu/courses/188156/files/16432145](https://canvas.umn.edu/courses/188156/files/16432145) \r \n\r \nPhysical copy for purchase\r \n\r \n[https://www.akpress.org/undoing-border-imperialism.html](https://www.akpress.org/undoing-border-imperialism.html) \r \n\r \nText to speech reader for audio form\r \n\r \n[https://www.naturalreaders.com/online/](https://www.naturalreaders.com/online/) \n\nSubreddit for further discussion r/AnarchoBooks' in type <class 'str'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
uj5u.com熱心網友回復:
您需要創建一個型別的 RDD,RDD[Tuple[str]]但在您的代碼中,該行:
rdd = spark.sparkContext.parallelize(comments)
RDD[str]當您嘗試將其轉換為具有該給定架構的資料框時,回傳然后失敗。
嘗試將該行修改為:
rdd = spark.sparkContext.parallelize([(c,) for c in comments])
請注意,您實際上可以將元組串列直接傳遞給spark.createDataFrame這樣的:
df = spark.createDataFrame([(c,) for c in comments], schema=schema)
df.show()
# --------
#|Comments|
# --------
#| string1|
#| string2|
#| string3|
# --------
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/420945.html
標籤:
上一篇:洗掉名稱不是特定值的行
