Browse Source

add new

master
SunFree 10 months ago
parent
commit
0fd999b47c
  1. 51
      internal/database.py
  2. 5
      internal/models.py
  3. 107
      routers/blogmanage.py

51
internal/database.py

@ -1,4 +1,5 @@
import pymysql
from fastapi import HTTPException, status
DB_CONFIG = {
"host": "111.229.38.129",
"user": "root",
@ -9,10 +10,14 @@ DB_CONFIG = {
}
# 创建数据库连接
def create_connection():
return pymysql.connect(**DB_CONFIG)
# 执行 SQL 查询
def execute_query(query, params=None, fetchall=False):
conn = create_connection()
with conn.cursor() as cursor:
@ -24,3 +29,49 @@ def execute_query(query, params=None, fetchall=False):
conn.commit()
conn.close()
return result
# 查询单个数据
def fetch_one(query, params=None):
result = execute_query(query, params)
return result if result else None
# 查询所有数据
def fetch_all(query, params=None, fetchall=True):
return execute_query(query, params, fetchall)
# 验证数据是否存在,不存在抛出异常
def raise_if_not_found(item, message="Item not found"):
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=message
)
# 验证唯一,非唯一抛出异常
def raise_if_exists(item, message="Item already exists"):
if item:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 成功响应数据
def response_success(data={}, message="data do success"):
if not data:
return {
"status": status.HTTP_200_OK,
"message": message,
}
else:
return {
"status": status.HTTP_200_OK,
"message": message,
"data": data
}

5
internal/models.py

@ -59,11 +59,6 @@ class Blog(BaseModel):
description="备注允许为空"
)]
class ResponseMessage(BaseModel):
status:int
message:str
data: object
class TypeList(BaseModel):
blogid:Annotated[int,Field(

107
routers/blogmanage.py

@ -1,90 +1,75 @@
from fastapi import Depends, APIRouter, status,Query,Path,HTTPException
from fastapi import Depends, APIRouter, status, Query, Path, HTTPException
from internal.models import *
from dependencies import get_current_active_user, execute_query
from datetime import datetime
from internal.database import fetch_one, fetch_all, execute_query, response_success, raise_if_exists,raise_if_not_found
from dependencies import get_current_active_user
router = APIRouter(
prefix="/blogs",
tags=['博客管理']
)
# 获取列表
@router.get("/list")
async def blog_list():
select_query = "SELECT blogs.id, blogtitle, blogcontent, typename, create_at, update_at, blogs.descr FROM blogs LEFT JOIN `types` ON blogs.typeid = types.id ORDER BY create_at DESC;"
blog_list = fetch_all(select_query)
return response_success(blog_list, "blog get list success")
@router.post('/add', response_model=ResponseMessage)
# 博客新增
@router.post('/add')
async def blog_add(blog: Blog, _: User = Depends(get_current_active_user)):
select_query = "INSERT INTO blogs (blogtitle,blogcontent,typeid,descr) VALUES (%s,%s,%s,%s)"
blog_data = (blog.blogtitle, blog.blogcontent, blog.typeid, blog.descr)
execute_query(select_query, blog_data)
return {
'status': status.HTTP_200_OK,
'message': 'blog create successfully!',
'data': blog
}
select_query = "SELECT * FROM blogs WHERE blogtitle = %s"
existing_blog = fetch_one(select_query, (blog.blogtitle,))
raise_if_exists(existing_blog, "blog is already exists")
insert_query = "INSERT INTO blogs (blogtitle,blogcontent,typeid,descr) VALUES (%s,%s,%s,%s)"
insert_data = (blog.blogtitle, blog.blogcontent, blog.typeid, blog.descr)
execute_query(insert_query, insert_data)
return response_success(data=blog,message="blog create success")
# 博客删除
@router.delete("/delete/{id}")
async def blog_delete(id: str = Path(description="博客id")):
select_query = "SELECT * FROM blogs WHERE id = %s"
existing_blog=fetch_one(select_query,(id,))
raise_if_not_found(existing_blog,"blog not found")
insert_query = "DELETE FROM blogs WHERE id = %s"
execute_query(insert_query, (id,))
return response_success(message="blog delete success")
@router.get("/list")
async def blog_list():
select_query = "SELECT blogs.id, blogtitle, blogcontent, typename, create_at, update_at, blogs.descr FROM blogs LEFT JOIN `types` ON blogs.typeid = types.id ORDER BY create_at DESC;"
blog_list = execute_query(select_query, fetchall=True)
return {
'status': status.HTTP_200_OK,
'message': 'blog search succuessfully!',
'data': blog_list
}
# 博客修改
@router.put("/update/{id}")
async def blog_put(blog: Blog, id: str = Path(description="博客id")):
select_query="SELECT * FROM blogs WHERE id=%s"
existing_blog=fetch_one(select_query,(id,))
raise_if_not_found(existing_blog,"blog not found")
update_query = "UPDATE blogs SET blogtitle=%s,blogcontent=%s,typeid=%s,descr=%s WHERE id=%s;"
update_data = (blog.blogtitle, blog.blogcontent,
blog.typeid, blog.descr, id)
execute_query(update_query, update_data)
return response_success("blog update sucess")
# 博客查询
@router.get("/list/search")
async def blog_list_search(
blogtitle: str= Query(None, description="博客标题"),
blogtitle: str = Query(None, description="博客标题"),
typename: str = Query(None, description="博客类型"),
start_date:str=Query(None,description="开始时间"),
end_date:str=Query(None,description="结束时间"),
):
start_date: str = Query(None, description="开始时间"),
end_date: str = Query(None, description="结束时间"),
):
select_query = "SELECT blogs.id, blogtitle, blogcontent, typename, create_at, update_at, blogs.descr FROM blogs LEFT JOIN `types` ON blogs.typeid = types.id WHERE 1=1"
params = []
if blogtitle:
select_query += " AND blogtitle LIKE %s"
params.append(f"%{blogtitle}%")
if typename:
select_query += " AND typename LIKE %s"
params.append(f"%{typename}%")
if start_date:
select_query += " AND create_at >= %s"
params.append(start_date)
if end_date:
select_query += " AND create_at <= %s"
params.append(end_date)
# 添加排序条件
select_query += " ORDER BY create_at DESC"
# 执行查询
blog_list = execute_query(select_query, params=params, fetchall=True)
return {
'status': status.HTTP_200_OK,
'message': 'blog serach succuessfully!',
'data': blog_list
}
@router.delete("/delete/{id}")
async def blog_delete(id:str=Path(description="博客id")):
select_query="DELETE FROM blogs WHERE id = %s"
delete_data=(id,)
execute_query(select_query,delete_data)
return {
"status": status.HTTP_200_OK,
"message": "Blog deleted successfully!"
}
@router.put("/update/{id}")
async def blog_put(blog:Blog,id:str=Path(description="博客id")):
select_query="UPDATE blogs SET blogtitle=%s,blogcontent=%s,typeid=%s,descr=%s WHERE id=%s;"
update_data=(blog.blogtitle,blog.blogcontent,blog.typeid,blog.descr,id)
execute_query(select_query,update_data)
return{
"status": status.HTTP_200_OK,
"message": "Blog deleted successfully!"
}
blog_list = fetch_all(select_query, params=params, fetchall=True)
return response_success(data=blog_list,message="blog serach succuessfully!")
Loading…
Cancel
Save