You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

66 lines
2.3 KiB

from fastapi import APIRouter, Depends
from fastapi.security import OAuth2PasswordRequestForm
from datetime import timedelta
from fastapi.security import OAuth2PasswordRequestForm
from fastapi import Depends, HTTPException, status
from dependencies import *
from internal.models import Token
router = APIRouter(
prefix="/users",
tags=["用户管理"]
)
# 登录获取token
@router.post("/token", response_model=Token)
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# 获取用户
@router.get("/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
# 注册新用户
@router.post("/register/")
async def register_user(user: UserInDB, _: User = Depends(get_current_active_user)):
# 检查用户名是否已经存在
existing_user = get_user(user.username)
if existing_user:
raise HTTPException(
status_code=400, detail="Username already registered")
if not user.hashed_password:
raise HTTPException(status_code=400, detail="password cannot be empty")
# 创建新用户并保存到数据库
hashed_password = get_password_hash(user.hashed_password)
insert_query = "INSERT INTO users (username, email, full_name, hashed_password, disabled) VALUES (%s, %s, %s, %s, %s)"
user_data = (user.username, user.email, user.full_name,
hashed_password, user.disabled)
execute_query(insert_query, user_data)
# 返回创建的用户信息
return {"status": status.HTTP_200_OK, "message": "users create successfully!"}
# @router.get("/me/items/")
# async def read_own_items(current_user: User = Depends(get_current_active_user)):
# return [{"item_id": "Foo", "owner": current_user.username}]