總的來說,我對資料幀和 pyspark 完全陌生。
在 python 中,我想做的事情很簡單 - 但是我似乎找不到一種使用 pyspark 不需要很長時間的方法。
我有一個包含大約 4000 行的 pyspark 資料框,其架構如下:
root
|-- waveformData: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: double (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
每個陣列大約有 20000 個雙打。
搜索此陣列以查找最大值和閾值(最大值的 50% 的第一個實體)只需要很少的時間 - 但僅當資料處于“正常”格式(numpy 陣列)時。
我正在使用一個基本的:
wav_df = temp_data.select("waveformData").toPandas()
wav = wav_df.to_numpy()[0][0].get("elements")
然后搜索最大值/閾值
但是“toPandas”步驟需要永遠(例如單行 30 秒)
為什么?
我一直在嘗試對 pyspark 資料框進行操作,以避免使用 .collect 等進行這種轉換,但我嘗試的一切都需要很長時間。
如果 pyspark 是用于大資料的,我一定是做錯了,這不可能是這種資料量的正常處理時間。
我錯過了什么?
uj5u.com熱心網友回復:
我創建了一些隨機測驗資料
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as F
import numpy as np
np.random.seed(123)
spark = SparkSession.builder.getOrCreate()
n = 20000
for i in range(100):
df = spark.createDataFrame([
Row(waveFormData=Row(elements=[float(v) for v in np.random.randn(n)], dimensions=[n])) for i in range(40)
])
df.write.parquet('waveFormData.parquet', mode='append')
當我加載資料并選擇在 2 秒內運行的陣列的最大值時:
df = spark.read.parquet('waveFormData.parquet')
df.select(F.array_max('waveFormData.elements')).toPandas()
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/371342.html
下一篇:機器學習訓練資料和未知值查詢
