設想:
- 票證有
StartDate和EndDate,如果StartDate和EndDate存在,則制作一個新的資料框,如下面的所需輸出所示。
Pyspark 資料集如下所示
#base Schema for Testing purpose
#Dataset
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
#Create User defined Custom Schema using StructType
schema = StructType([ StructField('CaseNumber', StringType(), True)\
,StructField('StartTime', StringType(), True)\
,StructField('EndTime', StringType(), True)])
data = [
{"CaseNumber": 'Ticket1', "StartTime": '1/22/19 10:00', "EndTime": ''},
{"CaseNumber": 'Ticket1', "StartTime": '', "EndTime": '1/23/19 11:00'},
{"CaseNumber": 'Ticket1', "StartTime": '1/25/19 7:00', "EndTime": ''},
{"CaseNumber": 'Ticket1', "StartTime": '1/27/19 3:00', "EndTime": ''},
{"CaseNumber": 'Ticket2', "StartTime": '1/29/19 10:00', "EndTime": ''},
{"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '2/23/19 2:00'},
{"CaseNumber": 'Ticket2', "StartTime": '3/25/19 7:00', "EndTime": ''},
{"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 8:00'},
{"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 10:00'},
{"CaseNumber": 'Ticket3', "StartTime": '4/25/19 1:00', "EndTime": ''}
]
from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
.master('local[1]') \
.appName('SparkByExamples.com') \
.getOrCreate()
# Creation of a dummy dataframe:
df1 = spark.createDataFrame(data,schema=schema)
df1.show()
創建的資料集:
---------- ------------- -------------
|CaseNumber| StartTime| EndTime|
---------- ------------- -------------
| Ticket1|1/22/19 10:00| NaN|
| Ticket1| NaN|1/23/19 11:00|
| Ticket1| 1/25/19 7:00| NaN|
| Ticket1| 1/27/19 3:00| NaN|
| Ticket2|1/29/19 10:00| NaN|
| Ticket2| NaN| 2/23/19 2:00|
| Ticket2| 3/25/19 7:00| NaN|
| Ticket2| NaN| 3/27/19 8:00|
| Ticket2| NaN|3/27/19 10:00|
| Ticket3| 4/25/19 1:00| NaN|
---------- ------------- -------------
所需的輸出應該是:
---------- ------------- -------------
|CaseNumber| StartTime| EndTime|
---------- ------------- -------------
| Ticket1|1/22/19 10:00|1/23/19 11:00|
| Ticket2|1/29/19 10:00| 2/23/19 2:00|
| Ticket2| 3/25/19 7:00| 3/27/19 8:00|
---------- ------------- -------------
應用 Lead 功能查看工endtime單是否存在
from pyspark.sql.window import Window
import pyspark.sql.functions as psf
windowSpec = Window.partitionBy("CaseNumber").orderBy("CaseNumber")
df = df1.withColumn("lead",lead("EndTime",1).over(windowSpec))
df.show()
pysparkdf = df.toPandas()
import pandas as pd
tickets = pysparkdf.groupby('CaseNumber')
def isLeadnull(e):
return e['lead'] != None
my_list = []
for i,ticket in tickets:
for j,e in ticket.iterrows() :
if isLeadnull(e):
my_list.append({'CaseNumber': e['CaseNumber'] ,'Start': e['StartTime'], 'EndTime': e['lead']})
else:
print(e['lead'],'Do nothing as condition not met')
這個函式之后的輸出是:
[{'CaseNumber': 'Ticket1',
'Start': '1/22/19 10:00',
'EndTime': '1/23/19 11:00'},
{'CaseNumber': 'Ticket1', 'Start': 'NaN', 'EndTime': 'NaN'},
{'CaseNumber': 'Ticket1', 'Start': '1/25/19 7:00', 'EndTime': 'NaN'},
{'CaseNumber': 'Ticket2',
'Start': '1/29/19 10:00',
'EndTime': '2/23/19 2:00'},
{'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': 'NaN'},
{'CaseNumber': 'Ticket2', 'Start': '3/25/19 7:00', 'EndTime': '3/27/19 8:00'},
{'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': '3/27/19 10:00'}]
uj5u.com熱心網友回復:
這是一種差距和孤島問題。您可以通過創建列來使用條件累積總和來識別“島group” ,然后您可以按CaseNumber 分組并聚合每個組的group最大值StartTime和最小值:EndTime
from pyspark.sql import functions as F, Window
# first, convert strings to timestamps and replacing empty strings with nulls
df1 = df1.withColumn("StartTime", F.to_timestamp("StartTime", "M/dd/yy H:mm")) \
.withColumn("EndTime", F.to_timestamp("EndTime", "M/dd/yy H:mm")) \
.replace("", None)
w = Window.partitionBy("CaseNumber").orderBy(F.coalesce("StartTime", "EndTime"))
df2 = df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)) \
.groupBy("CaseNumber", "group") \
.agg(F.max("StartTime").alias("StartTime"), F.min("EndTime").alias("EndTime")) \
.filter(F.col("EndTime").isNotNull()) \
.drop("group")
df2.show()
# ---------- ------------------- -------------------
#|CaseNumber| StartTime| EndTime|
# ---------- ------------------- -------------------
#| Ticket1|2019-01-22 10:00:00|2019-01-23 11:00:00|
#| Ticket2|2019-01-29 10:00:00|2019-02-23 02:00:00|
#| Ticket2|2019-03-25 07:00:00|2019-03-27 08:00:00|
# ---------- ------------------- -------------------
為了理解邏輯,您可以在分組之前逐步顯示中間列:
df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)).show()
# ---------- ------------------- ------------------- -----
#|CaseNumber| StartTime| EndTime|group|
# ---------- ------------------- ------------------- -----
#| Ticket1|2019-01-22 10:00:00| null| 1|
#| Ticket1| null|2019-01-23 11:00:00| 1|
#| Ticket1|2019-01-25 07:00:00| null| 2|
#| Ticket1|2019-01-27 03:00:00| null| 3|
#| Ticket2|2019-01-29 10:00:00| null| 1|
#| Ticket2| null|2019-02-23 02:00:00| 1|
#| Ticket2|2019-03-25 07:00:00| null| 2|
#| Ticket2| null|2019-03-27 08:00:00| 2|
#| Ticket2| null|2019-03-27 10:00:00| 2|
#| Ticket3|2019-04-25 01:00:00| null| 1|
# ---------- ------------------- ------------------- -----
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/421906.html
標籤:
