Browse Source

add new

master
panda 8 months ago
parent
commit
85bd7fbd6d
  1. 3
      main.py
  2. 134
      routers/expensemanage.py
  3. 61
      routers/expensetype.py
  4. 42
      routers/statistic.py

3
main.py

@ -4,7 +4,7 @@ from fastapi import Depends, FastAPI, HTTPException, status
from dependencies import *
from internal.models import Token
from fastapi.middleware.cors import CORSMiddleware
from routers import blogtype, usermanage,blogmanage,classticmanage,commonlinkmanage,labelmanage,diarymanage,diarytype,statistic
from routers import blogtype, usermanage,blogmanage,classticmanage,commonlinkmanage,labelmanage,diarymanage,diarytype,statistic,expensetype
app=FastAPI()
app.include_router(usermanage.router)
app.include_router(blogtype.router)
@ -15,6 +15,7 @@ app.include_router(classticmanage.router)
app.include_router(commonlinkmanage.router)
app.include_router(labelmanage.router)
app.include_router(statistic.router)
app.include_router(expensetype.router)
# 解决跨域
app.add_middleware(
CORSMiddleware,

134
routers/expensemanage.py

@ -0,0 +1,134 @@
from fastapi import Depends, APIRouter, Query, Path
from internal.models import *
from internal.database import (
fetch_one,
fetch_all,
execute_query,
response_success,
raise_if_exists,
raise_if_not_found,
)
from dependencies import get_current_active_user
import json
router = APIRouter(prefix="/expenses", tags=["收支管理"])
# 获取列表
@router.get("/list")
async def expense_list(page: int = Query(None), page_size: int = Query(None)):
limit_clause = ""
if page is not None and page_size is not None:
offset = (page - 1) * page_size
limit_clause = f"LIMIT {page_size} OFFSET {offset}"
select_query = f"""
SELECT expenses.id,expenses.pay_price, expenses.live_price, expenses.create_at, expensetypes.typename,expenses.descr FROM expenses
LEFT JOIN `expensetypes` ON expenses.typeid = expensetypes.id
ORDER BY create_at DESC
{limit_clause};
"""
expense_list = fetch_all(select_query)
return response_success(expense_list, "expense get list success")
@router.get("/list/{id}")
async def expense_one(id: int):
# 列表参数:收支名称、收支内容、创建时间、收支图片、收支查看时间、收支阅读次数、收支字数、类型名称、标签名列表
select_query = """
SELECT expenses.id,expenses.pay_price, expenses.live_price, expenses.create_at, expensetypes.typename,expenses.descr FROM expenses
LEFT JOIN `expensetypes` ON expenses.typeid = expensetypes.id
WHERE id = %s
ORDER BY create_at DESC;
"""
expense_one = fetch_one(select_query, (id,))
return response_success(expense_one, "expense get expense_one success")
# 收支新增
@router.post("/add")
async def expense_add(expense: expense, _: User = Depends(get_current_active_user)):
select_query = "SELECT * FROM expenses WHERE expensetitle = %s"
existing_expense = fetch_one(select_query, (expense.expensetitle,))
raise_if_exists(existing_expense, "expense already exists")
insert_query = (
"INSERT INTO expenses (expensetitle, expensecontent,imglink, typeid, descr) VALUES (%s, %s, %s, %s,%s)"
)
insert_value=(expense.expensetitle,expense.expensecontent,expense.imglink,expense.typeid,expense.descr)
execute_query(insert_query,insert_value)
return {"message": "expense created successfully"}
# 收支删除
@router.delete("/delete/{id}")
async def expense_delete(id: str = Path(description="收支id")):
select_query = "SELECT * FROM expenses WHERE id = %s"
existing_expense = fetch_one(select_query, (id,))
raise_if_not_found(existing_expense, "expense not found")
delete_query = "DELETE FROM expenses WHERE id = %s"
execute_query(delete_query, (id,))
return response_success(message="expense delete success")
@router.put("/update/{id}")
async def expense_update(id: int, expense: expense, _: User = Depends(get_current_active_user)):
# 检查要编辑的收支是否存在
select_query = "SELECT * FROM expenses WHERE id = %s"
existing_expense = fetch_one(select_query, (id,))
raise_if_not_found(existing_expense, "expense not found")
# 更新收支信息
update_query = (
"UPDATE expenses SET expensetitle = %s, expensecontent = %s, imglink = %s, typeid = %s, descr = %s WHERE id = %s"
)
update_data = (expense.expensetitle, expense.expensecontent,
expense.imglink, expense.typeid, expense.descr, id)
execute_query(update_query, update_data)
return response_success("expense update sucess")
# 收支模糊查询
@router.get("/list/search")
async def expense_list_search(
expensetitle: str = Query(None, description="收支标题"),
typename: str = Query(None, description="收支类型"),
start_date: str = Query(None, description="开始时间"),
end_date: str = Query(None, description="结束时间"),
):
select_query = """
SELECT expenses.id, expensetitle, expensecontent,wordcount, typename, create_at, update_at, expenses.descr
FROM expenses
LEFT JOIN `expensetypes` ON expenses.typeid = expensetypes.id
WHERE 1=1
"""
params = []
if expensetitle:
select_query += " AND expensetitle LIKE %s"
params.append(f"%{expensetitle}%")
if typename:
select_query += " AND typename LIKE %s"
params.append(f"%{typename}%")
if start_date:
select_query += " AND create_at >= %s"
params.append(start_date)
if end_date:
select_query += " AND create_at <= %s"
params.append(end_date)
select_query += "ORDER BY create_at DESC"
expense_list = fetch_all(select_query, params=params, fetchall=True)
return response_success(data=expense_list, message="expense serach succuessfully!")
# 根据id查询收支
@router.get("/list/search/{id}")
async def get_id_expense(id: str = Path(description="收支id")):
select_query = """SELECT expenses.id, expensetitle, expensecontent,wordcount, expenses.typeid, expenses.descr,imglink FROM expenses
LEFT JOIN `expensetypes` ON expenses.typeid = expensetypes.id
WHERE expenses.id = %s
"""
expense_list = execute_query(select_query, (id,))
return response_success(data=expense_list, message="expense search success")
# 我就测试一下

61
routers/expensetype.py

@ -0,0 +1,61 @@
from fastapi import Depends, APIRouter, status, Query, Path, HTTPException
from internal.models import *
from internal.database import fetch_one, fetch_all, execute_query, response_success, raise_if_exists,raise_if_not_found
from dependencies import get_current_active_user
router = APIRouter(
prefix="/expensetypes",
tags=['收支分类']
)
# 获取列表
@router.get("/list")
async def type_list():
select_query = "SELECT id,typename,descr FROM expensetypes;"
type_list = fetch_all(select_query)
return response_success(type_list, "type get list success")
# 新增
@router.post("/add")
async def type_add(type:Type):
insert_query="""INSERT INTO expensetypes (typename,descr) VALUES(%s,%s)"""
insert_value=(type.typename,type.descr)
execute_query(insert_query,insert_value)
return response_success(data=type,message="type create success")
# 单条数据查询
@router.get("/list/search")
async def type_search(typename:str=Query(description="类型名称")):
select_query="SELECT id,typename,descr FROM expensetypes WHERE 1=1 "
params=[]
if typename:
select_query+="AND typename LIKE %s"
params.append(f"%{typename}%")
type_query=fetch_all(select_query,params=params,fetchall=True)
return response_success(data=type_query,message="type search success")
# 语录修改
@router.put("/update/{id}")
async def type_put(type:Type,id: str = Path(description="类型id")):
update_query = (
"UPDATE expensetypes SET typename=%s,descr=%s WHERE id=%s;"
)
update_data = (type.typename,type.descr,id)
execute_query(update_query, update_data)
return response_success("type update sucess")
# 语录删除
@router.delete("/delete/{id}")
async def type_del(id: str = Path(description="类型id")):
update_query = (
"DELETE FROM expensetypes WHERE id=%s;"
)
update_data = (id)
execute_query(update_query, update_data)
return response_success()
# 根据id查询语录
@router.get("/list/search/{id}")
async def type_search_id(id:str=Path(description="类型id")):
select_query="SELECT * FROM expensetypes WHERE id=%s"
type_query=fetch_one(select_query,(id,))
return response_success(data=type_query,message="type search success")

42
routers/statistic.py

@ -25,34 +25,34 @@ async def statistic_list():
# 记录所有blog和diary数据
@router.get("/comments")
# 获取首页数据
@router.get("/homepage")
async def get_homepage_data():
# 获取博客数据
select_blog = """
select id,blogtitle,blogcontent,imglink,create_at,update_at from blogs ORDER BY create_at DESC;
"""
SELECT blogs.id, blogs.blogtitle, blogs.blogcontent, blogs.create_at, blogs.imglink,
blogs.wordcount, blogtypes.typename, JSON_ARRAYAGG(labels.labelname) AS labelnames
FROM blogs
LEFT JOIN `blogtypes` ON blogs.typeid = blogtypes.id
LEFT JOIN blog_label ON blog_label.blogid = blogs.id
LEFT JOIN labels ON blog_label.labelid = labels.id
GROUP BY blogs.id, blogs.blogtitle, blogs.blogcontent, blogs.create_at, blogs.imglink,
blogs.wordcount, blogtypes.typename
ORDER BY create_at DESC
"""
blog_list = fetch_all(select_blog)
# 获取日记数据
select_diary = """
select id,diarytitle,diarycontent,imglink,create_at,update_at from diarys ORDER BY create_at DESC;
SELECT diarys.id, diarys.diarytitle, diarys.diarycontent, diarys.imglink,
diarytypes.typename, diarys.create_at, diarys.update_at
FROM diarys
LEFT JOIN diarytypes ON diarys.typeid = diarytypes.id
ORDER BY create_at DESC
"""
diary_list = fetch_all(select_diary)
# 按日期分组数据
grouped_data = defaultdict(lambda: {'blogs': [], 'diaries': []})
for blog in blog_list:
date_key = blog['create_at'].date()
grouped_data[date_key]['blogs'].append(blog)
for diary in diary_list:
date_key = diary['create_at'].date()
grouped_data[date_key]['diaries'].append(diary)
# 将分组后的数据转换为列表并按日期排序
result = [
{'date': date, 'blogs': data['blogs'], 'diaries': data['diaries']}
for date, data in sorted(grouped_data.items(), key=lambda x: x[0], reverse=True)
]
# 合并博客和日记数据,按照创建日期降序排序
combined_data = sorted(blog_list + diary_list, key=lambda x: x['create_at'], reverse=True)
return {"data": result}
return {"data": combined_data}
Loading…
Cancel
Save