Browse Source

add all

master
SunFree 11 months ago
commit
a38dc6b093
  1. 0
      __init__.py
  2. BIN
      __pycache__/dependencies.cpython-310.pyc
  3. BIN
      __pycache__/main.cpython-310.pyc
  4. 74
      dependencies.py
  5. 0
      internal/__init__.py
  6. BIN
      internal/__pycache__/__init__.cpython-310.pyc
  7. BIN
      internal/__pycache__/database.cpython-310.pyc
  8. BIN
      internal/__pycache__/schemas.cpython-310.pyc
  9. 27
      internal/database.py
  10. 11
      internal/schemas.py
  11. 24
      main.py
  12. 0
      routers/__init__.py

0
__init__.py

BIN
__pycache__/dependencies.cpython-310.pyc

BIN
__pycache__/main.cpython-310.pyc

74
dependencies.py

@ -0,0 +1,74 @@
from datetime import datetime, timedelta
from fastapi import HTTPException, Depends, status
from jose import JWTError, jwt
from passlib.context import CryptContext
from internal.database import execute_query, create_connection
# 设置密码上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 随机秘钥
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# JWT签名算法变量
ALGORITHM = "HS256"
# 设置令牌过期时间
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 校验密码方法
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# 将密码进行hash
def get_password_hash(password):
return pwd_context.hash(password)
# 认证用户
def authenticate_user(username: str, password: str):
query = "SELECT * FROM users WHERE username = %s"
user_data = execute_query(query, (username,))
if not user_data:
return None
stored_password = user_data["password"]
if not verify_password(password, stored_password):
return None
return user_data
# 创建访问令牌
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 获取当前用户
async def get_current_user(token: str = Depends(create_connection)):
# 首先定义一个 HTTPException 用于处理认证失败的情况
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 解码 JWT 令牌,验证签名和有效期
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
# 如果 JWT 解码失败,则返回认证异常
raise credentials_exception
# 根据用户名从数据库中获取用户数据
user = authenticate_user(username)
if user is None:
raise credentials_exception
# 返回获取到的用户数据
return user

0
internal/__init__.py

BIN
internal/__pycache__/__init__.cpython-310.pyc

BIN
internal/__pycache__/database.cpython-310.pyc

BIN
internal/__pycache__/schemas.cpython-310.pyc

27
internal/database.py

@ -0,0 +1,27 @@
import pymysql
DB_CONFIG = {
"host": "111.229.38.129",
"user": "root",
"password": "zl981023",
"database": "blogapi",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
# 创建数据库连接
def create_connection():
return pymysql.connect(**DB_CONFIG)
# 执行 SQL 查询
def execute_query(query, params=None, fetchall=False):
conn = create_connection()
with conn.cursor() as cursor:
cursor.execute(query, params)
if fetchall:
result = cursor.fetchall()
else:
result = cursor.fetchone()
conn.commit()
conn.close()
return result

11
internal/schemas.py

@ -0,0 +1,11 @@
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None=None
class User(BaseModel):
username: str

24
main.py

@ -0,0 +1,24 @@
from fastapi import FastAPI,HTTPException
from dependencies import *
from internal.schemas import *
# 初始化 FastAPI 应用
app = FastAPI()
# 数据库连接参数
# 定义登录接口
@app.post("/token", response_model=Token)
async def login_for_access_token(username: str, password: str):
user = authenticate_user(username,password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")

0
routers/__init__.py

Loading…
Cancel
Save