diff --git a/backend/apps/datasource/api/datasource.py b/backend/apps/datasource/api/datasource.py index 5724f92a..084d7647 100644 --- a/backend/apps/datasource/api/datasource.py +++ b/backend/apps/datasource/api/datasource.py @@ -1,14 +1,18 @@ import asyncio import hashlib +import io import os import traceback import uuid from io import StringIO from typing import List +from urllib.parse import quote import orjson import pandas as pd from fastapi import APIRouter, File, UploadFile, HTTPException, Path +from fastapi.responses import StreamingResponse +from sqlalchemy import and_ from apps.db.db import get_schema from apps.db.engine import get_engine_conn @@ -81,7 +85,6 @@ def inner(): await asyncio.to_thread(inner) - @router.post("/update", response_model=CoreDatasource, summary=f"{PLACEHOLDER_PREFIX}ds_update") @require_permissions(permission=SqlbotPermission(type='ds', keyExpression="ds.id")) async def update(session: SessionDep, trans: Trans, user: CurrentUser, ds: CoreDatasource): @@ -360,3 +363,132 @@ def insert_pg(df, tableName, engine): finally: cursor.close() conn.close() + + +t_sheet = "数据表列表" +t_n_col = "表名" +t_c_col = "表备注" +f_n_col = "字段名" +f_c_col = "字段备注" + + +@router.get("/exportDsSchema/{id}") +async def export_ds_schema(session: SessionDep, id: int = Path(..., description=f"{PLACEHOLDER_PREFIX}ds_id")): + # { + # 'sheet':'', sheet name + # 'c1_h':'', column1 column name + # 'c2_h':'', column2 column name + # 'c1':[], column1 data + # 'c2':[], column2 data + # } + def inner(): + if id == 0: # download template + file_name = '批量上传备注' + df_list = [ + {'sheet': t_sheet, 'c1_h': t_n_col, 'c2_h': t_c_col, 'c1': ["user", "score"], + 'c2': ["用来存放用户信息的数据表", "用来存放用户课程信息的数据表"]}, + {'sheet': '数据表1', 'c1_h': f_n_col, 'c2_h': f_c_col, 'c1': ["id", "name"], + 'c2': ["用户id", "用户姓名"]}, + {'sheet': '数据表2', 'c1_h': f_n_col, 'c2_h': f_c_col, 'c1': ["course", "user_id", "score"], + 'c2': ["课程名称", "用户ID", "课程得分"]}, + ] + else: + ds = session.query(CoreDatasource).filter(CoreDatasource.id == id).first() + file_name = ds.name + tables = session.query(CoreTable).filter(CoreTable.ds_id == id).all() + if len(tables) == 0: + raise HTTPException(400, "No tables") + + df_list = [] + df1 = {'sheet': t_sheet, 'c1_h': t_n_col, 'c2_h': t_c_col, 'c1': [], 'c2': []} + df_list.append(df1) + for table in tables: + df1['c1'].append(table.table_name) + df1['c2'].append(table.custom_comment) + + fields = session.query(CoreField).filter(CoreField.table_id == table.id).all() + df_fields = {'sheet': table.table_name, 'c1_h': f_n_col, 'c2_h': f_c_col, 'c1': [], 'c2': []} + for field in fields: + df_fields['c1'].append(field.field_name) + df_fields['c2'].append(field.custom_comment) + df_list.append(df_fields) + + # build dataframe and export + output = io.BytesIO() + + with pd.ExcelWriter(output, engine='xlsxwriter') as writer: + for df in df_list: + pd.DataFrame({df['c1_h']: df['c1'], df['c2_h']: df['c2']}).to_excel(writer, sheet_name=df['sheet'], + index=False) + + output.seek(0) + + filename = f'{file_name}.xlsx' + encoded_filename = quote(filename) + return io.BytesIO(output.getvalue()) + + # headers = { + # 'Content-Disposition': f"attachment; filename*=UTF-8''{encoded_filename}" + # } + + result = await asyncio.to_thread(inner) + return StreamingResponse( + result, + media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + ) + + +@router.post("/uploadDsSchema/{id}") +async def upload_ds_schema(session: SessionDep, id: int = Path(..., description=f"{PLACEHOLDER_PREFIX}ds_id"), + file: UploadFile = File(...)): + ALLOWED_EXTENSIONS = {"xlsx", "xls"} + if not file.filename.lower().endswith(tuple(ALLOWED_EXTENSIONS)): + raise HTTPException(400, "Only support .xlsx/.xls") + + try: + contents = await file.read() + excel_file = io.BytesIO(contents) + + sheet_names = pd.ExcelFile(excel_file, engine="openpyxl").sheet_names + + excel_file.seek(0) + + field_sheets = [] + table_sheet = None # [] + for sheet in sheet_names: + df = pd.read_excel(excel_file, sheet_name=sheet, engine="openpyxl") + if sheet == t_sheet: + table_sheet = df.where(pd.notnull(df), None).to_dict(orient="records") + else: + field_sheets.append( + {'sheet_name': sheet, 'data': df.where(pd.notnull(df), None).to_dict(orient="records")}) + + # print(field_sheets) + + # get data and update + # update table comment + if table_sheet and len(table_sheet) > 0: + for table in table_sheet: + session.query(CoreTable).filter( + and_(CoreTable.ds_id == id, CoreTable.table_name == table[t_n_col])).update( + {'custom_comment': table[t_c_col]}) + + # update field comment + if field_sheets and len(field_sheets) > 0: + for fields in field_sheets: + if len(fields['data']) > 0: + # get table id + table = session.query(CoreTable).filter( + and_(CoreTable.ds_id == id, CoreTable.table_name == fields['sheet_name'])).first() + if table: + for field in fields['data']: + session.query(CoreField).filter( + and_(CoreField.ds_id == id, + CoreField.table_id == table.id, + CoreField.field_name == field[f_n_col])).update( + {'custom_comment': field[f_c_col]}) + session.commit() + + return True + except Exception as e: + raise HTTPException(status_code=500, detail=f"解析 Excel 失败: {str(e)}")