以前,資料框是這樣的
---------- --------------------
| appId| lang|
---------- --------------------
|1000098520| ["EN"]|
|1001449696| ["EN"]|
|1001780528|["AR","ZH","CS","...|
|1001892954| ["EN"]|
|1001892954| ["EN"]|
|1001976488|["EN","FR","DE","...|
|1002028916| ["EN"]|
|1002908393| ["EN"]|
|1003066972|["EN","FR","DE","...|
|1004217104| ["EN"]|
|1004552566| ["EN"]|
|1005192468| ["EN"]|
|1005488142|["EN","JA","KO","...|
root
|-- appId: string (nullable = true)
|-- lang: string (nullable = true)
我嘗試使用 json.loads() 轉換為帶有字串的陣列。但我認為它在某種程度上不符合 json .. 我怎樣才能將它轉換為字串陣列?
---------- -------------------- --------
| appId| lang|len_lang|
---------- -------------------- --------
|1000098520| [EN]| 1|
|1001449696| [EN]| 1|
|1001780528|[AR, ZH, CS, NL, ...| 25|
|1001892954| [EN]| 1|
|1001892954| [EN]| 1|
|1001976488| [EN, FR, DE, ES]| 4|
|1002028916| [EN]| 1|
|1002908393| [EN]| 1|
我有這個資料框。該lang列以前是字串型別,但我使用 udf json.loads() 將其轉換為陣列型別。然后我想過濾只有'EN'作為語言的appids,即陣列大小== 1并且只包含'EN'。
我試圖用F.size(F.col('lang'))==1 & F.array_contains(F.col('lang','EN'))...做一個 where() 陳述句,但我收到這個錯誤
21/11/19 10:07:32 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/../server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
eval_type = read_int(infile)
File "/../spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:260)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 352, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 142, in dump_stream
for obj in iterator:
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 341, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
return lambda *a: f(*a)
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "/var/folders/w1/16mcxl3d3zg1831vc7k73fnh0000gn/T/ipykernel_49586/324191451.py", line 5, in <lambda>
File "/var/folders/w1/16mcxl3d3zg1831vc7k73fnh0000gn/T/ipykernel_49586/324191451.py", line 2, in parse_array_from_string
File "/../.pyenv/versions/3.7.9/lib/python3.7/json/__init__.py", line 348, in loads
return _default_decoder.decode(s)
File "/../.pyenv/versions/3.7.9/lib/python3.7/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/../.pyenv/versions/3.7.9/lib/python3.7/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)```
uj5u.com熱心網友回復:
使用from_json如下例所示的函式。
import pyspark.sql.functions as F
.....
df = df.withColumn('lang', F.expr('from_json(lang,"array<string>")')).select('*', F.size('lang').alias('len_lang'))
df.printSchema()
df.show(truncate=False)
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/360750.html
標籤:Python 数组 阿帕奇火花 火花 apache-spark-sql
