我想在執行一些操作后合并兩個資料幀的列值以在 pyspark 中創建一個新的資料幀。每個資料幀的列是具有整數值的向量。完成的操作是取資料幀向量中每個值的平均值,并找到創建的新向量的最大元素的索引。
資料框1:
|id| |value1 |
|:.| |:......|
| 0| |[0,1,2]|
| 1| |[3,4,5]|
資料框2:
|id| |value2 |
|:.| |:......|
| 0| |[1,2,3]|
| 1| |[4,5,6]|
資料框3:
|value3 |
|:............|
|[0.5,1.5,2.5]|
|[3.5,4.5,5.5]|
資料框4:
|value4|
|:.....|
|2 |
|2 |
Dataframe3是通過取dataframe 1和2的每個向量的每個元素的平均值得到的,即:dataframe3的第一個向量[0.5,1.5,2.5]由[0 1/2,1 2/2,2 3]得到/2]。Dataframe4 是通過取每個向量的最大值的索引獲得的。取 dataframe3[0.5,1.5,2.5] 最大值的第一個向量是 2.5,它出現在索引 2 處,所以 Dataframe4 中的第一個元素是 2。我們如何在 pyspark 中實作這一點。
V1:
-------------------------------------- ---
|p1 |id |
-------------------------------------- ---
|[0.01426862, 0.010903089, 0.9748283] |0 |
|[0.068229124, 0.89613986, 0.035630997]|1 |
-------------------------------------- ---
V2:
------------------------- ---
|p2 |id |
------------------------- ---
|[0.0, 0.0, 1.0] |0 |
|[2.8160464E-27, 1.0, 0.0]|1 |
------------------------- ---
當使用 df3 = v1.join(v2,on="id") 時
df3=這就是我得到的
------------------------------------- ---------------
|p1 |p2 |
------------------------------------- ---------------
|[0.02203844, 0.010056663, 0.9679049] |[0.0, 0.0, 1.0]|
|[0.039553806, 0.015186918, 0.9452593]|[0.0, 0.0, 1.0]|
------------------------------------- ---------------
什么時候
df3 = df3.withColumn( "p3", F.expr("transform(arrays_zip(p1, p2), x -> (x.p1 x.p2) / 2)"),)
df4 = df3.withColumn("p4",F.expr("array_position(p3, array_max(p3))"))
p3 是平均值。我將 df4 的所有值都設為零
uj5u.com熱心網友回復:
首先,我重新創建您的測驗資料:
a = [
[0, [0,1,2]],
[1, [3,4,5]],
]
b = ["id", "value1"]
df1 = spark.createDataFrame(a,b)
c = [
[0, [1,2,3]],
[1, [4,5,6]],
]
d = ["id", "value2"]
df2 = spark.createDataFrame(c,d)
然后,我處理資料:
- 加入
df3 = df1.join(df2, on="id")
df3.show()
--- --------- ---------
| id| value1| value2|
--- --------- ---------
| 0|[0, 1, 2]|[1, 2, 3]|
| 1|[3, 4, 5]|[4, 5, 6]|
--- --------- ---------
- 創建平均陣列
from pyspark.sql import functions as F, types as T
@F.udf(T.ArrayType(T.FloatType()))
def avg_array(array1, array2):
return list(map(lambda x: (x[0] x[1]) / 2, zip(array1, array2)))
df3 = df3.withColumn("value3", avg_array(F.col("value1"), F.col("value2")))
# OR without UDF
df3 = df3.withColumn(
"value3",
F.expr("transform(arrays_zip(value1, value2), x -> (x.value1 x.value2) / 2)"),
)
df3.show()
--- --------- --------- ---------------
| id| value1| value2| value3|
--- --------- --------- ---------------
| 0|[0, 1, 2]|[1, 2, 3]|[0.5, 1.5, 2.5]|
| 1|[3, 4, 5]|[4, 5, 6]|[3.5, 4.5, 5.5]|
--- --------- --------- ---------------
- 獲取索引(
array_position從 1 開始,-1如有必要,您可以做一個)
df4 = df3.withColumn("value4",F.expr("array_position(value3, array_max(value3))"))
df4.show()
--- --------- --------- --------------- ------
| id| value1| value2| value3|value4|
--- --------- --------- --------------- ------
| 0|[0, 1, 2]|[1, 2, 3]|[0.5, 1.5, 2.5]| 3|
| 1|[3, 4, 5]|[4, 5, 6]|[3.5, 4.5, 5.5]| 3|
--- --------- --------- --------------- ------
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/360941.html
