我正在嘗試在 Databricks 中創建一個金表筆記本,但是完全重新處理歷史資料(43GB,35k parquet 檔案)需要 9 天。我嘗試擴大集群,但它沒有超過 5000 條記錄/秒。瓶頸似乎是applyInPandas()功能。我想知道是否可以用其他任何東西替換 pandas 以使金色筆記本執行得更快。
Silver 表有 60 列 ( read_id, reader_id, tracker_timestamp, event_type, ebook_id, page_id, agent_ip, agent_device_type, ...)。每行資料代表電子書的閱讀事件。例如“翻頁”、“點擊圖片”、“點擊鏈接”……所有在單個會話中發生的事件都具有相同的read.id. 在黃金表中,我試圖將這些事件分組到會話中,并計算每個事件在單個會話中發生的次數。因此,在銀表中的讀取會話中沒有 100 多行資料,我最終只會在金表中得到一個聚合行。
輸入是銀三角表:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql.functions import pandas_udf
input = (spark
.readStream
.format("delta")
.option("withEventTimeOrder", "true")
.option("maxFilesPerTrigger", 100)
.load(f"path_to_silver_bucket")
)
我使用withWatermark和session_window函式來確保我最終對來自單個讀取會話的所有事件進行分組。(閱讀會話在最后一次閱讀活動后 30 分鐘自動結束)
group = input.withWatermark("tracker_timestamp", "10 minutes").groupBy("read_id", F.session_window(input.tracker_timestamp, "30 minutes"))
在下一步中,我使用如下applyInPandas函式:
sessions = group.applyInPandas(processing_function, schema=processing_function_output_schema)
processing_function中使用的定義applyInPandas:
def processing_function(df):
surf_time_ms = df.query('event_type == "surf"')['duration'].sum()
immerse_time_ms = df.query('event_type == "immersion"')['duration'].sum()
min_timestamp = df['tracker_timestamp'].min()
max_timestamp = df['tracker_timestamp'].max()
shares = len(df.query('event_type == "share"'))
leads = len(df.query('event_type == "lead_store"'))
is_read = len(df.query('event_type == "surf"')) > 0
distinct_pages = df['page_id'].nunique()
data = {
"read_id": df['read_id'].values[0],
"surf_time_ms": surf_time_ms,
"immerse_time_ms": immerse_time_ms,
"min_timestamp": min_timestamp,
"max_timestamp": max_timestamp,
"shares": shares,
"leads": leads,
"is_read": is_read,
"number_of_events": len(df),
"distinct_pages": distinct_pages
}
for field in not_calculated_string_fields:
data[field] = df[field].values[0]
new_df = pd.DataFrame(data=data, index=['read_id'])
for x in all_events:
new_df[f"count_{x}"] = df.query(f"type == '{x}'").count()
for x in duration_events:
duration = df.query(f"event_type == '{x}'")['duration']
duration_sum = duration.sum()
new_df[f"duration_{x}_ms"] = duration_sum
if duration_sum > 0:
new_df[f"mean_duration_{x}_ms"] = duration.mean()
else:
new_df[f"mean_duration_{x}_ms"] = 0
return new_df
最后,我將計算的行寫入黃金表,如下所示:
for_partitioning = (sessions
.withColumn("tenant", F.col("story_tenant"))
.withColumn("year", F.year(F.col("min_timestamp")))
.withColumn("month", F.month(F.col("min_timestamp"))))
checkpoint_path = "checkpoint-path"
gold_path = f"gold-bucket"
(for_partitioning
.writeStream
.format('delta')
.partitionBy('year', 'month', 'tenant')
.option("mergeSchema", "true")
.option("checkpointLocation", checkpoint_path)
.outputMode("append")
.start(gold_path))
誰能想到比applyInPandas上述示例更有效的在 PySpark 中執行 UDF 的方法?我根本無法等待 9 天來重新處理 43GB 的資料......
我嘗試過使用不同的輸入和輸出選項(例如.option("maxFilesPerTrigger", 100)),但真正的問題似乎是applyInPandas.
uj5u.com熱心網友回復:
processing_function如果你真的想要,你可以將你的本地 Spark 重寫。
"read_id": df['read_id'].values[0]
F.first('read_id').alias('read_id')
"surf_time_ms": df.query('event_type == "surf"')['duration'].sum()
F.sum(F.when(F.col('event_type') == 'surf', F.col('duration'))).alias('surf_time_ms')
"immerse_time_ms": df.query('event_type == "immersion"')['duration'].sum()
F.sum(F.when(F.col('event_type') == 'immersion', F.col('duration'))).alias('immerse_time_ms')
"min_timestamp": df['tracker_timestamp'].min()
F.min('tracker_timestamp').alias('min_timestamp')
"max_timestamp": df['tracker_timestamp'].max()
F.max('tracker_timestamp').alias('max_timestamp')
"shares": len(df.query('event_type == "share"'))
F.count(F.when(F.col('event_type') == 'share', F.lit(1))).alias('shares')
"leads": len(df.query('event_type == "lead_store"'))
F.count(F.when(F.col('event_type') == 'lead_store', F.lit(1))).alias('leads')
"is_read": len(df.query('event_type == "surf"')) > 0
(F.count(F.when(F.col('event_type') == 'surf', F.lit(1))) > 0).alias('is_read')
"number_of_events": len(df)
F.count(F.lit(1)).alias('number_of_events')
"distinct_pages": df['page_id'].nunique()
F.countDistinct('page_id').alias('distinct_pages')
for field in not_calculated_string_fields:
data[field] = df[field].values[0]
*[F.first(field).alias(field) for field in not_calculated_string_fields]
for x in all_events:
new_df[f"count_{x}"] = df.query(f"type == '{x}'").count()
以上大概可以跳過吧?就我的測驗而言,新列得到 NaN 值,因為.count()回傳一個 Series 物件而不是一個簡單的值。
for x in duration_events:
duration = df.query(f"event_type == '{x}'")['duration']
duration_sum = duration.sum()
new_df[f"duration_{x}_ms"] = duration_sum
if duration_sum > 0:
new_df[f"mean_duration_{x}_ms"] = duration.mean()
else:
new_df[f"mean_duration_{x}_ms"] = 0
*[F.sum(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"duration_{x}_ms") for x in duration_events]
*[F.mean(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"mean_duration_{x}_ms") for x in duration_events]
所以,而不是
def processing_function(df):
...
...
sessions = group.applyInPandas(processing_function, schema=processing_function_output_schema)
你可以使用高效的原生 Spark:
sessions = group.agg(
F.first('read_id').alias('read_id'),
F.sum(F.when(F.col('event_type') == 'surf', F.col('duration'))).alias('surf_time_ms'),
F.sum(F.when(F.col('event_type') == 'immersion', F.col('duration'))).alias('immerse_time_ms'),
F.min('tracker_timestamp').alias('min_timestamp'),
F.max('tracker_timestamp').alias('max_timestamp'),
F.count(F.when(F.col('event_type') == 'share', F.lit(1))).alias('shares'),
F.count(F.when(F.col('event_type') == 'lead_store', F.lit(1))).alias('leads'),
(F.count(F.when(F.col('event_type') == 'surf', F.lit(1))) > 0).alias('is_read'),
F.count(F.lit(1)).alias('number_of_events'),
F.countDistinct('page_id').alias('distinct_pages'),
*[F.first(field).alias(field) for field in not_calculated_string_fields],
# skipped count_{x}
*[F.sum(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"duration_{x}_ms") for x in duration_events],
*[F.mean(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"mean_duration_{x}_ms") for x in duration_events],
)
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/529656.html
標籤:表现优化pyspark聚合delta-live-tables
