You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
6.9 KiB

9 months ago
9 months ago
10 months ago
8 months ago
9 months ago
10 months ago
8 months ago
10 months ago
8 months ago
9 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
9 months ago
10 months ago
8 months ago
10 months ago
8 months ago
8 months ago
8 months ago
9 months ago
10 months ago
9 months ago
8 months ago
10 months ago
9 months ago
9 months ago
9 months ago
9 months ago
8 months ago
9 months ago
8 months ago
9 months ago
9 months ago
10 months ago
10 months ago
9 months ago
10 months ago
10 months ago
8 months ago
10 months ago
8 months ago
9 months ago
8 months ago
9 months ago
8 months ago
9 months ago
8 months ago
10 months ago
8 months ago
8 months ago
8 months ago
10 months ago
10 months ago
9 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
9 months ago
8 months ago
9 months ago
10 months ago
10 months ago
8 months ago
10 months ago
9 months ago
10 months ago
9 months ago
8 months ago
8 months ago
8 months ago
9 months ago
8 months ago
9 months ago
  1. from fastapi import Depends, APIRouter, Query, Path
  2. from internal.models import *
  3. from internal.database import (
  4. fetch_one,
  5. fetch_all,
  6. execute_query,
  7. response_success,
  8. raise_if_exists,
  9. raise_if_not_found,
  10. )
  11. from dependencies import get_current_active_user
  12. import json
  13. router = APIRouter(prefix="/blogs", tags=["博客管理"])
  14. # 获取列表
  15. @router.get("/list")
  16. async def blog_list(page: int = Query(None), page_size: int = Query(None)):
  17. limit_clause = ""
  18. if page is not None and page_size is not None:
  19. offset = (page - 1) * page_size
  20. limit_clause = f"LIMIT {page_size} OFFSET {offset}"
  21. # 列表参数:博客名称、博客内容、创建时间、博客图片、博客查看时间、博客阅读次数、博客字数、类型名称、标签名列表
  22. select_query = f"""
  23. SELECT blogs.id,blogs.blogtitle, blogs.blogcontent, blogs.create_at, blogs.imglink,
  24. blogs.wordcount, blogtypes.typename,JSON_ARRAYAGG(labels.labelname) AS labelnames
  25. FROM blogs
  26. LEFT JOIN `blogtypes` ON blogs.typeid = blogtypes.id
  27. LEFT JOIN blog_label ON blog_label.blogid = blogs.id
  28. LEFT JOIN labels ON blog_label.labelid = labels.id
  29. GROUP BY blogs.id, blogs.blogtitle, blogs.blogcontent, blogs.create_at, blogs.imglink,
  30. blogs.wordcount, blogtypes.typename ORDER BY create_at DESC
  31. {limit_clause};
  32. """
  33. blog_list = fetch_all(select_query)
  34. count_query = "SELECT COUNT(*) AS total FROM blogs;"
  35. total_records = fetch_one(count_query)["total"]
  36. return response_success({
  37. "blogs": blog_list,
  38. "total": total_records,
  39. }, "blog get list success")
  40. @router.get("/list/{id}")
  41. async def blog_one(id: int):
  42. # 列表参数:博客名称、博客内容、创建时间、博客图片、博客查看时间、博客阅读次数、博客字数、类型名称、标签名列表
  43. select_query = """
  44. SELECT id, blogtitle, blogcontent FROM blogs
  45. WHERE id = %s
  46. ORDER BY create_at DESC;
  47. """
  48. blog_one = fetch_one(select_query, (id,))
  49. return response_success(blog_one, "blog get blog_one success")
  50. # 博客新增
  51. @router.post("/add")
  52. async def blog_add(blog: Blog, labels: list[Label], _: User = Depends(get_current_active_user)):
  53. select_query = "SELECT * FROM blogs WHERE blogtitle = %s"
  54. existing_blog = fetch_one(select_query, (blog.blogtitle,))
  55. raise_if_exists(existing_blog, "Blog already exists")
  56. insert_query = (
  57. "INSERT INTO blogs (blogtitle, blogcontent,imglink, typeid, descr) VALUES (%s, %s, %s, %s,%s)"
  58. )
  59. insert_data = (blog.blogtitle, blog.blogcontent,
  60. blog.imglink, blog.typeid, blog.descr)
  61. blog_id = execute_query(insert_query, insert_data, lastrowid=True)
  62. for label in labels:
  63. insert_label_query = "INSERT INTO blog_label (blogid, labelid) VALUES (%s, %s)"
  64. execute_query(insert_label_query, (blog_id, label.id))
  65. return {"message": "Blog created successfully", "blog_id": blog_id}
  66. # 博客删除
  67. @router.delete("/delete/{id}")
  68. async def blog_delete(id: str = Path(description="博客id")):
  69. select_query = "SELECT * FROM blogs WHERE id = %s"
  70. existing_blog = fetch_one(select_query, (id,))
  71. raise_if_not_found(existing_blog, "blog not found")
  72. insert_query = "DELETE FROM blogs WHERE id = %s"
  73. execute_query(insert_query, (id,))
  74. return response_success(message="blog delete success")
  75. @router.put("/update/{id}")
  76. async def blog_update(id: int, blog: Blog, labels: list[Label], _: User = Depends(get_current_active_user)):
  77. # 检查要编辑的博客是否存在
  78. select_query = "SELECT * FROM blogs WHERE id = %s"
  79. existing_blog = fetch_one(select_query, (id,))
  80. raise_if_not_found(existing_blog, "blog not found")
  81. # 更新博客信息
  82. update_query = (
  83. "UPDATE blogs SET blogtitle = %s, blogcontent = %s, imglink = %s, typeid = %s, descr = %s WHERE id = %s"
  84. )
  85. update_data = (blog.blogtitle, blog.blogcontent,
  86. blog.imglink, blog.typeid, blog.descr, id)
  87. execute_query(update_query, update_data)
  88. # 首先删除原有的关联标签
  89. delete_query = "DELETE FROM blog_label WHERE blogid = %s"
  90. execute_query(delete_query, (id,))
  91. # 然后插入新的关联标签
  92. for label in labels:
  93. insert_label_query = "INSERT INTO blog_label (blogid, labelid) VALUES (%s, %s)"
  94. execute_query(insert_label_query, (id, label.id))
  95. return response_success("blog update sucess")
  96. # 博客模糊查询
  97. @router.get("/list/search")
  98. async def blog_list_search(
  99. blogtitle: str = Query(None, description="博客标题"),
  100. typename: str = Query(None, description="博客类型"),
  101. start_date: str = Query(None, description="开始时间"),
  102. end_date: str = Query(None, description="结束时间"),
  103. ):
  104. select_query = """
  105. SELECT blogs.id, blogtitle, blogcontent,wordcount, typename, create_at, update_at, blogs.descr,JSON_ARRAYAGG(labels.labelname) AS labelnames
  106. FROM blogs
  107. LEFT JOIN `blogtypes` ON blogs.typeid = blogtypes.id
  108. LEFT JOIN blog_label ON blogs.id = blog_label.blogid
  109. LEFT JOIN labels ON blog_label.labelid = labels.id
  110. WHERE 1=1
  111. """
  112. params = []
  113. if blogtitle:
  114. select_query += " AND blogtitle LIKE %s"
  115. params.append(f"%{blogtitle}%")
  116. if typename:
  117. select_query += " AND typename LIKE %s"
  118. params.append(f"%{typename}%")
  119. if start_date:
  120. select_query += " AND create_at >= %s"
  121. params.append(start_date)
  122. if end_date:
  123. select_query += " AND create_at <= %s"
  124. params.append(end_date)
  125. select_query += "GROUP BY blogs.id, blogs.blogtitle, blogs.blogcontent,blogs.wordcount, blogtypes.typename, blogs.create_at, blogs.update_at, blogs.descr ORDER BY create_at DESC"
  126. blog_list = fetch_all(select_query, params=params, fetchall=True)
  127. return response_success(data=blog_list, message="blog serach succuessfully!")
  128. # 根据id查询博客
  129. @router.get("/list/search/{id}")
  130. async def get_id_blog(id: str = Path(description="博客id")):
  131. select_query = """SELECT blogs.id, blogtitle, blogcontent,wordcount, blogs.typeid, blogs.descr,JSON_ARRAYAGG(labels.id) AS labelnames,imglink FROM blogs
  132. LEFT JOIN `blogtypes` ON blogs.typeid = blogtypes.id
  133. LEFT JOIN blog_label ON blogs.id = blog_label.blogid
  134. LEFT JOIN labels ON blog_label.labelid = labels.id
  135. WHERE blogs.id = %s
  136. GROUP BY blogs.id;
  137. """
  138. blog_list = execute_query(select_query, (id,))
  139. if blog_list and isinstance(blog_list, dict):
  140. if 'labelnames' in blog_list and isinstance(blog_list['labelnames'], str):
  141. try:
  142. blog_list['labelnames'] = json.loads(blog_list['labelnames'])
  143. except json.JSONDecodeError:
  144. blog_list['labelnames'] = []
  145. return response_success(data=blog_list, message="blog search success")
  146. # 我就测试一下