在網上看到了一個分析某些資料的一個帖子,于是就down下來運行,錯誤出了一大堆,本人新手一枚,求各位大神幫忙看看怎么解決這個問題

以下是原始碼:
# -*- coding:utf-8 -*-
from pyspark import SparkConf, SparkContext
import re
conf = SparkConf().setMaster("local").setAppName("MY First App")
sc = SparkContext(conf=conf)
csdnRDD = sc.textFile("data/test.txt")
tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
tmprdd2 = tmprdd1.map(lambda x: x.split("___csdn_1quot")[0])
tmprdd4 = tmprdd2.filter(lambda x: re.match("\\w+([‐+.]\\w+)*@\\w+([‐.]\\w+)*\\.\\w+([‐.]\\w+)*", str(x)))
tmprdd5 = tmprdd4.map(lambda x: str(x).split("@")[1])
tmprdd6 = tmprdd5.map(lambda x: x.lower())
tmprdd7 = tmprdd6.map(lambda x: (x, 1)).cache()
num = tmprdd7.count()
tmprdd8 = tmprdd7.reduceByKey(lambda x, y: x + y).cache()
tmprdd9 = tmprdd8.map(lambda x: [x[1], x[0]])
tmprdd10 = tmprdd9.sortBy(ascending=False, numPartitions=None, keyfunc=lambda x: x)
res = tmprdd10.take(10)
for each in res:
print(each, str((each[0] / num) * 100) + "%")
以下是錯誤代碼:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:\workspace\Python_Spark\Spark_Study\test.py", line 9, in <lambda>
tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/03/15 22:13:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:\workspace\Python_Spark\Spark_Study\test.py", line 9, in <lambda>
tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/03/15 22:13:23 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
File "C:\workspace\Python_Spark\Spark_Study\test.py", line 15, in <module>
num = tmprdd7.count()
File "C:\Python27\lib\pyspark\rdd.py", line 1008, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "C:\Python27\lib\pyspark\rdd.py", line 999, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "C:\Python27\lib\pyspark\rdd.py", line 873, in fold
vals = self.mapPartitions(func).collect()
File "C:\Python27\lib\pyspark\rdd.py", line 776, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "C:\Python27\lib\site-packages\py4j\java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Python27\lib\site-packages\py4j\protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:\workspace\Python_Spark\Spark_Study\test.py", line 9, in <lambda>
tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
uj5u.com熱心網友回復:
上邊錯誤代碼沒有貼完,這是剩下的:Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in main
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 167, in process
File "C:\eclipse\spark-2.0.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:\workspace\Python_Spark\Spark_Study\test.py", line 9, in <lambda>
tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
uj5u.com熱心網友回復:
自己查了半天也沒查出個所以然,本人新手一枚,求大神幫忙啊!!!!
uj5u.com熱心網友回復:
我是參考這個帖子搭建的Python+spark,帖子結尾的程式也運行成功了http://blog.csdn.net/hjxinkkl/article/details/57083549?winzoom=1
uj5u.com熱心網友回復:
你全是win環境代碼沒有什么太多的問題 spark環境檢查 測驗pyspark能否正常使用
再像你這樣提交spark作業
uj5u.com熱心網友回復:
遇到了和樓主一樣的問題,樓主解決了么,求教撒
uj5u.com熱心網友回復:
tmprdd1 = csdnRDD.map(lambda x: (x.split("\t")[2]))x.split("\t")會產生一個list,有些資料是例外例外,產生的list不一定會有三個元素,所以就會例外退出。
你可以使用csdnRDD.map(lambda x:x.split("\t")).filter(lambda x:len(x)<3) 看看有哪一寫例外資料,然后確定如何過濾掉這些例外資料。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/65949.html
標籤:Spark
