主頁 > 軟體工程 > Pyspark-計算兩個資料框架之間的日期

Pyspark-計算兩個資料框架之間的日期

2021-10-21 08:24:26 軟體工程

我有兩個資料框,每個資料框都有一個日期列。

。
 ----------- 
| DEADLINES|
 ----------- 
| 2023-07-15|
| 2018-08-10|
| 2022-03-28|
| 2021-06-22|
| 2021-12-18|
| 2021-10-11|
| 2021-11-13|
 ----------- 

 ---------- 
| DT_DATE|
 ---------- 
|2021-04-02|
|2021-04-21|
|2021-05-01|
|2021-06-03|
|2021-09-07|
|2021-10-12|
|2021-11-02||
 ---------- 

我需要計算在給定的參考日期和每個DEADLINES日期之間有多少個DT_DATE的日期。

例如:使用2021-03-31作為參考日期應該得到以下結果集。

 ----------- ------------ 
| DEADLINES| dt_count|
 ----------- ------------ 
| 2023-07-15| 7|
| 2018-08-10| 0|
| 2022-03-28| 7|
| 2021-06-22| 4|
| 2021-12-18| 7|
| 2021-10-11| 5|
| 2021-11-13| 7|
 ----------- ------------ 

我設法使其在死線資料框架的每一行中進行迭代,但對于較大的資料集,性能變得非常差。

有誰有更好的解決方案嗎?

編輯:這是我目前的解決方案:

def count_daysdeadlines_df, dates_df, ref_date)。
    for row in deadlines_df.collect():
        qtt = dates_df.filter(dates_df.DT_DATE.between(ref_date, row.DEADLINES)).count()
        yield row.deadlines, qtt


new_df = spark.createDataFrame(count_days(deadlines_df, dates_df, "2021-03-31"), ["DEADLINES", "dt_count"] )

uj5u.com熱心網友回復:

兩個資料框可以用不同的權重聯合起來,并使用Window函式,范圍從開始到當前行(Scala):

val deadlines = Seq(
  ("2023-07-15"),
  ("2018-08-10"),
  ("2022-03-28"),
  ("2021-06-22"),
  ("2021-12-18"),
  ("2021-10-11"),
  ("2021-11-13")
).toDF("DEADLINES")

val dates = Seq(
  ("2021-04-02"),
  ("2021-04-21"),
  ("2021-05-01"),
  ("2021-06-03"),
  ("2021-09-07"),
  ("2021-10-12"),
  ("2021-11-02")
).toDF("DT_DATE")

Val referenceDate = "2021-03-31""weight", lit(0)
  .unionAll(
    日期
      .where($"DT_DATE"/span> >= referenceDate)
      .withColumn("weight"/span>, lit(1)
  )

val fromStartToCurrentRowWindow = Window.orderBy("DEADLINES").rangeBetween(Window.unboundedPreceding, Window.currentRow)

val result = united
  .withColumn("dt_count", sum("weight") .over(fromStartToCurrentRowWindow)
  .where($"weight" === lit(0)
  .drop("weight")

輸出:

 ---------- -------- 
|DEADLINES |dt_count||
 ---------- -------- 
|2018-08-10|0 |
|2021-06-22|4 |
|2021-10-11|5 |
|2021-11-13|7 |
|2021-12-18|7 |
|2022-28|7 |
|2023-07-15|7 !
 ---------- -------- 

注意:計算將在一個磁區中執行,Spark顯示這樣的警告。 WARN Logging - No Partition Defined for Window operation! 將所有的資料移到一個磁區中,這可能會導致嚴重的性能下降。

還有其他可能的解決方案。

還有其他可能的解決方案,通過范圍連接兩個資料幀,這導致了cartesian join.

uj5u.com熱心網友回復:

如果你有少量的截止日期,你可以:

如果你有少量的截止日期,你可以:

你可以

  • dates_df資料幀上按截止日期添加一列,當DT_DATEref_date和截止日期之間時,其值為1,否則為0
  • 然后將每個截止日期列相加
  • 最后對結果資料框進行轉置,以獲得你想要的資料框
  • 讓我們來看看一步一步的步驟

    按截止日期添加一列:from pyspark.sql import functions as F deadline_rows = deadlines_df.collect() dates_with_deadlines = dates_df for row in deadline_rows。 dates_with_deadlines = dates_with_deadlines.withColumn( str(row.DEADLINES), F.when( dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1) .否則( F.lit(0) ) )

    通過你的例子,你可以得到以下dates_with_deadlines資料框架:

     ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- 
    |DT_DATE |2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
     ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- 
    |2021-04-02|1|0|1 |1 |1 |1 |1 |1 !
    |2021-04-21|1 |0 |1 |1 |1 |1 |1 |1 !
    |2021-05-01|1|0|1 |1 |1 |1 |1 |1 !
    |2021-06-03|1|0|1 |1 |1 |1 |1 |1 !
    |2021-09-07|1 |0|1 |0 |1 |1 |1 |1 !
    |2021-10-12|1 |0 |1 |0 |1 |0 |1 !
    |2021-11-02|1 |0 |1 |0 |1 |0 |1 !
     ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- 
    

    最后期限

    aggregated_df = dates_with_deadlines.agg(*[F。 sum(str(x.DEADLINES)).alias(str(x.DEADLINES) for x in deadline_rows] )
    

    在這一步之后,你會得到以下aggregated_df資料框架:

     ---------- ---------- ---------- ---------- ---------- ---------- ---------- 
    |2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
     ---------- ---------- ---------- ---------- ---------- ---------- ---------- 
    |7 |0 |7 |4 |7 |5 |7 !
     ---------- ---------- ---------- ---------- ---------- ---------- ---------- 
    

    Transpose dataframe

    result_df = aggregated_df 
      .withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F. col(str(x.deadlines)).alias('dt_count') for x in deadline_rows]) 
      .drop(*[str(x.DEADLINES) for x in deadline_rows] ) 
      .withColumn('data'/span>, F.explode('merged'/span>) 
      .drop('merged') 
      .withColumn('DEADLINES', F.col('data.DEADLINES') 
      .withColumn('dt_count', F.col('data.dt_count') 
      .drop('data')
    

    這樣你就有了預期的result_df資料框架:

     ---------- -------- 
    |DEADLINES |dt_count|?
     ---------- -------- 
    |2023-07-15|7 |
    |2018-08-10|0 |
    |2022-03-28|7 |
    |2021-06-22|4 |
    |2021-12-18|7 |
    |2021-10-11|5 |
    |2021-11-13|7 |
     ---------- -------- 
    

    完整的代碼

    from pyspark.sql import functions as F
    
    deadline_rows = deadlines_df.collect()
    
    dates_with_deadlines = dates_df
    for row in deadline_rows。
        dates_with_deadlines = dates_with_deadlines.withColumn(
            str(row.DEADLINES),
            F.when(
              dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1)
            .否則(
              F.lit(0)
            )
        )
    
    aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES) for x in deadline_rows])
    
    result_df = aggregated_df 
      .withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F. col(str(x.deadlines)).alias('dt_count') for x in deadline_rows]) 
      .drop(*[str(x.DEADLINES) for x in deadline_rows] ) 
      .withColumn('data'/span>, F.explode('merged'/span>) 
      .drop('merged') 
      .withColumn('DEADLINES', F.col('data.DEADLINES') 
      .withColumn('dt_count', F.col('data.dt_count') 
      .drop('data')
    

    本解決方案的優點和局限性

    在這個解決方案中,唯一不能使用分布式系統的步驟是轉置步驟。

    此外,與您當前的解決方案不同,我們以并行方式對每個截止日期列進行所有聚合,而不是按順序進行。

    然而,該解決方案僅在截止日期較少(數百或數千個截止日期)的情況下才有效,首先是因為我們在 Spark 驅動程式中使用 .collect() 檢索了所有這些截止日期,其次是因為在第一步中我們為每個截止日期創建了一個列,創建了具有大量資料的行,最后是因為最后一步也僅在一個執行器上執行。

    轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/328160.html

    標籤:

    上一篇:SSD-Mobilenetv2300x300-Tensorflow異議檢測API

    下一篇:如何有條件地呼叫路由器級中間件?

    標籤雲
    其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

    熱門瀏覽
    • Git本地庫既關聯GitHub又關聯Gitee

      創建代碼倉庫 使用gitee舉例(github和gitee差不多) 1.在gitee右上角點擊+,選擇新建倉庫 ? 2.選擇填寫倉庫資訊,然后進行創建 ? 3.服務端已經準備好了,本地開始作準備 (1)Git 全域設定 git config --global user.name "成鈺" git c ......

      uj5u.com 2020-09-10 05:04:14 more
    • CODING DevOps 代碼質量實戰系列第二課,相約周三

      隨著 ToB(企業服務)的興起和 ToC(消費互聯網)產品進入成熟期,線上故障帶來的損失越來越大,代碼質量越來越重要,而「質量內建」正是 DevOps 核心理念之一。**《DevOps 代碼質量實戰(PHP 版)》**為 CODING DevOps 代碼質量實戰系列的第二課,同時也是本系列的 PHP ......

      uj5u.com 2020-09-10 05:07:43 more
    • 推薦Scrum書籍

      推薦Scrum書籍 直接上干貨,推薦書籍清單如下(推薦有順序的哦) Scrum指南 Scrum精髓 Scrum敏捷軟體開發 Scrum捷徑 硝煙中的Scrum和XP : 我們如何實施Scrum 敏捷軟體開發:Scrum實戰指南 Scrum要素 大規模Scrum:大規模敏捷組織的設計 用戶故事地圖 用 ......

      uj5u.com 2020-09-10 05:07:45 more
    • CODING DevOps 代碼質量實戰系列最后一課,周四發車

      隨著 ToB(企業服務)的興起和 ToC(消費互聯網)產品進入成熟期,線上故障帶來的損失越來越大,代碼質量越來越重要,而「質量內建」正是 DevOps 核心理念之一。 **《DevOps 代碼質量實戰(Java 版)》**為 CODING DevOps 代碼質量實戰系列的最后一課,同時也是本系列的 ......

      uj5u.com 2020-09-10 05:07:52 more
    • 敏捷軟體工程實踐書籍

      Scrum轉型想要做好,第一步先了解并真正落實Scrum,那么我推薦的Scrum書籍是要看懂并實踐的。第二步是團隊的工程實踐要做扎實。 下面推薦工程實踐書單: 重構:改善既有代碼的設計 決議極限編程 : 擁抱變化 代碼整潔代碼 程式員的職業素養 修改代碼的藝術 撰寫可讀代碼的藝術 測驗驅動開發 : ......

      uj5u.com 2020-09-10 05:07:55 more
    • Jenkins+svn+nginx實作windows環境自動部署vue前端專案

      前面文章介紹了Jenkins+svn+tomcat實作自動化部署,現在終于有空抽時間出來寫下Jenkins+svn+nginx實作自動部署vue前端專案。 jenkins的安裝和配置已經在前面文章進行介紹,下面介紹實作vue前端專案需要進行的哪些額外的步驟。 注意:在安裝jenkins和nginx的 ......

      uj5u.com 2020-09-10 05:08:49 more
    • CODING DevOps 微服務專案實戰系列第一課,明天等你

      CODING DevOps 微服務專案實戰系列第一課**《DevOps 微服務專案實戰:DevOps 初體驗》**將由 CODING DevOps 開發工程師 王寬老師 向大家介紹 DevOps 的基本理念,并探討為什么現代開發活動需要 DevOps,同時將以 eShopOnContainers 項 ......

      uj5u.com 2020-09-10 05:09:14 more
    • CODING DevOps 微服務專案實戰系列第二課來啦!

      近年來,工程專案的結構越來越復雜,需要接入合適的持續集成流水線形式,才能滿足更多變的需求,那么如何優雅地使用 CI 能力提升生產效率呢?CODING DevOps 微服務專案實戰系列第二課 《DevOps 微服務專案實戰:CI 進階用法》 將由 CODING DevOps 全堆疊工程師 何晨哲老師 向 ......

      uj5u.com 2020-09-10 05:09:33 more
    • CODING DevOps 微服務專案實戰系列最后一課,周四開講!

      隨著軟體工程越來越復雜化,如何在 Kubernetes 集群進行灰度發布成為了生產部署的”必修課“,而如何實作安全可控、自動化的灰度發布也成為了持續部署重點關注的問題。CODING DevOps 微服務專案實戰系列最后一課:**《DevOps 微服務專案實戰:基于 Nginx-ingress 的自動 ......

      uj5u.com 2020-09-10 05:10:00 more
    • CODING 儀表盤功能正式推出,實作作業資料可視化!

      CODING 儀表盤功能現已正式推出!該功能旨在用一張張統計卡片的形式,統計并展示使用 CODING 中所產生的資料。這意味著無需額外的設定,就可以收集歸納寶貴的作業資料并予之量化分析。這些海量的資料皆會以圖表或串列的方式躍然紙上,方便團隊成員隨時查看各專案的進度、狀態和指標,云端協作迎來真正意義上 ......

      uj5u.com 2020-09-10 05:11:01 more
    最新发布
    • windows系統git使用ssh方式和gitee/github進行同步

      使用git來clone專案有兩種方式:HTTPS和SSH:
      HTTPS:不管是誰,拿到url隨便clone,但是在push的時候需要驗證用戶名和密碼;
      SSH:clone的專案你必須是擁有者或者管理員,而且需要在clone前添加SSH Key。SSH 在push的時候,是不需要輸入用戶名的,如果配置... ......

      uj5u.com 2023-04-19 08:41:12 more
    • windows系統git使用ssh方式和gitee/github進行同步

      使用git來clone專案有兩種方式:HTTPS和SSH:
      HTTPS:不管是誰,拿到url隨便clone,但是在push的時候需要驗證用戶名和密碼;
      SSH:clone的專案你必須是擁有者或者管理員,而且需要在clone前添加SSH Key。SSH 在push的時候,是不需要輸入用戶名的,如果配置... ......

      uj5u.com 2023-04-19 08:35:34 more
    • 2023年農牧行業6大CRM系統、5大場景盤點

      在物聯網、大資料、云計算、人工智能、自動化技術等現代資訊技術蓬勃發展與逐步成熟的背景下,數字化正成為農牧行業供給側結構性變革與高質量發展的核心驅動因素。因此,改造和提升傳統農牧業、開拓創新現代智慧農牧業,加快推進農牧業的現代化、資訊化、數字化建設已成為農牧業發展的重要方向。 當下,企業數字化轉型已經 ......

      uj5u.com 2023-04-18 08:05:44 more
    • 2023年農牧行業6大CRM系統、5大場景盤點

      在物聯網、大資料、云計算、人工智能、自動化技術等現代資訊技術蓬勃發展與逐步成熟的背景下,數字化正成為農牧行業供給側結構性變革與高質量發展的核心驅動因素。因此,改造和提升傳統農牧業、開拓創新現代智慧農牧業,加快推進農牧業的現代化、資訊化、數字化建設已成為農牧業發展的重要方向。 當下,企業數字化轉型已經 ......

      uj5u.com 2023-04-18 08:00:18 more
    • 計算機組成原理—存盤器

      計算機組成原理—硬體結構 二、存盤器 1.概述 存盤器是計算機系統中的記憶設備,用來存放程式和資料 1.1存盤器的層次結構 快取-主存層次主要解決CPU和主存速度不匹配的問題,速度接近快取 主存-輔存層次主要解決存盤系統的容量問題,容量接近與價位接近于主存 2.主存盤器 2.1概述 主存與CPU的聯 ......

      uj5u.com 2023-04-17 08:20:31 more
    • 談一談我對協同開發的一些認識

      如今各互聯網公司普通都使用敏捷開發,采用小步快跑的形式來進行專案開發。如果是小專案或者小需求,那一個開發可能就搞定了。但對于電商等復雜的系統,其功能多,結構復雜,一個人肯定是搞不定的,所以都是很多人來共同開發維護。以我曾經待過的商城團隊為例,光是后端開發就有七十多人。 為了更好地開發這類大型系統,往 ......

      uj5u.com 2023-04-17 08:18:55 more
    • 專案管理PRINCE2核心知識點整理

      PRINCE2,即 PRoject IN Controlled Environment(受控環境中的專案)是一種結構化的專案管理方法論,由英國政府內閣商務部(OGC)推出,是英國專案管理標準。
      PRINCE2 作為一種開放的方法論,是一套結構化的專案管理流程,描述了如何以一種邏輯性的、有組織的方法,... ......

      uj5u.com 2023-04-17 08:18:51 more
    • 談一談我對協同開發的一些認識

      如今各互聯網公司普通都使用敏捷開發,采用小步快跑的形式來進行專案開發。如果是小專案或者小需求,那一個開發可能就搞定了。但對于電商等復雜的系統,其功能多,結構復雜,一個人肯定是搞不定的,所以都是很多人來共同開發維護。以我曾經待過的商城團隊為例,光是后端開發就有七十多人。 為了更好地開發這類大型系統,往 ......

      uj5u.com 2023-04-17 08:18:00 more
    • 專案管理PRINCE2核心知識點整理

      PRINCE2,即 PRoject IN Controlled Environment(受控環境中的專案)是一種結構化的專案管理方法論,由英國政府內閣商務部(OGC)推出,是英國專案管理標準。
      PRINCE2 作為一種開放的方法論,是一套結構化的專案管理流程,描述了如何以一種邏輯性的、有組織的方法,... ......

      uj5u.com 2023-04-17 08:17:55 more
    • 計算機組成原理—存盤器

      計算機組成原理—硬體結構 二、存盤器 1.概述 存盤器是計算機系統中的記憶設備,用來存放程式和資料 1.1存盤器的層次結構 快取-主存層次主要解決CPU和主存速度不匹配的問題,速度接近快取 主存-輔存層次主要解決存盤系統的容量問題,容量接近與價位接近于主存 2.主存盤器 2.1概述 主存與CPU的聯 ......

      uj5u.com 2023-04-17 08:12:06 more