Browse Source

add news

master
SunFree 11 months ago
parent
commit
6a3a51d4c8
  1. BIN
      __pycache__/dependencies.cpython-310.pyc
  2. BIN
      __pycache__/main.cpython-310.pyc
  3. 14
      dependencies.py
  4. BIN
      internal/__pycache__/__init__.cpython-310.pyc
  5. BIN
      internal/__pycache__/database.cpython-310.pyc
  6. BIN
      internal/__pycache__/models.cpython-310.pyc
  7. 27
      internal/models.py
  8. 22
      main.py

BIN
__pycache__/dependencies.cpython-310.pyc

BIN
__pycache__/main.cpython-310.pyc

14
dependencies.py

@ -22,6 +22,13 @@ def create_access_token(data: dict, expires_delta: timedelta):
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 从数据库获取信息
def get_user(username: str):
query = "SELECT * FROM users WHERE username = %s"
result = execute_query(query, (username,), fetchall=False)
if result:
return UserInDB(**result)
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@ -55,13 +62,6 @@ def verify_password(plain_password, hashed_password):
def get_password_hash(password):
return pwd_context.hash(password)
# 从数据库获取信息
def get_user(username: str):
query = "SELECT * FROM users WHERE username = %s"
result = execute_query(query, (username,), fetchall=False)
if result:
return UserInDB(**result)
# 验证用户密码
def authenticate_user(username: str, password: str):
user = get_user(username)

BIN
internal/__pycache__/__init__.cpython-310.pyc

BIN
internal/__pycache__/database.cpython-310.pyc

BIN
internal/__pycache__/models.cpython-310.pyc

27
internal/models.py

@ -1,4 +1,5 @@
from pydantic import BaseModel
from pydantic import BaseModel,Field
from typing import Annotated
# Token相关的模型
class Token(BaseModel):
@ -10,10 +11,24 @@ class TokenData(BaseModel):
# User相关的模型
class User(BaseModel):
username: str
email: str = None
full_name: str = None
disabled: bool = None
username: Annotated[str,Field(
title="用户",
examples=["admin"],
pattern=r'^.{4,20}$',
description="允许4-20的字符"
)]
email: Annotated[str,Field(
examples=["examples@example.com"],
max_length=50,
pattern=r'^[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)+$',
description="邮箱需要满足正则标准"
)]
full_name: Annotated[str,Field(
examples=["admin"],
pattern=r'^.{2,20}$',
description="允许2-20个字符"
)]
disabled: bool = True
class UserInDB(User):
hashed_password: str
hashed_password: str = None

22
main.py

@ -3,8 +3,17 @@ from fastapi.security import OAuth2PasswordRequestForm
from fastapi import Depends, FastAPI, HTTPException, status
from dependencies import *
from internal.models import Token
from fastapi.middleware.cors import CORSMiddleware
app=FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=['http://localhost:5173'], # 允许的源
allow_credentials=True,
allow_methods=['GET', 'POST'], # 允许的请求方法
allow_headers=['Authorization', 'Content-Type'], # 允许的请求头
)
# 用户登录
@app.post("/token", response_model=Token)
async def login_for_access_token(
@ -24,21 +33,22 @@ async def login_for_access_token(
return {"access_token": access_token, "token_type": "bearer"}
# 注册新用户
@app.post("/register/", response_model=UserInDB)
async def register_user(user: UserInDB):
@app.post("/register/")
async def register_user(user: UserInDB, _: User = Depends(get_current_active_user)):
# 检查用户名是否已经存在
existing_user = get_user(user.username)
if existing_user:
raise HTTPException(status_code=400, detail="Username already registered")
if not user.hashed_password:
raise HTTPException(status_code=400,detail="password cannot be empty")
# 创建新用户并保存到数据库
hashed_password = get_password_hash(user.hashed_password)
insert_query = "INSERT INTO users (username, email, full_name, hashed_password) VALUES (%s, %s, %s, %s)"
user_data = (user.username, user.email, user.full_name, hashed_password)
insert_query = "INSERT INTO users (username, email, full_name, hashed_password, disabled) VALUES (%s, %s, %s, %s, %s)"
user_data = (user.username, user.email, user.full_name, hashed_password, user.disabled)
execute_query(insert_query, user_data)
# 返回创建的用户信息
return user
return {"status":status.HTTP_200_OK,"message":"users create successfully!"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):

Loading…
Cancel
Save