我正在嘗試使用 pyspark 加入 2 個資料幀,其中資料幀 1 具有來自查找資料幀的多條資料記錄。
>>> df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_cloumn'])
>>> df.show()
--- -- ------ -------------------
| col1| col2| error_cloumn |
--- -- ------ -------------------
| 1 | 4| date_from,date_to|
| 1 | 8| emp_name |
--- -- ------ -------------------
>>> look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text msg1'),('date_to', 'DD-MM-YY', 'test msg2'),('emp_name', 'VARCHAR(100)', 'test msg3'),('emp_type', 'VARCHAR(100)', 'test msg4')], ['column_nm', 'clmn1', 'comment'])
>>> look_up_df.show()
--- -- ------ --------------------------
| 'column_nm'|'clmn1' |'comment' |
--- -- ------ --------------------------
|'date_from' |'DD-MM-YY' | 'text msg1'|
| 'date_to' |'DD-MM-YY' | 'test msg2'|
| 'emp_name' |'VARCHAR(100)'| 'test msg3'|
| 'emp_type' |'VARCHAR(100)'| 'test msg4'|
--- -- ------ --------------------------
Expected output : error_desc as look_up_df[column_nm] lit('expected') look_up_df[clmn1] lit('and comment is') look_up_df[comment]
output_df:
--- -- ------ ------------------- ----------------------------------------------------------------------------------------------------------------- -
| col1| col2| error_cloumn | error_desc |
--- -- ------ ------------------- ----------------------------------------------------------------------------------------------------------------- -
| 1 | 4| date_from,date_to|date_from expected as DD-MM-YY and comment is text msg1, date_to expected as DD-MM-YY and comment is text msg2 |
| 1 | 8| emp_name |emp_name should be VARCHAR(100) and comment is test msg3 |
--- -- ------ ------------------- ----------------------------------------------------------------------------------------------------------------- -
我正在嘗試使用打擊代碼:
from pyspark.sql import functions as F
df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_cloumn'])
df.show()
look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text msg1'),('date_to', 'DD-MM-YY', 'test msg2'),('emp_name', 'VARCHAR(100)', 'test msg3'),('emp_type', 'VARCHAR(100)', 'test msg4')], ['column_nm', 'clmn1', 'comment'])
look_up_df.show()
output_df = df.join(look_up_df, df["error_cloumn"] == look_up_df["column_nm"]).withColumn("error_desc",F.concat(F.Col('column_nm'),F.lit(' expected as '),F.Col('clmn1').lit(' and comment is '),.Col('comment'),))
此代碼適用于一條記錄,但對于記錄中的 date_from、date_to 等多列失敗
uj5u.com熱心網友回復:
好吧,您正在尋找的內容非常簡單,只需多個步驟,如果您沒有正確撰寫它可能會有點混亂。您可能會發現這個答案與另一個答案基本相似,但結構更好。我在每個步驟上添加了注釋,但可以隨意運行每個步驟以遵循邏輯。
from pyspark.sql import functions as F
(df
.withColumn('error_column', F.explode(F.split('error_column', ','))) # breakdown multiple errors to different rows
.join(look_up_df.withColumnRenamed('column_nm', 'error_column'), on=['error_column'], how='inner') # rename column so we can shorten the `on` conditions
.withColumn('error_desc', F.concat( # concat as your requirement
F.col('error_column'),
F.lit(' expected '),
F.col('clmn1'),
F.lit(' and comment is '),
F.col('comment'),
))
.groupBy('col1', 'col2') # as we broken it down, it's time to join them back together
.agg(
F.concat_ws(', ', F.collect_list('error_column')).alias('error_column'), # concat errors together with comma separated
F.concat_ws(', ', F.collect_list('error_desc')).alias('error_desc'), # concat descriotions together with comma separated
)
.show(10, False)
)
---- ---- ------------------ ---------------------------------------------------------------------------------------------------------
|col1|col2|error_column |error_desc |
---- ---- ------------------ ---------------------------------------------------------------------------------------------------------
|1 |4 |date_from, date_to|date_from expected DD-MM-YY and comment is text msg1, date_to expected DD-MM-YY and comment is test msg2|
|1 |8 |emp_name |emp_name expected VARCHAR(100) and comment is test msg3 |
---- ---- ------------------ ---------------------------------------------------------------------------------------------------------
uj5u.com熱心網友回復:
我建議拆分error_cloumn,df爆炸(都包含在pyspark.sql.functions模塊中)然后加入。您將獲得多行,并且通過分組col1, col2(假設這些可能是分組鍵),您可以按照您在結果中呈現的方式聚合文本。
如果您需要更多支持,請告訴我
uj5u.com熱心網友回復:
from pyspark.sql import functions as F
from pyspark.sql.functions import concat, lit, expr, when, explode, split, collect_list, concat_ws
df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_column'])
df = df.withColumn('all_values',explode(split('error_column',',')))
df.show()
look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text_msg1'),('date_to', 'DD-MM-YY', 'test_msg2'),('emp_name', 'VARCHAR(100)', 'test_msg3'),('emp_type', 'VARCHAR(100)', 'test_msg4')], ['column_name', 'clmn1', 'comment'])
look_up_df = (
look_up_df.withColumn('msg',
when(look_up_df.column_name.like('
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/465523.html
標籤:阿帕奇火花 pyspark apache-spark-sql
