假設我有以下 Spark 框架:
-------------------------- -----
|timestamp |name |
-------------------------- -----
|2021-11-06 16:29:00.004204|Alice|
|2021-11-06 16:29:00.004204|Bob |
-------------------------- -----
現在我想根據時間戳提取記錄/行的計數值,具體name == 'Alice'如下:
- 第一個 12 小時作業班次 (00:00-11:59:59)
- 第二個 12 小時作業班 (12:00-23:59:59)
- 第一個 8 小時作業班 (00:00-07:59:59)
- 第二個 8 小時作業班 (08:00-15:59:59)
- 第三個 8 小時作業班 (16:00-23:59:59)
并將結果回傳給 Spark 框架。我嘗試了以下方法未成功:
import time
import datetime as dt
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import dayofmonth, dayofweek
from pyspark.sql.functions import to_date
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType
dict = [{ 'name': 'Alice'},
{ 'name': 'Bob'}]
#df = spark.createDataFrame(dict)
schema = StructType([
StructField("timestamp", TimestampType(), True), \
StructField("date", StringType(), True), \
StructField("name", StringType(), True), \
])
#create a Spark dataframe
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.show(truncate=False)
#Generate data and timestamp
new_df = sdf.withColumn('timestamp', F.current_timestamp().cast("timestamp")) \
.withColumn('date', F.current_date().cast("date")) \
.withColumn('day_of_month', dayofmonth('timestamp')) \
.withColumn('day_of_week', ((dayofweek('timestamp') 5)%7) 1) # start of the week as a Monday = 1 (by default is Sunday = 1)
#.withColumn("No. records in 1st 12-hrs",from_unixtime(unix_timestamp(col("timestamp"),"yyyy-MM-dd HH:mm:ss"),"HH:mm:ss")) \
#.filter(col("timestamp").between("00:00","11:59")) \
#.groupBy("No. records in 1st 12-hrs", "name").sum("Count") \
#.withColumn("No. records in 1st 12-hrs",from_unixtime(unix_timestamp(col("timestamp"),"yyyy-MM-dd HH:mm:ss"),"HH:mm:ss")) \
#.filter(col("timestamp").between("12:00","23:59")) \
#.groupBy("No. records in 1st 12-hrs" , "name").sum("Count") \
#.withColumn('# No. records in 1st 8-hrs shift (00:00-07:59:59)', ????('timestamp')) \
#.withColumn('# No. records in 2nd 8-hrs shift (08:00-15:59:59)', ????('timestamp')) \
#.withColumn('# No. records in 3rd 8-hrs shift (16:00-23:59:59)', ????('timestamp')) \
new_df.show(truncate = False)
所以到目前為止,我的輸出如下,您可以在Colab notebook 中嘗試:
-------------------------- ---------- ----- ------------ -----------
|timestamp |date |name |day_of_month|day_of_week|
-------------------------- ---------- ----- ------------ -----------
|2021-11-06 16:17:43.698815|2021-11-06|Alice|6 |6 |
|2021-11-06 16:17:43.698815|2021-11-06|Bob |6 |6 |
-------------------------- ---------- ----- ------------ -----------
或者,我檢查了一些關于基于 Spark 的資料過濾的帖子 以及一個很酷的答案和按日期分組 spark 資料幀,以應用于name除作業班次范圍之外的特定主火花框架。
請注意,我對使用UDF或破解它不感興趣toPandas()
所以預期的結果應該是特定的name == 'Alice':
-------------------------- -------------------------- -------------------------- -------------------------- --------------------------
|No. records in 1st 12-hrs |No. records in 1st 12-hrs |No. records in 1st 8-hrs |No. records in 2nd 8-hrs |No. records in 3rd 8-hrs |
-------------------------- -------------------------- -------------------------- -------------------------- --------------------------
| | | | | |
-------------------------- -------------------------- -------------------------- -------------------------- --------------------------
uj5u.com熱心網友回復:
您可以通過檢查時間戳的小時部分之間來實作這一點[0, 11],[12, 23]依此類推...
import pyspark.sql.functions as F
new_df = sdf.groupBy("name").agg(
F.sum(F.hour("timestamp").between(0, 11).cast("int")).alias("1st-12-hrs"),
F.sum(F.hour("timestamp").between(12, 23).cast("int")).alias("2nd-12-hrs"),
F.sum(F.hour("timestamp").between(0, 7).cast("int")).alias("1st-8-hrs"),
F.sum(F.hour("timestamp").between(8, 15).cast("int")).alias("2nd-8-hrs"),
F.sum(F.hour("timestamp").between(16, 23).cast("int")).alias("3rd-8-hrs"),
)
new_df.show()
# ----- ---------- ---------- --------- --------- ---------
#|name |1st-12-hrs|2nd-12-hrs|1st-8-hrs|2nd-8-hrs|3rd-8-hrs|
# ----- ---------- ---------- --------- --------- ---------
#|Bob |0 |1 |0 |0 |1 |
#|Alice|0 |1 |0 |0 |1 |
# ----- ---------- ---------- --------- --------- ---------
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/351663.html
標籤:阿帕奇火花 约会时间 火花 apache-spark-sql
下一篇:在串列中創建開始和結束日期元組
