from datetime import datetime, timedelta, timezone from jose import JWTError, jwt from passlib.context import CryptContext from fastapi.security import OAuth2PasswordBearer from fastapi import Depends, HTTPException, status from internal.models import TokenData, UserInDB, User from internal.database import execute_query # openssl rand -hex 32 SECRET_KEY = "e86c54c19962d562dab09081e5a6ce0c8ef49ac9a49cdb7135aa670707bbc894" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 240 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/token") # 创建访问令牌 def create_access_token(data: dict, expires_delta: timedelta): to_encode = data.copy() expire = datetime.now(timezone.utc) + expires_delta to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # 从数据库获取信息 def get_user(username: str): query = "SELECT * FROM users WHERE username = %s" result = execute_query(query, (username,), fetchall=False) if result: return UserInDB(**result) async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(username=token_data.username) if user is None: raise credentials_exception return user # 验证用户是否为活跃用户 async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user # 验证密码 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 获取密码哈希 def get_password_hash(password): return pwd_context.hash(password) # 验证用户密码 def authenticate_user(username: str, password: str): user = get_user(username) if not user: return False if not verify_password(password, user.hashed_password): return False return user