我不明白為什么這在 PySpark 中不起作用......
我正在嘗試根據列值將資料拆分為approvedDataFrame 和DataFrame。rejected因此rejected,查看其中的language列值approved并僅回傳DataFrame 的列language中不存在的行:approvedlanguage
# Data
columns = ["language", "users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000), ("C ", 10000), ("C#", 32195432), ("C", 238135), ("R", 134315), ("Ruby", 235), ("C", 1000), ("R", 2000), ("Ruby", 4000)]
df = spark.createDataFrame(data, columns)
df.show()
# -------- -----------
# |language|users_count|
# -------- -----------
# | Java| 20000|
# | Python| 100000|
# | Scala| 3000|
# | C | 10000|
# | C#| 32195432|
# | C| 238135|
# | R| 134315|
# | Ruby| 235|
# | C| 1000|
# | R| 2000|
# | Ruby| 4000|
# -------- -----------
# Approved
is_approved = df.users_count > 10000
df_approved = df.filter(is_approved)
df_approved.show()
# -------- -----------
# |language|users_count|
# -------- -----------
# | Java| 20000|
# | Python| 100000|
# | C#| 32195432|
# | C| 238135|
# | R| 134315|
# -------- -----------
# Rejected
is_not_approved = ~df.language.isin(df_approved.language)
df_rejected = df.filter(is_not_approved)
df_rejected.show()
# -------- -----------
# |language|users_count|
# -------- -----------
# -------- -----------
# Also tried
df.filter( ~df.language.contains(df_approved.language) ).show()
# -------- -----------
# |language|users_count|
# -------- -----------
# -------- -----------
所以這沒有任何意義 - 為什么是df_rejected空的?
使用其他方法的預期結果:
SQL:
SELECT * FROM df
WHERE language NOT IN ( SELECT language FROM df_approved )
Python:
data_approved = []
for language, users_count in data:
if users_count > 10000:
data_approved.append((language, users_count))
data_rejected = []
for language, users_count in data:
if language not in [row[0] for row in data_approved]:
data_rejected.append((language, users_count))
print(data_approved)
print(data_rejected)
# [('Java', 20000), ('Python', 100000), ('C#', 32195432), ('C', 238135), ('R', 134315)]
# [('Scala', 3000), ('C ', 10000), ('Ruby', 235), ('Ruby', 4000)]
為什么 PySpark 沒有按預期過濾?
uj5u.com熱心網友回復:
首先,您將要使用 awindow來選擇最大user_count行數language。
from pyspark.sql import Window
columns = ["language", "users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000), ("C ", 10000), ("C#", 32195432), ("C", 238135), ("R", 134315), ("Ruby", 235), ("C", 1000), ("R", 2000), ("Ruby", 4000)]
df = spark.createDataFrame(data, columns)
df = (df.withColumn('max_users_count',
functions.max('users_count')
.over(w))
.where(functions.col('users_count')
== functions.col('max_users_count'))
.drop('max_users_count'))
df.show()
-------- -----------
|language|users_count|
-------- -----------
| C#| 32195432|
| C | 10000|
| C| 238135|
| R| 134315|
| Scala| 3000|
| Ruby| 4000|
| Python| 100000|
| Java| 20000|
-------- -----------
然后您可以根據指定的條件進行過濾。
is_approved = df.users_count > 10000
df_approved = df.filter(is_approved)
df_approved.show()
-------- -----------
|language|users_count|
-------- -----------
| Java| 20000|
| Python| 100000|
| C#| 32195432|
| C| 238135|
| R| 134315|
-------- -----------
然后對于條件的反轉,~在過濾器陳述句中添加符號
is_not_approved = df.filter(~is_approved)
is_not_approved.show()
-------- -----------
|language|users_count|
-------- -----------
| Scala| 3000|
| C | 10000|
| Ruby| 235|
| C| 1000|
| R| 2000|
| Ruby| 4000|
-------- -----------
uj5u.com熱心網友回復:
走 SQL 路線:
columns = ["language", "users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000), ("C ", 10000), ("C#", 32195432), ("C", 238135), ("R", 134315), ("Ruby", 235), ("C", 1000), ("R", 2000), ("Ruby", 4000)]
df = spark.createDataFrame(data, columns)
df_approved = df.filter(df.users_count > 10000)
df.createOrReplaceTempView("df")
df_approved.createOrReplaceTempView("df_approved")
df_not_approved = spark.sql("""
SELECT * FROM df WHERE NOT EXISTS (
SELECT 1 FROM df_approved
WHERE df.language = df_approved.language
)
""")
df_not_approved.show()
# -------- -----------
# |language|users_count|
# -------- -----------
# | C | 10000|
# | Ruby| 235|
# | Ruby| 4000|
# | Scala| 3000|
# -------- -----------
uj5u.com熱心網友回復:
嘗試:
df.subtract(df_approved).show()
-------- -----------
|language|users_count|
-------- -----------
| R| 2000|
| Ruby| 4000|
| Scala| 3000|
| C| 1000|
| C | 10000|
| Ruby| 235|
-------- -----------
UPD:如果您想繼續使用沒有重復的現有代碼,請使用 python 串列而不是Columnspark 物件。
import pyspark.sql.functions as f
column_list= df_approved.select(f.collect_list('language')).first()[0]
# output ['Java', 'Python', 'C#', 'C', 'R']
is_not_approved = ~df.language.isin(column_list)
df_rejected = df.filter(is_not_approved)
df_rejected.show()
-------- -----------
|language|users_count|
-------- -----------
| Scala| 3000|
| C | 10000|
| Ruby| 235|
| Ruby| 4000|
-------- -----------
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/416365.html
標籤:
