我有一個資料框roles和ids扮演這些角色的人。在下表中,角色為a,b,c,d,人員為a3,36,79,38。
我想要的是人們到他們的角色陣列的地圖,如表格右側所示。
--- ---- ---- --- --- --------
|rec| a| b| c| d| ppl | pplmap
--- ---- ---- --- --- -------- -------------------------------------
| D| a3| 36| 36| 36|[a3, 36]| [ a3 -> ['a'], 36 -> ['b','c','d'] ]
| E| a3| 79| 79| a3|[a3, 79]| [ a3 -> ['a','d'], 79 -> ['b','c'] ]
| F|null|null| 38| 38| [38]| [ 38 -> ['c','d'] ]
--- ---- ---- --- --- --------
而且,實際上,我真正想要的是一個可讀性很好的報告,例如:
D
a3 roles: a
36 roles: b, c, d
E
a3 roles: a, d
79 roles: b, c
F
38 roles: c, d
我正在使用 PySpark 3。
有什么建議?謝謝!!
uj5u.com熱心網友回復:
您可以首先取消旋轉資料框,然后使用一些 groupby 來構建您想要的地圖列。
輸入資料框:
data = [
("D", "a3", "36", "36", "36", ["a3", "36"]),
("E", "a3", "79", "79", "a3", ["a3", "79"]),
("F", None, None, "38", "38", ["38"]),
]
df = spark.createDataFrame(data, ["id", "a", "b", "c", "d", "ppl"])
使用stack函式去旋轉和map_from_entries分組后:
import pyspark.sql.functions as F
df1 = df.selectExpr(
"id",
"stack(4, 'a', a, 'b', b, 'c', c, 'd', d) as (role, person)"
).filter(
"person is not null"
).groupBy("id", "person").agg(
F.collect_list("role").alias("roles")
).groupBy("id").agg(
F.map_from_entries(
F.collect_list(F.struct(F.col("person"), F.col("roles")))
).alias("pplmap")
)
df1.show(truncate=False)
# --- ----------------------------
#|id |pplmap |
# --- ----------------------------
#|F |{38 -> [c, d]} |
#|E |{79 -> [b, c], a3 -> [a, d]}|
#|D |{a3 -> [a], 36 -> [b, c, d]}|
# --- ----------------------------
如果你想動態生成堆疊運算式(如果你有很多角色列),你可以在這里看到我的另一個答案。
uj5u.com熱心網友回復:
設定:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
df = pd.DataFrame({
'rec': list('DEF'),
'a': ['a3', 'a3', None],
'b': [36, 79, None],
'c': [36, 79, 38],
'd': [36, 55, 38]
})
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()
df = spark.createDataFrame(df)
然后相應地融化DataFrame,按值分組并按鍵聚合:
cols_to_melt = list('abcd')
res = df.withColumn(
"tmp",
explode(array(
[struct(lit(c).alias('key'), col(c).alias('val'))
for c in cols_to_melt]))) \
.select('rec', col('tmp.key'), col('tmp.val')) \
.dropna() \
.groupby(['rec', 'val']) \
.agg(collect_list('key').alias('keys')) \
.groupby('rec') \
.agg(map_from_entries(collect_list(struct("val","keys"))).alias('maps'))
res.show(truncate=False)
輸出:
--- ----------------------------------------------
|rec|maps |
--- ----------------------------------------------
|F |{38 -> [c, d], NaN -> [b]} |
|E |{79 -> [c], 79.0 -> [b], a3 -> [a], 55 -> [d]}|
|D |{36.0 -> [b], a3 -> [a], 36 -> [c, d]} |
--- ----------------------------------------------
要獲得報告,您只需遍歷收集的資料:
for row in res.collect():
print(row.rec)
print('\n'.join(f" {k} roles: {', '.join(v)}" for k, v in row.maps.items()))
那么你的最終報告應該是這樣的:
F
38 roles: c, d
NaN roles: b
E
55 roles: d
79 roles: c
a3 roles: a
79.0 roles: b
D
36.0 roles: b
a3 roles: a
36 roles: c, d
我在這里沒有處理的一個問題是,您的一列同時包含數字和字串值,這在 spark 中是不可能的。
如果您要將 Pandas DataFrame 轉換為 spark DataFrame(就像我在我的示例中所做的那樣),您應該傳遞一個顯式schema。
如果您從 CSV 檔案中讀取資料,則可能不必這樣做 - 型別將被自動推斷為String.
但是,在這種情況下,為了對某些具有類似值38和其他值的列進行分組,"38"您應該確保所有相關的數字列也轉換為String.
因此,在任何情況下,最好使用模式來確保您在 DataFrame 中獲得所需的型別。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/378880.html
標籤:Python 阿帕奇火花 火花 apache-spark-sql 熔化
