我有一個記錄串列,其中許多是重復的,我正在嘗試使用 SQLAlchemy Core 1.4.35 和 Python 3.9.2 將“UPSERT”“UPSERT”到 SQLite3 資料庫中。
如果模塊嘗試插入表中不存在的記錄,我希望插入成功,并且我想回傳為該記錄創建的主鍵。
但是,如果模塊嘗試插入已經存在的記錄,那么我希望 INSERT 失敗。我想選擇并回傳現有記錄的 ID。
當我運行下面的代碼時遇到兩個問題,我無法弄清楚是什么導致了這兩個問題。
如果我洗掉該行下縮進的所有代碼
except IntegrityError as err,則模塊成功完成。但是資料庫表有一堆重復的記錄——盡管表上有 UNIQUE 約束。如果我保留該代碼,并在處理例外時嘗試選擇現有記錄的 ID - 我會收到以下錯誤。
是什么導致了這兩個問題?
我可以做些什么來防止重復記錄被插入,并且還要選擇現有記錄的 ID?
我還在學習 SQL,所以我可能在這里遺漏了一些明顯的東西。
可能相關的更新
我檢查的所有重復記錄都有一個共同點,那就是其中一個欄位包含 NULL。SQLite 在檢查 UNIQUE 表約束時是否無法識別 NULL?
輸入資料和表格內容
下面鏈接的電子表格包含我在此測驗中使用的示例資料,以及運行以下代碼后的 DB 表的內容。以黃色突出顯示的記錄是表中的重復記錄。
https://docs.google.com/spreadsheets/d/1dS75vmzzNAqGShqakRwN8FUkL7RugiTo/edit?usp=sharing&ouid=102857691407472073826&rtpof=true&sd=true
測驗資料庫
BEGIN TRANSACTION;
CREATE TABLE IF NOT EXISTS "Containers" (
"ContainerID" INTEGER NOT NULL UNIQUE,
"ContainerName" TEXT,
"Qty" INTEGER,
"Size" INTEGER,
"Unit" TEXT,
"ClientOwned" TEXT,
"VendorOwned" TEXT,
PRIMARY KEY("ContainerID"),
UNIQUE("ContainerName","Qty","Size","Unit","ClientOwned","VendorOwned")
);
COMMIT;
測驗 Python 模塊
# Built-In
import datetime
import sys
from timeit import default_timer as timer
# Third-Party
import pandas as pd
import pprint
import sqlalchemy
from loguru import logger
from sqlalchemy import create_engine, MetaData, Table, select
from sqlalchemy.exc import IntegrityError
db_file = 'upsert_bug_test.db'
data_file = 'container_sample_data.xlsx'
pp = pprint.PrettyPrinter(indent=3)
class TestDatabase:
def __init__(self):
self.engine = create_engine(f'sqlite:///{db_file}')
self.conn = self.engine.connect()
self.metadata_obj = MetaData()
self._reflect_db()
def _reflect_db(self):
"""Get the schema for the tables and views that already exist in the database."""
self.db_containers = Table('Containers', self.metadata_obj, autoload_with=self.engine)
# Dictionary to map the fields in the data file to fields in the database
# Mapping key = database table
# Mapping value = list of field dictionaries
# Field key = name of field in report
# Field value = name of field in database table
mapping = {
'Containers': [
{'Container': 'ContainerName'},
{'Qty': 'Qty'},
{'Size': 'Size'},
{'Unit': 'Unit'},
{'Owned by Client': 'ClientOwned'},
{'Owned by Vendor': 'VendorOwned'},
],
}
def build_insert_dict(record: dict, section_mapping: list):
'''Using the mapping dictionary, build and return a dictionary of parameters that will be
passed to insert() and select().'''
return_data = {}
for pair in section_mapping:
for report_field in pair:
report_value = record.get(report_field)
db_field = pair.get(report_field)
return_data.update({db_field: report_value})
return return_data
if __name__ == '__main__':
# Start the timer
start = timer()
# Create an instance of the database object
db = TestDatabase()
# Load the data from the sample file
logger.info('Loading records from sample file...')
data_df = pd.read_excel(pd.ExcelFile(data_file), 'Sheet1')
new_data_df = data_df.astype(object).where(data_df.notna(), None)
data = new_data_df.to_dict('records')
# Insert each record from the data file into the database. If a record already exists,
# get the ID for that record and return it.
for record in data:
container_data = build_insert_dict(record, mapping.get('Containers'))
try:
ins = db.db_containers.insert().values(**container_data)
res = db.conn.execute(ins)
container_id = res.inserted_primary_key[0]
except IntegrityError as err:
logger.info(f'Record already exists in the database. Attempting to find the ID for that record...')
sel = select(db.db_containers.c.ContainerID).filter_by(**container_data)
res = db.conn.execute(sel)
old_container_id = res.fetchone()
if not old_container_id:
logger.error('Failed to get previously inserted container id! Investigate')
exit(-1)
else:
container_id = old_container_id[0]
logger.info(f'Found ID {container_id}')
stop = timer()
done = stop - start
migration_time = datetime.timedelta(seconds=done)
logger.success(f'Inserted all unique records into the database in {migration_time}')
選擇錯誤
(oaktier-env) PS C:\Oaktier\oaktier\tasks> python .\upsert_test.py
2022-04-21 11:27:26.670 | INFO | __main__:<module>:76 - Loading records from sample file...
2022-04-21 11:27:27.205 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.210 | INFO | __main__:<module>:100 - Found ID 1
2022-04-21 11:27:27.211 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.215 | INFO | __main__:<module>:100 - Found ID 2
2022-04-21 11:27:27.216 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
Traceback (most recent call last):
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlite3.IntegrityError: UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 88, in <module>
res = db.conn.execute(ins)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
return connection._execute_clauseelement(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
ret = self._execute_context(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
self._handle_dbapi_exception(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
util.raise_(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
raise exception
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned
[SQL: INSERT INTO "Containers" ("ContainerName", "Qty", "Size", "Unit", "ClientOwned", "VendorOwned") VALUES (?, ?, ?, ?, ?, ?)]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlite3.InterfaceError: Error binding parameter 0 - probably unsupported type.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 93, in <module>
res = db.conn.execute(sel)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
return connection._execute_clauseelement(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
ret = self._execute_context(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
self._handle_dbapi_exception(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
util.raise_(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
raise exception
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.InterfaceError: (sqlite3.InterfaceError) Error binding parameter 0 - probably unsupported type.
[SQL: SELECT "Containers"."ContainerID"
FROM "Containers"
WHERE "Containers"."ContainerName" = ? AND "Containers"."Qty" = ? AND "Containers"."Size" = ? AND "Containers"."Unit" = ? AND "Containers"."ClientOwned" = ? AND "Containers"."VendorOwned" = ?]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/rvf5)
uj5u.com熱心網友回復:
根據關于UNIQUE約束的 SQLite 檔案:
出于 UNIQUE 約束的目的,NULL 值被認為不同于所有其他值,包括其他 NULL。
因此,NULL 不會被忽略,而是被視為唯一物體。事實上,即使None是 PythonNaN中的 s 和 Pandas/Numpy 中的 s 也不相等。因此,對于 SQLite,帶有NULLs 的突出顯示的行不是重復的行,而是實際上唯一的行。對于空字串,可能存在空格問題。
作為一種解決方法,請考慮填寫非 None 占位符或空字串以NULL在插入??查詢后替換。避免將整個資料框轉換為相同型別,但根據需要顯式處理每一列。
with pd.ExcelFile(data_file) as wb:
raw_df = pd.read_excel(wb, 'Sheet1')
# CLEAN EACH STRING COLUMN
clean_df = (
raw_df.assign(
ContainerName = lambda x: x["ContainerName"].astype("string").str.strip(),
Unit = lambda x: x["Unit"].astype("string").str.strip(),
ClientOwned = lambda x: x["ClientOwned"].astype("string").str.strip(),
VendorOwned = lambda x: x["VendorOwned"].astype("string").str.strip()
).where(data_df.notna(), "")
)
db_data = clean_df.to_dict('records')
下面轉換為 SQLAlchemy 語法:
UPDATE Containers SET ContainerName = NULL WHERE ContainerName = '';
UPDATE Containers SET Unit = NULL WHERE Unit = '';
UPDATE Containers SET ClientOwned = NULL WHERE ClientOwned = '';
UPDATE Containers SET VendorOwned = NULL WHERE VendorOwned = '';
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/463250.html
標籤:Python sql python-3.x sqlite sqlalchemy
上一篇:js數量限制
