我不明白為什么我的代碼不作業了。最后一行是問題所在:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql.type import StringType
from pyspark import SQLContext
conf=SparkConf().setMaster("local"/span>).setAppName("mein soft"/span>)
sc=SparkContext(conf=conf)
sqlContext=SQLContext(sc)
lines=sc.textFile("File.txt")
#lines.repartition(3)/span>
lines.getNumPartitions()
def lan_map(x)。
if "word1" and "word2" in x。
return ("count",(1,1)
elif "word1" in x。
return ("Count",("1,0")
elif "word2" in x。
return ("Count",("0,1")
else:
return ("Count", ("0,0"))
mapfun=lines.map(lan_map)
mapfun. reduceByKey(lambda x, y: (x[0] y[0], x[1] y[1]) 。) 收集()
而錯誤:
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call 最后一次)在 1 #Esto resume lo que se hicimos 3 celdas atrás. ----> 2 mapfun.reduceByKey(lambda x,y: (x[0] y[0], x[1] y[1])).collect() 3 4 #mapfun.reduceByKey(noMeFuncaLambdaAsiQueHagoEsto(mapfun.x,mupfun.y))。 5 #這樣我們就可以直接回憶一下,有多少次出現了 "Python",有多少次出現了 "Spark"
。C:spark-3.1.2-bin-hadoop3.2pythonpyspark dd.py in collect(self) 947 """ 948 with SCCallSiteSync(self.context) as css: --> 949 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd() ) 950 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 951
C:spark-3.1.2-bin-hadoop3.2pythonlibpy4j-0.10.9-src.zippy4jjava_gateway.py in call(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name ) 1306
C:spark-3.1.2-bin-hadoop3.2pythonpysparksqlutils.py in deco(*a, **kw) 109 def deco(*a, **kw): 110 try: --> 111 return f(*a, **kw) 112 except py4j.protocol.Py4JJavaError as e: 113 converted = convert_exception(e.java_exception)
C:spark-3.1.2-bin-hadoop3.2pythonlibpy4j-0.10.9-src.zippy4jprotocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 如果答案[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "呼叫{0}{1}{2}時發生錯誤。 ". 328 format(target_id, ".", name), value)
。Py4JJavaError。在呼叫 z:org.apache.spark.api.python.PythonRDD.collectAndServe。: org.apache.spark.SparkException。作業因階段性失敗而中止。 階段0.0中的任務0失敗了1次,最近一次失敗。丟失任務0.0 (TID 0) (LAPTOP-PB7QDPVE執行器驅動)。 org.apache.spark.api.python.PythonException。回溯(最近一次 最后一次呼叫)。) 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkworker.py" 。 第604行,在main中 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkworker.py"。 第594行,in process 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonpyspark dd.py",第2916行,在 管道_func return func(split, prev_func(split, iterator)) 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonpyspark dd.py",第2916行,在 管線_func return func(split, prev_func(split, iterator)) 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonpyspark dd.py",第418行,在 函式 return f(iterator) File "C:spark-3.1.2-bin-hadoop3.2pythonpyspark dd.py",第2144行,在 合并本地 merge.mergeValues(iterator) 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkshuffle.py"。 第242行,在mergeValues中 d[k] = comb(d[k], v) if k in d else creator(v) File "C:spark-3.1.2-bin-hadoop3.2pythonpysparkutil.py", line 73, in 封裝器 return f(*args, **kwargs) File "", line 2, in TypeError: unsupported operand type(s) for : 'int'和'str'
。at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517) 在 org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652) 在 org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635) 在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470) 在 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 在 scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209) 在 scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) 在 org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(未知來源) 在java.util.concurrent.ThreadPoolExecutor$Worker.run(未知來源) 在java.lang.Thread.run(Unknown Source)
。驅動程式的堆疊跟蹤:在 org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206) 在 scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 在 scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2261) 在 org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) 在 org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180) 在 org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(未知來源) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)在 java.lang.reflect.Method.invoke(Unknown Source) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Unknown Source) 導致的。 org.apache.spark.api.python.PythonException。回溯(最近一次 最后一次呼叫)。) 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkworker.py" 。 第604行,在main中 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkworker.py"。 第594行,in process 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonpyspark dd.py",第2916行,在 管道_func return func(split, prev_func(split, iterator)) 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonpyspark dd.py",第2916行,在 管線_func return func(split, prev_func(split, iterator)) 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonpyspark dd.py",第418行,在 函式 return f(iterator) File "C:spark-3.1.2-bin-hadoop3.2pythonpyspark dd.py",第2144行,在 合并本地 merge.mergeValues(iterator) 檔案 "C:spark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkshuffle.py"。 第242行,在mergeValues中 d[k] = comb(d[k], v) if k in d else creator(v) File "C:spark-3.1.2-bin-hadoop3.2pythonpysparkutil.py", line 73, in 封裝器 return f(*args, **kwargs) File "", line 2, in TypeError: unsupported operand type(s) for : 'int'和'str'
。at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517) 在 org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652) 在 org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635) 在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470) 在 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 在 scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209) 在 scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) 在 org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(未知來源) at java.util.concurrent.ThreadPoolExecutor$Worker.run(未知來源) ... 還有1個
我感到非常迷茫,我甚至不能從我的funmap中回傳一個占有率。我的意思是,這應該是可行的:
mapfun[1]
我試著用一個函式來代替。但我失敗得更慘:
def fun2(x,y)。
x[0] y[0]
x[1] y[1]
mapfun.reduceByKey(fun2(x,y)).collect()
uj5u.com熱心網友回復:
你收到的是錯誤的資訊
TypeError: unsupported operand type(s) for : 'int' and 'str'
因為你的元組值是字串,即("1,0")而不是(1,0),Python目前不會應用這個運算子 或添加int和str(字串)資料型別。
此外,在你的map函式中,當你的x中有"word1 "和 "word2 "時,似乎有一個邏輯錯誤,因為這將只檢查"word2"是否在x。我建議采用以下重寫方式:
def lan_map(x)。
if "word1" in x and "word2" in x:
return ("count",(1,1)
elif "word1" in x。
return ("Count",(1,0)
elif "word2" in x:
return ("Count",(0,1)
else:
return ("Count",(0, 0)
或者可能更短
def lan_map(x)。
return ("count"/span>, (
1 if "word1" in x else0,
1 if "word2" in x else 0.
))
讓我知道這是否對你有用。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/310753.html
標籤:
