diff --git a/dependencies.py b/dependencies.py index 262d5d5..04883b7 100644 --- a/dependencies.py +++ b/dependencies.py @@ -2,8 +2,8 @@ from datetime import datetime, timedelta, timezone from jose import JWTError, jwt from passlib.context import CryptContext from fastapi.security import OAuth2PasswordBearer -from fastapi import Depends,HTTPException,status -from internal.models import TokenData,UserInDB,User +from fastapi import Depends, HTTPException, status +from internal.models import TokenData, UserInDB, User from internal.database import execute_query # openssl rand -hex 32 @@ -15,6 +15,8 @@ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/token") # 创建访问令牌 + + def create_access_token(data: dict, expires_delta: timedelta): to_encode = data.copy() expire = datetime.now(timezone.utc) + expires_delta @@ -23,12 +25,15 @@ def create_access_token(data: dict, expires_delta: timedelta): return encoded_jwt # 从数据库获取信息 + + def get_user(username: str): query = "SELECT * FROM users WHERE username = %s" result = execute_query(query, (username,), fetchall=False) if result: return UserInDB(**result) + async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -49,20 +54,28 @@ async def get_current_user(token: str = Depends(oauth2_scheme)): return user # 验证用户是否为活跃用户 + + async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user # 验证密码 + + def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 获取密码哈希 + + def get_password_hash(password): return pwd_context.hash(password) # 验证用户密码 + + def authenticate_user(username: str, password: str): user = get_user(username) if not user: diff --git a/main.py b/main.py index 5d72d79..11575f9 100644 --- a/main.py +++ b/main.py @@ -24,51 +24,3 @@ app.add_middleware( allow_methods=['GET', 'POST','DELETE','PUT'], allow_headers=['*'], ) - -# # 用户登录 -# @app.post("/token", response_model=Token) -# async def login_for_access_token( -# form_data: OAuth2PasswordRequestForm = Depends(), -# ) -> Token: -# user = authenticate_user(form_data.username, form_data.password) -# if not user: -# raise HTTPException( -# status_code=status.HTTP_401_UNAUTHORIZED, -# detail="Incorrect username or password", -# headers={"WWW-Authenticate": "Bearer"}, -# ) -# access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) -# access_token = create_access_token( -# data={"sub": user.username}, expires_delta=access_token_expires -# ) -# return {"access_token": access_token, "token_type": "bearer"} - -# 注册新用户 -@app.post("/register/") -async def register_user(user: UserInDB): - # 检查用户名是否已经存在 - existing_user = get_user(user.username) - if existing_user: - raise HTTPException(status_code=400, detail="Username already registered") - if not user.hashed_password: - raise HTTPException(status_code=400,detail="password cannot be empty") - - # 创建新用户并保存到数据库 - hashed_password = get_password_hash(user.hashed_password) - insert_query = "INSERT INTO users (username, email, full_name, hashed_password, disabled) VALUES (%s, %s, %s, %s, %s)" - user_data = (user.username, user.email, user.full_name, hashed_password, user.disabled) - execute_query(insert_query, user_data) - # 返回创建的用户信息 - return {"status":status.HTTP_200_OK,"message":"users create successfully!"} - - - -@app.get("/users/me/items/") -async def read_own_items(current_user: User = Depends(get_current_active_user)): - return [{"item_id": "Foo", "owner": current_user.username}] - -# @app.get("/list",response_model=list[BlogList]) -# def read_type_all(): -# select_query="SELECT blogname,blogtype,addtime,descr FROM blogs;" -# result=execute_query(select_query,fetchall=True) -# return result \ No newline at end of file diff --git a/routers/blogmanage.py b/routers/blogmanage.py index 9c37fc7..b9856c3 100644 --- a/routers/blogmanage.py +++ b/routers/blogmanage.py @@ -13,6 +13,8 @@ import json router = APIRouter(prefix="/blogs", tags=["博客管理"]) # 获取列表 + + @router.get("/list") async def blog_list(page: int = Query(None), page_size: int = Query(None)): limit_clause = "" diff --git a/routers/usermanage.py b/routers/usermanage.py index 6dbdd7e..d3eec96 100644 --- a/routers/usermanage.py +++ b/routers/usermanage.py @@ -1,17 +1,19 @@ -from fastapi import APIRouter,Depends +from fastapi import APIRouter, Depends from fastapi.security import OAuth2PasswordRequestForm from datetime import timedelta from fastapi.security import OAuth2PasswordRequestForm -from fastapi import Depends, FastAPI, HTTPException, status +from fastapi import Depends, HTTPException, status from dependencies import * from internal.models import Token -from fastapi.middleware.cors import CORSMiddleware -router=APIRouter( + +router = APIRouter( prefix="/users", tags=["用户管理"] ) -# 用户登录 +# 登录获取token + + @router.post("/token", response_model=Token) async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends(), @@ -29,6 +31,36 @@ async def login_for_access_token( ) return {"access_token": access_token, "token_type": "bearer"} +# 获取用户 + + @router.get("/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)): - return current_user \ No newline at end of file + return current_user + +# 注册新用户 + + +@router.post("/register/") +async def register_user(user: UserInDB, _: User = Depends(get_current_active_user)): + # 检查用户名是否已经存在 + existing_user = get_user(user.username) + if existing_user: + raise HTTPException( + status_code=400, detail="Username already registered") + if not user.hashed_password: + raise HTTPException(status_code=400, detail="password cannot be empty") + + # 创建新用户并保存到数据库 + hashed_password = get_password_hash(user.hashed_password) + insert_query = "INSERT INTO users (username, email, full_name, hashed_password, disabled) VALUES (%s, %s, %s, %s, %s)" + user_data = (user.username, user.email, user.full_name, + hashed_password, user.disabled) + execute_query(insert_query, user_data) + # 返回创建的用户信息 + return {"status": status.HTTP_200_OK, "message": "users create successfully!"} + + +@router.get("/me/items/") +async def read_own_items(current_user: User = Depends(get_current_active_user)): + return [{"item_id": "Foo", "owner": current_user.username}]