From 9fee091755f598b83e9c7b92ad6e0f98fe5799b4 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 5 Mar 2026 11:30:53 +0800 Subject: [PATCH 01/10] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=BA=90=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E9=9B=86=E9=99=90=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/i18n/locales/en/common.json | 6 +- frontend/src/i18n/locales/zh/common.json | 6 +- .../pages/DataCleansing/Create/CreateTask.tsx | 10 ++- .../Create/components/CreateTaskStepOne.tsx | 78 ++++++++++++++++++- 4 files changed, 94 insertions(+), 6 deletions(-) diff --git a/frontend/src/i18n/locales/en/common.json b/frontend/src/i18n/locales/en/common.json index a35b9549c..9f76893ee 100644 --- a/frontend/src/i18n/locales/en/common.json +++ b/frontend/src/i18n/locales/en/common.json @@ -1367,7 +1367,11 @@ "destDatasetName": "Target Dataset Name", "destDatasetNamePlaceholder": "Enter or select target dataset name", "destDatasetType": "Target Dataset Type", - "destDatasetTypeRequired": "Please select target dataset type" + "destDatasetTypeRequired": "Please select target dataset type", + "useSourceDataset": "Use Source Dataset", + "useSourceDatasetHint": "When checked, target dataset will use source dataset and cannot be modified", + "destDatasetNameRequired": "Please enter target dataset name", + "cannotUseSourceDataset": "Cannot use source dataset as target. Check 'Use Source Dataset' or enter a different name" }, "sections": { "taskInfo": "Task Info", diff --git a/frontend/src/i18n/locales/zh/common.json b/frontend/src/i18n/locales/zh/common.json index 26873eed9..5b9b41ddb 100644 --- a/frontend/src/i18n/locales/zh/common.json +++ b/frontend/src/i18n/locales/zh/common.json @@ -1367,7 +1367,11 @@ "destDatasetName": "目标数据集名称", "destDatasetNamePlaceholder": "输入或选择目标数据集名称", "destDatasetType": "目标数据集类型", - "destDatasetTypeRequired": "请选择目标数据集类型" + "destDatasetTypeRequired": "请选择目标数据集类型", + "useSourceDataset": "选择源数据集", + "useSourceDatasetHint": "勾选后目标数据集将使用源数据集,不可修改", + "destDatasetNameRequired": "请输入目标数据集名称", + "cannotUseSourceDataset": "不能使用源数据集作为目标数据集,请勾选\"选择源数据集\"或输入其他名称" }, "sections": { "taskInfo": "任务信息", diff --git a/frontend/src/pages/DataCleansing/Create/CreateTask.tsx b/frontend/src/pages/DataCleansing/Create/CreateTask.tsx index 8738111ec..293851b5c 100644 --- a/frontend/src/pages/DataCleansing/Create/CreateTask.tsx +++ b/frontend/src/pages/DataCleansing/Create/CreateTask.tsx @@ -22,6 +22,7 @@ export default function CleansingTaskCreate() { destDatasetType: DatasetType.TEXT, type: DatasetType.TEXT, }); + const [useSourceDataset, setUseSourceDataset] = useState(false); const { renderStepTwo, @@ -54,12 +55,17 @@ export default function CleansingTaskCreate() { switch (currentStep) { case 1: { const values = form.getFieldsValue(); - return ( + const hasBasicFields = ( values.name && values.srcDatasetId && values.destDatasetName && values.destDatasetType ); + if (!hasBasicFields) return false; + if (!useSourceDataset && values.destDatasetName === taskConfig.srcDatasetName) { + return false; + } + return true; } case 2: return selectedOperators.length > 0; @@ -76,6 +82,8 @@ export default function CleansingTaskCreate() { form={form} taskConfig={taskConfig} setTaskConfig={setTaskConfig} + useSourceDataset={useSourceDataset} + setUseSourceDataset={setUseSourceDataset} /> ); case 2: diff --git a/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx b/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx index 3b9d552de..e4f1764dd 100644 --- a/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx +++ b/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx @@ -6,10 +6,11 @@ import { DatasetSubType, DatasetType, } from "@/pages/DataManagement/dataset.model"; -import { Input, Select, Form, AutoComplete } from "antd"; +import { Input, Select, Form, AutoComplete, Checkbox, Tooltip } from "antd"; import TextArea from "antd/es/input/TextArea"; import { useEffect, useState } from "react"; import { useTranslation } from "react-i18next"; +import { Lock } from "lucide-react"; export default function CreateTaskStepOne({ form, @@ -24,11 +25,14 @@ export default function CreateTaskStepOne({ destDatasetName: string; type: DatasetType; destDatasetType: DatasetSubType; + srcDatasetId?: string; + srcDatasetName?: string; }; setTaskConfig: (config: any) => void; }) { const { t } = useTranslation(); const [datasets, setDatasets] = useState([]); + const [useSourceDataset, setUseSourceDataset] = useState(false); const datasetTypes = [...Object.values(getDatasetTypeMap(t))]; const fetchDatasets = async () => { @@ -45,10 +49,14 @@ export default function CreateTaskStepOne({ let dataset = null; if (key === "srcDatasetId") { dataset = datasets.find((d) => d.id === value); + // 如果勾选了"选择源数据集",自动更新目标数据集名称 + const newDestName = useSourceDataset ? (dataset?.name || "") : allValues.destDatasetName; + form.setFieldValue("destDatasetName", newDestName); setTaskConfig({ ...taskConfig, ...allValues, srcDatasetName: dataset?.name || "", + destDatasetName: newDestName, }); } else if (key === "destDatasetName") { dataset = datasets.find((d) => d.name === value); @@ -62,6 +70,35 @@ export default function CreateTaskStepOne({ } }; + const handleUseSourceDatasetChange = (checked: boolean) => { + setUseSourceDataset(checked); + if (checked) { + const srcDatasetId = form.getFieldValue("srcDatasetId"); + const srcDataset = datasets.find((d) => d.id === srcDatasetId); + const srcName = srcDataset?.name || ""; + form.setFieldValue("destDatasetName", srcName); + setTaskConfig({ + ...taskConfig, + destDatasetName: srcName, + }); + } else { + form.setFieldValue("destDatasetName", ""); + setTaskConfig({ + ...taskConfig, + destDatasetName: "", + }); + } + }; + + // 过滤掉当前选中的源数据集(当不勾选"选择源数据集"时) + const getFilteredDatasetOptions = () => { + const srcDatasetId = form.getFieldValue("srcDatasetId"); + if (useSourceDataset || !srcDatasetId) { + return datasets; + } + return datasets.filter((d) => d.id !== srcDatasetId); + }; + return (
- +
+ * + + handleUseSourceDatasetChange(e.target.checked)} + > + + {t("dataCleansing.task.form.useSourceDataset")} + + + {useSourceDataset && ( + + + + )} +
+ { + if (useSourceDataset) return Promise.resolve(); + const srcDatasetId = form.getFieldValue("srcDatasetId"); + const srcDataset = datasets.find((d) => d.id === srcDatasetId); + if (srcDataset && value === srcDataset.name) { + return Promise.reject(new Error(t("dataCleansing.task.form.cannotUseSourceDataset"))); + } + return Promise.resolve(); + } + } + ]} + > { + options={getFilteredDatasetOptions().map((dataset) => { return { label: (
@@ -118,6 +189,7 @@ export default function CreateTaskStepOne({ return option.value.toLowerCase().startsWith(inputValue.toLowerCase()); }} placeholder={t("dataCleansing.task.form.destDatasetNamePlaceholder")} + disabled={useSourceDataset} /> Date: Thu, 5 Mar 2026 12:01:08 +0800 Subject: [PATCH 02/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=B8=8A=E4=B8=80?= =?UTF-8?q?=E6=AD=A5=E4=B9=8B=E5=90=8E=E8=A6=81=E6=93=8D=E4=BD=9C=E4=B9=8B?= =?UTF-8?q?=E5=90=8E=E6=89=8D=E8=83=BD=E7=82=B9=E4=B8=8B=E4=B8=80=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/pages/DataCleansing/Create/CreateTask.tsx | 12 ++++++------ .../Create/components/CreateTaskStepOne.tsx | 5 ++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/frontend/src/pages/DataCleansing/Create/CreateTask.tsx b/frontend/src/pages/DataCleansing/Create/CreateTask.tsx index 293851b5c..e0b0534f0 100644 --- a/frontend/src/pages/DataCleansing/Create/CreateTask.tsx +++ b/frontend/src/pages/DataCleansing/Create/CreateTask.tsx @@ -54,15 +54,15 @@ export default function CleansingTaskCreate() { const canProceed = () => { switch (currentStep) { case 1: { - const values = form.getFieldsValue(); const hasBasicFields = ( - values.name && - values.srcDatasetId && - values.destDatasetName && - values.destDatasetType + taskConfig.name && + taskConfig.srcDatasetId && + taskConfig.destDatasetName && + taskConfig.destDatasetType ); if (!hasBasicFields) return false; - if (!useSourceDataset && values.destDatasetName === taskConfig.srcDatasetName) { + if (useSourceDataset) return true; + if (taskConfig.destDatasetName === taskConfig.srcDatasetName) { return false; } return true; diff --git a/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx b/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx index e4f1764dd..6a60c8e80 100644 --- a/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx +++ b/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx @@ -16,6 +16,8 @@ export default function CreateTaskStepOne({ form, taskConfig, setTaskConfig, + useSourceDataset, + setUseSourceDataset, }: { form: any; taskConfig: { @@ -29,10 +31,11 @@ export default function CreateTaskStepOne({ srcDatasetName?: string; }; setTaskConfig: (config: any) => void; + useSourceDataset: boolean; + setUseSourceDataset: (checked: boolean) => void; }) { const { t } = useTranslation(); const [datasets, setDatasets] = useState([]); - const [useSourceDataset, setUseSourceDataset] = useState(false); const datasetTypes = [...Object.values(getDatasetTypeMap(t))]; const fetchDatasets = async () => { From d3d2bcc9076a57faa01ca533fc596889b56036b8 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 5 Mar 2026 14:18:05 +0800 Subject: [PATCH 03/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=AE=97=E5=AD=90?= =?UTF-8?q?=E7=BC=96=E6=8E=92=E6=8B=96=E5=8A=A8=E6=95=88=E6=9E=9C=E6=9C=89?= =?UTF-8?q?=E9=97=AE=E9=A2=98=EF=BC=8C=E5=BE=80=E6=9C=80=E5=90=8E=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E6=8B=96=E5=8A=A8=E4=B8=8D=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DataCleansing/Create/hooks/useDragOperators.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/frontend/src/pages/DataCleansing/Create/hooks/useDragOperators.ts b/frontend/src/pages/DataCleansing/Create/hooks/useDragOperators.ts index d5b8e90aa..bf3573301 100644 --- a/frontend/src/pages/DataCleansing/Create/hooks/useDragOperators.ts +++ b/frontend/src/pages/DataCleansing/Create/hooks/useDragOperators.ts @@ -84,6 +84,18 @@ export function useDragOperators({ setOperators([...operators, draggingItem]); } } + // 如果是算子编排区域内的重新排序,移动到末尾 + else if (draggingSource === "sort") { + const draggedIndex = operators.findIndex( + (item) => item.id === draggingItem.id + ); + if (draggedIndex !== -1 && draggedIndex !== operators.length - 1) { + const newItems = [...operators]; + const [draggedItem] = newItems.splice(draggedIndex, 1); + newItems.push(draggedItem); + setOperators(newItems); + } + } resetDragState(); }; From 93cfc4872ed5a6bbc9a65aea3f7b95aa4b9d5b49 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 5 Mar 2026 14:42:29 +0800 Subject: [PATCH 04/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E9=80=89?= =?UTF-8?q?=E5=8F=96=E6=B6=88=E4=B8=80=E4=B8=8B=E5=A4=9A=E4=B8=AA=E9=83=BD?= =?UTF-8?q?=E8=A2=AB=E5=8F=96=E6=B6=88=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pages/DataCleansing/Create/components/ParamConfig.tsx | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/frontend/src/pages/DataCleansing/Create/components/ParamConfig.tsx b/frontend/src/pages/DataCleansing/Create/components/ParamConfig.tsx index 8636b7dbf..8be0d8ea1 100644 --- a/frontend/src/pages/DataCleansing/Create/components/ParamConfig.tsx +++ b/frontend/src/pages/DataCleansing/Create/components/ParamConfig.tsx @@ -100,7 +100,8 @@ const ParamConfig: React.FC = ({ ); - case "checkbox": + case "checkbox": { + const group = Array.isArray(value) ? value: value.split(",").map(item => item.trim()).filter(Boolean); return ( = ({ key={paramKey} > ); + } case "slider": return ( Date: Thu, 5 Mar 2026 15:11:18 +0800 Subject: [PATCH 05/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=AE=97=E5=AD=90?= =?UTF-8?q?=E9=A1=B5=E9=9D=A2=E4=B8=8D=E8=B0=83=E6=8E=A5=E5=8F=A3=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/hooks/useFetchData.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/frontend/src/hooks/useFetchData.ts b/frontend/src/hooks/useFetchData.ts index 79084e4bb..3af160e38 100644 --- a/frontend/src/hooks/useFetchData.ts +++ b/frontend/src/hooks/useFetchData.ts @@ -266,6 +266,11 @@ export default function useFetchData( }; }, [searchParams, fetchData]); + // 组件挂载时重置 prevSearchParamsRef,解决 StrictMode 双重挂载问题 + useEffect(() => { + prevSearchParamsRef.current = ""; + }, []); + // 组件卸载时清理轮询和状态 useEffect(() => { isMountedRef.current = true; From 9a58f5ca7216eb43e3826c999a527cf2e1f57d40 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 5 Mar 2026 15:18:16 +0800 Subject: [PATCH 06/10] =?UTF-8?q?=E5=88=B7=E6=96=B0=E6=97=B6=E4=BC=9A?= =?UTF-8?q?=E5=88=B7=E6=96=B0=E4=BE=A7=E8=BE=B9=E6=A0=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/pages/OperatorMarket/Home/OperatorMarket.tsx | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/frontend/src/pages/OperatorMarket/Home/OperatorMarket.tsx b/frontend/src/pages/OperatorMarket/Home/OperatorMarket.tsx index 04dcf2103..ee756cb7d 100644 --- a/frontend/src/pages/OperatorMarket/Home/OperatorMarket.tsx +++ b/frontend/src/pages/OperatorMarket/Home/OperatorMarket.tsx @@ -62,6 +62,11 @@ export default function OperatorMarketPage() { handleKeywordChange, } = useFetchData(queryOperatorsUsingPost, (op) => mapOperator(op, t)); + const handleReload = async () => { + fetchData(); + await initCategoriesTree(); + }; + const handleUploadOperator = () => { navigate(`/data/operator-market/create`); }; @@ -192,7 +197,7 @@ export default function OperatorMarketPage() { viewMode={viewMode} onViewModeChange={setViewMode} showViewToggle={true} - onReload={fetchData} + onReload={handleReload} />
From 5bf982f09aa853859fd9dc532d3d2a393cacee13 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 5 Mar 2026 16:06:08 +0800 Subject: [PATCH 07/10] =?UTF-8?q?=E7=A6=81=E6=AD=A2=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E9=A2=84=E5=88=B6=E6=A8=A1=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/public/config/error-code.json | 1 + .../app/core/exception/codes.py | 156 +++++++++++++----- .../service/cleaning_template_service.py | 78 +++++---- 3 files changed, 167 insertions(+), 68 deletions(-) diff --git a/frontend/public/config/error-code.json b/frontend/public/config/error-code.json index 0a7bc6518..1a1390106 100644 --- a/frontend/public/config/error-code.json +++ b/frontend/public/config/error-code.json @@ -10,6 +10,7 @@ "cleaning.0008": "文件系统错误", "cleaning.0009": "设置解析错误", "cleaning.0010": "任务ID不能为空", + "cleaning.0011": "无法删除预制模板", "operator.0001": "算子不存在", "operator.0002": "算子被编排于模版中或处在正在进行的任务中,无法删除", "operator.0003": "无法删除预置算子", diff --git a/runtime/datamate-python/app/core/exception/codes.py b/runtime/datamate-python/app/core/exception/codes.py index d23539ddd..406b0778e 100644 --- a/runtime/datamate-python/app/core/exception/codes.py +++ b/runtime/datamate-python/app/core/exception/codes.py @@ -13,6 +13,7 @@ - rag: RAG模块 - ratio: 配比模块 """ + from typing import Final from .base import ErrorCode @@ -51,43 +52,87 @@ def __init__(self): SERVICE_UNAVAILABLE: Final = ErrorCode("system.0005", "Service unavailable", 503) # ========== 标注模块 ========== - ANNOTATION_TASK_NOT_FOUND: Final = ErrorCode("annotation.0001", "Annotation task not found", 404) - ANNOTATION_PROJECT_NOT_FOUND: Final = ErrorCode("annotation.0002", "Annotation project not found", 404) - ANNOTATION_TEMPLATE_NOT_FOUND: Final = ErrorCode("annotation.0003", "Annotation template not found", 404) - ANNOTATION_FILE_NOT_FOUND: Final = ErrorCode("annotation.0004", "File not found", 404) - ANNOTATION_TAG_UPDATE_FAILED: Final = ErrorCode("annotation.0005", "Failed to update tags", 500) + ANNOTATION_TASK_NOT_FOUND: Final = ErrorCode( + "annotation.0001", "Annotation task not found", 404 + ) + ANNOTATION_PROJECT_NOT_FOUND: Final = ErrorCode( + "annotation.0002", "Annotation project not found", 404 + ) + ANNOTATION_TEMPLATE_NOT_FOUND: Final = ErrorCode( + "annotation.0003", "Annotation template not found", 404 + ) + ANNOTATION_FILE_NOT_FOUND: Final = ErrorCode( + "annotation.0004", "File not found", 404 + ) + ANNOTATION_TAG_UPDATE_FAILED: Final = ErrorCode( + "annotation.0005", "Failed to update tags", 500 + ) # ========== 归集模块 ========== - COLLECTION_TASK_NOT_FOUND: Final = ErrorCode("collection.0001", "Collection task not found", 404) - COLLECTION_TEMPLATE_NOT_FOUND: Final = ErrorCode("collection.0002", "Collection template not found", 404) - COLLECTION_EXECUTION_NOT_FOUND: Final = ErrorCode("collection.0003", "Execution record not found", 404) - COLLECTION_LOG_NOT_FOUND: Final = ErrorCode("collection.0004", "Log file not found", 404) + COLLECTION_TASK_NOT_FOUND: Final = ErrorCode( + "collection.0001", "Collection task not found", 404 + ) + COLLECTION_TEMPLATE_NOT_FOUND: Final = ErrorCode( + "collection.0002", "Collection template not found", 404 + ) + COLLECTION_EXECUTION_NOT_FOUND: Final = ErrorCode( + "collection.0003", "Execution record not found", 404 + ) + COLLECTION_LOG_NOT_FOUND: Final = ErrorCode( + "collection.0004", "Log file not found", 404 + ) # ========== 评估模块 ========== - EVALUATION_TASK_NOT_FOUND: Final = ErrorCode("evaluation.0001", "Evaluation task not found", 404) - EVALUATION_TASK_TYPE_ERROR: Final = ErrorCode("evaluation.0002", "Invalid task type", 400) - EVALUATION_MODEL_NOT_FOUND: Final = ErrorCode("evaluation.0003", "Evaluation model not found", 404) + EVALUATION_TASK_NOT_FOUND: Final = ErrorCode( + "evaluation.0001", "Evaluation task not found", 404 + ) + EVALUATION_TASK_TYPE_ERROR: Final = ErrorCode( + "evaluation.0002", "Invalid task type", 400 + ) + EVALUATION_MODEL_NOT_FOUND: Final = ErrorCode( + "evaluation.0003", "Evaluation model not found", 404 + ) # ========== 生成模块 ========== - GENERATION_TASK_NOT_FOUND: Final = ErrorCode("generation.0001", "Generation task not found", 404) - GENERATION_FILE_NOT_FOUND: Final = ErrorCode("generation.0002", "Generation file not found", 404) - GENERATION_CHUNK_NOT_FOUND: Final = ErrorCode("generation.0003", "Data chunk not found", 404) - GENERATION_DATA_NOT_FOUND: Final = ErrorCode("generation.0004", "Generation data not found", 404) + GENERATION_TASK_NOT_FOUND: Final = ErrorCode( + "generation.0001", "Generation task not found", 404 + ) + GENERATION_FILE_NOT_FOUND: Final = ErrorCode( + "generation.0002", "Generation file not found", 404 + ) + GENERATION_CHUNK_NOT_FOUND: Final = ErrorCode( + "generation.0003", "Data chunk not found", 404 + ) + GENERATION_DATA_NOT_FOUND: Final = ErrorCode( + "generation.0004", "Generation data not found", 404 + ) # ========== RAG 模块 ========== RAG_CONFIG_ERROR: Final = ErrorCode("rag.0001", "RAG configuration error", 400) - RAG_KNOWLEDGE_BASE_NOT_FOUND: Final = ErrorCode("rag.0002", "Knowledge base not found", 404) - RAG_KNOWLEDGE_BASE_ALREADY_EXISTS: Final = ErrorCode("rag.0003", "Knowledge base already exists", 400) - RAG_KNOWLEDGE_BASE_NAME_INVALID: Final = ErrorCode("rag.0004", "Knowledge base name is invalid", 400) + RAG_KNOWLEDGE_BASE_NOT_FOUND: Final = ErrorCode( + "rag.0002", "Knowledge base not found", 404 + ) + RAG_KNOWLEDGE_BASE_ALREADY_EXISTS: Final = ErrorCode( + "rag.0003", "Knowledge base already exists", 400 + ) + RAG_KNOWLEDGE_BASE_NAME_INVALID: Final = ErrorCode( + "rag.0004", "Knowledge base name is invalid", 400 + ) RAG_FILE_NOT_FOUND: Final = ErrorCode("rag.0005", "RAG file not found", 404) - RAG_FILE_PROCESS_FAILED: Final = ErrorCode("rag.0006", "File processing failed", 500) + RAG_FILE_PROCESS_FAILED: Final = ErrorCode( + "rag.0006", "File processing failed", 500 + ) RAG_FILE_PARSE_FAILED: Final = ErrorCode("rag.0007", "File parsing failed", 500) RAG_CHUNK_NOT_FOUND: Final = ErrorCode("rag.0008", "Chunk not found", 404) RAG_MODEL_NOT_FOUND: Final = ErrorCode("rag.0009", "RAG model not found", 404) RAG_QUERY_FAILED: Final = ErrorCode("rag.0010", "RAG query failed", 500) RAG_MILVUS_ERROR: Final = ErrorCode("rag.0011", "Milvus operation failed", 500) - RAG_COLLECTION_NOT_FOUND: Final = ErrorCode("rag.0012", "Milvus collection not found", 404) - RAG_EMBEDDING_FAILED: Final = ErrorCode("rag.0013", "Embedding generation failed", 500) + RAG_COLLECTION_NOT_FOUND: Final = ErrorCode( + "rag.0012", "Milvus collection not found", 404 + ) + RAG_EMBEDDING_FAILED: Final = ErrorCode( + "rag.0013", "Embedding generation failed", 500 + ) # ========== 配比模块 ========== RATIO_TASK_NOT_FOUND: Final = ErrorCode("ratio.0001", "Ratio task not found", 404) @@ -96,25 +141,60 @@ def __init__(self): RATIO_DELETE_FAILED: Final = ErrorCode("ratio.0004", "Failed to delete task", 500) # ========== 清洗模块 ========== - CLEANING_TASK_NOT_FOUND: Final = ErrorCode("cleaning.0001", "Cleaning task not found", 404) - CLEANING_NAME_DUPLICATED: Final = ErrorCode("cleaning.0002", "Cleaning task name is duplicated", 400) - CLEANING_TEMPLATE_NOT_FOUND: Final = ErrorCode("cleaning.0003", "Cleaning template not found", 404) - CLEANING_TEMPLATE_NAME_DUPLICATED: Final = ErrorCode("cleaning.0004", "Cleaning template name is duplicated", 400) - CLEANING_INVALID_OPERATOR_INPUT: Final = ErrorCode("cleaning.0005", "Invalid operator input/output types", 400) - CLEANING_INVALID_EXECUTOR_TYPE: Final = ErrorCode("cleaning.0006", "Invalid executor type", 400) - CLEANING_DATASET_NOT_FOUND: Final = ErrorCode("cleaning.0007", "Dataset not found", 404) - CLEANING_FILE_SYSTEM_ERROR: Final = ErrorCode("cleaning.0008", "File system error", 500) - CLEANING_SETTINGS_PARSE_ERROR: Final = ErrorCode("cleaning.0009", "Settings parse error", 400) - CLEANING_TASK_ID_REQUIRED: Final = ErrorCode("cleaning.0010", "Task ID is required", 400) + CLEANING_TASK_NOT_FOUND: Final = ErrorCode( + "cleaning.0001", "Cleaning task not found", 404 + ) + CLEANING_NAME_DUPLICATED: Final = ErrorCode( + "cleaning.0002", "Cleaning task name is duplicated", 400 + ) + CLEANING_TEMPLATE_NOT_FOUND: Final = ErrorCode( + "cleaning.0003", "Cleaning template not found", 404 + ) + CLEANING_TEMPLATE_NAME_DUPLICATED: Final = ErrorCode( + "cleaning.0004", "Cleaning template name is duplicated", 400 + ) + CLEANING_INVALID_OPERATOR_INPUT: Final = ErrorCode( + "cleaning.0005", "Invalid operator input/output types", 400 + ) + CLEANING_INVALID_EXECUTOR_TYPE: Final = ErrorCode( + "cleaning.0006", "Invalid executor type", 400 + ) + CLEANING_DATASET_NOT_FOUND: Final = ErrorCode( + "cleaning.0007", "Dataset not found", 404 + ) + CLEANING_FILE_SYSTEM_ERROR: Final = ErrorCode( + "cleaning.0008", "File system error", 500 + ) + CLEANING_SETTINGS_PARSE_ERROR: Final = ErrorCode( + "cleaning.0009", "Settings parse error", 400 + ) + CLEANING_TASK_ID_REQUIRED: Final = ErrorCode( + "cleaning.0010", "Task ID is required", 400 + ) + CLEANING_CANNOT_DELETE_PRESET_TEMPLATE: Final = ErrorCode( + "cleaning.0011", "Cannot delete preset template", 400 + ) # ========== 算子市场模块 ========== OPERATOR_NOT_FOUND: Final = ErrorCode("operator.0001", "Operator not found", 404) OPERATOR_IN_INSTANCE: Final = ErrorCode("operator.0002", "Operator is in use", 400) - OPERATOR_CANNOT_DELETE_PREDEFINED: Final = ErrorCode("operator.0003", "Cannot delete predefined operator", 400) - OPERATOR_UNSUPPORTED_FILE_TYPE: Final = ErrorCode("operator.0004", "Unsupported file type", 400) - OPERATOR_PARSE_FAILED: Final = ErrorCode("operator.0005", "Failed to parse operator package", 400) - OPERATOR_FIELD_NOT_FOUND: Final = ErrorCode("operator.0006", "Required field is missing", 400) + OPERATOR_CANNOT_DELETE_PREDEFINED: Final = ErrorCode( + "operator.0003", "Cannot delete predefined operator", 400 + ) + OPERATOR_UNSUPPORTED_FILE_TYPE: Final = ErrorCode( + "operator.0004", "Unsupported file type", 400 + ) + OPERATOR_PARSE_FAILED: Final = ErrorCode( + "operator.0005", "Failed to parse operator package", 400 + ) + OPERATOR_FIELD_NOT_FOUND: Final = ErrorCode( + "operator.0006", "Required field is missing", 400 + ) # ========== 系统模块 ========== - SYSTEM_MODEL_NOT_FOUND: Final = ErrorCode("system.0006", "Model configuration not found", 404) - SYSTEM_MODEL_HEALTH_CHECK_FAILED: Final = ErrorCode("system.0007", "Model health check failed", 500) + SYSTEM_MODEL_NOT_FOUND: Final = ErrorCode( + "system.0006", "Model configuration not found", 404 + ) + SYSTEM_MODEL_HEALTH_CHECK_FAILED: Final = ErrorCode( + "system.0007", "Model health check failed", 500 + ) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_template_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_template_service.py index 2443bf4fb..36ed7bbb0 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_template_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_template_service.py @@ -20,6 +20,14 @@ logger = get_logger(__name__) +PRESET_TEMPLATE_IDS = { + "550e8400-e29b-41d4-a716-446655440001", + "661f9500-f3ac-52e5-b827-557766550002", + "772a0611-a4bd-63f6-c938-668877660003", + "883b1722-b5ce-7407-d049-779988770004", + "994c2833-c6df-8518-e150-880099880005", +} + class CleaningTemplateService: """Service for managing cleaning templates""" @@ -37,9 +45,7 @@ def __init__( self.validator = validator async def get_templates( - self, - db: AsyncSession, - keyword: str | None = None + self, db: AsyncSession, keyword: str | None = None ) -> List[CleaningTemplateDto]: """Get all templates""" templates = await self.template_repo.find_all_templates(db, keyword) @@ -47,12 +53,15 @@ async def get_templates( # Collect all operator IDs template_instances_map = {} for template in templates: - instances = await self.operator_instance_repo.find_operator_by_instance_id(db, template.id) + instances = await self.operator_instance_repo.find_operator_by_instance_id( + db, template.id + ) template_instances_map[template.id] = instances # Batch query all operators - all_operators = await self.operator_service.get_operators(db=db, page=0, size=1000, categories=[], keyword=None, - is_star=None) + all_operators = await self.operator_service.get_operators( + db=db, page=0, size=1000, categories=[], keyword=None, is_star=None + ) operator_map = {op.id: op for op in all_operators} # Build result @@ -84,7 +93,9 @@ async def get_templates( try: operator_dto.overrides = json.loads(inst.settings_override) except json.JSONDecodeError as e: - logger.error(f"Failed to parse settings for {inst.operator_id}: {e}") + logger.error( + f"Failed to parse settings for {inst.operator_id}: {e}" + ) template_dto.instance.append(operator_dto) result.append(template_dto) @@ -92,9 +103,7 @@ async def get_templates( return result async def get_template( - self, - db: AsyncSession, - template_id: str + self, db: AsyncSession, template_id: str ) -> CleaningTemplateDto: """Get template by ID""" template = await self.template_repo.find_template_by_id(db, template_id) @@ -110,11 +119,14 @@ async def get_template( updated_at=template.updated_at, ) - instances = await self.operator_instance_repo.find_operator_by_instance_id(db, template_id) + instances = await self.operator_instance_repo.find_operator_by_instance_id( + db, template_id + ) # Batch query operators - all_operators = await self.operator_service.get_operators(db=db, page=0, size=1000, categories=[], keyword=None, - is_star=None) + all_operators = await self.operator_service.get_operators( + db=db, page=0, size=1000, categories=[], keyword=None, is_star=None + ) operator_map = {op.id: op for op in all_operators} for inst in instances: @@ -133,15 +145,15 @@ async def get_template( try: operator_dto.overrides = json.loads(inst.settings_override) except json.JSONDecodeError as e: - logger.error(f"Failed to parse settings for {inst.operator_id}: {e}") + logger.error( + f"Failed to parse settings for {inst.operator_id}: {e}" + ) template_dto.instance.append(operator_dto) return template_dto async def create_template( - self, - db: AsyncSession, - request: CreateCleaningTemplateRequest + self, db: AsyncSession, request: CreateCleaningTemplateRequest ) -> CleaningTemplateDto: """Create new template""" from app.db.models.cleaning import CleaningTemplate @@ -159,15 +171,14 @@ async def create_template( await self.template_repo.insert_template(db, template) - await self.operator_instance_repo.insert_instance(db, template_id, request.instance) + await self.operator_instance_repo.insert_instance( + db, template_id, request.instance + ) return await self.get_template(db, template_id) async def update_template( - self, - db: AsyncSession, - template_id: str, - request: UpdateCleaningTemplateRequest + self, db: AsyncSession, template_id: str, request: UpdateCleaningTemplateRequest ) -> CleaningTemplateDto: """Update template""" @@ -181,26 +192,31 @@ async def update_template( await self.template_repo.update_template(db, template) await self.operator_instance_repo.delete_by_instance_id(db, template_id) - await self.operator_instance_repo.insert_instance(db, template_id, request.instance) + await self.operator_instance_repo.insert_instance( + db, template_id, request.instance + ) return await self.get_template(db, template_id) async def delete_template(self, db: AsyncSession, template_id: str) -> None: """Delete template""" + if template_id in PRESET_TEMPLATE_IDS: + raise BusinessError(ErrorCodes.CLEANING_CANNOT_DELETE_PRESET_TEMPLATE) await self.template_repo.delete_template(db, template_id) await self.operator_instance_repo.delete_by_instance_id(db, template_id) async def get_instance_by_template_id( - self, - db: AsyncSession, - template_id: str + self, db: AsyncSession, template_id: str ) -> List[OperatorInstanceDto]: """Get operator instances by template ID""" - instances = await self.operator_instance_repo.find_operator_by_instance_id(db, template_id) + instances = await self.operator_instance_repo.find_operator_by_instance_id( + db, template_id + ) # Batch query operators - all_operators = await self.operator_service.get_operators(db=db, page=0, size=1000, categories=[], keyword=None, - is_star=None) + all_operators = await self.operator_service.get_operators( + db=db, page=0, size=1000, categories=[], keyword=None, is_star=None + ) operator_map = {op.id: op for op in all_operators} result = [] @@ -220,7 +236,9 @@ async def get_instance_by_template_id( try: operator_dto.overrides = json.loads(inst.settings_override) except json.JSONDecodeError as e: - logger.error(f"Failed to parse settings for {inst.operator_id}: {e}") + logger.error( + f"Failed to parse settings for {inst.operator_id}: {e}" + ) result.append(operator_dto) return result From 15b949b8be19cda9f00e67464d9516c20af31a1b Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 5 Mar 2026 17:03:34 +0800 Subject: [PATCH 08/10] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E5=B1=95=E7=A4=BA=E6=94=AF=E6=8C=81=E6=94=B6?= =?UTF-8?q?=E8=B5=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/pages/DataCleansing/Home/DataCleansing.tsx | 14 ++++++++++++-- .../Home/components/ProcessFlowDiagram.tsx | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx b/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx index 8b3a19d24..4e5f7d2be 100644 --- a/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx +++ b/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx @@ -1,6 +1,7 @@ import { useEffect, useState } from "react"; import { Tabs, Button } from "antd"; import { PlusOutlined } from "@ant-design/icons"; +import { ChevronDown, ChevronUp } from "lucide-react"; import { useNavigate } from "react-router"; import { useTranslation } from "react-i18next"; import TaskList from "./components/TaskList"; @@ -13,6 +14,7 @@ export default function DataProcessingPage() { const navigate = useNavigate(); const urlParams = useSearchParams(); const [currentView, setCurrentView] = useState<"task" | "template">("task"); + const [isFlowCollapsed, setIsFlowCollapsed] = useState(false); useEffect(() => { if (urlParams.view) { @@ -24,7 +26,15 @@ export default function DataProcessingPage() {
{/* Header */}
-

{t("dataCleansing.title")}

+
+

{t("dataCleansing.title")}

+
- + {!isFlowCollapsed && } setCurrentView(key as any)} diff --git a/frontend/src/pages/DataCleansing/Home/components/ProcessFlowDiagram.tsx b/frontend/src/pages/DataCleansing/Home/components/ProcessFlowDiagram.tsx index 6038487d1..44c8894fb 100644 --- a/frontend/src/pages/DataCleansing/Home/components/ProcessFlowDiagram.tsx +++ b/frontend/src/pages/DataCleansing/Home/components/ProcessFlowDiagram.tsx @@ -9,9 +9,9 @@ import { } from "lucide-react"; import { useTranslation } from "react-i18next"; -// 流程图组件 export default function ProcessFlowDiagram() { const { t } = useTranslation(); + const flowSteps = [ { id: "start", From b34a54ff4137a4c8603af54a06967dfcdd2cb302 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 6 Mar 2026 09:41:12 +0800 Subject: [PATCH 09/10] =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=94=B9=E4=B8=BA?= =?UTF-8?q?=E6=B5=81=E5=BC=8F=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/i18n/locales/en/common.json | 5 +- frontend/src/i18n/locales/zh/common.json | 3 +- .../Detail/components/LogsTable.tsx | 121 +++++++++++++--- .../src/pages/DataCleansing/cleansing.api.ts | 5 + .../interface/cleaning_task_routes.py | 137 ++++++++++++++++-- 5 files changed, 235 insertions(+), 36 deletions(-) diff --git a/frontend/src/i18n/locales/en/common.json b/frontend/src/i18n/locales/en/common.json index 9f76893ee..978088fad 100644 --- a/frontend/src/i18n/locales/en/common.json +++ b/frontend/src/i18n/locales/en/common.json @@ -1446,11 +1446,12 @@ "sizeOptimization": "File Size Optimization", "reduced": "Reduced by {{percent}}%" }, - "logTable": { +"logTable": { "selectRun": "Select Run", "currentDisplay": "Current Display: {{num}}th Run", "nthRun": "{{num}}th Run", - "noLogs": "No logs available for this task" + "noLogs": "No logs available for this task", + "streaming": "Streaming..." }, "operatorTable": { "serialNumber": "Serial Number", diff --git a/frontend/src/i18n/locales/zh/common.json b/frontend/src/i18n/locales/zh/common.json index 5b9b41ddb..5b7823f96 100644 --- a/frontend/src/i18n/locales/zh/common.json +++ b/frontend/src/i18n/locales/zh/common.json @@ -1450,7 +1450,8 @@ "selectRun": "选择运行轮次", "currentDisplay": "当前展示: 第 {{num}} 次", "nthRun": "第 {{num}} 次", - "noLogs": "当前任务无可用日志" + "noLogs": "当前任务无可用日志", + "streaming": "实时流式输出中..." }, "operatorTable": { "serialNumber": "序号", diff --git a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx index 48dd13d04..b72ceb3eb 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx @@ -1,26 +1,91 @@ -import {useEffect, useState} from "react"; -import {useParams} from "react-router"; -import {FileClock} from "lucide-react"; +import { useEffect, useState, useRef } from "react"; +import { useParams } from "react-router"; +import { FileClock } from "lucide-react"; import { useTranslation } from "react-i18next"; +import { streamCleaningTaskLog } from "../../cleansing.api"; -export default function LogsTable({taskLog, fetchTaskLog, retryCount} : {taskLog: any[], fetchTaskLog: () => Promise, retryCount: number}) { +interface LogEntry { + level: string; + message: string; +} + +export default function LogsTable({ taskLog: initialLogs, fetchTaskLog, retryCount }: { taskLog: any[], fetchTaskLog: () => Promise, retryCount: number }) { const { id = "" } = useParams(); const { t } = useTranslation(); const [selectedLog, setSelectedLog] = useState(retryCount + 1); + const [streamingLogs, setStreamingLogs] = useState([]); + const [isStreaming, setIsStreaming] = useState(false); + const logContainerRef = useRef(null); + const eventSourceRef = useRef(null); + + useEffect(() => { + if (selectedLog - 1 === retryCount) { + startStreaming(); + } else { + stopStreaming(); + fetchTaskLog(selectedLog - 1); + } + return () => stopStreaming(); + }, [id, selectedLog, retryCount]); useEffect(() => { - fetchTaskLog(selectedLog - 1); - }, [id, selectedLog]); + if (logContainerRef.current && isStreaming) { + logContainerRef.current.scrollTop = logContainerRef.current.scrollHeight; + } + }, [streamingLogs, isStreaming]); + +const startStreaming = () => { + stopStreaming(); + setStreamingLogs([]); + setIsStreaming(true); + + const eventSource = streamCleaningTaskLog(id, selectedLog - 1); + eventSourceRef.current = eventSource; - return taskLog?.length > 0 ? ( + eventSource.onmessage = (event) => { + try { + const logEntry: LogEntry = JSON.parse(event.data); + if (logEntry.message === "[END_OF_STREAM]" || logEntry.message === "[HEARTBEAT]") { + if (logEntry.message === "[END_OF_STREAM]") { + stopStreaming(); + } + return; + } + setStreamingLogs(prev => [...prev, logEntry]); + } catch (e) { + console.error("Failed to parse log entry:", e); + } + }; + + eventSource.onerror = (error) => { + console.error("SSE error:", error); + stopStreaming(); + }; + }; + + const stopStreaming = () => { + if (eventSourceRef.current) { + eventSourceRef.current.close(); + eventSourceRef.current = null; + } + setIsStreaming(false); + }; + + const displayLogs = selectedLog - 1 === retryCount ? streamingLogs : initialLogs; + + const handleSelectChange = (value: number) => { + setSelectedLog(value); + setStreamingLogs([]); + }; + + return displayLogs?.length > 0 || isStreaming ? ( <> - {/* --- 新增区域:左上角 Select 组件 --- */}
+ {isStreaming && ( + {t("dataCleansing.detail.logTable.streaming")} + )}
{t("dataCleansing.detail.logTable.nthRun", { num: selectedLog })}
-
- {taskLog?.map?.((log, index) => ( +
+ {displayLogs?.map?.((log, index) => (
- - [{log.level}] - + + [{log.level}] + {log.message}
))} + {isStreaming && ( +
+ [INFO] + ... +
+ )}
diff --git a/frontend/src/pages/DataCleansing/cleansing.api.ts b/frontend/src/pages/DataCleansing/cleansing.api.ts index edf1d8e8b..c589793cb 100644 --- a/frontend/src/pages/DataCleansing/cleansing.api.ts +++ b/frontend/src/pages/DataCleansing/cleansing.api.ts @@ -21,6 +21,11 @@ export function queryCleaningTaskLogByIdUsingGet(taskId: string | number, retryC return get(`/api/cleaning/tasks/${taskId}/log/${retryCount}`); } +export function streamCleaningTaskLog(taskId: string | number, retryCount: number = 0): EventSource { + const url = `/api/cleaning/tasks/${taskId}/log/stream?retry_count=${retryCount}`; + return new EventSource(url); +} + export function updateCleaningTaskByIdUsingPut(taskId: string | number, data: any) { return put(`/api/cleaning/tasks/${taskId}`, data); } diff --git a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py index 732c1266a..f40213b42 100644 --- a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py +++ b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py @@ -1,6 +1,7 @@ from typing import Optional from fastapi import APIRouter, Depends +from fastapi.responses import StreamingResponse from sqlalchemy.ext.asyncio import AsyncSession from app.core.logging import get_logger @@ -57,8 +58,7 @@ def _get_task_service(db: AsyncSession) -> CleaningTaskService: runtime_client = RuntimeClient() scheduler = CleaningTaskScheduler( - task_repo=CleaningTaskRepository(None), - runtime_client=runtime_client + task_repo=CleaningTaskRepository(None), runtime_client=runtime_client ) operator_service = _get_operator_service() dataset_service = DatasetManagementService(db) @@ -83,7 +83,7 @@ def _get_task_service(db: AsyncSession) -> CleaningTaskService: response_model=StandardResponse[PaginatedData[CleaningTaskDto]], summary="查询清洗任务列表", description="根据参数查询清洗任务列表(支持分页、状态过滤、关键词搜索)", - tags=['mcp'] + tags=["mcp"], ) async def get_cleaning_tasks( page: int = 0, @@ -108,7 +108,7 @@ async def get_cleaning_tasks( total_elements=count, total_pages=total_pages, content=tasks, - ) + ), ) @@ -117,7 +117,7 @@ async def get_cleaning_tasks( response_model=StandardResponse[CleaningTaskDto], summary="创建清洗任务", description="根据模板ID或算子列表创建清洗任务", - tags=['mcp'] + tags=["mcp"], ) async def create_cleaning_task( request: CreateCleaningTaskRequest, @@ -139,7 +139,7 @@ async def create_cleaning_task( "/{task_id}", response_model=StandardResponse[CleaningTaskDto], summary="获取清洗任务详情", - description="根据ID获取清洗任务详细信息" + description="根据ID获取清洗任务详细信息", ) async def get_cleaning_task( task_id: str, @@ -155,7 +155,7 @@ async def get_cleaning_task( "/{task_id}", response_model=StandardResponse[str], summary="删除清洗任务", - description="删除指定的清洗任务" + description="删除指定的清洗任务", ) async def delete_cleaning_task( task_id: str, @@ -172,7 +172,7 @@ async def delete_cleaning_task( "/{task_id}/stop", response_model=StandardResponse[str], summary="停止清洗任务", - description="停止正在运行的清洗任务" + description="停止正在运行的清洗任务", ) async def stop_cleaning_task( task_id: str, @@ -189,7 +189,7 @@ async def stop_cleaning_task( "/{task_id}/execute", response_model=StandardResponse[str], summary="执行清洗任务", - description="重新执行清洗任务" + description="重新执行清洗任务", ) async def execute_cleaning_task( task_id: str, @@ -206,7 +206,7 @@ async def execute_cleaning_task( "/{task_id}/result", response_model=StandardResponse[list[CleaningResultDto]], summary="获取清洗任务结果", - description="获取指定清洗任务的执行结果" + description="获取指定清洗任务的执行结果", ) async def get_cleaning_task_results( task_id: str, @@ -218,11 +218,126 @@ async def get_cleaning_task_results( return StandardResponse(code="0", message="success", data=results) +@router.get( + "/{task_id}/log/stream", + summary="流式获取清洗任务日志", + description="通过SSE流式获取清洗任务日志", +) +async def stream_cleaning_task_log( + task_id: str, + retry_count: int = 0, + db: AsyncSession = Depends(get_db), +): + """Stream cleaning task log via SSE""" + import asyncio + import json + import re + from pathlib import Path + + FLOW_PATH = "/flow" + + task_service = _get_task_service(db) + task = await task_service.task_repo.find_task_by_id(db, task_id) + + if not task: + + async def error_generator(): + yield f"data: {json.dumps({'level': 'ERROR', 'message': 'Task not found'}, ensure_ascii=False)}\n\n" + + return StreamingResponse(error_generator(), media_type="text/event-stream") + + log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") + if retry_count > 0: + log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + + standard_level_pattern = re.compile( + r"\b(DEBUG|Debug|INFO|Info|WARN|Warn|WARNING|Warning|ERROR|Error|FATAL|Fatal)\b" + ) + exception_suffix_pattern = re.compile(r"\b\w+(Warning|Error|Exception)\b") + + def get_log_level(line: str, default_level: str) -> str: + if not line or not line.strip(): + return default_level + std_match = standard_level_pattern.search(line) + if std_match: + return std_match.group(1).upper() + ex_match = exception_suffix_pattern.search(line) + if ex_match: + match = ex_match.group(1).upper() + if match == "WARNING": + return "WARN" + if match in ["ERROR", "EXCEPTION"]: + return "ERROR" + return default_level + + async def log_generator(): + last_position = 0 + last_level = "INFO" + heartbeat_counter = 0 + HEARTBEAT_INTERVAL = 10 + + while True: + try: + current_task = await task_service.task_repo.find_task_by_id(db, task_id) + is_task_finished = current_task and current_task.status in [ + "COMPLETED", + "FAILED", + "STOPPED", + ] + + if log_path.exists(): + with open(log_path, "r", encoding="utf-8") as f: + f.seek(last_position) + new_content = f.read() + if new_content: + for line in new_content.splitlines(keepends=True): + last_level = get_log_level(line, last_level) + log_entry = json.dumps( + {"level": last_level, "message": line.rstrip()}, + ensure_ascii=False, + ) + yield f"data: {log_entry}\n\n" + last_position = f.tell() + heartbeat_counter = 0 + else: + heartbeat_counter += 1 + + f.seek(0, 2) + current_size = f.tell() + if current_size < last_position: + last_position = 0 + else: + heartbeat_counter += 1 + + if is_task_finished: + yield f"data: {json.dumps({'level': 'INFO', 'message': '[END_OF_STREAM]'}, ensure_ascii=False)}\n\n" + break + + if heartbeat_counter >= HEARTBEAT_INTERVAL: + yield f"data: {json.dumps({'level': 'DEBUG', 'message': '[HEARTBEAT]'}, ensure_ascii=False)}\n\n" + heartbeat_counter = 0 + + await asyncio.sleep(1) + except Exception as e: + logger.error(f"Error reading log file: {e}") + await asyncio.sleep(2) + + return StreamingResponse( + log_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + @router.get( "/{task_id}/log/{retry_count}", response_model=StandardResponse[list[CleaningTaskLog]], summary="获取清洗任务日志", - description="获取指定清洗任务的执行日志" + description="获取指定清洗任务的执行日志", ) async def get_cleaning_task_log( task_id: str, From 989b25f87cfbfea75d99b665a423d1a020fbb828 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 6 Mar 2026 11:37:51 +0800 Subject: [PATCH 10/10] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=92=8C=E6=A8=A1=E7=89=88=E6=94=AF=E6=8C=81=E5=88=86=E9=A1=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Create/components/CreateTaskStepOne.tsx | 1 + frontend/vite.config.ts | 19 ++- .../interface/cleaning_template_routes.py | 30 ++--- .../repository/cleaning_task_repository.py | 2 +- .../cleaning_template_repository.py | 46 ++++++-- .../cleaning/service/cleaning_task_service.py | 109 ++++++++++++------ .../service/cleaning_template_service.py | 15 ++- 7 files changed, 149 insertions(+), 73 deletions(-) diff --git a/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx b/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx index 6a60c8e80..19a7e96f1 100644 --- a/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx +++ b/frontend/src/pages/DataCleansing/Create/components/CreateTaskStepOne.tsx @@ -82,6 +82,7 @@ export default function CreateTaskStepOne({ form.setFieldValue("destDatasetName", srcName); setTaskConfig({ ...taskConfig, + destDatasetId: srcDataset?.id || "", destDatasetName: srcName, }); } else { diff --git a/frontend/vite.config.ts b/frontend/vite.config.ts index 981776741..f9c490bde 100644 --- a/frontend/vite.config.ts +++ b/frontend/vite.config.ts @@ -49,11 +49,28 @@ export default defineConfig({ }; // Python 服务: rag, synthesis, annotation, evaluation, models - const pythonPaths = ["rag", "operators", "cleaning", "synthesis", "annotation", "knowledge-base", "data-collection", "evaluation", "models"]; + const pythonPaths = ["rag", "operators", "categories", "synthesis", "annotation", "knowledge-base", "data-collection", "evaluation", "models"]; // Java 服务: data-management, knowledge-base const javaPaths = ["data-management"]; const proxy: Record = {}; + // SSE 端点需要禁用缓冲 + proxy["/api/cleaning"] = { + target: "http://localhost:8080", + changeOrigin: true, + secure: false, + configure: (proxy: { on: (event: string, handler: (arg: unknown) => void) => void }) => { + proxy.on("proxyReq", (proxyReq: unknown) => { + (proxyReq as { removeHeader: (name: string) => void }).removeHeader("referer"); + (proxyReq as { removeHeader: (name: string) => void }).removeHeader("origin"); + }); + proxy.on("proxyRes", (proxyRes: unknown) => { + const res = proxyRes as { headers: Record }; + delete res.headers["set-cookie"]; + res.headers["cookies"] = ""; + }); + }, + }; for (const p of pythonPaths) { proxy[`/api/${p}`] = pythonProxyConfig; } diff --git a/runtime/datamate-python/app/module/cleaning/interface/cleaning_template_routes.py b/runtime/datamate-python/app/module/cleaning/interface/cleaning_template_routes.py index 102a625e7..6121d496f 100644 --- a/runtime/datamate-python/app/module/cleaning/interface/cleaning_template_routes.py +++ b/runtime/datamate-python/app/module/cleaning/interface/cleaning_template_routes.py @@ -2,7 +2,6 @@ from typing import Optional from fastapi import APIRouter, Depends, Query -from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from app.core.logging import get_logger @@ -65,31 +64,18 @@ def _get_template_service(db: AsyncSession) -> CleaningTemplateService: "", response_model=StandardResponse[PaginatedData[CleaningTemplateDto]], summary="查询清洗模板列表", - description="分页查询清洗模板" + description="分页查询清洗模板", ) async def get_cleaning_templates( - page: int = Query(1, description="页码"), + page: int = Query(0, description="页码"), size: int = Query(20, description="每页数量"), keyword: Optional[str] = Query(None, description="关键词搜索"), db: AsyncSession = Depends(get_db), ): """Query cleaning templates with pagination""" - from app.db.models.cleaning import CleaningTemplate - template_service = _get_template_service(db) - query = select(CleaningTemplate) - - if keyword: - keyword_pattern = f"%{keyword}%" - query = query.where( - CleaningTemplate.name.ilike(keyword_pattern) | CleaningTemplate.description.ilike(keyword_pattern) - ) - - count_query = select(func.count()).select_from(query.subquery()) - total = (await db.execute(count_query)).scalar_one() - - items = await template_service.get_templates(db, keyword) + items, total = await template_service.get_templates(db, keyword, page, size) total_pages = math.ceil(total / size) if total > 0 else 0 @@ -102,7 +88,7 @@ async def get_cleaning_templates( total_pages=total_pages, page=page, size=size, - ) + ), ) @@ -110,7 +96,7 @@ async def get_cleaning_templates( "", response_model=StandardResponse[CleaningTemplateDto], summary="创建清洗模板", - description="创建新的清洗模板" + description="创建新的清洗模板", ) async def create_cleaning_template( request: CreateCleaningTemplateRequest, @@ -129,7 +115,7 @@ async def create_cleaning_template( "/{template_id}", response_model=StandardResponse[CleaningTemplateDto], summary="获取清洗模板详情", - description="根据ID获取清洗模板详细信息" + description="根据ID获取清洗模板详细信息", ) async def get_cleaning_template( template_id: str, @@ -146,7 +132,7 @@ async def get_cleaning_template( "/{template_id}", response_model=StandardResponse[CleaningTemplateDto], summary="更新清洗模板", - description="更新清洗模板信息" + description="更新清洗模板信息", ) async def update_cleaning_template( template_id: str, @@ -166,7 +152,7 @@ async def update_cleaning_template( "/{template_id}", response_model=StandardResponse[str], summary="删除清洗模板", - description="删除指定的清洗模板" + description="删除指定的清洗模板", ) async def delete_cleaning_template( template_id: str, diff --git a/runtime/datamate-python/app/module/cleaning/repository/cleaning_task_repository.py b/runtime/datamate-python/app/module/cleaning/repository/cleaning_task_repository.py index 7c83d9a2d..f136ffa53 100644 --- a/runtime/datamate-python/app/module/cleaning/repository/cleaning_task_repository.py +++ b/runtime/datamate-python/app/module/cleaning/repository/cleaning_task_repository.py @@ -34,7 +34,7 @@ async def find_tasks( query = query.order_by(self.model.created_at.desc()) if page is not None and size is not None: - offset = max((page - 1) * size, 0) + offset = max(page * size, 0) query = query.offset(offset).limit(size) result = await db.execute(query) diff --git a/runtime/datamate-python/app/module/cleaning/repository/cleaning_template_repository.py b/runtime/datamate-python/app/module/cleaning/repository/cleaning_template_repository.py index aa35ba712..325aa971e 100644 --- a/runtime/datamate-python/app/module/cleaning/repository/cleaning_template_repository.py +++ b/runtime/datamate-python/app/module/cleaning/repository/cleaning_template_repository.py @@ -13,33 +13,63 @@ def __init__(self, model=None): async def find_all_templates( self, db: AsyncSession, - keyword: Optional[str] = None + keyword: Optional[str] = None, + page: Optional[int] = None, + size: Optional[int] = None, ) -> List[CleaningTemplate]: - """Query all templates""" + """Query all templates with optional pagination""" query = select(self.model) if keyword: keyword_pattern = f"%{keyword}%" query = query.where( - self.model.name.ilike(keyword_pattern) | self.model.description.ilike(keyword_pattern) + self.model.name.ilike(keyword_pattern) + | self.model.description.ilike(keyword_pattern) ) query = query.order_by(self.model.created_at.desc()) + + if page is not None and size is not None: + offset = page * size + query = query.offset(offset).limit(size) + result = await db.execute(query) return result.scalars().all() - async def find_template_by_id(self, db: AsyncSession, template_id: str) -> Optional[CleaningTemplate]: + async def count_templates( + self, db: AsyncSession, keyword: Optional[str] = None + ) -> int: + """Count templates""" + query = select(func.count()).select_from(self.model) + + if keyword: + keyword_pattern = f"%{keyword}%" + query = query.where( + self.model.name.ilike(keyword_pattern) + | self.model.description.ilike(keyword_pattern) + ) + + result = await db.execute(query) + return result.scalar_one() + + async def find_template_by_id( + self, db: AsyncSession, template_id: str + ) -> Optional[CleaningTemplate]: """Query template by ID""" query = select(self.model).where(self.model.id == template_id) result = await db.execute(query) return result.scalar_one_or_none() - async def insert_template(self, db: AsyncSession, template: CleaningTemplate) -> None: + async def insert_template( + self, db: AsyncSession, template: CleaningTemplate + ) -> None: """Insert new template""" db.add(template) await db.flush() - async def update_template(self, db: AsyncSession, template: CleaningTemplate) -> None: + async def update_template( + self, db: AsyncSession, template: CleaningTemplate + ) -> None: """Update template""" query = select(self.model).where(self.model.id == template.id) result = await db.execute(query) @@ -58,6 +88,8 @@ async def delete_template(self, db: AsyncSession, template_id: str) -> None: async def is_name_exist(self, db: AsyncSession, name: str) -> bool: """Check if template name exists""" - query = select(func.count()).select_from(self.model).where(self.model.name == name) + query = ( + select(func.count()).select_from(self.model).where(self.model.name == name) + ) result = await db.execute(query) return result.scalar_one() > 0 if result else False diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 5ce0bb191..d8c02c41a 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -98,35 +98,38 @@ async def get_task(self, db: AsyncSession, task_id: str) -> CleaningTaskDto: await self._set_process(db, task) - instances = await self.operator_instance_repo.find_operator_by_instance_id(db, task_id) + instances = await self.operator_instance_repo.find_operator_by_instance_id( + db, task_id + ) # Batch query operators - all_operators = await self.operator_service.get_operators(db=db, page=0, size=1000, categories=[], keyword=None, - is_star=None) + all_operators = await self.operator_service.get_operators( + db=db, page=0, size=1000, categories=[], keyword=None, is_star=None + ) operator_map = {op.id: op for op in all_operators} task.instance = [] for inst in instances: operator = operator_map.get(inst.operator_id) if operator: - task.instance.append(OperatorInstanceDto( - id=operator.id, - name=operator.name, - description=operator.description, - inputs=operator.inputs, - outputs=operator.outputs, - settings=operator.settings, - categories=operator.categories, - )) + task.instance.append( + OperatorInstanceDto( + id=operator.id, + name=operator.name, + description=operator.description, + inputs=operator.inputs, + outputs=operator.outputs, + settings=operator.settings, + categories=operator.categories, + ) + ) else: task.instance.append(OperatorInstanceDto(id=inst.operator_id)) return task async def create_task( - self, - db: AsyncSession, - request: CreateCleaningTaskRequest + self, db: AsyncSession, request: CreateCleaningTaskRequest ) -> CleaningTaskDto: """Create new cleaning task""" if request.instance and request.template_id: @@ -143,22 +146,28 @@ async def create_task( dest_dataset_name = request.dest_dataset_name if not dest_dataset_id: - logger.info(f"Creating new dataset: {dest_dataset_name}, type: {request.dest_dataset_type}") + logger.info( + f"Creating new dataset: {dest_dataset_name}, type: {request.dest_dataset_type}" + ) dest_dataset_response = await self.dataset_service.create_dataset( name=dest_dataset_name, dataset_type=request.dest_dataset_type, description="", - status="ACTIVE" + status="ACTIVE", ) dest_dataset_id = dest_dataset_response.id logger.info(f"Successfully created dataset: {dest_dataset_id}") else: logger.info(f"Using existing dataset: {dest_dataset_id}") - dest_dataset_response = await self.dataset_service.get_dataset(dest_dataset_id) + dest_dataset_response = await self.dataset_service.get_dataset( + dest_dataset_id + ) src_dataset = await self.dataset_service.get_dataset(request.src_dataset_id) if not src_dataset: - raise BusinessError(ErrorCodes.CLEANING_DATASET_NOT_FOUND, request.src_dataset_id) + raise BusinessError( + ErrorCodes.CLEANING_DATASET_NOT_FOUND, request.src_dataset_id + ) task_dto = CleaningTaskDto( id=task_id, @@ -185,10 +194,14 @@ async def create_task( if operator_ids: await self.operator_service.increment_usage_count(operator_ids, db) - all_operators = await self.operator_service.get_operators(db=db, page=0, size=1000, categories=[], keyword=None, is_star=None) + all_operators = await self.operator_service.get_operators( + db=db, page=0, size=1000, categories=[], keyword=None, is_star=None + ) operator_map = {op.id: op for op in all_operators} - await self.prepare_task(dest_dataset_id, task_id, request.instance, operator_map, executor_type) + await self.prepare_task( + dest_dataset_id, task_id, request.instance, operator_map, executor_type + ) return await self.get_task(db, task_id) @@ -261,9 +274,12 @@ async def prepare_task( config_file_path.parent.mkdir(parents=True, exist_ok=True) import yaml + try: - with open(config_file_path, 'w', encoding='utf-8') as f: - yaml.dump(process_config, f, default_flow_style=False, allow_unicode=True) + with open(config_file_path, "w", encoding="utf-8") as f: + yaml.dump( + process_config, f, default_flow_style=False, allow_unicode=True + ) except Exception as e: logger.error(f"Failed to write process.yaml: {e}") raise BusinessError(ErrorCodes.CLEANING_FILE_SYSTEM_ERROR, str(e)) @@ -319,7 +335,7 @@ async def scan_dataset( result = await db.execute(query, {"dataset_id": src_dataset_id}) files = result.fetchall() - with open(target_file_path, 'w', encoding='utf-8') as f: + with open(target_file_path, "w", encoding="utf-8") as f: for file in files: if succeed_files and file.id in succeed_files: continue @@ -333,11 +349,15 @@ async def scan_dataset( } f.write(json.dumps(file_info, ensure_ascii=False) + "\n") - async def get_task_results(self, db: AsyncSession, task_id: str) -> List[CleaningResultDto]: + async def get_task_results( + self, db: AsyncSession, task_id: str + ) -> List[CleaningResultDto]: """Get task results""" return await self.result_repo.find_by_instance_id(db, task_id) - async def get_task_log(self, db: AsyncSession, task_id: str, retry_count: int) -> List[CleaningTaskLog]: + async def get_task_log( + self, db: AsyncSession, task_id: str, retry_count: int + ) -> List[CleaningTaskLog]: """Get task log""" self.validator.check_task_id(task_id) @@ -356,14 +376,18 @@ async def get_task_log(self, db: AsyncSession, task_id: str, retry_count: int) - ) exception_suffix_pattern = re.compile(r"\b\w+(Warning|Error|Exception)\b") - with open(log_path, 'r', encoding='utf-8') as f: + with open(log_path, "r", encoding="utf-8") as f: for line in f: - last_level = self._get_log_level(line, last_level, standard_level_pattern, exception_suffix_pattern) + last_level = self._get_log_level( + line, last_level, standard_level_pattern, exception_suffix_pattern + ) logs.append(CleaningTaskLog(level=last_level, message=line.rstrip())) return logs - def _get_log_level(self, line: str, default_level: str, std_pattern, ex_pattern) -> str: + def _get_log_level( + self, line: str, default_level: str, std_pattern, ex_pattern + ) -> str: """Extract log level from log line""" if not line or not line.strip(): return default_level @@ -406,26 +430,35 @@ async def execute_task(self, db: AsyncSession, task_id: str) -> bool: if not task: raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id) + src_dataset = await self.dataset_service.get_dataset(task.src_dataset_id) + if src_dataset: + task.before_size = src_dataset.totalSize + task.file_count = src_dataset.fileCount + await self.task_repo.update_task(db, task) + await self.scan_dataset(db, task_id, task.src_dataset_id, succeed_set) await self.result_repo.delete_by_instance_id(db, task_id, "FAILED") - return await self.scheduler.execute_task(db, task_id, (task.retry_count or 0) + 1) + return await self.scheduler.execute_task( + db, task_id, (task.retry_count or 0) + 1 + ) async def stop_task(self, db: AsyncSession, task_id: str) -> bool: """Stop task""" return await self.scheduler.stop_task(db, task_id) async def get_instance_by_template_id( - self, - db: AsyncSession, - template_id: str + self, db: AsyncSession, template_id: str ) -> List[OperatorInstanceDto]: """Get instances by template ID (delegated to template service)""" - instances = await self.operator_instance_repo.find_operator_by_instance_id(db, template_id) + instances = await self.operator_instance_repo.find_operator_by_instance_id( + db, template_id + ) # Batch query operators - all_operators = await self.operator_service.get_operators(db=db, page=0, size=1000, categories=[], keyword=None, - is_star=None) + all_operators = await self.operator_service.get_operators( + db=db, page=0, size=1000, categories=[], keyword=None, is_star=None + ) operator_map = {op.id: op for op in all_operators} result = [] @@ -445,7 +478,9 @@ async def get_instance_by_template_id( try: operator_dto.overrides = json.loads(inst.settings_override) except json.JSONDecodeError as e: - logger.error(f"Failed to parse settings for {inst.operator_id}: {e}") + logger.error( + f"Failed to parse settings for {inst.operator_id}: {e}" + ) result.append(operator_dto) return result diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_template_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_template_service.py index 36ed7bbb0..6633088a6 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_template_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_template_service.py @@ -45,10 +45,15 @@ def __init__( self.validator = validator async def get_templates( - self, db: AsyncSession, keyword: str | None = None - ) -> List[CleaningTemplateDto]: - """Get all templates""" - templates = await self.template_repo.find_all_templates(db, keyword) + self, + db: AsyncSession, + keyword: str | None = None, + page: int | None = None, + size: int | None = None, + ) -> tuple[List[CleaningTemplateDto], int]: + """Get templates with pagination""" + total = await self.template_repo.count_templates(db, keyword) + templates = await self.template_repo.find_all_templates(db, keyword, page, size) # Collect all operator IDs template_instances_map = {} @@ -100,7 +105,7 @@ async def get_templates( result.append(template_dto) - return result + return result, total async def get_template( self, db: AsyncSession, template_id: str