diff --git a/backend/api-gateway/src/main/java/com/datamate/gateway/ApiGatewayApplication.java b/backend/api-gateway/src/main/java/com/datamate/gateway/ApiGatewayApplication.java index bf1c13237..7ab4fdda9 100644 --- a/backend/api-gateway/src/main/java/com/datamate/gateway/ApiGatewayApplication.java +++ b/backend/api-gateway/src/main/java/com/datamate/gateway/ApiGatewayApplication.java @@ -55,6 +55,9 @@ public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { .route("data-cleaning", r -> r.path("/api/cleaning/**") .uri("http://datamate-backend-python:18000")) + .route("data-setting", r -> r.path("/api/sys-param/**") + .uri("http://datamate-backend-python:18000")) + .route("deer-flow-frontend", r -> r.path("/chat/**") .uri("http://deer-flow-frontend:3000")) diff --git a/deployment/docker/datamate/docker-compose.yml b/deployment/docker/datamate/docker-compose.yml index 1bb368b04..fda8417fd 100644 --- a/deployment/docker/datamate/docker-compose.yml +++ b/deployment/docker/datamate/docker-compose.yml @@ -204,16 +204,15 @@ services: # ============================== label-studio-pgbouncer: container_name: label-studio-pgbouncer - image: pgbouncer/pgbouncer:latest + image: edoburu/pgbouncer:latest restart: on-failure - ports: - - "6432:6432" environment: - - DATABASES_HOST=datamate-database - - DATABASES_PORT=5432 - - DATABASES_NAME=labelstudio - - DATABASES_USER=postgres - - DATABASES_PASSWORD=${DB_PASSWORD:-password} + - DB_HOST=datamate-database + - DB_PORT=5432 + - DB_NAME=labelstudio + - DB_USER=postgres + - DB_PASSWORD=${DB_PASSWORD:-password} + - AUTH_TYPE=scram-sha-256 - POOL_MODE=transaction - MAX_CLIENT_CONN=100 - DEFAULT_POOL_SIZE=20 @@ -241,7 +240,7 @@ services: - POSTGRE_NAME=labelstudio - POSTGRE_USER=postgres - POSTGRE_PASSWORD=${DB_PASSWORD:-password} - - POSTGRE_PORT=6432 + - POSTGRE_PORT=5432 - POSTGRE_HOST=label-studio-pgbouncer - LABEL_STUDIO_HOST=${LABEL_STUDIO_HOST:-} - LOCAL_FILES_SERVING_ENABLED=true diff --git a/deployment/helm/datamate/values.yaml b/deployment/helm/datamate/values.yaml index fb5044ef0..78570f4fb 100644 --- a/deployment/helm/datamate/values.yaml +++ b/deployment/helm/datamate/values.yaml @@ -42,6 +42,7 @@ public: DB_PASSWORD: "password" CERT_PASS: "" DOMAIN: "" + HOME_PAGE_URL: "" datasetVolume: &datasetVolume name: dataset-volume @@ -90,6 +91,11 @@ database: secretKeyRef: name: datamate-conf key: DB_PASSWORD + - name: HOME_PAGE_URL + valueFrom: + secretKeyRef: + name: datamate-conf + key: HOME_PAGE_URL volumes: - *dataVolume - *logVolume @@ -420,3 +426,60 @@ ray-cluster: subPath: site-packages - mountPath: /usr/local/Ascend name: ascend + gpuGroup: + disabled: false + replicas: 0 + minReplicas: 0 + maxReplicas: 8 + rayStartParams: + resources: '"{\"gpu\": 1}"' + containerEnv: + - name: RAY_DEDUP_LOGS + value: "0" + - name: RAY_TQDM_PATCH_PRINT + value: "0" + - name: PG_HOST + value: "datamate-database" + - name: PG_PORT + value: "5432" + - name: PG_USER + value: "postgres" + - name: PG_PASSWORD + valueFrom: + secretKeyRef: + name: datamate-conf + key: DB_PASSWORD + - name: PG_DATABASE + value: "datamate" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + resources: + limits: + cpu: "8" + memory: "64G" + nvidia.com/gpu: 1 + requests: + cpu: "1" + memory: "2G" + nvidia.com/gpu: 1 + volumes: + - *datasetVolume + - *flowVolume + - *logVolume + - *operatorVolume + volumeMounts: + - mountPath: /tmp/ray + name: log-volume + subPathExpr: ray/$(POD_NAME) + - mountPath: /dataset + name: dataset-volume + - mountPath: /flow + name: flow-volume + - mountPath: /opt/runtime/datamate/ops/user + name: operator-volume + subPath: extract + - mountPath: /usr/local/lib/ops/site-packages + name: operator-volume + subPath: site-packages diff --git a/deployment/helm/label-studio/templates/deployment.yaml b/deployment/helm/label-studio/templates/deployment.yaml index 1cbb90294..e11f0028e 100644 --- a/deployment/helm/label-studio/templates/deployment.yaml +++ b/deployment/helm/label-studio/templates/deployment.yaml @@ -38,7 +38,7 @@ spec: - name: POSTGRE_PASSWORD value: {{ .Values.env.POSTGRE_PASSWORD | quote }} - name: POSTGRE_PORT - value: {{ if .Values.pgbouncer.enabled }}{{ "6432" | quote }}{{ else }}{{ .Values.env.POSTGRE_PORT | quote }}{{ end }} + value: 5432 - name: POSTGRE_HOST value: {{ if .Values.pgbouncer.enabled }}{{ printf "%s-pgbouncer" (include "label-studio.fullname" .) | quote }}{{ else }}{{ .Values.env.POSTGRE_HOST | quote }}{{ end }} - name: LABEL_STUDIO_HOST diff --git a/deployment/helm/label-studio/templates/pgbouncer-deployment.yaml b/deployment/helm/label-studio/templates/pgbouncer-deployment.yaml index a00aba8b6..5595269e3 100644 --- a/deployment/helm/label-studio/templates/pgbouncer-deployment.yaml +++ b/deployment/helm/label-studio/templates/pgbouncer-deployment.yaml @@ -30,18 +30,18 @@ spec: imagePullPolicy: {{ .Values.pgbouncer.image.pullPolicy }} ports: - name: pgbouncer - containerPort: 6432 + containerPort: 5432 protocol: TCP env: - - name: DATABASES_HOST + - name: DB_HOST value: {{ .Values.env.POSTGRE_HOST | quote }} - - name: DATABASES_PORT + - name: DB_PORT value: {{ .Values.env.POSTGRE_PORT | quote }} - - name: DATABASES_NAME + - name: DB_NAME value: {{ .Values.env.POSTGRE_NAME | quote }} - - name: DATABASES_USER + - name: DB_USER value: {{ .Values.env.POSTGRE_USER | quote }} - - name: DATABASES_PASSWORD + - name: DB_PASSWORD value: {{ .Values.env.POSTGRE_PASSWORD | quote }} - name: POOL_MODE value: {{ .Values.pgbouncer.poolMode | quote }} @@ -51,14 +51,16 @@ spec: value: {{ .Values.pgbouncer.defaultPoolSize | quote }} - name: MAX_DB_CONNECTIONS value: {{ .Values.pgbouncer.maxDbConnections | quote }} + - name: AUTH_TYPE + value: {{ .Values.pgbouncer.authType | quote }} livenessProbe: tcpSocket: - port: 6432 + port: 5432 initialDelaySeconds: 15 periodSeconds: 20 readinessProbe: tcpSocket: - port: 6432 + port: 5432 initialDelaySeconds: 5 periodSeconds: 10 resources: diff --git a/deployment/helm/label-studio/templates/pgbouncer-service.yaml b/deployment/helm/label-studio/templates/pgbouncer-service.yaml index 77f1e7432..0a00f0855 100644 --- a/deployment/helm/label-studio/templates/pgbouncer-service.yaml +++ b/deployment/helm/label-studio/templates/pgbouncer-service.yaml @@ -11,7 +11,7 @@ metadata: spec: type: ClusterIP ports: - - port: 6432 + - port: 5432 targetPort: pgbouncer protocol: TCP name: pgbouncer diff --git a/deployment/helm/label-studio/values.yaml b/deployment/helm/label-studio/values.yaml index 10b530120..7ab0aec18 100644 --- a/deployment/helm/label-studio/values.yaml +++ b/deployment/helm/label-studio/values.yaml @@ -72,7 +72,7 @@ pgbouncer: replicaCount: 1 image: - repository: pgbouncer/pgbouncer + repository: edoburu/pgbouncer tag: "latest" pullPolicy: IfNotPresent @@ -81,6 +81,7 @@ pgbouncer: maxClientConn: 100 # Maximum number of client connections defaultPoolSize: 20 # Default pool size per database (max connections to PostgreSQL) maxDbConnections: 20 # Maximum database connections (hard limit) + authType: scram-sha-256 resources: {} diff --git a/frontend/src/i18n/locales/en/common.json b/frontend/src/i18n/locales/en/common.json index 48be552c4..2c56b472c 100644 --- a/frontend/src/i18n/locales/en/common.json +++ b/frontend/src/i18n/locales/en/common.json @@ -1432,6 +1432,8 @@ "processedFileType": "Processed File Type", "beforeSize": "Before Size", "afterSize": "After Size", + "result": "Result", + "resultDetail": "Result Detail", "status": "Status", "actions": "Actions", "searchFileName": "Search file name", @@ -1450,12 +1452,14 @@ "sizeOptimization": "File Size Optimization", "reduced": "Reduced by {{percent}}%" }, -"logTable": { + "logTable": { "selectRun": "Select Run", "currentDisplay": "Current Display: {{num}}th Run", "nthRun": "{{num}}th Run", "noLogs": "No logs available for this task", - "streaming": "Streaming..." + "streaming": "Streaming...", + "download": "Download Log", + "downloadFailed": "Failed to download log file" }, "operatorTable": { "serialNumber": "Serial Number", diff --git a/frontend/src/i18n/locales/zh/common.json b/frontend/src/i18n/locales/zh/common.json index b62fad01e..1a020a04f 100644 --- a/frontend/src/i18n/locales/zh/common.json +++ b/frontend/src/i18n/locales/zh/common.json @@ -1432,6 +1432,8 @@ "processedFileType": "处理后文件类型", "beforeSize": "处理前大小", "afterSize": "处理后大小", + "result": "处理结果", + "resultDetail": "处理结果详情", "status": "状态", "actions": "操作", "searchFileName": "搜索文件名", @@ -1455,7 +1457,9 @@ "currentDisplay": "当前展示: 第 {{num}} 次", "nthRun": "第 {{num}} 次", "noLogs": "当前任务无可用日志", - "streaming": "实时流式输出中..." + "streaming": "实时流式输出中...", + "download": "下载日志", + "downloadFailed": "下载日志文件失败" }, "operatorTable": { "serialNumber": "序号", diff --git a/frontend/src/main.tsx b/frontend/src/main.tsx index 95de11ee2..f1d826e62 100644 --- a/frontend/src/main.tsx +++ b/frontend/src/main.tsx @@ -11,39 +11,96 @@ import theme from "./theme"; import {errorConfigStore} from "@/utils/errorConfigStore.ts"; import "@/i18n"; +async function checkHomePageRedirect(): Promise { + try { + const response = await fetch('/api/sys-param/sys.home.page.url', { + cache: 'no-store' + }); + + if (response.ok) { + const result = await response.json(); + return result.data?.paramValue?.trim() || null; + } + } catch (error) { + console.error('Failed to fetch home page URL:', error); + } + + return null; +} + +function showLoadingUI() { + const container = document.getElementById("root"); + if (!container) return; + + container.innerHTML = ` +
+
+
+ +
+
+ `; +} + async function bootstrap() { const container = document.getElementById("root"); if (!container) return; - const root = createRoot(container); + showLoadingUI(); try { - // 2. 【关键步骤】在渲染前,等待配置文件加载完成 - // 这一步会发起 fetch 请求去拿 /config/error-code.json - await errorConfigStore.loadConfig(); + const [, homePageUrl] = await Promise.all([ + errorConfigStore.loadConfig(), + checkHomePageRedirect() + ]); + + if (homePageUrl) { + const currentPath = window.location.pathname; + const targetPath = new URL(homePageUrl, window.location.origin).pathname; + + if (currentPath === '/' && currentPath !== targetPath) { + window.location.href = homePageUrl; + return; + } + } } catch (e) { - // 容错处理:即使配置文件加载失败(比如404),也不应该导致整个 App 白屏崩溃 - // 此时 App 会使用代码里的默认兜底文案 - console.error('Error config load failed, using default messages.', e); - } finally { - // 3. 无论配置加载成功与否,最后都执行渲染 - root.render( - - - - - }> - - - - - - - - ); + console.error('Config load failed:', e); } + + const root = createRoot(container); + + root.render( + + + + + }> + + + + + + + + ); } -// 4. 执行启动 bootstrap(); diff --git a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx index 8aa576a0c..0d5203efc 100644 --- a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx +++ b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx @@ -217,7 +217,7 @@ export default function CleansingTaskDetail() { )} {activeTab === "operators" && } {activeTab === "files" && } - {activeTab === "logs" && } + {activeTab === "logs" && } diff --git a/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx index b3c1c1ed0..0b025645a 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx @@ -262,6 +262,53 @@ export default function FileTable({result, fetchTaskResult}) { {formatFileSize(number)} ), }, + { + title: t("dataCleansing.detail.fileTable.result"), + dataIndex: "result", + key: "result", + width: 200, + render: (text: string) => { + if (!text) return -; + + try { + const parsed = JSON.parse(text); + const jsonString = JSON.stringify(parsed, null, 2); + const displayText = typeof parsed === 'object' + ? (Array.isArray(parsed) ? `[${parsed.length} items]` : '{...}') + : String(parsed); + + return ( + + {jsonString} + + } + title={t("dataCleansing.detail.fileTable.resultDetail")} + trigger="click" + > + + {displayText} + + + ); + } catch { + const displayText = text.length > 30 ? text.substring(0, 30) + '...' : text; + return ( + {text}} + title={t("dataCleansing.detail.fileTable.resultDetail")} + trigger="click" + disabled={text.length <= 30} + > + + {displayText} + + + ); + } + }, + }, { title: t("dataCleansing.detail.fileTable.status"), dataIndex: "status", diff --git a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx index b72ceb3eb..7a7bb3cc2 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx @@ -1,15 +1,16 @@ import { useEffect, useState, useRef } from "react"; import { useParams } from "react-router"; -import { FileClock } from "lucide-react"; +import { FileClock, Download } from "lucide-react"; import { useTranslation } from "react-i18next"; -import { streamCleaningTaskLog } from "../../cleansing.api"; +import { App } from "antd"; +import { streamCleaningTaskLog, downloadCleaningTaskLog } from "../../cleansing.api"; interface LogEntry { level: string; message: string; } -export default function LogsTable({ taskLog: initialLogs, fetchTaskLog, retryCount }: { taskLog: any[], fetchTaskLog: () => Promise, retryCount: number }) { +export default function LogsTable({ taskLog: initialLogs, fetchTaskLog, retryCount, taskName }: { taskLog: LogEntry[], fetchTaskLog: () => Promise, retryCount: number, taskName: string }) { const { id = "" } = useParams(); const { t } = useTranslation(); const [selectedLog, setSelectedLog] = useState(retryCount + 1); @@ -17,6 +18,7 @@ export default function LogsTable({ taskLog: initialLogs, fetchTaskLog, retryCou const [isStreaming, setIsStreaming] = useState(false); const logContainerRef = useRef(null); const eventSourceRef = useRef(null); + const { message } = App.useApp(); useEffect(() => { if (selectedLog - 1 === retryCount) { @@ -34,7 +36,7 @@ export default function LogsTable({ taskLog: initialLogs, fetchTaskLog, retryCou } }, [streamingLogs, isStreaming]); -const startStreaming = () => { + const startStreaming = () => { stopStreaming(); setStreamingLogs([]); setIsStreaming(true); @@ -78,6 +80,23 @@ const startStreaming = () => { setStreamingLogs([]); }; + const handleDownload = async () => { + try { + const blob = await downloadCleaningTaskLog(id, selectedLog - 1); + const url = window.URL.createObjectURL(blob); + const a = document.createElement('a'); + a.href = url; + a.download = `${taskName}_第${selectedLog}次运行.log`; + document.body.appendChild(a); + a.click(); + window.URL.revokeObjectURL(url); + document.body.removeChild(a); + } catch (error) { + console.error("Failed to download log:", error); + message.error(t("dataCleansing.detail.logTable.downloadFailed")); + } + }; + return displayLogs?.length > 0 || isStreaming ? ( <>
@@ -98,7 +117,17 @@ const startStreaming = () => { {t("dataCleansing.detail.logTable.streaming")} )}
- {t("dataCleansing.detail.logTable.nthRun", { num: selectedLog })} +
+ {t("dataCleansing.detail.logTable.nthRun", { num: selectedLog })} + +
{ + const url = `/api/cleaning/tasks/${taskId}/log/${retryCount}/download`; + const response = await fetch(url); + if (!response.ok) { + throw new Error('Failed to download log file'); + } + return response.blob(); +} + export function updateCleaningTaskByIdUsingPut(taskId: string | number, data: any) { return put(`/api/cleaning/tasks/${taskId}`, data); } @@ -62,9 +71,3 @@ export function updateCleaningTemplateByIdUsingPut(templateId: string | number, export function deleteCleaningTemplateByIdUsingDelete(templateId: string | number) { return del(`/api/cleaning/templates/${templateId}`); } - - - - - - diff --git a/frontend/src/pages/Home/Home.tsx b/frontend/src/pages/Home/Home.tsx index 354815490..30d46ca20 100644 --- a/frontend/src/pages/Home/Home.tsx +++ b/frontend/src/pages/Home/Home.tsx @@ -2,27 +2,67 @@ import { FolderOpen, Settings, ArrowRight, - Sparkles, Target, Zap, Database, MessageSquare, GitBranch, + Sparkles, } from "lucide-react"; import { features, menuItems } from "../Layout/Menu.tsx"; -import { useState } from 'react'; +import { useState, useEffect } from 'react'; import { useNavigate } from "react-router"; -import { Card, Dropdown, Button } from "antd"; +import { Card, Dropdown, Button, Spin } from "antd"; import type { MenuProps } from 'antd'; import { Globe } from "lucide-react"; import { useTranslation } from "react-i18next"; import i18n from "@/i18n"; +import { getHomePageUrl } from '@/utils/systemParam'; export default function WelcomePage() { const navigate = useNavigate(); const [isChecking, setIsChecking] = useState(false); + const [isCheckingRedirect, setIsCheckingRedirect] = useState(true); const { t } = useTranslation(); + useEffect(() => { + let isMounted = true; + + const checkAndRedirect = async () => { + try { + const homePageUrl = await getHomePageUrl(); + + if (!isMounted) return; + + if (homePageUrl) { + window.location.href = homePageUrl; + return; + } + + setIsCheckingRedirect(false); + } catch (error) { + console.error('Failed to check home page URL:', error); + if (isMounted) { + setIsCheckingRedirect(false); + } + } + }; + + checkAndRedirect(); + + return () => { + isMounted = false; + }; + }, []); + + if (isCheckingRedirect) { + return ( +
+ +
+ ); + } + const languageMenuItems: MenuProps['items'] = [ { key: 'zh', @@ -42,7 +82,6 @@ export default function WelcomePage() { } ]; - // 检查接口连通性的函数 const checkDeerFlowDeploy = async (): Promise => { try { const controller = new AbortController(); @@ -59,7 +98,6 @@ export default function WelcomePage() { clearTimeout(timeoutId); - // 检查 HTTP 状态码在 200-299 范围内 if (response.ok) { return true; } @@ -70,7 +108,7 @@ export default function WelcomePage() { }; const handleChatClick = async () => { - if (isChecking) return; // 防止重复点击 + if (isChecking) return; setIsChecking(true); @@ -78,14 +116,11 @@ export default function WelcomePage() { const isDeerFlowDeploy = await checkDeerFlowDeploy(); if (isDeerFlowDeploy) { - // 接口正常,执行原有逻辑 window.location.href = "/chat"; } else { - // 接口异常,使用 navigate 跳转 navigate("/chat"); } } catch (error) { - // 发生错误时也使用 navigate 跳转 console.error('检查过程中发生错误:', error); navigate("/chat"); } finally { diff --git a/frontend/src/utils/systemParam.ts b/frontend/src/utils/systemParam.ts new file mode 100644 index 000000000..44412806a --- /dev/null +++ b/frontend/src/utils/systemParam.ts @@ -0,0 +1,53 @@ +/** + * System Parameter API + * 系统参数 API 接口 + */ +import { get } from '@/utils/request'; + +export interface SysParam { + id: string; + paramValue: string; + paramType: string; + optionList?: string; + description?: string; + isBuiltIn: boolean; + canModify: boolean; + isEnabled: boolean; + createdAt?: string; + updatedAt?: string; + createdBy?: string; + updatedBy?: string; +} + +/** + * 获取所有系统参数 + */ +export async function getSystemParams(): Promise { + const response = await get<{ code: string; message: string; data: SysParam[] }>('/api/sys-param/list'); + return response.data || []; +} + +/** + * 根据ID获取系统参数 + */ +export async function getSystemParamById(paramId: string): Promise { + try { + const response = await get<{ code: string; message: string; data: SysParam }>(`/api/sys-param/${paramId}`); + return response.data; + } catch (error) { + return null; + } +} + +/** + * 获取首页URL配置 + */ +export async function getHomePageUrl(): Promise { + try { + const param = await getSystemParamById('sys.home.page.url'); + return param?.paramValue?.trim() || null; + } catch (error) { + console.error('Failed to get home page URL:', error); + return null; + } +} diff --git a/runtime/datamate-python/app/core/exception/codes.py b/runtime/datamate-python/app/core/exception/codes.py index 406b0778e..1c6358fe8 100644 --- a/runtime/datamate-python/app/core/exception/codes.py +++ b/runtime/datamate-python/app/core/exception/codes.py @@ -174,6 +174,9 @@ def __init__(self): CLEANING_CANNOT_DELETE_PRESET_TEMPLATE: Final = ErrorCode( "cleaning.0011", "Cannot delete preset template", 400 ) + CLEANING_TASK_LOG_NOT_FOUND: Final = ErrorCode( + "cleaning.0012", "Cleaning task log file not found", 404 + ) # ========== 算子市场模块 ========== OPERATOR_NOT_FOUND: Final = ErrorCode("operator.0001", "Operator not found", 404) diff --git a/runtime/datamate-python/app/db/models/__init__.py b/runtime/datamate-python/app/db/models/__init__.py index ddc80dc57..5ef25e7c0 100644 --- a/runtime/datamate-python/app/db/models/__init__.py +++ b/runtime/datamate-python/app/db/models/__init__.py @@ -1,39 +1,25 @@ - from .dataset_management import ( Dataset, DatasetTag, DatasetFiles, DatasetStatistics, - Tag + Tag, ) -from .user_management import ( - User -) +from .user_management import User -from .annotation_management import ( - AnnotationTemplate, - LabelingProject -) +from .annotation_management import AnnotationTemplate, LabelingProject -from .data_evaluation import ( - EvaluationTask, - EvaluationItem -) +from .data_evaluation import EvaluationTask, EvaluationItem -from .operator import ( - Operator, - Category, - CategoryRelation, - OperatorRelease -) +from .operator import Operator, Category, CategoryRelation, OperatorRelease -from .chunk_upload import ( - ChunkUploadPreRequest -) +from .chunk_upload import ChunkUploadPreRequest from .knowledge_gen import KnowledgeBase, RagFile +from .sys_param import SysParam + __all__ = [ "Dataset", "DatasetTag", @@ -52,4 +38,5 @@ "ChunkUploadPreRequest", "KnowledgeBase", "RagFile", + "SysParam", ] diff --git a/runtime/datamate-python/app/db/models/sys_param.py b/runtime/datamate-python/app/db/models/sys_param.py new file mode 100644 index 000000000..c2dad78d8 --- /dev/null +++ b/runtime/datamate-python/app/db/models/sys_param.py @@ -0,0 +1,30 @@ +""" +System Parameter Model +系统参数模型 +""" + +from sqlalchemy import Column, String, Boolean, Text +from app.db.models.base_entity import BaseEntity + + +class SysParam(BaseEntity): + """系统参数实体""" + + __tablename__ = "t_sys_param" + __ignore_data_scope__ = True # 系统参数不进行数据权限过滤 + + id = Column(String(100), primary_key=True, comment="参数ID") + param_value = Column(Text, nullable=False, comment="参数值") + param_type = Column( + String(50), + nullable=False, + default="string", + comment="参数类型(string、integer、boolean)", + ) + option_list = Column( + Text, nullable=True, comment="选项列表(JSON格式,仅对enum类型有效)" + ) + description = Column(String(255), nullable=True, default="", comment="参数描述") + is_built_in = Column(Boolean, nullable=False, default=False, comment="是否内置") + can_modify = Column(Boolean, nullable=False, default=True, comment="是否可修改") + is_enabled = Column(Boolean, nullable=False, default=True, comment="是否启用") diff --git a/runtime/datamate-python/app/main.py b/runtime/datamate-python/app/main.py index 00dcbd0d8..a0a5499d0 100644 --- a/runtime/datamate-python/app/main.py +++ b/runtime/datamate-python/app/main.py @@ -2,9 +2,11 @@ from typing import Literal from urllib.parse import urlparse, urlunparse -from fastapi import FastAPI +from fastapi import FastAPI, Depends from fastapi_mcp import FastApiMCP +from fastapi.responses import RedirectResponse from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.exception import ( @@ -15,15 +17,19 @@ BusinessError, ) from app.core.logging import setup_logging, get_logger -from app.db.session import AsyncSessionLocal +from app.db.session import AsyncSessionLocal, get_db from app.middleware import UserContextMiddleware from app.module import router -from app.module.collection.schedule import load_scheduled_collection_tasks, set_collection_scheduler +from app.module.collection.schedule import ( + load_scheduled_collection_tasks, + set_collection_scheduler, +) from app.module.shared.schedule import Scheduler setup_logging() logger = get_logger(__name__) + @asynccontextmanager async def lifespan(app: FastAPI): @@ -68,13 +74,14 @@ def mask_db_url(url: str) -> Literal[b""] | str: collection_scheduler.shutdown() logger.info("DataMate Python Backend shutting down ...\n\n") + # 创建FastAPI应用 app = FastAPI( title=settings.app_name, description=settings.app_description, version=settings.app_version, debug=settings.debug, - lifespan=lifespan + lifespan=lifespan, ) # 注册全局异常捕获中间件(最外层,确保捕获所有异常) @@ -90,22 +97,26 @@ def mask_db_url(url: str) -> Literal[b""] | str: # 注册全局异常处理器 register_exception_handlers(app) + # 测试端点:验证异常处理 @app.get("/test-success", include_in_schema=False) async def test_success(): """测试成功响应""" return SuccessResponse(data={"message": "Test successful"}) + @app.get("/test-business-error", include_in_schema=False) async def test_business_error(): """测试业务错误响应""" raise BusinessError(ErrorCodes.ANNOTATION_TASK_NOT_FOUND) + @app.get("/test-system-error", include_in_schema=False) async def test_system_error(): """测试系统错误响应""" raise SystemError(ErrorCodes.DATABASE_ERROR) + # 根路径 @app.get("/", include_in_schema=False) async def root(): @@ -119,7 +130,13 @@ async def root(): } ) -mcp = FastApiMCP(app, name="DataMate MCP", description="DataMate python mcp server", include_tags=["mcp"]) + +mcp = FastApiMCP( + app, + name="DataMate MCP", + description="DataMate python mcp server", + include_tags=["mcp"], +) mcp.mount_http(mount_path="/api/mcp") if __name__ == "__main__": @@ -130,5 +147,5 @@ async def root(): host=settings.host, port=settings.port, reload=settings.debug, - log_level=settings.log_level.lower() + log_level=settings.log_level.lower(), ) diff --git a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py index f40213b42..cbf38fb59 100644 --- a/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py +++ b/runtime/datamate-python/app/module/cleaning/interface/cleaning_task_routes.py @@ -348,3 +348,56 @@ async def get_cleaning_task_log( task_service = _get_task_service(db) logs = await task_service.get_task_log(db, task_id, retry_count) return StandardResponse(code="0", message="success", data=logs) + + +@router.get( + "/{task_id}/log/{retry_count}/download", + summary="下载清洗任务日志文件", + description="下载指定清洗任务的日志文件", +) +async def download_cleaning_task_log( + task_id: str, + retry_count: int, + db: AsyncSession = Depends(get_db), +): + """Download cleaning task log file""" + from pathlib import Path + from fastapi.responses import FileResponse + + FLOW_PATH = "/flow" + + task_service = _get_task_service(db) + task = await task_service.task_repo.find_task_by_id(db, task_id) + + if not task: + from app.core.exception import BusinessError, ErrorCodes + + raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id) + + log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") + if retry_count > 0: + log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") + + if not log_path.exists(): + from app.core.exception import BusinessError, ErrorCodes + + raise BusinessError( + ErrorCodes.CLEANING_TASK_LOG_NOT_FOUND, + f"Log file not found for task {task_id}, retry {retry_count}", + ) + + # Generate filename with task name and retry count + import re + + task_name = task.name or "未命名任务" + safe_task_name = re.sub( + r"[^\w\u4e00-\u9fff\-]", "_", task_name + ) # Keep alphanumeric, Chinese, and hyphens + run_number = retry_count + 1 # retry_count is 0-indexed, so add 1 for display + filename = f"{safe_task_name}_第{run_number}次运行.log" + + return FileResponse( + path=log_path, + media_type="text/plain", + filename=filename, + ) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index d8c02c41a..96ea61660 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -326,7 +326,7 @@ async def scan_dataset( target_file_path.parent.mkdir(parents=True, exist_ok=True) query = text(""" - SELECT id, file_name, file_path, file_type, file_size + SELECT id, file_name, file_path, file_type, file_size, metadata FROM t_dm_dataset_files WHERE dataset_id = :dataset_id ORDER BY created_at @@ -340,12 +340,22 @@ async def scan_dataset( if succeed_files and file.id in succeed_files: continue + metadata_dict = {} + if file.metadata: + try: + parsed = json.loads(file.metadata) + if isinstance(parsed, dict): + metadata_dict = parsed + except (json.JSONDecodeError, TypeError): + pass + file_info = { "fileId": file.id, "fileName": file.file_name, "filePath": file.file_path, "fileType": file.file_type, "fileSize": file.file_size, + "metadata": metadata_dict, } f.write(json.dumps(file_info, ensure_ascii=False) + "\n") diff --git a/runtime/datamate-python/app/module/system/interface/__init__.py b/runtime/datamate-python/app/module/system/interface/__init__.py index 15438e988..bf56c5d86 100644 --- a/runtime/datamate-python/app/module/system/interface/__init__.py +++ b/runtime/datamate-python/app/module/system/interface/__init__.py @@ -2,8 +2,10 @@ from .about import router as about_router from app.module.system.interface.models import router as models_router +from .sys_param import router as sys_param_router router = APIRouter() router.include_router(about_router) router.include_router(models_router) +router.include_router(sys_param_router) diff --git a/runtime/datamate-python/app/module/system/interface/sys_param.py b/runtime/datamate-python/app/module/system/interface/sys_param.py new file mode 100644 index 000000000..d47645b3c --- /dev/null +++ b/runtime/datamate-python/app/module/system/interface/sys_param.py @@ -0,0 +1,106 @@ +""" +System Parameter API Routes +系统参数 REST API 路由 +""" + +from typing import List +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db.session import get_db +from app.module.shared.schema.common import StandardResponse +from app.module.system.schema.sys_param import ( + SysParamDto, + UpdateParamValueRequest, + CreateSysParamRequest, +) +from app.module.system.service.sys_param_service import SysParamService + +router = APIRouter(prefix="/sys-param", tags=["System Parameters"]) + + +@router.get( + "/list", + response_model=StandardResponse[List[SysParamDto]], + summary="获取系统参数列表", + description="获取所有系统参数配置", +) +async def list_params(db: AsyncSession = Depends(get_db)): + """获取系统参数列表""" + service = SysParamService(db) + params = await service.list_params() + return StandardResponse(code="0", message="success", data=params) + + +@router.get( + "/{param_id}", + response_model=StandardResponse[SysParamDto], + summary="获取系统参数详情", + description="根据ID获取系统参数详情", +) +async def get_param( + param_id: str, + db: AsyncSession = Depends(get_db), +): + """获取系统参数详情""" + service = SysParamService(db) + param = await service.get_param_by_id(param_id) + if not param: + from app.core.exception import BusinessError, ErrorCodes + + raise BusinessError( + ErrorCodes.NOT_FOUND, f"System parameter {param_id} not found" + ) + return StandardResponse(code="0", message="success", data=param) + + +@router.post( + "", + response_model=StandardResponse[SysParamDto], + summary="创建系统参数", + description="创建新的系统参数", +) +async def create_param( + request: CreateSysParamRequest, + db: AsyncSession = Depends(get_db), +): + """创建系统参数""" + service = SysParamService(db) + param = await service.create_param(request) + await db.commit() + return StandardResponse(code="0", message="success", data=param) + + +@router.put( + "/{param_id}", + response_model=StandardResponse[SysParamDto], + summary="更新系统参数值", + description="更新指定系统参数的值", +) +async def update_param_value( + param_id: str, + request: UpdateParamValueRequest, + db: AsyncSession = Depends(get_db), +): + """更新系统参数值""" + service = SysParamService(db) + param = await service.update_param_value(param_id, request.param_value) + await db.commit() + return StandardResponse(code="0", message="success", data=param) + + +@router.delete( + "/{param_id}", + response_model=StandardResponse[str], + summary="删除系统参数", + description="删除指定的系统参数", +) +async def delete_param( + param_id: str, + db: AsyncSession = Depends(get_db), +): + """删除系统参数""" + service = SysParamService(db) + await service.delete_param(param_id) + await db.commit() + return StandardResponse(code="0", message="success", data=param_id) diff --git a/runtime/datamate-python/app/module/system/schema/sys_param.py b/runtime/datamate-python/app/module/system/schema/sys_param.py new file mode 100644 index 000000000..0f4e7606d --- /dev/null +++ b/runtime/datamate-python/app/module/system/schema/sys_param.py @@ -0,0 +1,51 @@ +""" +System Parameter DTOs +系统参数数据传输对象 +""" + +from typing import Optional +from datetime import datetime +from pydantic import BaseModel, Field + +from app.module.shared.schema import BaseResponseModel + + +class SysParamDto(BaseResponseModel): + """系统参数 DTO""" + + id: str = Field(..., description="参数ID") + param_value: str = Field(..., description="参数值") + param_type: str = Field( + default="string", description="参数类型(string、integer、boolean)" + ) + option_list: Optional[str] = Field(None, description="选项列表(JSON格式)") + description: Optional[str] = Field(None, description="参数描述") + is_built_in: bool = Field(default=False, description="是否内置") + can_modify: bool = Field(default=True, description="是否可修改") + is_enabled: bool = Field(default=True, description="是否启用") + created_at: Optional[datetime] = Field(None, description="创建时间") + updated_at: Optional[datetime] = Field(None, description="更新时间") + created_by: Optional[str] = Field(None, description="创建者") + updated_by: Optional[str] = Field(None, description="更新者") + + class Config: + from_attributes = True + + +class UpdateParamValueRequest(BaseResponseModel): + """更新参数值请求""" + + param_value: str = Field(..., description="参数值") + + +class CreateSysParamRequest(BaseResponseModel): + """创建系统参数请求""" + + id: str = Field(..., description="参数ID") + param_value: str = Field(..., description="参数值") + param_type: str = Field(default="string", description="参数类型") + option_list: Optional[str] = Field(None, description="选项列表(JSON格式)") + description: Optional[str] = Field(None, description="参数描述") + is_built_in: bool = Field(default=False, description="是否内置") + can_modify: bool = Field(default=True, description="是否可修改") + is_enabled: bool = Field(default=True, description="是否启用") diff --git a/runtime/datamate-python/app/module/system/service/sys_param_service.py b/runtime/datamate-python/app/module/system/service/sys_param_service.py new file mode 100644 index 000000000..1e4dad77c --- /dev/null +++ b/runtime/datamate-python/app/module/system/service/sys_param_service.py @@ -0,0 +1,141 @@ +""" +System Parameter Service +系统参数服务 +""" + +from typing import List, Optional +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.logging import get_logger +from app.core.exception import BusinessError, ErrorCodes +from app.db.models.sys_param import SysParam +from app.module.system.schema.sys_param import ( + SysParamDto, + UpdateParamValueRequest, + CreateSysParamRequest, +) + +logger = get_logger(__name__) + + +class SysParamService: + """系统参数服务""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def list_params(self) -> List[SysParamDto]: + """获取系统参数列表""" + query = select(SysParam).order_by(SysParam.param_type, SysParam.id) + result = await self.db.execute(query) + params = result.scalars().all() + return [self._orm_to_response(p) for p in params] + + async def get_param_by_id(self, param_id: str) -> Optional[SysParamDto]: + """根据ID获取系统参数""" + query = select(SysParam).where(SysParam.id == param_id) + result = await self.db.execute(query) + param = result.scalar_one_or_none() + if not param: + return None + return self._orm_to_response(param) + + async def update_param_value(self, param_id: str, param_value: str) -> SysParamDto: + """更新系统参数值""" + # 查询参数 + query = select(SysParam).where(SysParam.id == param_id) + result = await self.db.execute(query) + param = result.scalar_one_or_none() + + if not param: + raise BusinessError( + ErrorCodes.NOT_FOUND, f"System parameter {param_id} not found" + ) + + # 检查是否可修改 + if not param.can_modify: + raise BusinessError( + ErrorCodes.OPERATION_FAILED, + f"System parameter {param_id} cannot be modified", + ) + + # 更新值 + param.param_value = param_value + await self.db.commit() + await self.db.refresh(param) + + logger.info(f"Updated system parameter {param_id} = {param_value}") + return self._orm_to_response(param) + + async def create_param(self, request: CreateSysParamRequest) -> SysParamDto: + """创建系统参数""" + # 检查是否已存在 + existing = await self.get_param_by_id(request.id) + if existing: + raise BusinessError( + ErrorCodes.OPERATION_FAILED, + f"System parameter {request.id} already exists", + ) + + # 创建新参数 + param = SysParam( + id=request.id, + param_value=request.param_value, + param_type=request.param_type, + option_list=request.option_list, + description=request.description, + is_built_in=request.is_built_in, + can_modify=request.can_modify, + is_enabled=request.is_enabled, + ) + + self.db.add(param) + await self.db.commit() + await self.db.refresh(param) + + logger.info(f"Created system parameter {request.id}") + return self._orm_to_response(param) + + async def delete_param(self, param_id: str) -> None: + """删除系统参数""" + # 查询参数 + query = select(SysParam).where(SysParam.id == param_id) + result = await self.db.execute(query) + param = result.scalar_one_or_none() + + if not param: + raise BusinessError( + ErrorCodes.NOT_FOUND, f"System parameter {param_id} not found" + ) + + # 检查是否为内置参数 + if param.is_built_in: + raise BusinessError( + ErrorCodes.OPERATION_FAILED, + f"Cannot delete built-in system parameter {param_id}", + ) + + # 删除参数 + await self.db.delete(param) + await self.db.commit() + + logger.info(f"Deleted system parameter {param_id}") + + @staticmethod + def _orm_to_response(row: SysParam) -> SysParamDto: + """Convert ORM model to response DTO""" + return SysParamDto( + id=row.id, + param_value=row.param_value, + param_type=row.param_type or "string", + option_list=row.option_list, + description=row.description, + is_built_in=bool(row.is_built_in) if row.is_built_in is not None else False, + can_modify=bool(row.can_modify) if row.can_modify is not None else True, + is_enabled=bool(row.is_enabled) if row.is_enabled is not None else True, + created_at=row.created_at, + updated_at=row.updated_at, + created_by=row.created_by, + updated_by=row.updated_by, + ) diff --git a/runtime/python-executor/datamate/wrappers/executor.py b/runtime/python-executor/datamate/wrappers/executor.py index b44ef2a2f..4b40948f7 100644 --- a/runtime/python-executor/datamate/wrappers/executor.py +++ b/runtime/python-executor/datamate/wrappers/executor.py @@ -43,6 +43,8 @@ def load_meta(self, line): meta["sourceFileType"] = meta.get("fileType") if meta.get("fileSize"): meta["sourceFileSize"] = meta.get("fileSize") + else: + meta["sourceFileSize"] = 0 if not meta.get("totalPageNum"): meta["totalPageNum"] = 0 if not meta.get("extraFilePath"): diff --git a/scripts/db/setting-management-init.sql b/scripts/db/setting-management-init.sql index 92103579c..9903d97de 100644 --- a/scripts/db/setting-management-init.sql +++ b/scripts/db/setting-management-init.sql @@ -98,7 +98,7 @@ CREATE TRIGGER update_t_sys_param_updated_at -- 插入初始数据 INSERT INTO t_sys_param (id, param_value, param_type, option_list, description, is_built_in, - can_modify, is_enabled, created_by, updated_by) + can_modify, is_enabled, created_by, updated_by) VALUES ('sys.knowledge.base.count', '200', 'number', '10,200,500', '知识库最大数量', true, true, true, 'system', 'system'), ('SEARCH_API', 'tavily', 'string', 'tavily,infoquest,duckduckgo,brave_search,arxiv', 'deer-flow使用的搜索引擎', true, true, true, 'system', 'system'), @@ -106,5 +106,6 @@ VALUES ('BRAVE_SEARCH_API_KEY', 'api-xxx', 'string', '', 'deer-flow使用的搜索引擎所需的apiKey', true, true, true, 'system', 'system'), ('JINA_API_KEY', '', 'string', '', 'deer-flow使用的JINA搜索引擎所需的apiKey', true, true, true, 'system', 'system'), ('sys.management.dataset.pvc.name', 'datamate-dataset-pvc', 'string', '', '数据集所在pvc名称', true, false,true, 'system', 'system'), - ('DATA_JUICER_EXECUTOR', 'default', 'string', 'default,ray', 'data-juicer使用的执行器', true, true, true, 'system', 'system') + ('DATA_JUICER_EXECUTOR', 'default', 'string', 'default,ray', 'data-juicer使用的执行器', true, true, true, 'system', 'system'), + ('sys.home.page.url', '', 'string', '', '首页URL,访问根路径时自动重定向到此URL(为空则不重定向)', true, true, true, 'system', 'system') ON CONFLICT (id) DO NOTHING; diff --git a/scripts/db/zz-final-task.sh b/scripts/db/zz-final-task.sh new file mode 100644 index 000000000..170a71a5f --- /dev/null +++ b/scripts/db/zz-final-task.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# 遇到错误即退出 +set -e + +# 使用 -n 判断环境变量 HOME_PAGE_URL 是否非空(即已设置且有值) +if [ -n "$HOME_PAGE_URL" ]; then + echo "检测到 HOME_PAGE_URL 环境变量已配置,值为: $HOME_PAGE_URL。准备更新数据库..." + + # 只有变量非空时,才会进入这里执行 SQL + psql -v ON_ERROR_STOP=1 -U "$POSTGRES_USER" -d "datamate" -v my_url="$HOME_PAGE_URL" <<-EOSQL + + UPDATE t_sys_param + SET param_value = :'my_url' + WHERE id = 'sys.home.page.url'; + +EOSQL + + echo "sys.home.page.url 更新完成!" +else + # 如果变量为空或未设置,打印提示并直接跳过 + echo "未配置 HOME_PAGE_URL 环境变量或值为空,跳过 sys.home.page.url 的更新操作。" +fi \ No newline at end of file