You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
67 lines
2.6 KiB
67 lines
2.6 KiB
from fastapi import Depends, APIRouter, status, Query, Path, HTTPException,Request
|
|
from internal.models import *
|
|
from internal.database import fetch_one, fetch_all, execute_query, response_success, raise_if_exists,raise_if_not_found
|
|
from dependencies import get_current_active_user
|
|
from limiter_config import limiter
|
|
router = APIRouter(
|
|
prefix="/blogtypes",
|
|
tags=['博客分类']
|
|
)
|
|
# 获取列表
|
|
@router.get("/list")
|
|
@limiter.limit("10/minute")
|
|
async def type_list(request: Request,):
|
|
select_query = "SELECT id,typename,descr FROM blogtypes;"
|
|
type_list = fetch_all(select_query)
|
|
return response_success(type_list, "type get list success")
|
|
|
|
# 新增
|
|
@router.post("/add")
|
|
@limiter.limit("10/minute")
|
|
async def type_add(request: Request,type:Type,_: User = Depends(get_current_active_user)):
|
|
insert_query="""INSERT INTO blogtypes (typename,descr) VALUES(%s,%s)"""
|
|
insert_value=(type.typename,type.descr)
|
|
execute_query(insert_query,insert_value)
|
|
return response_success(data=type,message="type create success")
|
|
|
|
# 单条数据查询
|
|
@router.get("/list/search")
|
|
@limiter.limit("10/minute")
|
|
async def type_search(request: Request,typename:str=Query(None,description="类型名称")):
|
|
select_query="SELECT id,typename,descr FROM blogtypes WHERE 1=1 "
|
|
params=[]
|
|
if typename:
|
|
select_query+="AND typename LIKE %s"
|
|
params.append(f"%{typename}%")
|
|
type_query=fetch_all(select_query,params=params,fetchall=True)
|
|
return response_success(data=type_query,message="type search success")
|
|
|
|
# 语录修改
|
|
@router.put("/update/{id}")
|
|
@limiter.limit("10/minute")
|
|
async def type_put(request: Request,type:Type,id: str = Path(description="类型id"),_: User = Depends(get_current_active_user)):
|
|
update_query = (
|
|
"UPDATE blogtypes SET typename=%s,descr=%s WHERE id=%s;"
|
|
)
|
|
update_data = (type.typename,type.descr,id)
|
|
execute_query(update_query, update_data)
|
|
return response_success("type update sucess")
|
|
|
|
# 语录删除
|
|
@router.delete("/delete/{id}")
|
|
@limiter.limit("10/minute")
|
|
async def type_del(request: Request,id: str = Path(description="类型id"),_: User = Depends(get_current_active_user)):
|
|
update_query = (
|
|
"DELETE FROM blogtypes WHERE id=%s;"
|
|
)
|
|
update_data = (id)
|
|
execute_query(update_query, update_data)
|
|
return response_success()
|
|
|
|
# 根据id查询语录
|
|
@router.get("/search/{id}")
|
|
@limiter.limit("10/minute")
|
|
async def type_search_id(request: Request,id:str=Path(description="类型id")):
|
|
select_query="SELECT * FROM blogtypes WHERE id=%s"
|
|
type_query=fetch_one(select_query,(id,))
|
|
return response_success(data=type_query,message="type search success")
|