寫在前面
在上一章節參悟python元類(又稱metaclass)系列實戰(四)完成了
Mysql類, 用來連接資料庫以及執行sql陳述句;
繼續豐富系列實戰(三)的users類, 忘記的小伙伴請戳此處;
本章內容為該系列的終結篇, 感謝大家從一而終;
有誤的地方懇請大神指正下,
熱身預備
-
Mysql中, 嚴謹的sql陳述句都長的類似下面的格式
SELECT `uid`, `name` FROM `users`; -- 關鍵字都帶"反引號" -
Python中如何把一個
list中的元素都加上反引號呢?fields = ['uid', 'name'] new_fields = list(map(lambda a: f'`{a}`', fields)) # ['`uid`', '`name`'] -
如何把
fields帶入到sql中呢?f"SELECT {', '.join(new_fields)} FROM `users`" -
我們上一章定義
Mysql.execute時, 對傳入的sql字串做了特殊要求await cur.execute(sql.replace('?', '%s'), args) # 即在寫sql時希望用 ? 代替占位符 %s, 執行時再通過 replace 替換回來 # 該要求用于`INSERT`時, 就得構造類似如下的sql "INSERT INTO `users`(`uid`, 'name') VALUES('?', '?')" # 定義createArgsStr方法, 傳入長度, 生成 ? 占位符 def createArgsStr(num): return ', '.join(['?' for _ in range(num)]) # 那 INSERT陳述句就可以這樣構造 f"INSERT INTO `users`({', '.join(new_fields)}) VALUES({createArgsStr(len(new_fields))})" -
熱身完畢, 接下來我們把
insert, select, delete, update的功能加到ORM中
更新元類ModelMetaClass
class ModelMetaClass(type):
def __new__(cls, name, bases, attrs):
if name == 'Model':
# 當出現與'Model'同名的類時, 直接創建這類
return type.__new__(cls, name, bases, attrs)
# 定義表名: 要么在類中定義__table__屬性(目的是"類名可以與表名不相同"), 否則與類名相同
tableName = attrs.get('__table__') or name
print(f'建立映射關系: {name}類 --> {tableName}表')
mappings = Dict() # 存盤column與Field 子類的對應關系, Field在上一章中定義的, 忘了回去翻
fields = [] # 用來存盤除主鍵以外的所有欄位名
primaryKey = None # 用來記錄主鍵欄位的名字, 初始沒有
for k, v in attrs.items():
# 遍歷所有屬性, 即映射表的欄位, 讀不懂請回看第二章 Users 的定義
if isinstance(v, Field): # Field類, 所有欄位型別的父類
print(f'建立映射... column: {k} ==> class: {v}')
mappings[k] = v
if v.primaryKey: # 判斷欄位是否被設定成了主鍵
if primaryKey: # 因為一張表只能有一個主鍵
raise Exception(f'Duplicate primary key for field {k}')
primaryKey = k
else:
fields.append(k)
if not primaryKey: # 這里做了一步強制要求設定主鍵, 你也可以去掉
raise Exception(f'請給表{tableName}設定主鍵')
for k in mappings.keys():
# 洗掉原屬性, 避免實體的屬性遮蓋類的同名屬性, 況且我們已經保存到 mappings 中了, 不怕丟
attrs.pop(k)
# 接下來給本元類(ModelMetaClass)創建的class(如 Model)設定私有屬性
attrs['__mappings__'] = mappings
attrs['__table__'] = tableName
attrs['__primaryKey__'] = primaryKey
attrs['__fields__'] = fields
# 以下 8 行代碼是新增的內容(注意提前定義好 createArgsStr), update 單拎出來, 這里不做處理
escapedFields = list(map(lambda a: f'`{a}`', fields))
attrs['__select__'] = f"SELECT `{primaryKey}`, {', '.join(escapedFields)} FROM `{tableName}`"
attrs['__delete__'] = f"DELETE FROM `{tableName}` WHERE `{primaryKey}`=?"
# 由于 update_at & created_at 可以由mysql自動寫入, 我們寫入時可以不傳
fields.remove('updated_at')
fields.remove('created_at')
attrs['__fields_2__'] = fields
escapedFields_2 = list(map(lambda a: f'`{a}`', fields))
attrs['__insert__'] = f"INSERT INTO `{tableName}` ({', '.join(escapedFields_2)}) VALUES ({createArgsStr(len(escapedFields_2))})"
return type.__new__(cls, name, bases, attrs)
更新Model類, 新增findAll方法
class Model(Dict, metaclass=ModelMetaClass):
"""指定metaclass, 以實作動態定制"""
def __init__(self, **kw):
super().__init__(**kw)
def getValue(self, key):
return getattr(self, key, None)
def getValueOrDefault(self, key):
value = https://www.cnblogs.com/z417/archive/2020/11/17/getattr(self, key, None)
if value is None:
field = self.__mappings__[key] # 從所有column中獲取value
if field.default is not None:
# 如果default指向是方法(如time.time), 則呼叫方法獲取其值; 否則直接賦值
value = field.default() if callable(field.default) else field.default
print(f'using defalut value for {key}: {value}')
setattr(self, key, value) # 其實是調 Dict.__setattr__, 以支持用"."訪問
return value
# 以下內容為更新(新增)
@classmethod
def appendCondition(cls, sql, where, args, **kw):
if where:
sql.append('WHERE')
sql.append(where)
if not args:
args = []
orderBy = kw.get('orderBy')
if orderBy:
sql.append('ORDER BY')
sql.append(orderBy)
limit = kw.get('limit')
if limit:
sql.append('LIMIT')
if isinstance(limit, int):
sql.append('?')
args.append(limit)
elif isinstance(limit, tuple) and len(limit) == 2:
sql.append('?, ?')
args.extend(limit) # 在串列末尾一次性追加另一個序列中的多個值
else:
raise Exception(f'Invalid limit value: {limit}')
return sql
@classmethod
async def findAll(cls, where=None, args=None, **kw):
'''回傳結果集'''
sql = [cls.__select__]
sql = cls.appendCondition(sql, where, args, **kw)
rs = await Mysql.select(' '.join(sql), args)
# cls(**r)是呼叫Model.__init__方法, 將dict轉為Dict, 就可以 . 的方式獲取value
return [cls(**r) for r in rs]
- 挨個特性測驗下
findAll
if __name__ == '__main__':
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(Mysql.createPool())
u = users()
# 測驗查詢整張表
rs = loop.run_until_complete(u.findAll())
for i in rs:
print(i.uid)
# 測驗where條件
rs = loop.run_until_complete(u.findAll(where='is_deleted=?', args=[1]))
print(rs[0].name)
# 測驗排序
rs = loop.run_until_complete(u.findAll(where='is_deleted=?', args=[0], orderBy='uid DESC'))
print(rs)
# 測驗分頁查詢
rs = loop.run_until_complete(u.findAll(where='is_deleted=?', args=[1], orderBy='uid DESC', limit=1))
print(rs[0].email)
再給Model類新增一個findFieldValue方法, 可以查詢指定欄位
@classmethod
async def findFieldValue(cls, selectField, where=None, args=None, **kw):
sql = [f"SELECT {selectField} FROM `{cls.__table__}`"]
sql = cls.appendCondition(sql, where, args, **kw)
rs = await Mysql.select(' '.join(sql), args)
return [cls(**r) for r in rs]
# 測驗代碼
if __name__ == '__main__':
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(Mysql.createPool())
u = users()
rs = loop.run_until_complete(u.findFieldValue(
'uid, name, email', 'is_deleted=?', [1], orderBy='uid DESC', limit=(0, 2)))
print(rs)
再新增findByPrimaryKey, 根據主鍵id查找
@classmethod
async def findByPrimaryKey(cls, pk):
'''find object by primary key.'''
rs = await Mysql.select(f'{cls.__select__} where `{cls.__primaryKey__}`=?', [pk], 1)
if len(rs) == 0:
return {}
return cls(**rs[0])
還有insert, update, delete, logicDelete
async def insert(self):
# 因為insert都是具體的一行資料, 即單個實體, 所以沒有定義成類方法
args = list(map(self.getValueOrDefault, self.__fields_2__)) # __fields_2__不含update_at & created_at
rows = await Mysql.execute(self.__insert__, args)
if rows != 1:
print(f'插入資料失敗, 影響行數: {rows}')
@classmethod
async def update(cls, setField, where, args):
"""這樣可以批量update"""
sql = f"UPDATE `{cls.__table__}` SET {', '.join(map(lambda a: f'`{a}`=?', setField))} WHERE {where}"
rows = await Mysql.execute(sql, args)
if not rows:
print(f'failed to update by {where}, affected rows: {rows}')
@classmethod
async def delete(cls, pk):
"""delete by primaryKey"""
rows = await Mysql.execute(cls.__delete__, [pk])
if rows != 1:
print(
f'failed to delete by {cls.__primaryKey__}, affected rows: {rows}')
@classmethod
async def logicDelete(cls, pk):
await cls.update(setField=['is_deleted'],
where=f'{cls.__primaryKey__}=?', args=[1, pk])
- 測驗代碼
if __name__ == '__main__':
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(Mysql.createPool())
u = users()
import hashlib
passwd = hashlib.md5('123456'.encode(encoding='utf-8')).hexdigest()
test = users(email='[email protected]', passwd=passwd, name='test',
created_by=100, updated_by=100)
loop.run_until_complete(test.insert())
loop.run_until_complete(u.update(setField=['name'], where='uid=106', args=['newTest']))
loop.run_until_complete(u.logicDelete(106))
print(loop.run_until_complete(u.findByPrimaryKey(106)))
總結
-
到這里小伙伴可能有看懵的, 不要緊, 原始碼和sql我都打包好了戳這里下載, 請修改資料庫密碼啥的等配置項
-
其實此ORM缺點很多, 普適性差, 對異步也沒能應用的合適; 只作為理解
metaclass的小練習 -
不要重復造輪子, 推薦小伙伴還是用成熟的ORM框架, 如
SQLAlchemy, aiomysql對它做了支持
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/222460.html
標籤:其他
