需要根據來自兩個地方的過濾器動態地應用這些過濾器。(1) 配置 (2) 用戶/作業輸入
。需要應用的過濾器。(1) config.filters中提到的任何過濾器;(2) 用戶提供的過濾器,即基于運行日期的天數,即rundate-history_days。 如果用戶傳遞的rundate是2020-01-20,history_days是5,那么最終的過濾器應該是:
cust=123 and ( activity_day between rundate and rundate-5)
我能夠使用兩步過濾器來實作這一點。
(1) 使用 sql 方式從配置中過濾 df.filter(config['config'])
(2) 在1的基礎上進行第二輪過濾,使用
activity_day>=date_sub(rundate,history_days) & activity_day<rundate
我是否可以將兩個步驟的過濾器合并成一個?這樣我就可以在配置中保持兩個過濾器,并以某種方式替代用戶輸入?
資料:
df = spark.createDataFrame(
[
(123,"2020-01-01") 。
(124,"2020-01-01") 。
(123,"2019-01-01")
],
("cust", " activity_day")
)
Config:
config = ""
[ {
"source": "df_1",
"filters":"cust=123",
}
]
uj5u.com熱心網友回復:
你可以從決議你的配置開始,提取過濾條件'filters和相應的值到一個字典中,并將run_date條件添加到其中以進一步過濾DataFrame`
資料準備
sparkDF = sql.createDataFrame(
[
(123,"2020-01-01") 。
(124,"2020-01-01") 。
(123,"2019-01-01")
],
("cust", " activity_day")
)
sparkDF.show()
---- ------------
|cust|activity_day|
---- ------------
| 123| 202001-01|
| 124| 2020-01|
| 123| 2019-01|
---- ------------
決議配置和生成過濾條件
config = [ {"source"/span>: "df_1"," filters":"cust=123",}]
filter_dict = {}
for row in config:
if 'filters' in 行。
key,value = row[' filters'].split("=")
filter_dict[key] = value
filter_dict
{'cust': '123'}。
run_date = "2020-01-01"/span>
upper_range = F.to_date(F.lit(run_date))
lower_range = F.date_add(upper_range,-5)
secondary_condn = (F.col('activity_day').between(low_range,upper_range))
final_condn = (F.col(column) == filter_dict[column]) & (secondary_condn)
過濾資料框架
sparkDF.filter(final_condn).show()
---- ------------
|cust|activity_day|
---- ------------
| 123| 202001-01|
---- ------------
面向多條件的SQL方法
你可以利用createOrReplaceTempView來實作更復雜的過濾器。這個想法是建立WHERE過濾器,將其合并到SQL查詢中來過濾記錄
sparkDF = sql.createDataFrame(
[
(123,"2020-01-01",1,"NL") 。
(124,"2020-01-01",0,"US") 。
(123,"2019-01-01",1,"IN") 。
(124,"2020-01-02",0,"NL") 。
],
("cust", "activity_day","is_deleted","country")
)
sparkDF.show()
---- ------------ ---------- -------
|ust|activity_day|is_deleted|country|
---- ------------ ---------- -------
| 123| 202001-01| 1| NL|
| 124| 2020-01| 0| US|
| 123| 2019-01| 1| IN|
124| 2020-01-02| 0| NL|
---- ------------ ---------- -------
where_filter = ""/span>
logical_flag = " OR "/span>
for i,row in enumerate(config):
if ' filters' in row:
if i == 0:
where_filter = row['filters']
else:
where_filter = logical_flag "(" row['filters'] ")"。
where_filter # O/P -- 'cust=123 OR (is_deleted != 1 AND country="NL")'/span>
if where_filter != "":
sql.sql(f""
選擇 *
FROM customer_activity
其中{where_filter}
"").show()
---- ------------ ---------- -------
|ust|activity_day|is_deleted|country|
---- ------------ ---------- -------
| 123| 202001-01| 1| NL|
| 123| 2019-01| 1| IN|
124| 2020-01-02| 0| NL|
---- ------------ ---------- -------
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/309334.html
標籤:
上一篇:查看組合框內的資料庫列的內容
