我有一種情況,我需要比較多對列(對的數量會有所不同,并且可以來自如下代碼片段所示的串列)并分別獲取匹配/不匹配的 1/0 標志。最終使用它來識別不匹配的記錄/行數和不匹配的記錄百分比
NONKEYCOLS= ['Marks', 'Qualification']
第一個影像是源 df,第二個影像是預期的 df。

[

由于回圈中的多對都會發生這種情況,因此對于大約十億條記錄來說它非常慢。需要一些有效的幫助。
我有以下代碼,但計算更改記錄的部分需要很長時間。
for ind,cols in enumerate(NONKEYCOLS):
print(ind)
print(cols)
globals()['new_dataset' '_char_changes_tmp']=globals()['new_dataset' '_char_changes_tmp']\
.withColumn("records_changed" str(ind),\
F.sum(col("records_ch_flag_" str(ind)))\
.over(w1))
globals()['new_dataset' '_char_changes_tmp']=globals()['new_dataset' '_char_changes_tmp']\
.withColumn("records_changed" str(ind),\
F.sum(col("records_ch_flag_" str(ind)))\
.over(w1))
globals()['new_dataset' '_char_changes_tmp']=globals()['new_dataset' '_char_changes_tmp']\
.withColumn("records_changed_cnt" str(ind),\
F.count(col("records_ch_flag_" str(ind)))\
.over(w1))
uj5u.com熱心網友回復:
我不確定你在運行什么回圈,但這是一個在select.
data_ls = [
(10, 11, 'foo', 'foo'),
(12, 12, 'bar', 'bar'),
(10, 12, 'foo', 'bar')
]
data_sdf = spark.sparkContext.parallelize(data_ls). \
toDF(['marks_1', 'marks_2', 'qualification_1', 'qualification_2'])
col_pairs = ['marks','qualification']
data_sdf. \
select('*',
*[(func.col(c '_1') == func.col(c '_2')).cast('int').alias(c '_check') for c in col_pairs]
). \
show()
# ------- ------- --------------- --------------- ----------- -------------------
# |marks_1|marks_2|qualification_1|qualification_2|marks_check|qualification_check|
# ------- ------- --------------- --------------- ----------- -------------------
# | 10| 11| foo| foo| 0| 1|
# | 12| 12| bar| bar| 1| 1|
# | 10| 12| foo| bar| 0| 0|
# ------- ------- --------------- --------------- ----------- -------------------
串列理解將產生以下內容
[(func.col(c '_1') == func.col(c '_2')).cast('int').alias(c '_check') for c in col_pairs]
# [Column<'CAST((marks_1 = marks_2) AS INT) AS `marks_check`'>,
# Column<'CAST((qualification_1 = qualification_2) AS INT) AS `qualification_check`'>]
編輯
根據附加(更新)資訊,您需要計算該對的不匹配記錄數,然后計算不匹配百分比。
顛倒上述邏輯來計算不匹配的記錄
col_pairs = ['marks','qualification']
data_sdf. \
agg(*[func.sum((func.col(c '_1') != func.col(c '_2')).cast('int')).alias(c '_unmatch') for c in col_pairs],
func.count('*').alias('row_cnt')
). \
select('*',
*[(func.col(c '_unmatch') / func.col('row_cnt')).alias(c '_unmatch_perc') for c in col_pairs]
). \
show()
# ------------- --------------------- ------- ------------------ --------------------------
# |marks_unmatch|qualification_unmatch|row_cnt|marks_unmatch_perc|qualification_unmatch_perc|
# ------------- --------------------- ------- ------------------ --------------------------
# | 2| 1| 3|0.6666666666666666| 0.3333333333333333|
# ------------- --------------------- ------- ------------------ --------------------------
代碼標記(為 1)該對不匹配的記錄并獲取該標志的總和 - 這為我們提供了該對的不匹配記錄計數。將其除以總行數將得出百分比。
串列理解將產生以下內容
[func.sum((func.col(c '_1') != func.col(c '_2')).cast('int')).alias(c '_unmatch') for c in col_pairs]
# [Column<'sum(CAST((NOT (marks_1 = marks_2)) AS INT)) AS `marks_unmatch`'>,
# Column<'sum(CAST((NOT (qualification_1 = qualification_2)) AS INT)) AS `qualification_unmatch`'>]
這是非常有效的,因為所有這些都發生在一個select陳述句中,該陳述句只會在 spark 計劃中投影一次,而您的方法會在您每次執行時都投影withColumn- 而且火花效率低下。
uj5u.com熱心網友回復:
df.colRegex可以很好地為您服務。如果與正則運算式匹配的列中的所有值都相等,則得到 1。該腳本很有效,因為一切都在一個中完成select。
輸入:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('p', 1, 2, 'g', 'm'),
('a', 3, 3, 'g', 'g'),
('b', 4, 5, 'g', 'g'),
('r', 8, 8, 'm', 'm'),
('d', 2, 1, 'u', 'g')],
['Name', 'Marks_1', 'Marks_2', 'Qualification_1', 'Qualification_2'])
col_pairs = ['Marks', 'Qualification']
腳本:
def equals(*cols):
return (F.size(F.array_distinct(F.array(*cols))) == 1).cast('int')
df = df.select(
'*',
*[equals(df.colRegex(f"`^{c}.*`")).alias(f'{c}_result') for c in col_pairs]
)
df.show()
# ---- ------- ------- --------------- --------------- ------------ --------------------
# |Name|Marks_1|Marks_2|Qualification_1|Qualification_2|Marks_result|Qualification_result|
# ---- ------- ------- --------------- --------------- ------------ --------------------
# | p| 1| 2| g| m| 0| 0|
# | a| 3| 3| g| g| 1| 1|
# | b| 4| 5| g| g| 0| 1|
# | r| 8| 8| m| m| 1| 1|
# | d| 2| 1| u| g| 0| 0|
# ---- ------- ------- --------------- --------------- ------------ --------------------
效率證明:
df.explain()
# == Physical Plan ==
# *(1) Project [Name#636, Marks_1#637L, Marks_2#638L, Qualification_1#639, Qualification_2#640, cast((size(array_distinct(array(Marks_1#637L, Marks_2#638L)), true) = 1) as int) AS Marks_result#646, cast((size(array_distinct(array(Qualification_1#639, Qualification_2#640)), true) = 1) as int) AS Qualification_result#647]
# - Scan ExistingRDD[Name#636,Marks_1#637L,Marks_2#638L,Qualification_1#639,Qualification_2#640]
編輯:
def equals(*cols):
return (F.size(F.array_distinct(F.array(*cols))) != 1).cast('int')
df = df.select(
'*',
*[equals(df.colRegex(f"`^{c}.*`")).alias(f'{c}_result') for c in col_pairs]
).agg(
*[F.sum(f'{c}_result').alias(f'rec_changed_{c}') for c in col_pairs],
*[(F.sum(f'{c}_result') / F.count(f'{c}_result')).alias(f'{c}_%_rec_changed') for c in col_pairs]
)
df.show()
# ----------------- ------------------------- ------------------- ---------------------------
# |rec_changed_Marks|rec_changed_Qualification|Marks_%_rec_changed|Qualification_%_rec_changed|
# ----------------- ------------------------- ------------------- ---------------------------
# | 3| 2| 0.6| 0.4|
# ----------------- ------------------------- ------------------- ---------------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/527076.html
上一篇:Python-增加兩個串列的組合
下一篇:如何從字串中洗掉所有第一篇文章?
