from fastapi import Depends, APIRouter, Query, Path,Request from internal.models import * from datetime import date from internal.database import ( fetch_one, fetch_all, execute_query, response_success, raise_if_exists, raise_if_not_found, ) from dependencies import get_current_active_user import json from limiter_config import limiter router = APIRouter(prefix="/blogs", tags=["博客管理"]) # 获取列表 @router.get("/list") @limiter.limit("5/minute") async def blog_list(request: Request,page: int = Query(None), page_size: int = Query(None)): limit_clause = "" if page is not None and page_size is not None: offset = (page - 1) * page_size limit_clause = f"LIMIT {page_size} OFFSET {offset}" # 列表参数:博客名称、博客内容、创建时间、博客图片、博客查看时间、博客阅读次数、博客字数、类型名称、标签名列表 select_query = f""" SELECT blogs.id,blogs.blogtitle, blogs.blogcontent, blogs.create_at, blogs.imglink, blogs.wordcount, blogtypes.typename,JSON_ARRAYAGG(labels.labelname) AS labelnames FROM blogs LEFT JOIN `blogtypes` ON blogs.typeid = blogtypes.id LEFT JOIN blog_label ON blog_label.blogid = blogs.id LEFT JOIN labels ON blog_label.labelid = labels.id GROUP BY blogs.id, blogs.blogtitle, blogs.blogcontent, blogs.create_at, blogs.imglink, blogs.wordcount, blogtypes.typename ORDER BY create_at DESC {limit_clause}; """ blog_list = fetch_all(select_query) count_query = "SELECT COUNT(*) AS total FROM blogs;" total_records = fetch_one(count_query)["total"] return response_success({ "blogs": blog_list, "total": total_records, }, "blog get list success") @router.get("/list/{id}") @limiter.limit("5/minute") async def blog_one(request:Request,id: int): # 列表参数:博客名称、博客内容、创建时间、博客图片、博客查看时间、博客阅读次数、博客字数、类型名称、标签名列表 select_query = """ SELECT id, blogtitle, blogcontent FROM blogs WHERE id = %s ORDER BY create_at DESC; """ blog_one = fetch_one(select_query, (id,)) return response_success(blog_one, "blog get blog_one success") # 博客新增 @router.post("/add") @limiter.limit("5/minute") async def blog_add(request:Request,blog: Blog, labels: list[Label], _: User = Depends(get_current_active_user)): select_query = "SELECT * FROM blogs WHERE blogtitle = %s" existing_blog = fetch_one(select_query, (blog.blogtitle,)) raise_if_exists(existing_blog, "Blog already exists") insert_query = ( "INSERT INTO blogs (blogtitle, blogcontent,imglink, typeid, descr) VALUES (%s, %s, %s, %s,%s)" ) insert_data = (blog.blogtitle, blog.blogcontent, blog.imglink, blog.typeid, blog.descr) blog_id = execute_query(insert_query, insert_data, lastrowid=True) for label in labels: insert_label_query = "INSERT INTO blog_label (blogid, labelid) VALUES (%s, %s)" execute_query(insert_label_query, (blog_id, label.id)) return {"message": "Blog created successfully", "blog_id": blog_id} # 博客删除 @router.delete("/delete/{id}") @limiter.limit("5/minute") async def blog_delete(request:Request,id: str = Path(description="博客id")): select_query = "SELECT * FROM blogs WHERE id = %s" existing_blog = fetch_one(select_query, (id,)) raise_if_not_found(existing_blog, "blog not found") insert_query = "DELETE FROM blogs WHERE id = %s" execute_query(insert_query, (id,)) return response_success(message="blog delete success") @router.put("/update/{id}") @limiter.limit("5/minute") async def blog_update(request:Request,id: int, blog: Blog, labels: list[Label], _: User = Depends(get_current_active_user)): # 检查要编辑的博客是否存在 select_query = "SELECT * FROM blogs WHERE id = %s" existing_blog = fetch_one(select_query, (id,)) raise_if_not_found(existing_blog, "blog not found") # 更新博客信息 update_query = ( "UPDATE blogs SET blogtitle = %s, blogcontent = %s, imglink = %s, typeid = %s, descr = %s WHERE id = %s" ) update_data = (blog.blogtitle, blog.blogcontent, blog.imglink, blog.typeid, blog.descr, id) execute_query(update_query, update_data) # 首先删除原有的关联标签 delete_query = "DELETE FROM blog_label WHERE blogid = %s" execute_query(delete_query, (id,)) # 然后插入新的关联标签 for label in labels: insert_label_query = "INSERT INTO blog_label (blogid, labelid) VALUES (%s, %s)" execute_query(insert_label_query, (id, label.id)) return response_success("blog update sucess") # 博客模糊查询 @router.get("/search") @limiter.limit("5/minute") async def blog_list_search( request:Request, blogtitle: str = Query(None, description="博客标题"), typename: str = Query(None, description="博客类型"), start_date: str = Query(None, description="开始时间"), end_date: str = Query(None, description="结束时间"), ): select_query = """ SELECT blogs.id, blogtitle, blogcontent,wordcount, typename, create_at, update_at, blogs.descr,JSON_ARRAYAGG(labels.labelname) AS labelnames FROM blogs LEFT JOIN `blogtypes` ON blogs.typeid = blogtypes.id LEFT JOIN blog_label ON blogs.id = blog_label.blogid LEFT JOIN labels ON blog_label.labelid = labels.id WHERE 1=1 """ params = [] if blogtitle: select_query += " AND blogtitle LIKE %s" params.append(f"%{blogtitle}%") if typename: select_query += " AND typename LIKE %s" params.append(f"%{typename}%") if start_date: select_query += " AND create_at >= %s" params.append(start_date) if end_date: select_query += " AND create_at <= %s" params.append(end_date) select_query += "GROUP BY blogs.id, blogs.blogtitle, blogs.blogcontent,blogs.wordcount, blogtypes.typename, blogs.create_at, blogs.update_at, blogs.descr ORDER BY create_at DESC" blog_list = fetch_all(select_query, params=params, fetchall=True) return response_success(data=blog_list, message="blog serach succuessfully!") # 根据id查询博客 @router.get("/search/{id}") @limiter.limit("5/minute") async def get_id_blog(request:Request,id: str = Path(description="博客id")): select_query = """SELECT blogs.id, blogtitle, blogcontent,wordcount, blogs.typeid, blogs.descr,JSON_ARRAYAGG(labels.id) AS labelnames,imglink FROM blogs LEFT JOIN `blogtypes` ON blogs.typeid = blogtypes.id LEFT JOIN blog_label ON blogs.id = blog_label.blogid LEFT JOIN labels ON blog_label.labelid = labels.id WHERE blogs.id = %s GROUP BY blogs.id; """ blog_list = execute_query(select_query, (id,)) if blog_list and isinstance(blog_list, dict): if 'labelnames' in blog_list and isinstance(blog_list['labelnames'], str): try: blog_list['labelnames'] = json.loads(blog_list['labelnames']) except json.JSONDecodeError: blog_list['labelnames'] = [] return response_success(data=blog_list, message="blog search success")