我有兩個資料框,left和right. 后者right是 的子集left,left包含所有行right。我想通過做一個簡單的“left_anti”連接right來洗掉多余的行left。
我發現如果我使用left右側的過濾版本,連接將不起作用。只有當我從頭開始重建正確的資料框時它才有效。
- 這里發生了什么?
- 是否有不涉及重新創建正確資料框的解決方法?
from pyspark.sql import Row, SparkSession
import pyspark.sql.types as t
schema = t.StructType(
[
t.StructField("street_number", t.IntegerType()),
t.StructField("street_name", t.StringType()),
t.StructField("lower_street_number", t.IntegerType()),
t.StructField("upper_street_number", t.IntegerType()),
]
)
data = [
# Row that conflicts w/ range row, and should be removed
Row(
street_number=123,
street_name="Main St",
lower_street_number=None,
upper_street_number=None,
),
# Range row
Row(
street_number=None,
street_name="Main St",
lower_street_number=120,
upper_street_number=130,
),
]
def join_files(left_side, right_side):
join_condition = [
(
(right_side.lower_street_number.isNotNull())
& (right_side.upper_street_number.isNotNull())
& (right_side.lower_street_number <= left_side.street_number)
& (right_side.upper_street_number >= left_side.street_number)
)
]
return left_side.join(right_side, join_condition, "left_anti")
spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema)
right_fail = left.filter("lower_street_number IS NOT NULL")
result = join_files(left, right_fail)
result.count() # Returns 2 - both rows still present
right_success = spark.createDataFrame([data[1]], schema)
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected
uj5u.com熱心網友回復:
您可以為 DF 取別名:
import pyspark.sql.functions as F
def join_files(left_side, right_side):
join_condition = [
(
(F.col("right_side.lower_street_number").isNotNull())
& (F.col("right_side.upper_street_number").isNotNull())
& (F.col("right_side.lower_street_number") <= F.col("left_side.street_number"))
& (F.col("right_side.upper_street_number") >= F.col("left_side.street_number"))
)
]
return left_side.join(right_side, join_condition, "left_anti")
spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema).alias("left_side")
right_fail = left.filter("lower_street_number IS NOT NULL").alias("right_side")
result = join_files(left, right_fail)
print(result.count()) # Returns 2 - both rows still present
right_success = spark.createDataFrame([data[1]], schema).alias("right_side")
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected
不知道您使用的是哪個 pyspark 版本,但是pyspark==3.0.1我收到以下解釋性錯誤。
AnalysisException: Column lower_street_number#522, upper_street_number#523, lower_street_number#522, upper_street_number#523 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.;
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/328335.html
標籤:加入 火花 apache-spark-sql 反连接
