You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
112 lines
4.0 KiB
112 lines
4.0 KiB
from fastapi import Depends, APIRouter, status, Query, Path, HTTPException
|
|
from internal.models import *
|
|
from internal.database import (
|
|
fetch_one,
|
|
fetch_all,
|
|
execute_query,
|
|
response_success,
|
|
raise_if_exists,
|
|
raise_if_not_found,
|
|
)
|
|
from dependencies import get_current_active_user
|
|
|
|
router = APIRouter(prefix="/blogs", tags=["博客管理"])
|
|
|
|
# 获取列表
|
|
|
|
|
|
@router.get("/list")
|
|
async def blog_list():
|
|
select_query = """
|
|
SELECT blogs.blogtitle, blogs.blogcontent, blogs.create_at, blogs.img,
|
|
blogs.readminite, blogs.readnum, blogs.wordcount, types.typename,
|
|
JSON_ARRAYAGG(labels.labelname) AS labelnames
|
|
FROM blogs
|
|
LEFT JOIN `types` ON blogs.typeid = types.id
|
|
LEFT JOIN blog_label ON blog_label.blogid = blogs.id
|
|
LEFT JOIN labels ON blog_label.labelid = labels.id
|
|
GROUP BY blogs.blogtitle, blogs.blogcontent, blogs.create_at, blogs.img,
|
|
blogs.readminite, blogs.readnum, blogs.wordcount, types.typename;
|
|
|
|
|
|
"""
|
|
blog_list = fetch_all(select_query)
|
|
return response_success(blog_list, "blog get list success")
|
|
|
|
|
|
# 博客新增
|
|
@router.post("/add")
|
|
async def blog_add(blog: Blog, _: User = Depends(get_current_active_user)):
|
|
select_query = "SELECT * FROM blogs WHERE blogtitle = %s"
|
|
existing_blog = fetch_one(select_query, (blog.blogtitle,))
|
|
raise_if_exists(existing_blog, "blog is already exists")
|
|
insert_query = (
|
|
"INSERT INTO blogs (blogtitle,blogcontent,typeid,descr) VALUES (%s,%s,%s,%s)"
|
|
)
|
|
insert_data = (blog.blogtitle, blog.blogcontent, blog.typeid, blog.descr)
|
|
execute_query(insert_query, insert_data)
|
|
return response_success(data=blog, message="blog create success")
|
|
|
|
|
|
# 博客删除
|
|
@router.delete("/delete/{id}")
|
|
async def blog_delete(id: str = Path(description="博客id")):
|
|
select_query = "SELECT * FROM blogs WHERE id = %s"
|
|
existing_blog = fetch_one(select_query, (id,))
|
|
raise_if_not_found(existing_blog, "blog not found")
|
|
insert_query = "DELETE FROM blogs WHERE id = %s"
|
|
execute_query(insert_query, (id,))
|
|
return response_success(message="blog delete success")
|
|
|
|
|
|
# 博客修改
|
|
@router.put("/update/{id}")
|
|
async def blog_put(blog: Blog, id: str = Path(description="博客id")):
|
|
select_query = "SELECT * FROM blogs WHERE id=%s"
|
|
existing_blog = fetch_one(select_query, (id,))
|
|
raise_if_not_found(existing_blog, "blog not found")
|
|
update_query = (
|
|
"UPDATE blogs SET blogtitle=%s,blogcontent=%s,typeid=%s,descr=%s WHERE id=%s;"
|
|
)
|
|
update_data = (blog.blogtitle, blog.blogcontent,
|
|
blog.typeid, blog.descr, id)
|
|
execute_query(update_query, update_data)
|
|
return response_success("blog update sucess")
|
|
|
|
|
|
# 博客模糊查询
|
|
@router.get("/list/search")
|
|
async def blog_list_search(
|
|
blogtitle: str = Query(None, description="博客标题"),
|
|
typename: str = Query(None, description="博客类型"),
|
|
start_date: str = Query(None, description="开始时间"),
|
|
end_date: str = Query(None, description="结束时间"),
|
|
):
|
|
select_query = "SELECT blogs.id, blogtitle, blogcontent, typename, create_at, update_at, blogs.descr FROM blogs LEFT JOIN `types` ON blogs.typeid = types.id WHERE 1=1"
|
|
params = []
|
|
if blogtitle:
|
|
select_query += " AND blogtitle LIKE %s"
|
|
params.append(f"%{blogtitle}%")
|
|
if typename:
|
|
select_query += " AND typename LIKE %s"
|
|
params.append(f"%{typename}%")
|
|
if start_date:
|
|
select_query += " AND create_at >= %s"
|
|
params.append(start_date)
|
|
if end_date:
|
|
select_query += " AND create_at <= %s"
|
|
params.append(end_date)
|
|
select_query += " ORDER BY create_at DESC"
|
|
blog_list = fetch_all(select_query, params=params, fetchall=True)
|
|
return response_success(data=blog_list, message="blog serach succuessfully!")
|
|
|
|
|
|
# 根据id查询博客
|
|
@router.get("/list/search/{id}")
|
|
async def get_id_blog(id: str = Path(description="博客id")):
|
|
select_query = "SELECT * FROM blogs WHERE id=%s"
|
|
blog_list = execute_query(select_query, (id,))
|
|
return response_success(data=blog_list, message="blog search success")
|
|
|
|
|
|
# 我就测试一下
|