楷模:
class Team(Base):
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
players = relationship("Player", backref="team")
class Player(Base):
id = Column(Integer, primary_key=True)
name = Column(String(255), unique=True)
team_id = Column(Integer, ForeignKey("team.id"))
positions = relationship("Position", backref="player")
class Position(Base):
id = Column(Integer(), primary_key=True)
name = Column(String(255), unique=True)
player_id = Column(Integer, ForeignKey("player.id"))
goals = relationship("Goal", backref="position")
class Goal(Base):
id = Column(Integer(), primary_key=True)
distance = Column(Integer)
position_id = Column(Integer, ForeignKey("position.id"))
# Query to get all goals of all players of a team
query = (
select(Team)
.select_from(Player, Position, Goal)
.options(joinedload(Team.players))
.options(
joinedload(
Team.players,
Player.positions,
)
)
.options(
joinedload(
Team.players,
Player.positions,
Position.goals,
)
)
result = await db.execute(query)
response = result.scalar()
來自上述查詢的示例 json 輸出,
{
"id": 3,
"players": [
{
"id": 3,
"positions": []
},
{
"id": 5,
"positions": [
{
"id": 7,
"goals": [
{
"id": 13,
}
]
}
]
},
{
"id": 1,
"positions": [
{
"id": 1,
"goals": [
{
"id": 16,
},
{
"id": 15,
},
{
"id": 14,
}
]
},
{
"id": 2,
"goals": [
{
"id": 4,
}
]
}
]
}
]
}
從示例 json 中,我們可以清楚地看到,對于 id=1 的玩家回傳了多個目標。
現在,我需要將查詢限制為每個玩家的最后一個目標,而不是該玩家的所有目標。
所以我嘗試了,
subquery = (
select(Goal)
.order_by(Goal.id.desc())
.limit(1)
.subquery()
.lateral()
)
query = (
select(Team)
.select_from(Player, Position, Goal)
.options(joinedload(Team.players))
.options(
joinedload(
Team.players,
Player.positions,
)
)
.outerjoin(subquery)
.options(
contains_eager(
Team.players,
Player.positions,
Position.goals,
alias=subquery,
)
)
result = await db.execute(query)
response = result.scalar()
來自上述查詢的示例 json 輸出
{
"id": 3,
"players": [
{
"id": 3,
"positions": []
},
{
"id": 5,
"positions": [
{
"id": 7,
"goals": [
{
"id": 16,
}
]
}
]
},
{
"id": 1,
"positions": [
{
"id": 1,
"goals": [
{
"id": 16,
}
]
},
{
"id": 2,
"goals": [
{
"id": 16,
}
]
}
]
}
]
}
這會獲取任何玩家的最后一個目標,但不會獲取相應玩家的最后一個目標。
過濾器如Goal.position_id == Position.idin outerjoinorsubquery不作業或導致錯誤。
編輯:
看起來我需要populate_existing()select ,但它在新方法中不可用。
編輯2:
為了簡化這些查詢,我還考慮last_goal_id在表中創建列position并更新position表以存盤最后插入的 id goal。2個表中的外鍵是否正常?goal會有position_id而且position會有last_goal_id。
uj5u.com熱心網友回復:
我認為您可以通過使用DISTINCT ON子句洗掉從Goal物件中檢索到的重復行來實作您想要的:
query = (
# Select from Goal and join all the required tables
select(Goal)
.join(Goal.position)
.join(Position.player)
.join(Player.team)
# Remove duplicate rows based on the Player id
.distinct(Player.id)
# Order by `Player.id` (required for distinct) and descending on the goal_id to have the latest added goals (newest) first
.order_by(Player.id, Goal.id.desc())
)
當使用下面的示例日期時,會導致:
{
"id": 3,
"players": [
{
"id": 5,
"positions": [
{
"id": 7,
"goals": [
{
"id": 13,
}
]
}
]
},
{
"id": 1,
"positions": [
{
"id": 1,
"goals": [
{
"id": 16,
}
]
}
]
}
]
}
我認為這里的問題Player是缺少 3,因為他沒有目標。
您還可以使用DISTINCT ON導致以下結果的子句打開查詢:
query = (
# Select all the required tables
select(Team, Player, Position, Goal)
# outerjoin all required tables resulting in a `LEFT OUTER JOIN`
.outerjoin(Team.players)
.outerjoin(Player.positions)
.outerjoin(Position.goals)
# Remove duplicate rows based on the Player id
.distinct(Player.id)
# Order by `Player.id` (required for distinct) and descending on the goal_id to have the latest added goals (newest) first
.order_by(Player.id, Goal.id.desc())
)
正如我們現在開始Team和下降的那樣,還包括Player沒有任何內容,從而產生以下資料:Goal
{
"id": 3,
"players": [
{
"id": 3,
"positions": []
},
{
"id": 5,
"positions": [
{
"id": 7,
"goals": [
{
"id": 13,
}
]
}
]
},
{
"id": 1,
"positions": [
{
"id": 1,
"goals": [
{
"id": 16,
}
]
}
]
}
]
}
樣本資料
{
"id": 3,
"players": [
{
"id": 3,
"positions": []
},
{
"id": 5,
"positions": [
{
"id": 7,
"goals": [
{
"id": 13,
}
]
}
]
},
{
"id": 1,
"positions": [
{
"id": 1,
"goals": [
{
"id": 16,
},
{
"id": 15,
},
{
"id": 14,
}
]
},
{
"id": 2,
"goals": [
{
"id": 4,
}
]
}
]
}
]
}
uj5u.com熱心網友回復:
前言
首先,我認為下面的行不應該是查詢的一部分,因為它將創建一個笛卡爾積。在執行查詢時查找 sqlalchemy 警告:
.select_from(Player, Position, Goal) # DELETE this as it creates cartesian product
其次,您可以稍微簡化原始查詢。下面產生一個與您的問題中的查詢等效的查詢:
# Query to get all goals of all players of a team
query1 = (
select(Team)
# .select_from(Player, Position, Goal) # DELETE this as it creates cartesian product
.options(
joinedload(Team.players)
.joinedload(Player.positions)
.joinedload(Position.goals)
)
)
contains_eager作為替代joinedload
上面的查詢也可以通過 a) 顯式連接相關表和 b) 向 sqlalchemy 提示查詢已經包含所需的關系來實作不同的查詢:
query2 = (
select(Team)
.outerjoin(Team.players)
.outerjoin(Player.positions)
.outerjoin(Position.goals)
.options(contains_eager(
Team.players,
Player.positions,
Position.goals,
))
)
解決方案:
鑒于我們現在可以更明確地了解關系連接條件,實作查詢的一種方法如下:
# subquery to use in the join for getting only the last 1 goal for each Position
subq = (
select(Goal.id.label("last_goal_id"))
.filter(Goal.position_id == Position.id)
.order_by(Goal.id.desc())
.limit(1)
.scalar_subquery()
.correlate(Position)
)
query3 = (
select(Team)
.outerjoin(Team.players)
.outerjoin(Player.positions)
.outerjoin(Goal, Goal.id == subq) # use the JOIN which includes ONLY last Goal, but ...
.options(contains_eager(
Team.players,
Player.positions,
Position.goals, # ... tell sqlalchemy that we actually loaded ALL `.goals`
))
)
它產生以下SQL(sqlite):
SELECT goal.id,
goal.distance,
goal.position_id,
position.id AS id_1,
position.name,
position.player_id,
player.id AS id_2,
player.name AS name_1,
player.team_id,
team.id AS id_3,
team.name AS name_2
FROM team
LEFT OUTER JOIN player ON team.id = player.team_id
LEFT OUTER JOIN position ON player.id = position.player_id
LEFT OUTER JOIN goal ON goal.id =
(SELECT goal.id AS last_goal_id
FROM goal
WHERE goal.position_id = position.id
ORDER BY goal.id DESC
LIMIT 1)
替代解決方案:
您還可以做什么來創建一個hybrid_property指向最后一個的計算列,Goal.id并Position使用它來定義一個僅包含串列中最后一個目標的關系:
class Position(Base):
__tablename__ = "position"
id = Column(Integer(), primary_key=True)
name = Column(String(255), unique=True)
player_id = Column(Integer, ForeignKey("player.id"))
goals = relationship("Goal", backref="position")
@hybrid_property
def last_goal_id(self):
...
@last_goal_id.expression
def last_goal_id(cls):
stmt = (
select(Goal.id.label("last_goal_id"))
# .filter(Goal.position_id == Position.id)
.filter(Goal.position_id == cls.id)
.order_by(Goal.id.desc())
.limit(1)
.scalar_subquery()
.correlate(cls)
# .correlate_except(Goal)
)
return stmt
last_goals = relationship(
lambda: Goal,
primaryjoin=lambda: and_(
Goal.position_id == Position.id,
Goal.id == Position.last_goal_id,
),
viewonly=True,
uselist=True,
)
在這種情況下,您可以使用以下查詢,但您不應導航Position.goals關系,因為它將加載整個串列。json鍵的名稱也會不同。
query1 = (
select(Team)
.options(
joinedload(Team.players)
.joinedload(Player.positions)
.joinedload(Position.last_goals) # use `.last_goals` instead of `.goals`
)
)
注意:我個人最喜歡這個,因為它簡潔明了。
您甚至可以混合使用這些技術來獲得雙方:使用.last_goals關系,但欺騙SA 認為它是滿載的.goals:
query2 = (
select(Team)
.outerjoin(Team.players)
.outerjoin(Player.positions)
.outerjoin(Position.last_goals) # join via `.last_goals` relationship join, but ...
.options(contains_eager(
Team.players,
Player.positions,
Position.goals, # ... tell sqlalchemy that we actually loaded `.goals`
))
)
uj5u.com熱心網友回復:
看看 using RANK,它可能會滿足您的需求,盡管它需要一些查詢/子查詢而不是一個大的joinedload.
我有一個子查詢來對目標日期進行排名并按位置或球員對其進行劃分,并將其過濾到排名等于 1 的位置。這將為您提供每個位置的最新目標,您可以為其創建一個字典。通過您的主查詢,您可以使用位置 ID 使用該字典查找最新目標。
像這樣的東西:
# Rank goals by id and position
subquery = select(
Goal.id.label('goal_id'),
Goal.position_id,
func.rank().over(order_by=Goal.id.desc(), partition_by(Goal.position_id)).label('rank'),
).subquery()
# Create dict of {position_id: latest_goal_id} to use as a lookup
latest_goal_query = (
select(subquery.c.goal_id, subquery.c.position_id)
.where(subquery.c.rank == 1)
)
latest_goal_ids = {pos_id: goal_id for goal_id, pos_id in session.execute(latest_goals).fetchall()}
# Get goal objects from the IDs
goal_query = select(Goal).where(Goal.id.in_(latest_goals.values()))
goals = {goal.id: goal for goal in session.execute(goal_query).scalars()}
# Map position ID to the latest goal object
latest_goals = {pos_id: goals[goal_id] for pos_id, goal_id in latest_goal_ids.items()}
# Read the team and position, and you can use the position_id to get the latest goal
query = ...
順便說一句,我曾經嘗試joinedload過所有事情,直到 SQLAlchemy 的作者告訴我selectinload應該盡可能使用它,因為它只獲取您需要的資料,而連接可能有大量重復資料(例如,如果您的團隊有20名球員,每人5個位置,每人20個進球,那么我認為加入這將導致每個團隊名稱被發送2000次,每個球員名稱被發送100次)。
編輯:column_property只是想到作為替代解決方案。不幸的是,我一直無法弄清楚如何映射實際的目標模型,所以這并不完美,但這里有一個示例,說明如何將最新目標的 ID 直接添加到 Player 模型。
class Player(Base):
...
latest_goal_id = column_property(
select(Goal.id)
.where(Goal.position.has(Position.player_id == id)),
.order_by(Goal.id.desc()).limit(1)
)
從查詢的角度來看,它只是被視為另一列,因此您可以對其進行選擇和過濾。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/480143.html
標籤:Python sql PostgreSQL 加入 sqlalchemy
上一篇:沒有子查詢的左連接
