我的資料框名為 df,有 123729 行,如下所示:
--- ------ ------
| HR|maxABP|Second|
--- ------ ------
|110| 128.0| 1|
|110| 127.0| 2|
|111| 127.0| 3|
|111| 127.0| 4|
|111| 126.0| 5|
|111| 127.0| 6|
|109| 126.0| 7|
|111| 126.0| 8|
我需要每 60 行或秒聚合到多個值。每一分鐘,我都想知道最小心率、平均心率、最大心率,以及 maxABP 是否在任何一秒內低于 85。所需的輸出類似于下表,如果 maxABP < 85,則警報為 1,否則為 0。
| 最小心率 | 最大心率 | 平均_HR | 警報 | 分鐘 |
|---|---|---|---|---|
| 70 | 100 | 80 | 1 | 1 |
| 60 | 90 | 75 | 0 | 2 |
我想知道是否可以使用 mapreduce 將每 60 行聚合到這些單個值。我知道有很多錯誤,但也許是這樣的:
def max_HR(df, i):
x = i
y = i 60
return reduce(lambda x, y: max(df[x:y]))
df_maxHR = map(lambda i: max_HR(i))
哪里i應該是df的部分。
uj5u.com熱心網友回復:
我認為groupBy足以獲得所需的結果。
df.show()
--- ------ ------
| HR|maxABP|Second|
--- ------ ------
|110| 128.0| 10|
|110| 127.0| 20|
|111| 127.0| 30|
|111| 127.0| 40|
|111| 126.0| 50|
|111| 127.0| 60|
|109| 126.0| 70|
|111| 126.0| 80|
--- ------ ------
df.withColumn('Minute', f.expr('cast(Second / 60 as int)')) \
.groupBy('Minute').agg( \
f.round(f.min('HR'), 2).alias('Min_HR'), \
f.round(f.max('HR'), 2).alias('Max_HR'), \
f.round(f.avg('HR'), 2).alias('Avg_HR'), \
f.max('maxABP').alias('maxABP')) \
.withColumn('Alarm', f.expr('if(maxABP < 85, 1, 0)')) \
.show()
------ ------ ------ ------ ------ -----
|Minute|Min_HR|Max_HR|Avg_HR|maxABP|Alarm|
------ ------ ------ ------ ------ -----
| 1| 109| 111|110.33| 127.0| 0|
| 0| 110| 111| 110.6| 128.0| 0|
------ ------ ------ ------ ------ -----
uj5u.com熱心網友回復:
示例 DF:
df = spark.createDataFrame(
[
(110, 128.0, 1),(110, 127.0, 2),(111, 127.0, 3),(111, 127.0, 4)
,(111, 126.0, 5),(111, 127.0, 6),(109, 126.0, 7),(111, 126.0, 1001)
,(114, 126.0, 1003),(115, 83.0, 1064),(116, 127.0, 1066)
], ['HR', 'maxABP', 'Second']
)
--- ------ ------
| HR|maxABP|Second|
--- ------ ------
|110| 128.0| 1|
|110| 127.0| 2|
|111| 127.0| 3|
|111| 127.0| 4|
|111| 126.0| 5|
|111| 127.0| 6|
|109| 126.0| 7|
|111| 126.0| 1001|
|114| 126.0| 1003|
|115| 83.0| 1064|
|116| 127.0| 1066|
然后使用視窗函式:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w1 = (Window.partitionBy(F.col('Minute')))
df\
.withColumn('Minute', F.round(F.col('Second')/60,0) 1)\
.withColumn('Min_HR', F.min('HR').over(w1))\
.withColumn('Max_HR', F.max('HR').over(w1))\
.withColumn('Avg_HR', F.round(F.avg('HR').over(w1),0))\
.withColumn('Min_ABP', F.round(F.min('maxABP').over(w1),0))\
.select('Min_HR','Max_HR','Min_ABP','Avg_HR','Minute')\
.dropDuplicates()\
.withColumn('Alarm', F.when(F.col('Min_ABP')<85, 1).otherwise(F.lit('0')))\
.select('Min_HR','Max_HR','Avg_HR','Alarm','Minute')\
.orderBy('Minute')\
.show()
------ ------ ------ ----- ------
|Min_HR|Max_HR|Avg_HR|Alarm|Minute|
------ ------ ------ ----- ------
| 109| 111| 110.0| 0| 1.0|
| 111| 114| 113.0| 0| 18.0|
| 115| 116| 116.0| 1| 19.0|
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/371346.html
上一篇:PythonDataFrame如何獲取所有具有相同名稱的行并將值加在一起
下一篇:如何在r中追加資料之前驗證重復
