我想計算在非常大的資料幀上滿足條件的行數,可以通過
df.filter(col("value") >= thresh).count()
我想知道 range 中每個閾值的結果[1, 10]。列舉每個閾值然后執行此操作將掃描資料幀 10 次。它很慢。
如果我可以通過只掃描一次df來實作它?
uj5u.com熱心網友回復:
為每個閾值創建一個指標列,然后求和:
import random
import pyspark.sql.functions as F
from pyspark.sql import Row
df = spark.createDataFrame([Row(value=random.randint(0,10)) for _ in range(1_000_000)])
df.select([
(F.col("value") >= thresh)
.cast("int")
.alias(f"ind_{thresh}")
for thresh in range(1,11)
]).groupBy().sum().show()
# ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- -----------
# |sum(ind_1)|sum(ind_2)|sum(ind_3)|sum(ind_4)|sum(ind_5)|sum(ind_6)|sum(ind_7)|sum(ind_8)|sum(ind_9)|sum(ind_10)|
# ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- -----------
# | 908971| 818171| 727240| 636334| 545463| 454279| 363143| 272460| 181729| 90965|
# ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- -----------
uj5u.com熱心網友回復:
使用帶有when運算式的條件聚合應該可以完成這項作業。
這是一個例子:
from pyspark.sql import functions as F
df = spark.createDataFrame([(1,), (2,), (3,), (4,), (4,), (6,), (7,)], ["value"])
count_expr = [
F.count(F.when(F.col("value") >= th, 1)).alias(f"gte_{th}")
for th in range(1, 11)
]
df.select(*count_expr).show()
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ------
#|gte_1|gte_2|gte_3|gte_4|gte_5|gte_6|gte_7|gte_8|gte_9|gte_10|
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ------
#| 7| 6| 5| 4| 2| 2| 1| 0| 0| 0|
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ------
uj5u.com熱心網友回復:
使用udf來自的用戶定義函式pyspark.sql.functions:
import pandas as pd
import numpy as np
df = pd.DataFrame(np.random.randint(0,100, size=(20)), columns=['val'])
thres = [90, 80, 30] # these are the thresholds
thres.sort(reverse=True) # list needs to be sorted
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
spark = SparkSession.builder \
.master("local[2]") \
.appName("myApp") \
.getOrCreate()
sparkDF = spark.createDataFrame(df)
myUdf = udf(lambda x: 0 if x>thres[0] else 1 if x>thres[1] else 2 if x>thres[2] else 3)
sparkDF = sparkDF.withColumn("rank", myUdf(sparkDF.val))
sparkDF.show()
# --- ----
# |val|rank|
# --- ----
# | 28| 3|
# | 54| 2|
# | 19| 3|
# | 4| 3|
# | 74| 2|
# | 62| 2|
# | 95| 0|
# | 19| 3|
# | 55| 2|
# | 62| 2|
# | 33| 2|
# | 93| 0|
# | 81| 1|
# | 41| 2|
# | 80| 2|
# | 53| 2|
# | 14| 3|
# | 16| 3|
# | 30| 3|
# | 77| 2|
# --- ----
sparkDF.groupby(['rank']).count().show()
# Out:
# ---- -----
# |rank|count|
# ---- -----
# | 3| 7|
# | 0| 2|
# | 1| 1|
# | 2| 10|
# ---- -----
i如果一個值嚴格大于thres[i]但小于或等于,則該值獲得排名thres[i-1]。這應該最大限度地減少比較次數。
因為thres = [90, 80, 30]我們有等級 0-> [max, 90[, 1-> [90, 80[, 2-> [80, 30[, 3->[30, min]
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/453286.html
標籤:Python 阿帕奇火花 pyspark apache-spark-sql
上一篇:根據條件應用函式的最有效方法
