我們之前登錄認證的一些內容都直接寫入到代碼中,我們現在統一的給放到config檔案中,
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30
我們需要把uer.py修改
from config import * #對應的方法 def create_access_token(data: dict): to_encode = data.copy() encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_cure_user(request: Request, token: Optional[str] = Header(...)) -> UserBase: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="驗證失敗" ) credentials_FOR_exception = HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="用戶未登錄或者登陸token已經失效" ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception useris = await request.app.state.redis.get(username) if not useris and useris!=token: raise credentials_FOR_exception user = UserBase(email=username) return user except JWTError: raise credentials_exception
在存盤redis的地方也需要修改
request.app.state.redis.set(user.email, token,
expire=ACCESS_TOKEN_EXPIRE_MINUTES * 60)
我們把main.py的redis相關的也放在配置里
#config.py redishost='127.0.0.1' redisport='6379' redisdb='0'
main.py修改
async def get_redis_pool() -> Redis: redis = await create_redis_pool(f"redis://:@"+redishost+":"+redisport+"/"+redisport+"?encoding=utf-8") return redis
調整之前的websocket和file相關的代碼,統一放在routers檔案下,創建
websoocket.py,
from fastapi import APIRouter,WebSocket,Depends,WebSocketDisconnect from typing import List, Dict from routers.user import get_cure_user socketRouter = APIRouter() class ConnectionManager: def __init__(self): # 存放**的鏈接 self.active_connections: List[Dict[str, WebSocket]] = [] async def connect(self, user: str, ws: WebSocket): # 鏈接 await ws.accept() self.active_connections.append({"user": user, "ws": ws}) def disconnect(self, user: str, ws: WebSocket): # 關閉時 移除ws物件 self.active_connections.remove({"user": user, "ws": ws}) async def send_other_message_json(self, message: dict, user: str): # 發送個人訊息 for connection in self.active_connections: if connection["user"] == user: await connection['ws'].send_json(message) async def broadcast_json(self, data: dict): # 廣播訊息 for connection in self.active_connections: await connection['ws'].send_json(data) @staticmethod async def send_personal_message(message: str, ws: WebSocket): # 發送個人訊息 await ws.send_text(message) async def send_other_message(self, message: str, user: str): # 發送個人訊息 for connection in self.active_connections: if connection["user"] == user: await connection['ws'].send_text(message) async def broadcast(self, data: str): # 廣播訊息 for connection in self.active_connections: await connection['ws'].send_text(data) manager = ConnectionManager() @socketRouter.websocket("/ws/{user}/") async def websocket_many_point( websocket: WebSocket, user: str, cookie_or_token: str = Depends(get_cure_user), ): await manager.connect(user, websocket) try: while True: data = await websocket.receive_json() senduser = data['username'] if senduser: await manager.send_other_message_json(data, senduser) else: await manager.broadcast_json(data) except WebSocketDisconnect as e: manager.disconnect(user, websocket)
對于file相關的也放在files檔案下
from fastapi import APIRouter, File, UploadFile from typing import List fileRouter = APIRouter() @fileRouter.post("/uploadfile/") async def upload_file(fileS: List[UploadFile] = File(...)): return {"filenames": [file.filename for file in fileS]}
然后我們在main.py優化
from routers.websoocket import socketRouter from routers.file import fileRouter #我們在最后注冊這個router app.include_router(socketRouter, prefix="/socket", tags=['socket']) app.include_router(fileRouter, prefix="/file", tags=['file'])
調整完整后,我們的介面更加清晰了,我們啟動下,看我們先有的介面,

代碼存盤
https://gitee.com/liwanlei/fastapistuday
文章首發在公眾號,歡迎關注,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/338952.html
標籤:其他
