主頁 > 軟體工程 > applyInPandas()聚合在大增量表上運行緩慢

applyInPandas()聚合在大增量表上運行緩慢

2022-11-09 03:26:32 軟體工程

我正在嘗試在 Databricks 中創建一個金表筆記本,但是完全重新處理歷史資料(43GB,35k parquet 檔案)需要 9 天。我嘗試擴大集群,但它沒有超過 5000 條記錄/秒。瓶頸似乎是applyInPandas()功能。我想知道是否可以用其他任何東西替換 pandas 以使金色筆記本執行得更快。

Silver 表有 60 列 ( read_id, reader_id, tracker_timestamp, event_type, ebook_id, page_id, agent_ip, agent_device_type, ...)。每行資料代表電子書的閱讀事件。例如“翻頁”、“點擊圖片”、“點擊鏈接”……所有在單個會話中發生的事件都具有相同的read.id. 在黃金表中,我試圖將這些事件分組到會話中,并計算每個事件在單個會話中發生的次數。因此,在銀表中的讀取會話中沒有 100 多行資料,我最終只會在金表中得到一個聚合行。

輸入是銀三角表:

import pyspark.sql.functions as F
import pyspark.sql.types as T

import pandas as pd
from pyspark.sql.functions import pandas_udf

input = (spark
         .readStream
         .format("delta")
         .option("withEventTimeOrder", "true")
         .option("maxFilesPerTrigger", 100)
         .load(f"path_to_silver_bucket")
        )

我使用withWatermarksession_window函式來確保我最終對來自單個讀取會話的所有事件進行分組。(閱讀會話在最后一次閱讀活動后 30 分鐘自動結束)

group = input.withWatermark("tracker_timestamp", "10 minutes").groupBy("read_id", F.session_window(input.tracker_timestamp, "30 minutes"))

在下一步中,我使用如下applyInPandas函式: sessions = group.applyInPandas(processing_function, schema=processing_function_output_schema)

processing_function中使用的定義applyInPandas

def processing_function(df):
    surf_time_ms = df.query('event_type == "surf"')['duration'].sum()
    immerse_time_ms = df.query('event_type == "immersion"')['duration'].sum()
    min_timestamp = df['tracker_timestamp'].min()
    max_timestamp = df['tracker_timestamp'].max()
    shares = len(df.query('event_type == "share"'))
    leads = len(df.query('event_type == "lead_store"'))
    is_read = len(df.query('event_type == "surf"')) > 0
    distinct_pages = df['page_id'].nunique()

    data = {
        "read_id": df['read_id'].values[0],
        "surf_time_ms": surf_time_ms,
        "immerse_time_ms": immerse_time_ms,
        "min_timestamp": min_timestamp,
        "max_timestamp": max_timestamp,
        "shares": shares,
        "leads": leads,
        "is_read": is_read,
        "number_of_events": len(df),
        "distinct_pages": distinct_pages
    }
    
    for field in not_calculated_string_fields:
      data[field] = df[field].values[0]
    
    new_df = pd.DataFrame(data=data, index=['read_id'])
    
    for x in all_events:
        new_df[f"count_{x}"] = df.query(f"type == '{x}'").count()
            
    for x in duration_events:
        duration = df.query(f"event_type == '{x}'")['duration']
        duration_sum = duration.sum()
        new_df[f"duration_{x}_ms"] = duration_sum
        if duration_sum > 0:
            new_df[f"mean_duration_{x}_ms"] = duration.mean()
        else:
            new_df[f"mean_duration_{x}_ms"] = 0

    return new_df

最后,我將計算的行寫入黃金表,如下所示:

for_partitioning = (sessions
      .withColumn("tenant", F.col("story_tenant"))
      .withColumn("year", F.year(F.col("min_timestamp")))
      .withColumn("month", F.month(F.col("min_timestamp"))))

checkpoint_path = "checkpoint-path"
gold_path = f"gold-bucket"
(for_partitioning
      .writeStream
      .format('delta')
      .partitionBy('year', 'month', 'tenant')
      .option("mergeSchema", "true")
      .option("checkpointLocation", checkpoint_path)
      .outputMode("append")
      .start(gold_path))

誰能想到比applyInPandas上述示例更有效的在 PySpark 中執行 UDF 的方法?我根本無法等待 9 天來重新處理 43GB 的資料......

我嘗試過使用不同的輸入和輸出選項(例如.option("maxFilesPerTrigger", 100)),但真正的問題似乎是applyInPandas.

uj5u.com熱心網友回復:

processing_function如果你真的想要,你可以將你的本地 Spark 重寫。

"read_id": df['read_id'].values[0]

F.first('read_id').alias('read_id')

"surf_time_ms": df.query('event_type == "surf"')['duration'].sum()

F.sum(F.when(F.col('event_type') == 'surf', F.col('duration'))).alias('surf_time_ms')

"immerse_time_ms": df.query('event_type == "immersion"')['duration'].sum()

F.sum(F.when(F.col('event_type') == 'immersion', F.col('duration'))).alias('immerse_time_ms')

"min_timestamp": df['tracker_timestamp'].min()

F.min('tracker_timestamp').alias('min_timestamp')

"max_timestamp": df['tracker_timestamp'].max()

F.max('tracker_timestamp').alias('max_timestamp')

"shares": len(df.query('event_type == "share"'))

F.count(F.when(F.col('event_type') == 'share', F.lit(1))).alias('shares')

"leads": len(df.query('event_type == "lead_store"'))

F.count(F.when(F.col('event_type') == 'lead_store', F.lit(1))).alias('leads')

"is_read": len(df.query('event_type == "surf"')) > 0

(F.count(F.when(F.col('event_type') == 'surf', F.lit(1))) > 0).alias('is_read')

"number_of_events": len(df)

F.count(F.lit(1)).alias('number_of_events')

"distinct_pages": df['page_id'].nunique()

F.countDistinct('page_id').alias('distinct_pages')

for field in not_calculated_string_fields:
data[field] = df[field].values[0]

*[F.first(field).alias(field) for field in not_calculated_string_fields]

for x in all_events:
new_df[f"count_{x}"] = df.query(f"type == '{x}'").count()

以上大概可以跳過吧?就我的測驗而言,新列得到 NaN 值,因為.count()回傳一個 Series 物件而不是一個簡單的值。

for x in duration_events:
duration = df.query(f"event_type == '{x}'")['duration']
duration_sum = duration.sum()
new_df[f"duration_{x}_ms"] = duration_sum
if duration_sum > 0:
new_df[f"mean_duration_{x}_ms"] = duration.mean()
else:
new_df[f"mean_duration_{x}_ms"] = 0

*[F.sum(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"duration_{x}_ms") for x in duration_events]
*[F.mean(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"mean_duration_{x}_ms") for x in duration_events]

所以,而不是

def processing_function(df):
    ...
    ...

sessions = group.applyInPandas(processing_function, schema=processing_function_output_schema)

你可以使用高效的原生 Spark:

sessions = group.agg(
    F.first('read_id').alias('read_id'),
    F.sum(F.when(F.col('event_type') == 'surf', F.col('duration'))).alias('surf_time_ms'),
    F.sum(F.when(F.col('event_type') == 'immersion', F.col('duration'))).alias('immerse_time_ms'),
    F.min('tracker_timestamp').alias('min_timestamp'),
    F.max('tracker_timestamp').alias('max_timestamp'),
    F.count(F.when(F.col('event_type') == 'share', F.lit(1))).alias('shares'),
    F.count(F.when(F.col('event_type') == 'lead_store', F.lit(1))).alias('leads'),
    (F.count(F.when(F.col('event_type') == 'surf', F.lit(1))) > 0).alias('is_read'),
    F.count(F.lit(1)).alias('number_of_events'),
    F.countDistinct('page_id').alias('distinct_pages'),
    *[F.first(field).alias(field) for field in not_calculated_string_fields],
    # skipped count_{x} 
    *[F.sum(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"duration_{x}_ms") for x in duration_events],
    *[F.mean(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"mean_duration_{x}_ms") for x in duration_events],
)

轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/529656.html

標籤:表现优化pyspark聚合delta-live-tables

上一篇:將列從longtext更改為mediumtext需要超過1小時

下一篇:比較python和ctypes等效代碼的性能

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • Git本地庫既關聯GitHub又關聯Gitee

    創建代碼倉庫 使用gitee舉例(github和gitee差不多) 1.在gitee右上角點擊+,選擇新建倉庫 ? 2.選擇填寫倉庫資訊,然后進行創建 ? 3.服務端已經準備好了,本地開始作準備 (1)Git 全域設定 git config --global user.name "成鈺" git c ......

    uj5u.com 2020-09-10 05:04:14 more
  • CODING DevOps 代碼質量實戰系列第二課,相約周三

    隨著 ToB(企業服務)的興起和 ToC(消費互聯網)產品進入成熟期,線上故障帶來的損失越來越大,代碼質量越來越重要,而「質量內建」正是 DevOps 核心理念之一。**《DevOps 代碼質量實戰(PHP 版)》**為 CODING DevOps 代碼質量實戰系列的第二課,同時也是本系列的 PHP ......

    uj5u.com 2020-09-10 05:07:43 more
  • 推薦Scrum書籍

    推薦Scrum書籍 直接上干貨,推薦書籍清單如下(推薦有順序的哦) Scrum指南 Scrum精髓 Scrum敏捷軟體開發 Scrum捷徑 硝煙中的Scrum和XP : 我們如何實施Scrum 敏捷軟體開發:Scrum實戰指南 Scrum要素 大規模Scrum:大規模敏捷組織的設計 用戶故事地圖 用 ......

    uj5u.com 2020-09-10 05:07:45 more
  • CODING DevOps 代碼質量實戰系列最后一課,周四發車

    隨著 ToB(企業服務)的興起和 ToC(消費互聯網)產品進入成熟期,線上故障帶來的損失越來越大,代碼質量越來越重要,而「質量內建」正是 DevOps 核心理念之一。 **《DevOps 代碼質量實戰(Java 版)》**為 CODING DevOps 代碼質量實戰系列的最后一課,同時也是本系列的 ......

    uj5u.com 2020-09-10 05:07:52 more
  • 敏捷軟體工程實踐書籍

    Scrum轉型想要做好,第一步先了解并真正落實Scrum,那么我推薦的Scrum書籍是要看懂并實踐的。第二步是團隊的工程實踐要做扎實。 下面推薦工程實踐書單: 重構:改善既有代碼的設計 決議極限編程 : 擁抱變化 代碼整潔代碼 程式員的職業素養 修改代碼的藝術 撰寫可讀代碼的藝術 測驗驅動開發 : ......

    uj5u.com 2020-09-10 05:07:55 more
  • Jenkins+svn+nginx實作windows環境自動部署vue前端專案

    前面文章介紹了Jenkins+svn+tomcat實作自動化部署,現在終于有空抽時間出來寫下Jenkins+svn+nginx實作自動部署vue前端專案。 jenkins的安裝和配置已經在前面文章進行介紹,下面介紹實作vue前端專案需要進行的哪些額外的步驟。 注意:在安裝jenkins和nginx的 ......

    uj5u.com 2020-09-10 05:08:49 more
  • CODING DevOps 微服務專案實戰系列第一課,明天等你

    CODING DevOps 微服務專案實戰系列第一課**《DevOps 微服務專案實戰:DevOps 初體驗》**將由 CODING DevOps 開發工程師 王寬老師 向大家介紹 DevOps 的基本理念,并探討為什么現代開發活動需要 DevOps,同時將以 eShopOnContainers 項 ......

    uj5u.com 2020-09-10 05:09:14 more
  • CODING DevOps 微服務專案實戰系列第二課來啦!

    近年來,工程專案的結構越來越復雜,需要接入合適的持續集成流水線形式,才能滿足更多變的需求,那么如何優雅地使用 CI 能力提升生產效率呢?CODING DevOps 微服務專案實戰系列第二課 《DevOps 微服務專案實戰:CI 進階用法》 將由 CODING DevOps 全堆疊工程師 何晨哲老師 向 ......

    uj5u.com 2020-09-10 05:09:33 more
  • CODING DevOps 微服務專案實戰系列最后一課,周四開講!

    隨著軟體工程越來越復雜化,如何在 Kubernetes 集群進行灰度發布成為了生產部署的”必修課“,而如何實作安全可控、自動化的灰度發布也成為了持續部署重點關注的問題。CODING DevOps 微服務專案實戰系列最后一課:**《DevOps 微服務專案實戰:基于 Nginx-ingress 的自動 ......

    uj5u.com 2020-09-10 05:10:00 more
  • CODING 儀表盤功能正式推出,實作作業資料可視化!

    CODING 儀表盤功能現已正式推出!該功能旨在用一張張統計卡片的形式,統計并展示使用 CODING 中所產生的資料。這意味著無需額外的設定,就可以收集歸納寶貴的作業資料并予之量化分析。這些海量的資料皆會以圖表或串列的方式躍然紙上,方便團隊成員隨時查看各專案的進度、狀態和指標,云端協作迎來真正意義上 ......

    uj5u.com 2020-09-10 05:11:01 more
最新发布
  • windows系統git使用ssh方式和gitee/github進行同步

    使用git來clone專案有兩種方式:HTTPS和SSH:
    HTTPS:不管是誰,拿到url隨便clone,但是在push的時候需要驗證用戶名和密碼;
    SSH:clone的專案你必須是擁有者或者管理員,而且需要在clone前添加SSH Key。SSH 在push的時候,是不需要輸入用戶名的,如果配置... ......

    uj5u.com 2023-04-19 08:41:12 more
  • windows系統git使用ssh方式和gitee/github進行同步

    使用git來clone專案有兩種方式:HTTPS和SSH:
    HTTPS:不管是誰,拿到url隨便clone,但是在push的時候需要驗證用戶名和密碼;
    SSH:clone的專案你必須是擁有者或者管理員,而且需要在clone前添加SSH Key。SSH 在push的時候,是不需要輸入用戶名的,如果配置... ......

    uj5u.com 2023-04-19 08:35:34 more
  • 2023年農牧行業6大CRM系統、5大場景盤點

    在物聯網、大資料、云計算、人工智能、自動化技術等現代資訊技術蓬勃發展與逐步成熟的背景下,數字化正成為農牧行業供給側結構性變革與高質量發展的核心驅動因素。因此,改造和提升傳統農牧業、開拓創新現代智慧農牧業,加快推進農牧業的現代化、資訊化、數字化建設已成為農牧業發展的重要方向。 當下,企業數字化轉型已經 ......

    uj5u.com 2023-04-18 08:05:44 more
  • 2023年農牧行業6大CRM系統、5大場景盤點

    在物聯網、大資料、云計算、人工智能、自動化技術等現代資訊技術蓬勃發展與逐步成熟的背景下,數字化正成為農牧行業供給側結構性變革與高質量發展的核心驅動因素。因此,改造和提升傳統農牧業、開拓創新現代智慧農牧業,加快推進農牧業的現代化、資訊化、數字化建設已成為農牧業發展的重要方向。 當下,企業數字化轉型已經 ......

    uj5u.com 2023-04-18 08:00:18 more
  • 計算機組成原理—存盤器

    計算機組成原理—硬體結構 二、存盤器 1.概述 存盤器是計算機系統中的記憶設備,用來存放程式和資料 1.1存盤器的層次結構 快取-主存層次主要解決CPU和主存速度不匹配的問題,速度接近快取 主存-輔存層次主要解決存盤系統的容量問題,容量接近與價位接近于主存 2.主存盤器 2.1概述 主存與CPU的聯 ......

    uj5u.com 2023-04-17 08:20:31 more
  • 談一談我對協同開發的一些認識

    如今各互聯網公司普通都使用敏捷開發,采用小步快跑的形式來進行專案開發。如果是小專案或者小需求,那一個開發可能就搞定了。但對于電商等復雜的系統,其功能多,結構復雜,一個人肯定是搞不定的,所以都是很多人來共同開發維護。以我曾經待過的商城團隊為例,光是后端開發就有七十多人。 為了更好地開發這類大型系統,往 ......

    uj5u.com 2023-04-17 08:18:55 more
  • 專案管理PRINCE2核心知識點整理

    PRINCE2,即 PRoject IN Controlled Environment(受控環境中的專案)是一種結構化的專案管理方法論,由英國政府內閣商務部(OGC)推出,是英國專案管理標準。
    PRINCE2 作為一種開放的方法論,是一套結構化的專案管理流程,描述了如何以一種邏輯性的、有組織的方法,... ......

    uj5u.com 2023-04-17 08:18:51 more
  • 談一談我對協同開發的一些認識

    如今各互聯網公司普通都使用敏捷開發,采用小步快跑的形式來進行專案開發。如果是小專案或者小需求,那一個開發可能就搞定了。但對于電商等復雜的系統,其功能多,結構復雜,一個人肯定是搞不定的,所以都是很多人來共同開發維護。以我曾經待過的商城團隊為例,光是后端開發就有七十多人。 為了更好地開發這類大型系統,往 ......

    uj5u.com 2023-04-17 08:18:00 more
  • 專案管理PRINCE2核心知識點整理

    PRINCE2,即 PRoject IN Controlled Environment(受控環境中的專案)是一種結構化的專案管理方法論,由英國政府內閣商務部(OGC)推出,是英國專案管理標準。
    PRINCE2 作為一種開放的方法論,是一套結構化的專案管理流程,描述了如何以一種邏輯性的、有組織的方法,... ......

    uj5u.com 2023-04-17 08:17:55 more
  • 計算機組成原理—存盤器

    計算機組成原理—硬體結構 二、存盤器 1.概述 存盤器是計算機系統中的記憶設備,用來存放程式和資料 1.1存盤器的層次結構 快取-主存層次主要解決CPU和主存速度不匹配的問題,速度接近快取 主存-輔存層次主要解決存盤系統的容量問題,容量接近與價位接近于主存 2.主存盤器 2.1概述 主存與CPU的聯 ......

    uj5u.com 2023-04-17 08:12:06 more