From a6c371226a16a10f7fd352d9059fa210af4ecc06 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 11 Dec 2025 16:41:07 +0800 Subject: [PATCH 01/17] fix(chart): update Helm chart helpers and values for improved configuration --- deployment/helm/label-studio/templates/_helpers.tpl | 7 +++---- deployment/helm/label-studio/values.yaml | 10 ++++++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/deployment/helm/label-studio/templates/_helpers.tpl b/deployment/helm/label-studio/templates/_helpers.tpl index 0e6b76957..04d95dd99 100644 --- a/deployment/helm/label-studio/templates/_helpers.tpl +++ b/deployment/helm/label-studio/templates/_helpers.tpl @@ -2,7 +2,7 @@ Expand the name of the chart. */}} {{- define "label-studio.name" -}} -{{- default .Chart.name .Values.nameOverride | trunc 63 | trimSuffix "-" -}} +{{- default .Values.nameOverride .Chart.Name | trunc 63 | trimSuffix "-" -}} {{- end -}} {{/* @@ -12,7 +12,7 @@ Create a default fully qualified app name. {{- if .Values.fullnameOverride -}} {{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}} {{- else -}} -{{- $name := default .Chart.name .Values.nameOverride -}} +{{- $name := default .Values.nameOverride .Chart.Name -}} {{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}} {{- end -}} {{- end -}} @@ -21,6 +21,5 @@ Create a default fully qualified app name. Create chart name and version as used by the chart label. */}} {{- define "label-studio.chart" -}} -{{- printf "%s-%s" .Chart.name .Chart.version | replace "+" "_" | trunc 63 | trimSuffix "-" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}} {{- end -}} - diff --git a/deployment/helm/label-studio/values.yaml b/deployment/helm/label-studio/values.yaml index 347be476e..696e5a40d 100644 --- a/deployment/helm/label-studio/values.yaml +++ b/deployment/helm/label-studio/values.yaml @@ -1,5 +1,6 @@ # Default values for label-studio Helm chart. # This mirrors the configuration from deployment/docker/label-studio/docker-compose.yml +fullnameOverride: label-studio replicaCount: 1 @@ -24,9 +25,9 @@ postgres: size: 10Gi service: - type: ClusterIP + type: NodePort port: 8000 - nodePort: null + nodePort: 30001 # Corresponds to docker-compose port mapping 30001:8000 ingress: @@ -54,7 +55,7 @@ env: POSTGRE_USER: "postgres" POSTGRE_PASSWORD: "" POSTGRE_PORT: 5432 - POSTGRE_HOST: "db" + POSTGRE_HOST: "label-studio-postgres" LABEL_STUDIO_HOST: "" # can be overridden LOCAL_FILES_SERVING_ENABLED: "true" LOCAL_FILES_DOCUMENT_ROOT: "/label-studio/local" @@ -75,5 +76,6 @@ persistence: # If not set and persistence.enabled=true, a PVC will be created automatically. datasetVolume: enabled: true - claimName: "" # if empty, uses same PVC as persistence or creates a dedicated one + claimName: datamate-dataset-pvc # if empty, uses same PVC as persistence or creates a dedicated one + From 175cdb9f5351a0965e60862d94941d4be79ffd1b Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 11 Dec 2025 16:56:19 +0800 Subject: [PATCH 02/17] feat(SynthesisTaskTab): enhance task table with tooltip support and improved column widths --- .../components/SynthesisTaskTab.tsx | 63 ++++++++++++++----- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/frontend/src/pages/SynthesisTask/components/SynthesisTaskTab.tsx b/frontend/src/pages/SynthesisTask/components/SynthesisTaskTab.tsx index 832fa3231..01a0b538a 100644 --- a/frontend/src/pages/SynthesisTask/components/SynthesisTaskTab.tsx +++ b/frontend/src/pages/SynthesisTask/components/SynthesisTaskTab.tsx @@ -1,4 +1,4 @@ -import { useState, useEffect, useCallback } from "react"; +import { useState, useEffect } from "react"; import { Card, Button, Table, Modal, message, Tooltip, Form, Input, Select } from "antd"; import { Plus, @@ -50,11 +50,6 @@ interface SynthesisTask { updated_by?: string; } -interface SynthesisDataItem { - id: string; - [key: string]: any; -} - export default function SynthesisTaskTab() { const navigate = useNavigate(); const [searchQuery, setSearchQuery] = useState(""); @@ -74,11 +69,6 @@ export default function SynthesisTaskTab() { const [evalForm] = Form.useForm(); - // 合成数据相关状态 - const [activeChunkId, setActiveChunkId] = useState(null); - const [synthesisData, setSynthesisData] = useState([]); - const [selectedDataIds, setSelectedDataIds] = useState([]); - // 获取任务列表 const loadTasks = async () => { setLoading(true); @@ -118,7 +108,16 @@ export default function SynthesisTaskTab() { }; // 表格列 + const ellipsisStyle = { + maxWidth: 100, + overflow: "hidden", + textOverflow: "ellipsis", + whiteSpace: "nowrap", + display: "inline-block", + verticalAlign: "middle", + }; const taskColumns = [ + { title: ( @@ -113,13 +154,13 @@ export default function SynthFileTask() { render: (status?: string) => { let badgeStatus: BadgeProps["status"] = "default"; let text = status || "未知"; - if (status === "pending" || status === "processing") { + if (status === "pending" || status === "PROCESSING" || status === "processing") { badgeStatus = "processing"; text = "处理中"; - } else if (status === "completed") { + } else if (status === "COMPLETED" || status === "completed") { badgeStatus = "success"; text = "已完成"; - } else if (status === "failed") { + } else if (status === "FAILED" || status === "failed") { badgeStatus = "error"; text = "失败"; } @@ -155,40 +196,122 @@ export default function SynthFileTask() { }, ]; + const handleRefresh = async () => { + // 刷新按钮:明确触发一次顶部 loading,让用户看到“闪一下”的效果 + window.dispatchEvent(new Event("loading:show")); + try { + await fetchTaskDetail(); + await fetchData(pagination.current || 1, pagination.pageSize || 10, false); + } finally { + window.dispatchEvent(new Event("loading:hide")); + } + }; + + const handleDelete = async () => { + if (!taskId) return; + try { + await deleteSynthesisTaskByIdUsingDelete(taskId); + message.success("合成任务已删除"); + navigate("/data/synthesis/task"); + } catch { + message.error("删除合成任务失败"); + } + }; + + // 头部统计与操作 + const headerData: Record = taskInfo + ? { + name: taskInfo.name, + id: taskInfo.id, + icon: , + description: taskInfo.description, + createdAt: taskInfo.created_at ? formatDateTime(taskInfo.created_at) : "--", + } + : {}; + + const statistics = [ + { + key: "type", + icon: , + label: "类型", + value: + taskInfo?.synthesis_type === "QA" + ? "问答对生成" + : taskInfo?.synthesis_type === "COT" + ? "链式推理生成" + : taskInfo?.synthesis_type || "--", + }, + { + key: "fileCount", + icon: , + label: "文件数", + value: taskInfo?.total_files ?? "--", + }, + ]; + + const operations = [ + { + key: "refresh", + label: "刷新", + icon: , + onClick: handleRefresh, + }, + { + key: "delete", + label: "删除任务", + icon: , + danger: true, + confirm: { + title: "确认删除该合成任务?", + description: "删除后将无法恢复,请谨慎操作。", + okText: "确认删除", + cancelText: "取消", + onConfirm: handleDelete, + placement: "top", + overlayStyle: { + marginTop: 40, + }, + }, + }, + ]; + + const tabList = [ + { + key: "files", + label: "处理文件", + }, + ]; + + const breadItems = [ + { + title: 合成任务, + }, + { + title: taskInfo?.name || "任务详情", + }, + ]; + return ( -
- {/* 顶部任务信息和返回按钮 */} -
-
- {taskInfo && ( - <> -
- {taskInfo.name} - - {taskInfo.synthesis_type === "QA" ? "问答对生成" : taskInfo.synthesis_type === "COT" ? "链式推理生成" : taskInfo.synthesis_type} - - - 状态:{taskInfo.status === "pending" ? "等待中" : taskInfo.status === "completed" ? "已完成" : taskInfo.status === "failed" ? "失败" : taskInfo.status} - -
-
- 创建时间:{formatDateTime(taskInfo.created_at)} - 模型ID:{taskInfo.model_id} -
- + <> + +
+ +
+
+ +
+ {activeTab === "files" && ( + + rowKey="id" + loading={loading} + dataSource={data} + columns={columns} + pagination={pagination} + onChange={handleTableChange} + /> )}
-
- {/* 文件任务表格 */} - - rowKey="id" - loading={loading} - dataSource={data} - columns={columns} - pagination={pagination} - onChange={handleTableChange} - /> -
+ ); } diff --git a/runtime/datamate-python/app/module/generation/schema/generation.py b/runtime/datamate-python/app/module/generation/schema/generation.py index 9fe16efa9..f279aa03b 100644 --- a/runtime/datamate-python/app/module/generation/schema/generation.py +++ b/runtime/datamate-python/app/module/generation/schema/generation.py @@ -2,7 +2,7 @@ from enum import Enum from typing import List, Optional, Dict, Any -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator class TextSplitConfig(BaseModel): @@ -27,13 +27,21 @@ class SynthesisType(Enum): class CreateSynthesisTaskRequest(BaseModel): """创建数据合成任务请求""" name: str = Field(..., description="合成任务名称") - description: str = Field(None, description="合成任务描述") + description: Optional[str] = Field(None, description="合成任务描述") model_id: str = Field(..., description="模型ID") source_file_id: list[str] = Field(..., description="原始文件ID列表") text_split_config: TextSplitConfig = Field(None, description="文本切片配置") synthesis_config: SynthesisConfig = Field(..., description="合成配置") synthesis_type: SynthesisType = Field(..., description="合成类型") + @field_validator("description") + @classmethod + def empty_string_to_none(cls, v: Optional[str]) -> Optional[str]: + """前端如果传入空字符串,将其统一转换为 None,避免存库时看起来像有描述但实际上为空。""" + if isinstance(v, str) and v.strip() == "": + return None + return v + class DataSynthesisTaskItem(BaseModel): """数据合成任务列表/详情项""" From f92dd5981e27d5cefdf234bfbce07b00639fc4e2 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 11 Dec 2025 18:55:34 +0800 Subject: [PATCH 04/17] feat(SynthFileTask): enhance file display with progress tracking and delete action --- .../src/pages/SynthesisTask/SynthFileTask.tsx | 79 ++++++++++++------- 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/frontend/src/pages/SynthesisTask/SynthFileTask.tsx b/frontend/src/pages/SynthesisTask/SynthFileTask.tsx index 14c8a3fa5..7fc9c12d3 100644 --- a/frontend/src/pages/SynthesisTask/SynthFileTask.tsx +++ b/frontend/src/pages/SynthesisTask/SynthFileTask.tsx @@ -2,7 +2,7 @@ import { App } from "antd"; import { useEffect, useState } from "react"; import { Link, useNavigate, useParams } from "react-router"; import { DeleteOutlined, ReloadOutlined } from "@ant-design/icons"; -import { Badge, Breadcrumb, Button, Table, Tabs } from "antd"; +import { Badge, Breadcrumb, Button, Table, Tabs, Progress, Tooltip } from "antd"; import type { BadgeProps } from "antd"; import type { ColumnsType, TablePaginationConfig } from "antd/es/table"; @@ -13,7 +13,7 @@ import { deleteSynthesisTaskByIdUsingDelete, } from "@/pages/SynthesisTask/synthesis-api"; import { formatDateTime } from "@/utils/unit"; -import { Folder, Sparkles } from "lucide-react"; +import { Folder, Sparkles, Trash2 } from "lucide-react"; interface SynthesisFileTaskItem { id: string; @@ -131,20 +131,22 @@ export default function SynthFileTask() { const columns: ColumnsType = [ { - title: "文件名", - dataIndex: "file_name", - key: "file_name", - render: (text: string, record) => ( - + title: "文件", + key: "file", + render: (_text, record) => ( +
+ + +
), }, { @@ -168,19 +170,28 @@ export default function SynthFileTask() { }, }, { - title: "切片进度", - key: "chunks", - render: (_text, record) => ( - - {record.processed_chunks}/{record.total_chunks} - - ), + title: "切片总数", + dataIndex: "total_chunks", + key: "total_chunks", }, { - title: "目标文件路径", - dataIndex: "target_file_location", - key: "target_file_location", - ellipsis: true, + title: "处理进度", + key: "progress", + render: (_text, record) => { + const total = record.total_chunks || 0; + const processed = record.processed_chunks || 0; + const percent = total > 0 ? Math.min(100, Math.round((processed / total) * 100)) : 0; + return ( +
+ `${processed}/${total}`} + /> +
+ ); + }, }, { title: "创建时间", @@ -194,6 +205,20 @@ export default function SynthFileTask() { key: "updated_at", render: (val?: string) => (val ? formatDateTime(val) : "-"), }, + { + title: "操作", + key: "actions", + render: () => ( + + -
+ const breadItems = [ + { + title: 合成任务, + }, + { + title: state.taskId ? ( + {taskInfo?.name || "任务详情"} + ) : ( + taskInfo?.name || "任务详情" + ), + }, + { + title: state.fileName || "文件详情", + }, + ]; - {/* 主体左右布局 */} -
- {/* 左侧 Chunk 列表:占比 2/5 */} -
-
- Chunk 列表 -
-
- {chunkLoading ? ( -
- + return ( + <> + +
+
+ {/* 左侧 Chunk 列表 */} +
+
+
+ Chunk 列表 + {chunkPagination.total ? ( + 共 {chunkPagination.total} 条 + ) : null}
- ) : chunks.length === 0 ? ( - - ) : ( - { - const active = item.id === selectedChunkId; - return ( - -
-
setSelectedChunkId(item.id)} - > - Chunk #{item.chunk_index} - -
-
+ {chunkLoading ? ( +
+ +
+ ) : chunks.length === 0 ? ( + + ) : ( + { + const active = item.id === selectedChunkId; + return ( + setSelectedChunkId(item.id)} > - {item.chunk_content} -
-
- { - setSelectedChunkId(item.id); - handleDeleteCurrentChunk(); - }} - okText="删除" - cancelText="取消" - > - - - 删除该 Chunk 及合成数据 - - - ), - }, - ], - }} - trigger={["click"]} - > -
-
-
- ); - }} - /> - )} -
-
- `共 ${total} 条`} - /> +
+
+
+ + Chunk #{item.chunk_index} + +
+ {/* 右侧显示 Chunk ID,完整展示 */} + + ID: {item.id} + +
+
+ {item.chunk_content} +
+
+ + ); + }} + /> + )} +
+ {chunkPagination.total ? ( +
+ `共 ${total} 条`} + /> +
+ ) : null} +
-
- {/* 右侧合成数据展示:占比 3/5 */} -
-
- 合成数据 - {currentChunk && ( - - 当前 Chunk #{currentChunk.chunk_index} - - )} -
-
- {dataLoading ? ( -
- + {/* 右侧合成数据展示 */} +
+
+
+ 合成数据 + {currentChunk && ( + + 当前 Chunk #{currentChunk.chunk_index} + + )}
- ) : !selectedChunkId ? ( - - ) : synthDataList.length === 0 ? ( - - ) : ( -
- {synthDataList.map((item, index) => { - const isEditing = editingId === item.id; - return ( -
-
- 记录 {index + 1} - ID:{item.id} -
+
+ {dataLoading ? ( +
+ +
+ ) : !selectedChunkId ? ( + + ) : synthDataList.length === 0 ? ( + + ) : ( +
+ {synthDataList.map((item, index) => { + const isEditing = editingId === item.id; + return ( +
+
+ 记录 {index + 1} + ID:{item.id} +
- {/* 右下角更多操作按钮:编辑 & 删除 */} -
- - - 编辑 - - ), - onClick: (info) => { - info.domEvent.stopPropagation(); - startEdit(item); - }, - }, - { - key: "delete-data", - danger: true, - label: ( - { - e?.stopPropagation(); - handleDeleteSingleSynthesisData(item.id); - }} - okText="删除" - cancelText="取消" + {/* 顶部操作按钮:编辑 & 删除,更显眼 */} +
+ {!isEditing && ( + <> + + handleDeleteSingleSynthesisData(item.id)} + > +
+ 删除 + +
+ + )} +
- {/* 表格形式的 key-value 展示 + 可编辑 value */} -
- {getDataEntries(item.data).map(([key, value], rowIdx) => { - const displayValue = - typeof value === "string" || - typeof value === "number" || - typeof value === "boolean" - ? String(value) - : JSON.stringify(value, null, 2); + {/* key-value 展示区域:不再截断,完整展示 */} +
+ {getDataEntries(item.data).map(([key, value], rowIdx) => { + const displayValue = + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" + ? String(value) + : JSON.stringify(value, null, 2); - return ( -
-
- {key} -
-
- {isEditing ? ( - { - const v = e.target.value; - setEditingMap((prev) => ({ ...prev, [key]: v })); - }} - autoSize={{ minRows: 1, maxRows: 4 }} - /> - ) : ( - displayValue - )} -
-
- ); - })} -
+ return ( +
+
+ {key} +
+
+ {isEditing ? ( + { + const v = e.target.value; + setEditingMap((prev) => ({ ...prev, [key]: v })); + }} + autoSize={{ minRows: 1, maxRows: 6 }} + /> + ) : ( + displayValue + )} +
+
+ ); + })} +
- {isEditing && ( -
- - + {isEditing && ( +
+ + +
+ )}
- )} -
- ); - })} + ); + })} +
+ )}
- )} +
-
+ ); } From 951b065af36a9b90adf70ae7371153f187aa23d4 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 11 Dec 2025 20:50:38 +0800 Subject: [PATCH 06/17] feat(SynthDataDetail): add delete action for chunks with confirmation prompt --- .../pages/SynthesisTask/SynthDataDetail.tsx | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx b/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx index 6ce20ada7..aa9be354b 100644 --- a/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx +++ b/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx @@ -299,7 +299,7 @@ export default function SynthDataDetail() { return (
- {/* 右侧显示 Chunk ID,完整展示 */} - - ID: {item.id} - +
+ {/* 右侧显示 Chunk ID,完整展示 */} + + ID: {item.id} + + {/* 删除该 Chunk 按钮 */} + { + e?.stopPropagation(); + setSelectedChunkId(item.id); + handleDeleteCurrentChunk(); + }} + onCancel={(e) => e?.stopPropagation()} + > +
{item.chunk_content} From a7463ca0d1b1ea6fb02975525c8db5591bc2b8b6 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 11 Dec 2025 20:52:35 +0800 Subject: [PATCH 07/17] feat(SynthDataDetail): update edit and delete buttons to icon-only format --- .../pages/SynthesisTask/SynthDataDetail.tsx | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx b/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx index aa9be354b..33c07bdbb 100644 --- a/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx +++ b/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx @@ -400,18 +400,17 @@ export default function SynthDataDetail() { ID:{item.id}
- {/* 顶部操作按钮:编辑 & 删除,更显眼 */} -
+ {/* 顶部操作按钮:编辑 & 删除,改为仅图标按钮 */} +
{!isEditing && ( <> + /> handleDeleteSingleSynthesisData(item.id)} > + icon={} + onClick={(e) => e.stopPropagation()} + /> )} From 61230fafd6083ae6a2f42e3c8f6bccfc27fb5637 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 11 Dec 2025 21:00:04 +0800 Subject: [PATCH 08/17] feat(SynthDataDetail): add confirmation modals for chunk and synthesis data deletion --- .../pages/SynthesisTask/SynthDataDetail.tsx | 137 ++++++++++++------ 1 file changed, 93 insertions(+), 44 deletions(-) diff --git a/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx b/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx index 33c07bdbb..bfc364b1e 100644 --- a/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx +++ b/frontend/src/pages/SynthesisTask/SynthDataDetail.tsx @@ -83,7 +83,8 @@ export default function SynthDataDetail() { const [chunkLoading, setChunkLoading] = useState(false); const [dataLoading, setDataLoading] = useState(false); const [synthDataList, setSynthDataList] = useState([]); - // 展开/收起逻辑已去除,尽量直接展示完整数据 + const [chunkConfirmVisibleId, setChunkConfirmVisibleId] = useState(null); + const [dataConfirmVisibleId, setDataConfirmVisibleId] = useState(null); // 加载任务信息(用于顶部展示) useEffect(() => { @@ -268,9 +269,72 @@ export default function SynthDataDetail() { }, ]; + const showChunkConfirm = (id: string) => setChunkConfirmVisibleId(id); + const hideChunkConfirm = () => setChunkConfirmVisibleId(null); + + const showDataConfirm = (id: string) => setDataConfirmVisibleId(id); + const hideDataConfirm = () => setDataConfirmVisibleId(null); + return ( <> + {/* 全局删除确认遮罩:Chunk */} + {chunkConfirmVisibleId && ( +
+
+
确认删除该 Chunk 及其合成数据?
+
+ ID: {chunkConfirmVisibleId} +
+
+ + +
+
+
+ )} + + {/* 全局删除确认遮罩:合成数据 */} + {dataConfirmVisibleId && ( +
+
+
确认删除该条合成数据?
+
+ ID: {dataConfirmVisibleId} +
+
+ + +
+
+
+ )} +
{/* 左侧 Chunk 列表 */} @@ -318,26 +382,17 @@ export default function SynthDataDetail() { ID: {item.id} - {/* 删除该 Chunk 按钮 */} - { - e?.stopPropagation(); - setSelectedChunkId(item.id); - handleDeleteCurrentChunk(); +
@@ -397,37 +452,31 @@ export default function SynthDataDetail() { >
记录 {index + 1} - ID:{item.id} -
- - {/* 顶部操作按钮:编辑 & 删除,改为仅图标按钮 */} -
- {!isEditing && ( - <> -
{/* key-value 展示区域:不再截断,完整展示 */} From 4aaf0fd63c5376673c362843e6fe10d0c1a09679 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Sat, 13 Dec 2025 18:42:27 +0800 Subject: [PATCH 09/17] feat(DocumentSplitter): add enhanced document splitting functionality with CJK support and metadata detection --- .../datamate-python/app/common/text_split.py | 0 .../{ => module/shared}/common/__init__.py | 0 .../shared}/common/document_loaders.py | 0 .../app/module/shared/common/text_split.py | 169 ++++++++++++++++++ 4 files changed, 169 insertions(+) delete mode 100644 runtime/datamate-python/app/common/text_split.py rename runtime/datamate-python/app/{ => module/shared}/common/__init__.py (100%) rename runtime/datamate-python/app/{ => module/shared}/common/document_loaders.py (100%) create mode 100644 runtime/datamate-python/app/module/shared/common/text_split.py diff --git a/runtime/datamate-python/app/common/text_split.py b/runtime/datamate-python/app/common/text_split.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/runtime/datamate-python/app/common/__init__.py b/runtime/datamate-python/app/module/shared/common/__init__.py similarity index 100% rename from runtime/datamate-python/app/common/__init__.py rename to runtime/datamate-python/app/module/shared/common/__init__.py diff --git a/runtime/datamate-python/app/common/document_loaders.py b/runtime/datamate-python/app/module/shared/common/document_loaders.py similarity index 100% rename from runtime/datamate-python/app/common/document_loaders.py rename to runtime/datamate-python/app/module/shared/common/document_loaders.py diff --git a/runtime/datamate-python/app/module/shared/common/text_split.py b/runtime/datamate-python/app/module/shared/common/text_split.py new file mode 100644 index 000000000..c445fadeb --- /dev/null +++ b/runtime/datamate-python/app/module/shared/common/text_split.py @@ -0,0 +1,169 @@ +import os +from typing import List, Optional, Tuple + +from langchain_core.documents import Document +from langchain_text_splitters import ( + RecursiveCharacterTextSplitter, + MarkdownHeaderTextSplitter +) + + +class DocumentSplitter: + """ + 文档分割器类 - 增强版,优先通过元数据识别文档类型 + 核心特性: + 1. 优先从metadata的source字段(文件扩展名)识别Markdown + 2. 元数据缺失时,通过内容特征降级检测 + 3. 支持CJK(中日韩)语言优化 + """ + + def __init__( + self, + chunk_size: int = 2000, + chunk_overlap: int = 200, + is_cjk_language: bool = True, + markdown_headers: Optional[List[Tuple[str, str]]] = None + ): + """ + 初始化文档分割器 + + Args: + chunk_size: 每个文本块的最大长度(默认2000字符) + chunk_overlap: 文本块之间的重叠长度(默认200字符) + is_cjk_language: 是否处理中日韩等无词边界语言(默认True) + markdown_headers: Markdown标题分割规则(默认:#/##/###/####) + """ + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.is_cjk_language = is_cjk_language + + # 默认Markdown标题分割规则 + self.markdown_headers = markdown_headers or [ + ("#", "header_1"), + ("##", "header_2"), + ("###", "header_3"), + ("####", "header_4"), + ] + + # 初始化基础文本分割器 + self.text_splitter = self._create_text_splitter() + + def _create_text_splitter(self) -> RecursiveCharacterTextSplitter: + """创建递归字符分割器(内部方法)""" + # 优化后的CJK分隔符列表(修复语法错误,调整优先级) + if self.is_cjk_language: + separators = [ + "\n\n", "\n", # 段落/换行(最高优先级) + "。", ".", # 句号(中文/英文) + "!", "!", # 感叹号(中文/英文) + "?", "?", # 问号(中文/英文) + ";", ";", # 分号(中文/英文) + ",", ",", # 逗号(中文/英文) + "、", # 顿号(中文) + ":", ":", # 冒号(中文/英文) + " ", # 空格 + "\u200b", "", # 零宽空格/兜底 + ] + else: + separators = ["\n\n", "\n", " ", ".", "!", "?", ";", ":", ",", ""] + + return RecursiveCharacterTextSplitter( + chunk_size=self.chunk_size, + chunk_overlap=self.chunk_overlap, + separators=separators, + length_function=len, + is_separator_regex=False + ) + + @staticmethod + def _is_markdown(doc: Document) -> bool: + """ + 优先从元数据判断是否为Markdown + 规则:检查metadata中的source字段扩展名是否为.md/.markdown/.mdx等 + """ + # 获取source字段(忽略大小写) + source = doc.metadata.get("source", "").lower() + if not source: + return False + + # 获取文件扩展名 + ext = os.path.splitext(source)[-1].lower() + # Markdown常见扩展名列表 + md_ext = [".md", ".markdown", ".mdx", ".mkd", ".mkdown"] + return ext in md_ext + + def split(self, documents: List[Document], is_markdown: bool = False) -> List[Document]: + """ + 核心分割方法 + + Args: + documents: 待分割的Document列表 + is_markdown: 是否为Markdown文档(默认False) + + Returns: + 分割后的Document列表 + """ + if not documents: + return [] + + # Markdown文档处理:先按标题分割,再按字符分割 + if is_markdown: + # 初始化Markdown标题分割器 + md_splitter = MarkdownHeaderTextSplitter( + headers_to_split_on=self.markdown_headers, + strip_headers=True, + return_each_line=False + ) + + # 按标题分割并继承元数据 + md_chunks = [] + for doc in documents: + chunks = md_splitter.split_text(doc.page_content) + for chunk in chunks: + chunk.metadata.update(doc.metadata) + md_chunks.extend(chunks) + + # 对标题分割后的内容进行字符分割 + final_chunks = self.text_splitter.split_documents(md_chunks) + + # 普通文本直接分割 + else: + final_chunks = self.text_splitter.split_documents(documents) + + return final_chunks + + # 核心自动分割方法(元数据优先) + @classmethod + def auto_split( + cls, + documents: List[Document], + chunk_size: int = 2000, + chunk_overlap: int = 200 + ) -> List[Document]: + """ + 极简快捷方法:自动识别文档类型并分割(元数据优先) + 仅需传入3个参数,无需初始化类实例 + + Args: + documents: 待分割的Document列表 + chunk_size: 每个文本块的最大长度(默认2000字符) + chunk_overlap: 文本块之间的重叠长度(默认200字符) + + Returns: + 分割后的Document列表 + """ + if not documents: + return [] + + # 初始化分割器实例(使用CJK默认优化) + splitter = cls( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + is_cjk_language=True + ) + + # 自动检测文档类型(元数据优先) + is_md = splitter._is_markdown(documents[0]) + + # 根据检测结果选择分割方式 + return splitter.split(documents, is_markdown=is_md) From 2ec96d93fa989b578d4d29aaec463a21c153968f Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Sat, 13 Dec 2025 18:43:08 +0800 Subject: [PATCH 10/17] feat(DataSynthesis): refactor data synthesis models and update task handling logic --- .../app/db/models/data_synthesis.py | 89 ++++++----- .../module/evaluation/service/evaluation.py | 4 +- .../generation/interface/generation_api.py | 119 +++++++++----- .../module/generation/schema/generation.py | 19 ++- .../generation/service/export_service.py | 4 +- .../generation/service/generation_service.py | 148 ++++++++---------- .../app/module/generation/service/prompt.py | 84 ++++++++++ .../app/module/shared/util/model_chat.py | 2 +- scripts/db/data-synthesis-init.sql | 14 +- 9 files changed, 300 insertions(+), 183 deletions(-) diff --git a/runtime/datamate-python/app/db/models/data_synthesis.py b/runtime/datamate-python/app/db/models/data_synthesis.py index 809cd2524..e83a0e01f 100644 --- a/runtime/datamate-python/app/db/models/data_synthesis.py +++ b/runtime/datamate-python/app/db/models/data_synthesis.py @@ -1,66 +1,75 @@ import uuid -from xml.etree.ElementTree import tostring -from sqlalchemy import Column, String, Text, Integer, JSON, TIMESTAMP, ForeignKey, func -from sqlalchemy.orm import relationship +from sqlalchemy import Column, String, Text, Integer, JSON, TIMESTAMP, func from app.db.session import Base from app.module.generation.schema.generation import CreateSynthesisTaskRequest async def save_synthesis_task(db_session, synthesis_task: CreateSynthesisTaskRequest): - """保存数据合成任务。""" - # 转换为模型实例 + """保存数据合成任务。 + + 注意:当前 MySQL 表 `t_data_synth_instances` 结构中只包含 synth_type / synth_config 等字段, + 没有 model_id、text_split_config、source_file_id、result_data_location 等列,因此这里只保存 + 与表结构一致的字段,其他信息由上层逻辑或其它表负责管理。 + """ gid = str(uuid.uuid4()) - synthesis_task_instance = DataSynthesisInstance( + + # 兼容旧请求结构:从请求对象中提取必要字段, + # - 合成类型:synthesis_type -> synth_type + # - 合成配置:text_split_config + synthesis_config 合并后写入 synth_config + synth_config = { + "text_split_config": synthesis_task.text_split_config.model_dump() + if synthesis_task.text_split_config + else None, + "synthesis_config": synthesis_task.synthesis_config.model_dump() + if synthesis_task.synthesis_config + else None, + "model_id": synthesis_task.model_id, + "source_file_id": list(synthesis_task.source_file_id or []), + } + + synth_task_instance = DataSynthInstance( id=gid, name=synthesis_task.name, description=synthesis_task.description, status="pending", - model_id=synthesis_task.model_id, - synthesis_type=synthesis_task.synthesis_type.value, + synth_type=synthesis_task.synthesis_type.value, progress=0, - result_data_location=f"/dataset/synthesis_results/{gid}/", - text_split_config=synthesis_task.text_split_config.model_dump(), - synthesis_config=synthesis_task.synthesis_config.model_dump(), - source_file_id=synthesis_task.source_file_id, - total_files=len(synthesis_task.source_file_id), + synth_config=synth_config, + total_files=len(synthesis_task.source_file_id or []), processed_files=0, total_chunks=0, processed_chunks=0, - total_synthesis_data=0, + total_synth_data=0, created_at=func.now(), updated_at=func.now(), created_by="system", - updated_by="system" + updated_by="system", ) - db_session.add(synthesis_task_instance) + db_session.add(synth_task_instance) await db_session.commit() - await db_session.refresh(synthesis_task_instance) - return synthesis_task_instance + await db_session.refresh(synth_task_instance) + return synth_task_instance -class DataSynthesisInstance(Base): - """数据合成任务表,对应表 t_data_synthesis_instances +class DataSynthInstance(Base): + """数据合成任务表,对应表 t_data_synth_instances - create table if not exists t_data_synthesis_instances + create table if not exists t_data_synth_instances ( id VARCHAR(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci PRIMARY KEY COMMENT 'UUID', name VARCHAR(255) NOT NULL COMMENT '任务名称', description TEXT COMMENT '任务描述', status VARCHAR(20) COMMENT '任务状态', - synthesis_type VARCHAR(20) NOT NULL COMMENT '合成类型', - model_id VARCHAR(255) NOT NULL COMMENT '模型ID', + synth_type VARCHAR(20) NOT NULL COMMENT '合成类型', progress INT DEFAULT 0 COMMENT '任务进度(百分比)', - result_data_location VARCHAR(1000) COMMENT '结果数据存储位置', - text_split_config JSON NOT NULL COMMENT '文本切片配置', - synthesis_config JSON NOT NULL COMMENT '合成配置', - source_file_id JSON NOT NULL COMMENT '原始文件ID列表', + synth_config JSON NOT NULL COMMENT '合成配置', total_files INT DEFAULT 0 COMMENT '总文件数', processed_files INT DEFAULT 0 COMMENT '已处理文件数', total_chunks INT DEFAULT 0 COMMENT '总文本块数', processed_chunks INT DEFAULT 0 COMMENT '已处理文本块数', - total_synthesis_data INT DEFAULT 0 COMMENT '总合成数据量', + total_synth_data INT DEFAULT 0 COMMENT '总合成数据量', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', created_by VARCHAR(255) COMMENT '创建者', @@ -68,27 +77,29 @@ class DataSynthesisInstance(Base): ) COMMENT='数据合成任务表(UUID 主键)'; """ - __tablename__ = "t_data_synthesis_instances" + __tablename__ = "t_data_synth_instances" id = Column(String(36), primary_key=True, index=True, comment="UUID") name = Column(String(255), nullable=False, comment="任务名称") description = Column(Text, nullable=True, comment="任务描述") status = Column(String(20), nullable=True, comment="任务状态") - synthesis_type = Column(String(20), nullable=False, comment="合成类型") - model_id = Column(String(255), nullable=False, comment="模型ID") + # 与数据库字段保持一致:synth_type / synth_config + synth_type = Column(String(20), nullable=False, comment="合成类型") progress = Column(Integer, nullable=False, default=0, comment="任务进度(百分比)") - result_data_location = Column(String(1000), nullable=True, comment="结果数据存储位置") - text_split_config = Column(JSON, nullable=False, comment="文本切片配置") - synthesis_config = Column(JSON, nullable=False, comment="合成配置") - source_file_id = Column(JSON, nullable=False, comment="原始文件ID列表") + synth_config = Column(JSON, nullable=False, comment="合成配置") total_files = Column(Integer, nullable=False, default=0, comment="总文件数") processed_files = Column(Integer, nullable=False, default=0, comment="已处理文件数") total_chunks = Column(Integer, nullable=False, default=0, comment="总文本块数") processed_chunks = Column(Integer, nullable=False, default=0, comment="已处理文本块数") - total_synthesis_data = Column(Integer, nullable=False, default=0, comment="总合成数据量") - - created_at = Column(TIMESTAMP, server_default=func.current_timestamp(), nullable=True, comment="创建时间") - updated_at = Column(TIMESTAMP, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), nullable=True, comment="更新时间") + total_synth_data = Column(Integer, nullable=False, default=0, comment="总合成数据量") + created_at = Column(TIMESTAMP, nullable=False, default=func.now(), comment="创建时间") + updated_at = Column( + TIMESTAMP, + nullable=False, + default=func.now(), + onupdate=func.now(), + comment="更新时间", + ) created_by = Column(String(255), nullable=True, comment="创建者") updated_by = Column(String(255), nullable=True, comment="更新者") diff --git a/runtime/datamate-python/app/module/evaluation/service/evaluation.py b/runtime/datamate-python/app/module/evaluation/service/evaluation.py index a691028c8..372a7c571 100644 --- a/runtime/datamate-python/app/module/evaluation/service/evaluation.py +++ b/runtime/datamate-python/app/module/evaluation/service/evaluation.py @@ -13,7 +13,7 @@ from app.db.session import AsyncSessionLocal from app.module.evaluation.schema.evaluation import SourceType from app.module.shared.schema import TaskStatus -from app.module.shared.util.model_chat import call_openai_style_model, _extract_json_substring +from app.module.shared.util.model_chat import call_openai_style_model, extract_json_substring from app.module.evaluation.schema.prompt import get_prompt from app.module.shared.util.structured_file import StructuredFileHandlerFactory from app.module.system.service.common_service import get_model_by_id @@ -73,7 +73,7 @@ async def evaluate_item(self, model_config, item: EvaluationItem, semaphore: asy call_openai_style_model, model_config.base_url, model_config.api_key, model_config.model_name, prompt_text, ) - resp_text = _extract_json_substring(resp_text) + resp_text = extract_json_substring(resp_text) try: json.loads(resp_text) except Exception as e: diff --git a/runtime/datamate-python/app/module/generation/interface/generation_api.py b/runtime/datamate-python/app/module/generation/interface/generation_api.py index b37c09747..a07a76e72 100644 --- a/runtime/datamate-python/app/module/generation/interface/generation_api.py +++ b/runtime/datamate-python/app/module/generation/interface/generation_api.py @@ -1,4 +1,5 @@ import uuid +from typing import cast from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks from sqlalchemy import select, func, delete @@ -7,7 +8,7 @@ from app.core.logging import get_logger from app.db.models.data_synthesis import ( save_synthesis_task, - DataSynthesisInstance, + DataSynthInstance, DataSynthesisFileInstance, DataSynthesisChunkInstance, SynthesisData, @@ -65,32 +66,64 @@ async def create_synthesis_task( synthesis_task = await save_synthesis_task(db, request) # 将已有的 DatasetFiles 记录保存到 t_data_synthesis_file_instances + synth_files = [] for f in dataset_files: file_instance = DataSynthesisFileInstance( id=str(uuid.uuid4()), # 使用新的 UUID 作为文件任务记录的主键,避免与 DatasetFiles 主键冲突 synthesis_instance_id=synthesis_task.id, file_name=f.file_name, source_file_id=str(f.id), - target_file_location=synthesis_task.result_data_location or "", status="pending", total_chunks=0, processed_chunks=0, created_by="system", updated_by="system", ) - db.add(file_instance) + synth_files.append(file_instance) if dataset_files: + db.add_all(synth_files) await db.commit() generation_service = GenerationService(db) # 异步处理任务:只传任务 ID,后台任务中使用新的 DB 会话重新加载任务对象 background_tasks.add_task(generation_service.process_task, synthesis_task.id) + # 将 ORM 对象包装成 DataSynthesisTaskItem,兼容新字段从 synth_config 还原 + synth_cfg = getattr(synthesis_task, "synth_config", {}) or {} + text_split_cfg = synth_cfg.get("text_split_config") or {} + synthesis_cfg = synth_cfg.get("synthesis_config") or {} + source_file_ids = synth_cfg.get("source_file_id") or request.source_file_id or [] + model_id = synth_cfg.get("model_id") or request.model_id + result_location = synth_cfg.get("result_data_location") + + task_item = DataSynthesisTaskItem( + id=synthesis_task.id, + name=synthesis_task.name, + description=synthesis_task.description, + status=synthesis_task.status, + synthesis_type=synthesis_task.synth_type, + model_id=model_id, + progress=synthesis_task.progress, + result_data_location=result_location, + text_split_config=text_split_cfg, + synthesis_config=synthesis_cfg, + source_file_id=list(source_file_ids), + total_files=synthesis_task.total_files, + processed_files=synthesis_task.processed_files, + total_chunks=synthesis_task.total_chunks, + processed_chunks=synthesis_task.processed_chunks, + total_synthesis_data=synthesis_task.total_synth_data, + created_at=synthesis_task.created_at, + updated_at=synthesis_task.updated_at, + created_by=synthesis_task.created_by, + updated_by=synthesis_task.updated_by, + ) + return StandardResponse( code=200, message="success", - data=synthesis_task, + data=task_item, ) @@ -100,7 +133,7 @@ async def get_synthesis_task( db: AsyncSession = Depends(get_db) ): """获取数据合成任务详情""" - result = await db.get(DataSynthesisInstance, task_id) + result = await db.get(DataSynthInstance, task_id) if not result: raise HTTPException(status_code=404, detail="Synthesis task not found") @@ -121,16 +154,16 @@ async def list_synthesis_tasks( db: AsyncSession = Depends(get_db) ): """分页列出所有数据合成任务,默认按创建时间倒序""" - query = select(DataSynthesisInstance) + query = select(DataSynthInstance) if synthesis_type: - query = query.filter(DataSynthesisInstance.synthesis_type == synthesis_type) + query = query.filter(DataSynthInstance.synth_type == synthesis_type) if status: - query = query.filter(DataSynthesisInstance.status == status) + query = query.filter(DataSynthInstance.status == status) if name: - query = query.filter(DataSynthesisInstance.name.like(f"%{name}%")) + query = query.filter(DataSynthInstance.name.like(f"%{name}%")) # 默认按创建时间倒序排列 - query = query.order_by(DataSynthesisInstance.created_at.desc()) + query = query.order_by(DataSynthInstance.created_at.desc()) count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar_one() @@ -143,31 +176,39 @@ async def list_synthesis_tasks( result = await db.execute(query.offset((page - 1) * page_size).limit(page_size)) rows = result.scalars().all() - task_items = [ - DataSynthesisTaskItem( - id=row.id, - name=row.name, - description=row.description, - status=row.status, - synthesis_type=row.synthesis_type, - model_id=row.model_id, - progress=row.progress, - result_data_location=row.result_data_location, - text_split_config=row.text_split_config, - synthesis_config=row.synthesis_config, - source_file_id=row.source_file_id, - total_files=row.total_files, - processed_files=row.processed_files, - total_chunks=row.total_chunks, - processed_chunks=row.processed_chunks, - total_synthesis_data=row.total_synthesis_data, - created_at=row.created_at, - updated_at=row.updated_at, - created_by=row.created_by, - updated_by=row.updated_by, + task_items: list[DataSynthesisTaskItem] = [] + for row in rows: + synth_cfg = getattr(row, "synth_config", {}) or {} + text_split_cfg = synth_cfg.get("text_split_config") or {} + synthesis_cfg = synth_cfg.get("synthesis_config") or {} + source_file_ids = synth_cfg.get("source_file_id") or [] + model_id = synth_cfg.get("model_id") + result_location = synth_cfg.get("result_data_location") + + task_items.append( + DataSynthesisTaskItem( + id=str(row.id), + name=str(row.name), + description=cast(str | None, row.description), + status=cast(str | None, row.status), + synthesis_type=str(row.synth_type), + model_id=model_id or "", + progress=int(cast(int, row.progress)), + result_data_location=result_location, + text_split_config=text_split_cfg, + synthesis_config=synthesis_cfg, + source_file_id=list(source_file_ids), + total_files=int(cast(int, row.total_files)), + processed_files=int(cast(int, row.processed_files)), + total_chunks=int(cast(int, row.total_chunks)), + processed_chunks=int(cast(int, row.processed_chunks)), + total_synthesis_data=int(cast(int, row.total_synth_data)), + created_at=row.created_at, + updated_at=row.updated_at, + created_by=row.created_by, + updated_by=row.updated_by, + ) ) - for row in rows - ] paged = PagedDataSynthesisTaskResponse( content=task_items, @@ -190,7 +231,7 @@ async def delete_synthesis_task( db: AsyncSession = Depends(get_db) ): """删除数据合成任务""" - task = await db.get(DataSynthesisInstance, task_id) + task = await db.get(DataSynthInstance, task_id) if not task: raise HTTPException(status_code=404, detail="Synthesis task not found") @@ -241,7 +282,7 @@ async def delete_synthesis_file_task( ): """删除数据合成任务中的文件任务,同时刷新任务表中的文件/切片数量""" # 先获取任务和文件任务记录 - task = await db.get(DataSynthesisInstance, task_id) + task = await db.get(DataSynthInstance, task_id) if not task: raise HTTPException(status_code=404, detail="Synthesis task not found") @@ -306,7 +347,7 @@ async def list_synthesis_file_tasks( ): """分页获取某个数据合成任务下的文件任务列表""" # 先校验任务是否存在 - task = await db.get(DataSynthesisInstance, task_id) + task = await db.get(DataSynthInstance, task_id) if not task: raise HTTPException(status_code=404, detail="Synthesis task not found") @@ -523,7 +564,7 @@ async def delete_synthesis_data_by_chunk( result = await db.execute( delete(SynthesisData).where(SynthesisData.chunk_instance_id == chunk_id) ) - deleted = result.rowcount or 0 + deleted = int(getattr(result, "rowcount", 0) or 0) await db.commit() @@ -542,7 +583,7 @@ async def batch_delete_synthesis_data( result = await db.execute( delete(SynthesisData).where(SynthesisData.id.in_(request.ids)) ) - deleted = result.rowcount or 0 + deleted = int(getattr(result, "rowcount", 0) or 0) await db.commit() return StandardResponse(code=200, message="success", data=deleted) diff --git a/runtime/datamate-python/app/module/generation/schema/generation.py b/runtime/datamate-python/app/module/generation/schema/generation.py index f279aa03b..172fe5141 100644 --- a/runtime/datamate-python/app/module/generation/schema/generation.py +++ b/runtime/datamate-python/app/module/generation/schema/generation.py @@ -1,6 +1,6 @@ from datetime import datetime from enum import Enum -from typing import List, Optional, Dict, Any +from typing import List, Optional, Dict, Any, Union from pydantic import BaseModel, Field, field_validator @@ -11,12 +11,19 @@ class TextSplitConfig(BaseModel): chunk_overlap: int = Field(..., description="重叠令牌数") -class SynthesisConfig(BaseModel): +class SyntheConfig(BaseModel): """合成配置""" + model_id: str = Field(..., description="模型ID") prompt_template: str = Field(..., description="合成提示模板") - synthesis_count: int = Field(None, description="单个chunk合成的数据数量") + number: int = Field(None, description="单个chunk合成的数据数量") temperature: Optional[float] = Field(None, description="温度参数") +class Config(BaseModel): + """配置""" + text_split_config: TextSplitConfig = Field(None, description="文本切片配置") + question_synth_config: SyntheConfig = Field(None, description="问题合成配置") + answer_synth_config: SyntheConfig = Field(None, description="答案合成配置") + class SynthesisType(Enum): """合成类型""" @@ -28,11 +35,9 @@ class CreateSynthesisTaskRequest(BaseModel): """创建数据合成任务请求""" name: str = Field(..., description="合成任务名称") description: Optional[str] = Field(None, description="合成任务描述") - model_id: str = Field(..., description="模型ID") - source_file_id: list[str] = Field(..., description="原始文件ID列表") - text_split_config: TextSplitConfig = Field(None, description="文本切片配置") - synthesis_config: SynthesisConfig = Field(..., description="合成配置") synthesis_type: SynthesisType = Field(..., description="合成类型") + source_file_id: list[str] = Field(..., description="原始文件ID列表") + synth_config: Config = Field(..., description="合成配置") @field_validator("description") @classmethod diff --git a/runtime/datamate-python/app/module/generation/service/export_service.py b/runtime/datamate-python/app/module/generation/service/export_service.py index 2f2dde61f..fd8037729 100644 --- a/runtime/datamate-python/app/module/generation/service/export_service.py +++ b/runtime/datamate-python/app/module/generation/service/export_service.py @@ -9,7 +9,7 @@ from app.core.logging import get_logger from app.db.models.data_synthesis import ( - DataSynthesisInstance, + DataSynthInstance, DataSynthesisFileInstance, SynthesisData, ) @@ -43,7 +43,7 @@ async def export_task_to_dataset( Optimized to process one file at a time to reduce memory usage. """ - task = await self._db.get(DataSynthesisInstance, task_id) + task = await self._db.get(DataSynthInstance, task_id) if not task: raise SynthesisExportError(f"Synthesis task {task_id} not found") diff --git a/runtime/datamate-python/app/module/generation/service/generation_service.py b/runtime/datamate-python/app/module/generation/service/generation_service.py index b21683921..26974689c 100644 --- a/runtime/datamate-python/app/module/generation/service/generation_service.py +++ b/runtime/datamate-python/app/module/generation/service/generation_service.py @@ -3,124 +3,103 @@ import uuid from pathlib import Path -from langchain_text_splitters import RecursiveCharacterTextSplitter from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.db.models.data_synthesis import ( - DataSynthesisInstance, + DataSynthInstance, DataSynthesisFileInstance, DataSynthesisChunkInstance, SynthesisData, ) from app.db.models.dataset_management import DatasetFiles -from app.db.models.model_config import get_model_by_id from app.db.session import logger -from app.module.shared.util.model_chat import _extract_json_substring -from app.module.system.service.common_service import get_chat_client, chat -from app.common.document_loaders import load_documents +from app.module.generation.schema.generation import Config +from app.module.shared.common.document_loaders import load_documents +from app.module.shared.common.text_split import DocumentSplitter +from app.module.shared.util.model_chat import extract_json_substring +from app.module.system.service.common_service import chat class GenerationService: def __init__(self, db: AsyncSession): self.db = db + self.semaphore = asyncio.Semaphore(10) async def process_task(self, task_id: str): """处理数据合成任务入口:根据任务ID加载任务并逐个处理源文件。""" - synthesis_task: DataSynthesisInstance | None = await self.db.get(DataSynthesisInstance, task_id) - if not synthesis_task: + synth_task: DataSynthInstance | None = await self.db.get(DataSynthInstance, task_id) + if not synth_task: logger.error(f"Synthesis task {task_id} not found, abort processing") return - logger.info(f"Processing synthesis task {task_id}") - file_ids = synthesis_task.source_file_id or [] + logger.info(f"Start processing synthe task {task_id}") - # 获取模型客户端 - model_result = await get_model_by_id(self.db, str(synthesis_task.model_id)) - if not model_result: - logger.error( - f"Model config not found for id={synthesis_task.model_id}, abort task {synthesis_task.id}" - ) + # 获取任务关联的文件原始ID列表 + file_ids = await self._get_file_ids_for_task(task_id) + if not file_ids: + logger.warning(f"No files associated with task {task_id}, abort processing") return - chat_client = get_chat_client(model_result) - - # 控制并发度的信号量(限制全任务范围内最多 10 个并发调用) - semaphore = asyncio.Semaphore(10) # 逐个文件处理 for file_id in file_ids: try: - success = await self._process_single_file( - synthesis_task=synthesis_task, - file_id=file_id, - chat_client=chat_client, - semaphore=semaphore, - ) + success = await self._process_single_file(synth_task,file_id) except Exception as e: logger.exception(f"Unexpected error when processing file {file_id} for task {task_id}: {e}") # 确保对应文件任务状态标记为失败 - await self._mark_file_failed(str(synthesis_task.id), file_id, str(e)) + await self._mark_file_failed(str(synth_task.id), file_id, str(e)) success = False if success: # 每处理完一个文件,简单增加 processed_files 计数 - synthesis_task.processed_files = (synthesis_task.processed_files or 0) + 1 + synth_task.processed_files = (synth_task.processed_files or 0) + 1 await self.db.commit() - await self.db.refresh(synthesis_task) + await self.db.refresh(synth_task) - logger.info(f"Finished processing synthesis task {synthesis_task.id}") + logger.info(f"Finished processing synthesis task {synth_task.id}") async def _process_single_file( self, - synthesis_task: DataSynthesisInstance, - file_id: str, - chat_client, - semaphore: asyncio.Semaphore, + synth_task: DataSynthInstance, + file_id: str ) -> bool: """处理单个源文件:解析路径、切片、保存分块并触发 LLM 调用。""" file_path = await self._resolve_file_path(file_id) if not file_path: logger.warning(f"File path not found for file_id={file_id}, skip") - await self._mark_file_failed(str(synthesis_task.id), file_id, "file_path_not_found") + await self._mark_file_failed(str(synth_task.id), file_id, "file_path_not_found") return False logger.info(f"Processing file_id={file_id}, path={file_path}") - split_cfg = synthesis_task.text_split_config or {} - synthesis_cfg = synthesis_task.synthesis_config or {} - chunk_size = int(split_cfg.get("chunk_size", 800)) - chunk_overlap = int(split_cfg.get("chunk_overlap", 50)) - # 加载并切片 - try: - chunks = self._load_and_split(file_path, chunk_size, chunk_overlap) - except Exception as e: - logger.error(f"Failed to load/split file {file_path}: {e}") - await self._mark_file_failed(str(synthesis_task.id), file_id, f"load_split_error: {e}") - return False + synth_config = synth_task.synth_config or {} + config = Config(**synth_config) + chunks = self._load_and_split(file_path, config.text_split_config.chunk_size, + config.text_split_config.chunk_overlap) if not chunks: logger.warning(f"No chunks generated for file_id={file_id}") - await self._mark_file_failed(str(synthesis_task.id), file_id, "no_chunks_generated") + await self._mark_file_failed(str(synth_task.id), file_id, "no_chunks_generated") return False logger.info(f"File {file_id} split into {len(chunks)} chunks by LangChain") # 保存文件任务记录 + 分块记录 file_task = await self._get_or_create_file_instance( - synthesis_task_id=str(synthesis_task.id), + synthesis_task_id=str(synth_task.id), source_file_id=file_id, file_path=file_path, ) - await self._persist_chunks(synthesis_task, file_task, file_id, chunks) + await self._persist_chunks(synth_task, file_task, file_id, chunks) # 针对每个切片并发调用大模型 await self._invoke_llm_for_chunks( - synthesis_task=synthesis_task, + synthesis_task=synth_task, file_id=file_id, chunks=chunks, synthesis_cfg=synthesis_cfg, - chat_client=chat_client, - semaphore=semaphore, + chat_client=chat_client ) # 如果执行到此处,说明该文件的切片与 LLM 调用流程均未抛出异常,标记为完成 @@ -132,7 +111,7 @@ async def _process_single_file( async def _persist_chunks( self, - synthesis_task: DataSynthesisInstance, + synthesis_task: DataSynthInstance, file_task: DataSynthesisFileInstance, file_id: str, chunks, @@ -166,17 +145,16 @@ async def _persist_chunks( async def _invoke_llm_for_chunks( self, - synthesis_task: DataSynthesisInstance, + synthesis_task: DataSynthInstance, file_id: str, chunks, synthesis_cfg: dict, chat_client, - semaphore: asyncio.Semaphore, ) -> None: """针对每个分片并发调用大模型生成数据。""" # 需要将 answer 和对应 chunk 建立关系,因此这里保留 chunk_index tasks = [ - self._call_llm(doc, file_id, idx, synthesis_task, synthesis_cfg, chat_client, semaphore) + self._call_llm(doc, file_id, idx, synthesis_task, synthesis_cfg, chat_client) for idx, doc in enumerate(chunks, start=1) ] await asyncio.gather(*tasks, return_exceptions=True) @@ -189,7 +167,6 @@ async def _call_llm( synthesis_task, synthesis_cfg: dict, chat_client, - semaphore: asyncio.Semaphore, ): """单次大模型调用逻辑,带并发控制。 @@ -199,7 +176,7 @@ async def _call_llm( - 在拿到 LLM 返回后,解析为 JSON 并批量写入 SynthesisData, 同时更新文件级 processed_chunks / 进度等信息。 """ - async with semaphore: + async with self.semaphore: prompt = self._build_qa_prompt(doc.page_content, synthesis_cfg) try: loop = asyncio.get_running_loop() @@ -230,25 +207,21 @@ async def _resolve_file_path(self, file_id: str) -> str | None: return None return file_obj.file_path - def _load_and_split(self, file_path: str, chunk_size: int, chunk_overlap: int): + @staticmethod + def _load_and_split(file_path: str, chunk_size: int, chunk_overlap: int): """使用 LangChain 加载文本并进行切片,直接返回 Document 列表。 - - 当前实现: - - 使用 TextLoader 加载纯文本/Markdown/JSON 等文本文件 - - 使用 RecursiveCharacterTextSplitter 做基于字符的递归切片 - - 保留每个 Document 的 metadata,方便后续追加例如文件ID、chunk序号等信息。 + Args: + file_path: 待切片的文件路径 + chunk_size: 切片大小 + chunk_overlap: 切片重叠大小 """ - docs = load_documents(file_path) - - splitter = RecursiveCharacterTextSplitter( - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - # 尝试按这些分隔符优先切分,再退化到字符级 - separators=["\n\n", "\n", "。", "!", "?", "!", "?", "。\n", "\t", " "] - ) - split_docs = splitter.split_documents(docs) - return split_docs + try: + docs = load_documents(file_path) + split_docs = DocumentSplitter.auto_split(docs, chunk_size, chunk_overlap) + return split_docs + except Exception as e: + logger.error(f"Error loading or splitting file {file_path}: {e}") + raise @staticmethod @@ -293,7 +266,7 @@ async def _handle_llm_answer( return # 1. 预处理原始回答:尝试从中截取出最可能的 JSON 片段 - cleaned = _extract_json_substring(raw_answer) + cleaned = extract_json_substring(raw_answer) # 2. 解析 JSON,统一成列表结构 try: @@ -378,8 +351,7 @@ async def _get_or_create_file_instance( return file_task # 查询任务以获取 result_data_location - task = await self.db.get(DataSynthesisInstance, synthesis_task_id) - target_location = task.result_data_location if task else "" + task = await self.db.get(DataSynthInstance, synthesis_task_id) # 创建新的文件任务记录,初始状态为 processing file_task = DataSynthesisFileInstance( @@ -387,7 +359,6 @@ async def _get_or_create_file_instance( synthesis_instance_id=synthesis_task_id, file_name=Path(file_path).name, source_file_id=source_file_id, - target_file_location=target_location or "", status="processing", total_chunks=0, processed_chunks=0, @@ -399,7 +370,7 @@ async def _get_or_create_file_instance( await self.db.refresh(file_task) return file_task - async def _mark_file_failed(self, synthesis_task_id: str, file_id: str, reason: str | None = None) -> None: + async def _mark_file_failed(self, synth_task_id: str, file_id: str, reason: str | None = None) -> None: """将指定任务下的单个文件任务标记为失败状态,兜底错误处理。 - 如果找到对应的 DataSynthesisFileInstance,则更新其 status="failed"。 @@ -409,14 +380,14 @@ async def _mark_file_failed(self, synthesis_task_id: str, file_id: str, reason: try: result = await self.db.execute( select(DataSynthesisFileInstance).where( - DataSynthesisFileInstance.synthesis_instance_id == synthesis_task_id, + DataSynthesisFileInstance.synthesis_instance_id == synth_task_id, DataSynthesisFileInstance.source_file_id == file_id, ) ) file_task = result.scalar_one_or_none() if not file_task: logger.warning( - f"Failed to mark file as failed: no DataSynthesisFileInstance found for task={synthesis_task_id}, file_id={file_id}, reason={reason}" + f"Failed to mark file as failed: no DataSynthesisFileInstance found for task={synth_task_id}, file_id={file_id}, reason={reason}" ) return @@ -424,10 +395,19 @@ async def _mark_file_failed(self, synthesis_task_id: str, file_id: str, reason: await self.db.commit() await self.db.refresh(file_task) logger.info( - f"Marked file task as failed for task={synthesis_task_id}, file_id={file_id}, reason={reason}" + f"Marked file task as failed for task={synth_task_id}, file_id={file_id}, reason={reason}" ) except Exception as e: # 兜底日志,避免异常向外传播影响其它文件处理 logger.exception( - f"Unexpected error when marking file failed for task={synthesis_task_id}, file_id={file_id}, original_reason={reason}, error={e}" + f"Unexpected error when marking file failed for task={synth_task_id}, file_id={file_id}, original_reason={reason}, error={e}" ) + + async def _get_file_ids_for_task(self, synth_task_id: str): + """根据任务ID查询关联的文件原始ID列表""" + result = await self.db.execute( + select(DataSynthesisFileInstance.source_file_id) + .where(DataSynthesisFileInstance.synthesis_instance_id == synth_task_id) + ) + file_ids = result.scalars().all() + return file_ids diff --git a/runtime/datamate-python/app/module/generation/service/prompt.py b/runtime/datamate-python/app/module/generation/service/prompt.py index 42ea87f82..a2bf8489d 100644 --- a/runtime/datamate-python/app/module/generation/service/prompt.py +++ b/runtime/datamate-python/app/module/generation/service/prompt.py @@ -1,5 +1,89 @@ from app.module.generation.schema.generation import SynthesisType +QUESTION_GENERATOR_PROMPT=f""" +# Role: 文本问题生成专家 +## Profile: +- Description: 你是一名专业的文本分析与问题设计专家,能够从复杂文本中提炼关键信息并产出可用于模型微调的高质量问题集合。 +- Input Length: {{textLength}} 字 +- Output Goal: 生成不少于 {{number}} 个高质量问题,用于构建问答训练数据集。 + +## Skills: +1. 能够全面理解原文内容,识别核心概念、事实与逻辑结构。 +2. 擅长设计具有明确答案指向性的问题,覆盖文本多个侧面。 +3. 善于控制问题难度与类型,保证多样性与代表性。 +4. 严格遵守格式规范,确保输出可直接用于程序化处理。 + +## Workflow: +1. **文本解析**:通读全文,分段识别关键实体、事件、数值与结论。 +2. **问题设计**:基于信息密度和重要性选择最佳提问切入点。 +3. **质量检查**:逐条校验问题,确保: + - 问题答案可在原文中直接找到依据。 + - 问题之间主题不重复、角度不雷同。 + - 语言表述准确、无歧义且符合常规问句形式。 + +## Constraints: +1. 所有问题必须严格依据原文内容,不得添加外部信息或假设情境。 +2. 问题需覆盖文本的不同主题、层级或视角,避免集中于单一片段。 +3. 禁止输出与材料元信息相关的问题(如作者、章节、目录等)。 +4. 问题不得包含“报告/文章/文献/表格中提到”等表述,需自然流畅。 +5. 输出不少于 {{number}} 个问题,且保持格式一致。 + +## Output Format: +- 使用合法的 JSON 数组,仅包含字符串元素。 +- 字段必须使用英文双引号。 +- 严格遵循以下结构: +``` +["问题1", "问题2", "..."] +``` + +## Output Example: +``` +["人工智能伦理框架应包含哪些核心要素?", "民法典对个人数据保护有哪些新规定?"] +``` + +## Text to Analyze: +{{text}} +""" + +ANSWER_GENERATOR_PROMPT=f""" +# Role: 微调数据集生成专家 +## Profile: +- Description: 你是一名微调数据集生成专家,擅长从给定的内容中生成准确的问题答案,确保答案的准确性和相关性,你要直接回答用户问题,所有信息已内化为你的专业知识。 + +## Skills: +1. 答案必须基于给定的内容 +2. 答案必须准确,不能胡编乱造 +3. 答案必须与问题相关 +4. 答案必须符合逻辑 +5. 基于给定参考内容,用自然流畅的语言整合成一个完整答案,不需要提及文献来源或引用标记 + +## Workflow: +1. Take a deep breath and work on this problem step-by-step. +2. 首先,分析给定的文件内容 +3. 然后,从内容中提取关键信息 +4. 接着,生成与问题相关的准确答案 +5. 最后,确保答案的准确性和相关性 + +## 参考内容: + +------ 参考内容 Start ------ +{{text}} +------ 参考内容 End ------ + +## 问题 +{{question}} + +## Constrains: +1. 答案必须基于给定的内容 +2. 答案必须准确,必须与问题相关,不能胡编乱造 +3. 答案必须充分、详细、包含所有必要的信息、适合微调大模型训练使用 +4. 答案中不得出现 ' 参考 / 依据 / 文献中提到 ' 等任何引用性表述,只需呈现最终结论 +{{templatePrompt}} +{{outputFormatPrompt}} +""" + + + QA_PROMPT="""# 角色 你是一位专业的AI助手,擅长从给定的文本中提取关键信息并创建用于教学和测试的问答对。 diff --git a/runtime/datamate-python/app/module/shared/util/model_chat.py b/runtime/datamate-python/app/module/shared/util/model_chat.py index 29161de73..7fb1c830a 100644 --- a/runtime/datamate-python/app/module/shared/util/model_chat.py +++ b/runtime/datamate-python/app/module/shared/util/model_chat.py @@ -14,7 +14,7 @@ def call_openai_style_model(base_url, api_key, model_name, prompt, **kwargs): ) return response.choices[0].message.content -def _extract_json_substring(raw: str) -> str: +def extract_json_substring(raw: str) -> str: """从 LLM 的原始回答中提取最可能的 JSON 字符串片段。 处理思路: diff --git a/scripts/db/data-synthesis-init.sql b/scripts/db/data-synthesis-init.sql index 0909d6b8a..52066262a 100644 --- a/scripts/db/data-synthesis-init.sql +++ b/scripts/db/data-synthesis-init.sql @@ -2,24 +2,20 @@ USE datamate; -- =============================== -- t_data_synthesis_instances (数据合成任务表) -create table if not exists t_data_synthesis_instances +create table if not exists t_data_synth_instances ( id VARCHAR(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci PRIMARY KEY COMMENT 'UUID', name VARCHAR(255) NOT NULL COMMENT '任务名称', description TEXT COMMENT '任务描述', status VARCHAR(20) COMMENT '任务状态', - synthesis_type VARCHAR(20) NOT NULL COMMENT '合成类型', - model_id VARCHAR(255) NOT NULL COMMENT '模型ID', + synth_type VARCHAR(20) NOT NULL COMMENT '合成类型', progress INT DEFAULT 0 COMMENT '任务进度(百分比)', - result_data_location VARCHAR(1000) COMMENT '结果数据存储位置', - text_split_config JSON NOT NULL COMMENT '文本切片配置', - synthesis_config JSON NOT NULL COMMENT '合成配置', - source_file_id JSON NOT NULL COMMENT '原始文件ID列表', + synth_config JSON NOT NULL COMMENT '合成配置', total_files INT DEFAULT 0 COMMENT '总文件数', processed_files INT DEFAULT 0 COMMENT '已处理文件数', total_chunks INT DEFAULT 0 COMMENT '总文本块数', processed_chunks INT DEFAULT 0 COMMENT '已处理文本块数', - total_synthesis_data INT DEFAULT 0 COMMENT '总合成数据量', + total_synth_data INT DEFAULT 0 COMMENT '总合成数据量', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', created_by VARCHAR(255) COMMENT '创建者', @@ -61,4 +57,4 @@ create table if not exists t_data_synthesis_data data json COMMENT '合成的数据', synthesis_file_instance_id VARCHAR(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '数据合成文件任务ID', chunk_instance_id VARCHAR(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '分块任务ID' -) COMMENT='数据合成任务队列表(UUID 主键)'; \ No newline at end of file +) COMMENT='数据合成任务队列表(UUID 主键)'; From efc32df3897b81d253bf731be723c89e82d64212 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Mon, 15 Dec 2025 10:53:42 +0800 Subject: [PATCH 11/17] feat(DataSynthesis): streamline synthesis task handling and enhance chunk processing logic --- .../app/db/models/data_synthesis.py | 14 +- .../generation/interface/generation_api.py | 42 +- .../module/generation/schema/generation.py | 13 +- .../generation/service/generation_service.py | 535 +++++++++++------- .../app/module/generation/service/prompt.py | 4 +- scripts/db/data-synthesis-init.sql | 2 +- 6 files changed, 346 insertions(+), 264 deletions(-) diff --git a/runtime/datamate-python/app/db/models/data_synthesis.py b/runtime/datamate-python/app/db/models/data_synthesis.py index e83a0e01f..b294a1bc4 100644 --- a/runtime/datamate-python/app/db/models/data_synthesis.py +++ b/runtime/datamate-python/app/db/models/data_synthesis.py @@ -18,16 +18,6 @@ async def save_synthesis_task(db_session, synthesis_task: CreateSynthesisTaskReq # 兼容旧请求结构:从请求对象中提取必要字段, # - 合成类型:synthesis_type -> synth_type # - 合成配置:text_split_config + synthesis_config 合并后写入 synth_config - synth_config = { - "text_split_config": synthesis_task.text_split_config.model_dump() - if synthesis_task.text_split_config - else None, - "synthesis_config": synthesis_task.synthesis_config.model_dump() - if synthesis_task.synthesis_config - else None, - "model_id": synthesis_task.model_id, - "source_file_id": list(synthesis_task.source_file_id or []), - } synth_task_instance = DataSynthInstance( id=gid, @@ -36,7 +26,7 @@ async def save_synthesis_task(db_session, synthesis_task: CreateSynthesisTaskReq status="pending", synth_type=synthesis_task.synthesis_type.value, progress=0, - synth_config=synth_config, + synth_config=synthesis_task.synth_config.model_dump(), total_files=len(synthesis_task.source_file_id or []), processed_files=0, total_chunks=0, @@ -134,7 +124,7 @@ class DataSynthesisFileInstance(Base): ) file_name = Column(String(255), nullable=False, comment="文件名") source_file_id = Column(String(255), nullable=False, comment="原始文件ID") - target_file_location = Column(String(1000), nullable=False, comment="目标文件存储位置") + target_file_location = Column(String(1000), nullable=True, comment="目标文件存储位置") status = Column(String(20), nullable=True, comment="任务状态") total_chunks = Column(Integer, nullable=False, default=0, comment="总文本块数") processed_chunks = Column(Integer, nullable=False, default=0, comment="已处理文本块数") diff --git a/runtime/datamate-python/app/module/generation/interface/generation_api.py b/runtime/datamate-python/app/module/generation/interface/generation_api.py index a07a76e72..bee0fba97 100644 --- a/runtime/datamate-python/app/module/generation/interface/generation_api.py +++ b/runtime/datamate-python/app/module/generation/interface/generation_api.py @@ -14,7 +14,6 @@ SynthesisData, ) from app.db.models.dataset_management import DatasetFiles -from app.db.models.model_config import get_model_by_id from app.db.session import get_db from app.module.generation.schema.generation import ( CreateSynthesisTaskRequest, @@ -29,9 +28,9 @@ SynthesisDataUpdateRequest, BatchDeleteSynthesisDataRequest, ) +from app.module.generation.service.export_service import SynthesisDatasetExporter, SynthesisExportError from app.module.generation.service.generation_service import GenerationService from app.module.generation.service.prompt import get_prompt -from app.module.generation.service.export_service import SynthesisDatasetExporter, SynthesisExportError from app.module.shared.schema import StandardResponse router = APIRouter( @@ -48,10 +47,6 @@ async def create_synthesis_task( db: AsyncSession = Depends(get_db), ): """创建数据合成任务""" - result = await get_model_by_id(db, request.model_id) - if not result: - raise HTTPException(status_code=404, detail="Model not found") - # 先根据 source_file_id 在 DatasetFiles 中查出已有文件信息 file_ids = request.source_file_id or [] dataset_files = [] @@ -90,12 +85,6 @@ async def create_synthesis_task( background_tasks.add_task(generation_service.process_task, synthesis_task.id) # 将 ORM 对象包装成 DataSynthesisTaskItem,兼容新字段从 synth_config 还原 - synth_cfg = getattr(synthesis_task, "synth_config", {}) or {} - text_split_cfg = synth_cfg.get("text_split_config") or {} - synthesis_cfg = synth_cfg.get("synthesis_config") or {} - source_file_ids = synth_cfg.get("source_file_id") or request.source_file_id or [] - model_id = synth_cfg.get("model_id") or request.model_id - result_location = synth_cfg.get("result_data_location") task_item = DataSynthesisTaskItem( id=synthesis_task.id, @@ -103,17 +92,7 @@ async def create_synthesis_task( description=synthesis_task.description, status=synthesis_task.status, synthesis_type=synthesis_task.synth_type, - model_id=model_id, - progress=synthesis_task.progress, - result_data_location=result_location, - text_split_config=text_split_cfg, - synthesis_config=synthesis_cfg, - source_file_id=list(source_file_ids), total_files=synthesis_task.total_files, - processed_files=synthesis_task.processed_files, - total_chunks=synthesis_task.total_chunks, - processed_chunks=synthesis_task.processed_chunks, - total_synthesis_data=synthesis_task.total_synth_data, created_at=synthesis_task.created_at, updated_at=synthesis_task.updated_at, created_by=synthesis_task.created_by, @@ -133,14 +112,26 @@ async def get_synthesis_task( db: AsyncSession = Depends(get_db) ): """获取数据合成任务详情""" - result = await db.get(DataSynthInstance, task_id) - if not result: + synthesis_task = await db.get(DataSynthInstance, task_id) + if not synthesis_task: raise HTTPException(status_code=404, detail="Synthesis task not found") + task_item = DataSynthesisTaskItem( + id=synthesis_task.id, + name=synthesis_task.name, + description=synthesis_task.description, + status=synthesis_task.status, + synthesis_type=synthesis_task.synth_type, + total_files=synthesis_task.total_files, + created_at=synthesis_task.created_at, + updated_at=synthesis_task.updated_at, + created_by=synthesis_task.created_by, + updated_by=synthesis_task.updated_by, + ) return StandardResponse( code=200, message="success", - data=result, + data=task_item, ) @@ -374,7 +365,6 @@ async def list_synthesis_file_tasks( synthesis_instance_id=row.synthesis_instance_id, file_name=row.file_name, source_file_id=row.source_file_id, - target_file_location=row.target_file_location, status=row.status, total_chunks=row.total_chunks, processed_chunks=row.processed_chunks, diff --git a/runtime/datamate-python/app/module/generation/schema/generation.py b/runtime/datamate-python/app/module/generation/schema/generation.py index 172fe5141..9ac388821 100644 --- a/runtime/datamate-python/app/module/generation/schema/generation.py +++ b/runtime/datamate-python/app/module/generation/schema/generation.py @@ -14,7 +14,7 @@ class TextSplitConfig(BaseModel): class SyntheConfig(BaseModel): """合成配置""" model_id: str = Field(..., description="模型ID") - prompt_template: str = Field(..., description="合成提示模板") + prompt_template: str = Field(None, description="合成提示模板") number: int = Field(None, description="单个chunk合成的数据数量") temperature: Optional[float] = Field(None, description="温度参数") @@ -55,17 +55,7 @@ class DataSynthesisTaskItem(BaseModel): description: Optional[str] = None status: Optional[str] = None synthesis_type: str - model_id: str - progress: int - result_data_location: Optional[str] = None - text_split_config: Dict[str, Any] - synthesis_config: Dict[str, Any] - source_file_id: list[str] total_files: int - processed_files: int - total_chunks: int - processed_chunks: int - total_synthesis_data: int created_at: Optional[datetime] = None updated_at: Optional[datetime] = None created_by: Optional[str] = None @@ -90,7 +80,6 @@ class DataSynthesisFileTaskItem(BaseModel): synthesis_instance_id: str file_name: str source_file_id: str - target_file_location: str status: Optional[str] = None total_chunks: int processed_chunks: int diff --git a/runtime/datamate-python/app/module/generation/service/generation_service.py b/runtime/datamate-python/app/module/generation/service/generation_service.py index 26974689c..a26bc17a6 100644 --- a/runtime/datamate-python/app/module/generation/service/generation_service.py +++ b/runtime/datamate-python/app/module/generation/service/generation_service.py @@ -1,8 +1,8 @@ import asyncio import json import uuid -from pathlib import Path +from langchain_core.language_models import BaseChatModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession @@ -14,17 +14,23 @@ ) from app.db.models.dataset_management import DatasetFiles from app.db.session import logger -from app.module.generation.schema.generation import Config +from app.module.generation.schema.generation import Config, SyntheConfig +from app.module.generation.service.prompt import ( + QUESTION_GENERATOR_PROMPT, + ANSWER_GENERATOR_PROMPT, +) from app.module.shared.common.document_loaders import load_documents from app.module.shared.common.text_split import DocumentSplitter from app.module.shared.util.model_chat import extract_json_substring -from app.module.system.service.common_service import chat +from app.module.system.service.common_service import chat, get_model_by_id, get_chat_client class GenerationService: def __init__(self, db: AsyncSession): self.db = db - self.semaphore = asyncio.Semaphore(10) + # 全局并发信号量:保证任意时刻最多 10 次模型调用 + self.question_semaphore = asyncio.Semaphore(10) + self.answer_semaphore = asyncio.Semaphore(10) async def process_task(self, task_id: str): """处理数据合成任务入口:根据任务ID加载任务并逐个处理源文件。""" @@ -44,7 +50,7 @@ async def process_task(self, task_id: str): # 逐个文件处理 for file_id in file_ids: try: - success = await self._process_single_file(synth_task,file_id) + success = await self._process_single_file(synth_task, file_id) except Exception as e: logger.exception(f"Unexpected error when processing file {file_id} for task {task_id}: {e}") # 确保对应文件任务状态标记为失败 @@ -59,12 +65,22 @@ async def process_task(self, task_id: str): logger.info(f"Finished processing synthesis task {synth_task.id}") + # ==================== 高层文件处理流程 ==================== async def _process_single_file( self, synth_task: DataSynthInstance, - file_id: str + file_id: str, ) -> bool: - """处理单个源文件:解析路径、切片、保存分块并触发 LLM 调用。""" + """按 chunk 批量流式处理单个源文件。 + + 流程: + 1. 切片并将所有 chunk 持久化到 DB 后释放内存; + 2. 从 DB 按 chunk_index 升序批量读取 chunk; + 3. 对批次中的每个 chunk:先生成指定数量的问题,再基于这些问题生成答案; + 4. 每成功处理完一个 chunk(即该 chunk 至少生成一条 QA)就更新一次 processed_chunks; + 5. 全部完成后将文件实例标记为 completed。 + """ + # 解析文件路径与配置 file_path = await self._resolve_file_path(file_id) if not file_path: logger.warning(f"File path not found for file_id={file_id}, skip") @@ -73,11 +89,19 @@ async def _process_single_file( logger.info(f"Processing file_id={file_id}, path={file_path}") - synth_config = synth_task.synth_config or {} - config = Config(**synth_config) + try: + config = Config(**(synth_task.synth_config or {})) + except Exception as e: + logger.error(f"Invalid synth_config for task={synth_task.id}: {e}") + await self._mark_file_failed(str(synth_task.id), file_id, "invalid_synth_config") + return False - chunks = self._load_and_split(file_path, config.text_split_config.chunk_size, - config.text_split_config.chunk_overlap) + # 1. 加载并切片(仅在此处占用内存) + chunks = self._load_and_split( + file_path, + config.text_split_config.chunk_size, + config.text_split_config.chunk_overlap, + ) if not chunks: logger.warning(f"No chunks generated for file_id={file_id}") await self._mark_file_failed(str(synth_task.id), file_id, "no_chunks_generated") @@ -85,118 +109,250 @@ async def _process_single_file( logger.info(f"File {file_id} split into {len(chunks)} chunks by LangChain") - # 保存文件任务记录 + 分块记录 + # 2. 获取文件实例并持久化 chunk 记录 file_task = await self._get_or_create_file_instance( synthesis_task_id=str(synth_task.id), source_file_id=file_id, - file_path=file_path, ) + if not file_task: + logger.error( + f"DataSynthesisFileInstance not found for task={synth_task.id}, file_id={file_id}" + ) + await self._mark_file_failed(str(synth_task.id), file_id, "file_instance_not_found") + return False + await self._persist_chunks(synth_task, file_task, file_id, chunks) + total_chunks = len(chunks) + # 释放内存中的切片 + del chunks + + # 3. 读取问答配置 + question_cfg: SyntheConfig | None = config.question_synth_config + answer_cfg: SyntheConfig | None = config.answer_synth_config + if not question_cfg or not answer_cfg: + logger.error( + f"Question/Answer synth config missing for task={synth_task.id}, file={file_id}" + ) + await self._mark_file_failed(str(synth_task.id), file_id, "qa_config_missing") + return False - # 针对每个切片并发调用大模型 - await self._invoke_llm_for_chunks( - synthesis_task=synth_task, - file_id=file_id, - chunks=chunks, - synthesis_cfg=synthesis_cfg, - chat_client=chat_client + logger.info( + f"Start QA generation for task={synth_task.id}, file={file_id}, total_chunks={total_chunks}" ) - # 如果执行到此处,说明该文件的切片与 LLM 调用流程均未抛出异常,标记为完成 + # 为本文件构建模型 client + question_model = await get_model_by_id(self.db, question_cfg.model_id) + answer_model = await get_model_by_id(self.db, answer_cfg.model_id) + question_chat = get_chat_client(question_model) + answer_chat = get_chat_client(answer_model) + + # 分批次从 DB 读取并处理 chunk + batch_size = 16 + current_index = 1 + processed_chunks = 0 + + while current_index <= total_chunks: + end_index = min(current_index + batch_size - 1, total_chunks) + chunk_batch = await self._load_chunk_batch( + file_task_id=file_task.id, + start_index=current_index, + end_index=end_index, + ) + if not chunk_batch: + logger.warning( + f"Empty chunk batch loaded for file={file_id}, range=[{current_index}, {end_index}]" + ) + current_index = end_index + 1 + continue + + # 对本批中的每个 chunk 并发处理(内部受 semaphore 限流) + async def process_one(chunk: DataSynthesisChunkInstance) -> bool: + return await self._process_single_chunk_qa( + file_task=file_task, + chunk=chunk, + question_cfg=question_cfg, + answer_cfg=answer_cfg, + question_chat=question_chat, + answer_chat=answer_chat, + ) + + tasks = [process_one(chunk) for chunk in chunk_batch] + await asyncio.gather(*tasks, return_exceptions=True) + + current_index = end_index + 1 + + # 全部完成 file_task.status = "completed" await self.db.commit() await self.db.refresh(file_task) - return True - async def _persist_chunks( + async def _process_single_chunk_qa( self, - synthesis_task: DataSynthInstance, file_task: DataSynthesisFileInstance, - file_id: str, - chunks, - ) -> None: - """将切片结果保存到 t_data_synthesis_chunk_instances,并更新文件级分块计数。""" - for idx, doc in enumerate(chunks, start=1): - # 先复制原始 Document.metadata,再在其上追加任务相关字段,避免覆盖原有元数据 - base_metadata = dict(getattr(doc, "metadata", {}) or {}) - base_metadata.update( - { - "task_id": str(synthesis_task.id), - "file_id": file_id - } + chunk: DataSynthesisChunkInstance, + question_cfg: SyntheConfig, + answer_cfg: SyntheConfig, + question_chat: BaseChatModel, + answer_chat: BaseChatModel, + ) -> bool: + """处理单个 chunk:生成问题列表,然后为每个问题生成答案并落库。 + + 返回:该 chunk 是否至少成功生成一条 QA。 + """ + chunk_index = chunk.chunk_index + chunk_text = chunk.chunk_content or "" + if not chunk_text.strip(): + logger.warning(f"Empty chunk text for file_task={file_task.id}, chunk_index={chunk_index}") + return False + + # 1. 生成问题 + try: + questions = await self._generate_questions_for_one_chunk( + chunk_text=chunk_text, + question_cfg=question_cfg, + question_chat=question_chat, + ) + except Exception as e: + logger.error( + f"Generate questions failed for file_task={file_task.id}, chunk_index={chunk_index}: {e}" ) + return False - chunk_record = DataSynthesisChunkInstance( - id=str(uuid.uuid4()), - synthesis_file_instance_id=file_task.id, - chunk_index=idx, - chunk_content=doc.page_content, - chunk_metadata=base_metadata, + if not questions: + logger.info( + f"No questions generated for file_task={file_task.id}, chunk_index={chunk_index}" ) - self.db.add(chunk_record) + return False - # 更新文件任务的分块数量 - file_task.total_chunks = len(chunks) - file_task.status = "processing" + # 2. 针对每个问题生成答案并入库 + success_any = await self._generate_answers_for_one_chunk( + file_task=file_task, + chunk=chunk, + questions=questions, + answer_cfg=answer_cfg, + answer_chat=answer_chat, + ) - await self.db.commit() - await self.db.refresh(file_task) + # todo:每次处理完一个chunk,更新已经处理的chunk数量,要避免并发写冲突 - async def _invoke_llm_for_chunks( + return success_any + + async def _generate_questions_for_one_chunk( self, - synthesis_task: DataSynthInstance, - file_id: str, - chunks, - synthesis_cfg: dict, - chat_client, - ) -> None: - """针对每个分片并发调用大模型生成数据。""" - # 需要将 answer 和对应 chunk 建立关系,因此这里保留 chunk_index - tasks = [ - self._call_llm(doc, file_id, idx, synthesis_task, synthesis_cfg, chat_client) - for idx, doc in enumerate(chunks, start=1) - ] - await asyncio.gather(*tasks, return_exceptions=True) + chunk_text: str, + question_cfg: SyntheConfig, + question_chat: BaseChatModel, + ) -> list[str]: + """针对单个 chunk 文本,调用 question_chat 生成问题列表。""" + number = question_cfg.number or 5 + template = getattr(question_cfg, "prompt_template", QUESTION_GENERATOR_PROMPT) + template = template if (template is not None and template.strip() != "") else QUESTION_GENERATOR_PROMPT + + prompt = ( + template + .replace("{{text}}", chunk_text) + .replace("{{number}}", str(number)) + .replace("{{textLength}}", str(len(chunk_text))) + ) + + async with self.question_semaphore: + loop = asyncio.get_running_loop() + raw_answer = await loop.run_in_executor( + None, + chat, + question_chat, + prompt, + ) - async def _call_llm( + # 解析为问题列表 + questions = self._parse_questions_from_answer( + raw_answer, + ) + return questions + + async def _generate_answers_for_one_chunk( self, - doc, - file_id: str, - idx: int, - synthesis_task, - synthesis_cfg: dict, - chat_client, - ): - """单次大模型调用逻辑,带并发控制。 - - 说明: - - 使用信号量限制全局并发量(当前为 10)。 - - 使用线程池执行同步的 chat 调用,避免阻塞事件循环。 - - 在拿到 LLM 返回后,解析为 JSON 并批量写入 SynthesisData, - 同时更新文件级 processed_chunks / 进度等信息。 + file_task: DataSynthesisFileInstance, + chunk: DataSynthesisChunkInstance, + questions: list[str], + answer_cfg: SyntheConfig, + answer_chat: BaseChatModel, + ) -> bool: + """为一个 chunk 的所有问题生成答案并写入 SynthesisData。 + + 返回:是否至少成功写入一条 QA。 """ - async with self.semaphore: - prompt = self._build_qa_prompt(doc.page_content, synthesis_cfg) - try: + if not questions: + return False + + chunk_text = chunk.chunk_content or "" + template = getattr(answer_cfg, "prompt_template", ANSWER_GENERATOR_PROMPT) + template = template if (template is not None and template.strip() != "") else ANSWER_GENERATOR_PROMPT + extra_vars = getattr(answer_cfg, "extra_prompt_vars", {}) or {} + + success_flags: list[bool] = [] + + async def process_single_question(question: str): + prompt = template.replace("{{text}}", chunk_text).replace("{{question}}", question) + for k, v in extra_vars.items(): + prompt.replace(f"{{{{{k}}}}}", str(v)) + else: + prompt_local = prompt + + async with self.answer_semaphore: loop = asyncio.get_running_loop() - answer = await loop.run_in_executor(None, chat, chat_client, prompt) - logger.debug( - f"Generated QA for task={synthesis_task.id}, file={file_id}, chunk={idx}" - ) - await self._handle_llm_answer( - synthesis_task_id=str(synthesis_task.id), - file_id=file_id, - chunk_index=idx, - raw_answer=answer, - ) - return answer - except Exception as e: - logger.error( - f"LLM generation failed for task={synthesis_task.id}, file={file_id}, chunk={idx}: {e}" + answer = await loop.run_in_executor( + None, + chat, + answer_chat, + prompt_local, ) - return None + data_obj = {"instruction": question, "output": answer} + record = SynthesisData( + id=str(uuid.uuid4()), + data=data_obj, + synthesis_file_instance_id=file_task.id, + chunk_instance_id=chunk.id, + ) + self.db.add(record) + success_flags.append(True) + + tasks = [process_single_question(q) for q in questions] + await asyncio.gather(*tasks, return_exceptions=True) + + if success_flags: + await self.db.commit() + return True + return False + + @staticmethod + def _parse_questions_from_answer( + raw_answer: str, + ) -> list[str]: + """从大模型返回中解析问题数组。""" + if not raw_answer: + return [] + + cleaned = extract_json_substring(raw_answer) + try: + data = json.loads(cleaned) + except Exception as e: + logger.error( + f"Failed to parse question list JSON for task: {e}. " + ) + return [] + + if isinstance(data, list): + return [str(q) for q in data if isinstance(q, str) and q.strip()] + # 容错:如果是单个字符串 + if isinstance(data, str) and data.strip(): + return [data.strip()] + return [] + + + # ==================== 原有辅助方法(文件路径/切片/持久化等) ==================== async def _resolve_file_path(self, file_id: str) -> str | None: """根据文件ID查询 t_dm_dataset_files 并返回 file_path(仅 ACTIVE 文件)。""" result = await self.db.execute( @@ -223,115 +379,44 @@ def _load_and_split(file_path: str, chunk_size: int, chunk_overlap: int): logger.error(f"Error loading or splitting file {file_path}: {e}") raise - - @staticmethod - def _build_qa_prompt(chunk: str, synthesis_cfg: dict) -> str: - """构造 QA 数据合成的提示词。 - - 要求: - - synthesis_cfg["prompt_template"] 是一个字符串,其中包含 {document} 占位符; - - 将当前切片内容替换到 {document}。 - 如果未提供或模板非法,则使用内置默认模板。 - """ - template = None - if isinstance(synthesis_cfg, dict): - template = synthesis_cfg.get("prompt_template") - synthesis_count = synthesis_cfg["synthesis_count"] if ("synthesis_count" in synthesis_cfg and synthesis_cfg["synthesis_count"]) else 5 - try: - prompt = template.format(document=chunk, synthesis_count=synthesis_count) - except Exception: - # 防御性处理:如果 format 出现异常,则退回到简单拼接 - prompt = f"{template}\n\n文档内容:{chunk}\n\n请根据文档内容生成 {synthesis_count} 条符合要求的问答数据。" - return prompt - - async def _handle_llm_answer( + async def _persist_chunks( self, - synthesis_task_id: str, + synthesis_task: DataSynthInstance, + file_task: DataSynthesisFileInstance, file_id: str, - chunk_index: int, - raw_answer: str, + chunks, ) -> None: - """解析 LLM 返回内容为 JSON,批量保存到 SynthesisData,并更新文件任务进度。 - - 约定: - - LLM 返回的 raw_answer 是 JSON 字符串,可以是: - 1)单个对象:{"question": ..., "answer": ...} - 2)对象数组:[{}, {}, ...] - - 我们将其规范化为列表,每个元素作为一条 SynthesisData.data 写入。 - - 根据 synthesis_task_id + file_id + chunk_index 找到对应的 DataSynthesisChunkInstance, - 以便设置 chunk_instance_id 和 synthesis_file_instance_id。 - - 每处理完一个 chunk,递增对应 DataSynthesisFileInstance.processed_chunks,并按比例更新进度。 - """ - if not raw_answer: - return - - # 1. 预处理原始回答:尝试从中截取出最可能的 JSON 片段 - cleaned = extract_json_substring(raw_answer) - - # 2. 解析 JSON,统一成列表结构 - try: - parsed = json.loads(cleaned) - except Exception as e: - logger.error( - f"Failed to parse LLM answer as JSON for task={synthesis_task_id}, file={file_id}, chunk={chunk_index}: {e}. Raw answer: {raw_answer!r}" - ) - return - - if isinstance(parsed, dict): - items = [parsed] - elif isinstance(parsed, list): - items = [p for p in parsed if isinstance(p, dict)] - else: - logger.error(f"Unexpected JSON structure from LLM answer for task={synthesis_task_id}, file={file_id}, chunk={chunk_index}: {type(parsed)}") - return - - if not items: - return - - # 3. 找到对应的 chunk 记录(一个 chunk_index 对应一条记录) - chunk_result = await self.db.execute( - select(DataSynthesisChunkInstance, DataSynthesisFileInstance) - .join( - DataSynthesisFileInstance, - DataSynthesisFileInstance.id == DataSynthesisChunkInstance.synthesis_file_instance_id, - ) - .where( - DataSynthesisFileInstance.synthesis_instance_id == synthesis_task_id, - DataSynthesisFileInstance.source_file_id == file_id, - DataSynthesisChunkInstance.chunk_index == chunk_index, - ) - ) - row = chunk_result.first() - if not row: - logger.error( - f"Chunk record not found for task={synthesis_task_id}, file={file_id}, chunk_index={chunk_index}, skip saving SynthesisData." + """将切片结果保存到 t_data_synthesis_chunk_instances,并更新文件级分块计数。""" + for idx, doc in enumerate(chunks, start=1): + # 先复制原始 Document.metadata,再在其上追加任务相关字段,避免覆盖原有元数据 + base_metadata = dict(getattr(doc, "metadata", {}) or {}) + base_metadata.update( + { + "task_id": str(synthesis_task.id), + "file_id": file_id + } ) - return - - chunk_instance, file_instance = row - # 4. 批量写入 SynthesisData - for data_obj in items: - record = SynthesisData( + chunk_record = DataSynthesisChunkInstance( id=str(uuid.uuid4()), - data=data_obj, - synthesis_file_instance_id=file_instance.id, - chunk_instance_id=chunk_instance.id, + synthesis_file_instance_id=file_task.id, + chunk_index=idx, + chunk_content=doc.page_content, + chunk_metadata=base_metadata, ) - self.db.add(record) - - # 5. 更新文件级 processed_chunks / 进度 - file_instance.processed_chunks = (file_instance.processed_chunks or 0) + 1 + self.db.add(chunk_record) + # 更新文件任务的分块数量 + file_task.total_chunks = len(chunks) + file_task.status = "processing" await self.db.commit() - await self.db.refresh(file_instance) + await self.db.refresh(file_task) async def _get_or_create_file_instance( self, synthesis_task_id: str, source_file_id: str, - file_path: str, ) -> DataSynthesisFileInstance: """根据任务ID和原始文件ID,查找或创建对应的 DataSynthesisFileInstance 记录。 @@ -347,27 +432,6 @@ async def _get_or_create_file_instance( ) ) file_task = result.scalar_one_or_none() - if file_task is not None: - return file_task - - # 查询任务以获取 result_data_location - task = await self.db.get(DataSynthInstance, synthesis_task_id) - - # 创建新的文件任务记录,初始状态为 processing - file_task = DataSynthesisFileInstance( - id=str(uuid.uuid4()), - synthesis_instance_id=synthesis_task_id, - file_name=Path(file_path).name, - source_file_id=source_file_id, - status="processing", - total_chunks=0, - processed_chunks=0, - created_by="system", - updated_by="system", - ) - self.db.add(file_task) - await self.db.commit() - await self.db.refresh(file_task) return file_task async def _mark_file_failed(self, synth_task_id: str, file_id: str, reason: str | None = None) -> None: @@ -411,3 +475,54 @@ async def _get_file_ids_for_task(self, synth_task_id: str): ) file_ids = result.scalars().all() return file_ids + + # ========== 新增:chunk 计数与批量加载、processed_chunks 安全更新辅助方法 ========== + async def _count_chunks_for_file(self, synth_file_instance_id: str) -> int: + """统计指定任务与文件下的 chunk 总数。""" + from sqlalchemy import func + + result = await self.db.execute( + select(func.count(DataSynthesisChunkInstance.id)).where( + DataSynthesisChunkInstance.synthesis_file_instance_id == synth_file_instance_id + ) + ) + return int(result.scalar() or 0) + + async def _load_chunk_batch( + self, + file_task_id: str, + start_index: int, + end_index: int, + ) -> list[DataSynthesisChunkInstance]: + """按索引范围加载指定文件任务下的一批 chunk 记录(含边界)。""" + result = await self.db.execute( + select(DataSynthesisChunkInstance) + .where( + DataSynthesisChunkInstance.synthesis_file_instance_id == file_task_id, + DataSynthesisChunkInstance.chunk_index >= start_index, + DataSynthesisChunkInstance.chunk_index <= end_index, + ) + .order_by(DataSynthesisChunkInstance.chunk_index.asc()) + ) + return list(result.scalars().all()) + + async def _increment_processed_chunks(self, file_task_id: str, delta: int) -> None: + """安全地增加文件级 processed_chunks 计数。 + + 本方法在单协程上下文中被顺序调用(每个文件一个逻辑写入者), + 避免了并发写冲突;同时采用读取 + 增加 + 提交的方式保证最终一致性。 + """ + # 重新加载最新的 file_task 记录,避免使用过期实例 + result = await self.db.execute( + select(DataSynthesisFileInstance).where( + DataSynthesisFileInstance.id == file_task_id, + ) + ) + file_task = result.scalar_one_or_none() + if not file_task: + logger.error(f"Failed to increment processed_chunks: file_task {file_task_id} not found") + return + + file_task.processed_chunks = (file_task.processed_chunks or 0) + int(delta) + await self.db.commit() + await self.db.refresh(file_task) diff --git a/runtime/datamate-python/app/module/generation/service/prompt.py b/runtime/datamate-python/app/module/generation/service/prompt.py index a2bf8489d..5094be229 100644 --- a/runtime/datamate-python/app/module/generation/service/prompt.py +++ b/runtime/datamate-python/app/module/generation/service/prompt.py @@ -58,7 +58,7 @@ 5. 基于给定参考内容,用自然流畅的语言整合成一个完整答案,不需要提及文献来源或引用标记 ## Workflow: -1. Take a deep breath and work on this problem step-by-step. +1. 请一步一步地解决这个问题 2. 首先,分析给定的文件内容 3. 然后,从内容中提取关键信息 4. 接着,生成与问题相关的准确答案 @@ -78,8 +78,6 @@ 2. 答案必须准确,必须与问题相关,不能胡编乱造 3. 答案必须充分、详细、包含所有必要的信息、适合微调大模型训练使用 4. 答案中不得出现 ' 参考 / 依据 / 文献中提到 ' 等任何引用性表述,只需呈现最终结论 -{{templatePrompt}} -{{outputFormatPrompt}} """ diff --git a/scripts/db/data-synthesis-init.sql b/scripts/db/data-synthesis-init.sql index 52066262a..1356787dc 100644 --- a/scripts/db/data-synthesis-init.sql +++ b/scripts/db/data-synthesis-init.sql @@ -30,7 +30,7 @@ create table if not exists t_data_synthesis_file_instances synthesis_instance_id VARCHAR(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '数据合成任务ID', file_name VARCHAR(255) NOT NULL COMMENT '文件名', source_file_id VARCHAR(255) NOT NULL COMMENT '原始文件ID', - target_file_location VARCHAR(1000) NOT NULL COMMENT '目标文件存储位置', + target_file_location VARCHAR(1000) NULL COMMENT '目标文件存储位置', status VARCHAR(20) COMMENT '任务状态', total_chunks INT DEFAULT 0 COMMENT '总文本块数', processed_chunks INT DEFAULT 0 COMMENT '已处理文本块数', From 59f8319a121bb2d0561ed23286ded82c7442d9c9 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Wed, 17 Dec 2025 21:16:38 +0800 Subject: [PATCH 12/17] feat(DataSynthesis): refactor data synthesis models and update task handling logic --- .../generation/service/generation_service.py | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/runtime/datamate-python/app/module/generation/service/generation_service.py b/runtime/datamate-python/app/module/generation/service/generation_service.py index a26bc17a6..913cd9127 100644 --- a/runtime/datamate-python/app/module/generation/service/generation_service.py +++ b/runtime/datamate-python/app/module/generation/service/generation_service.py @@ -29,8 +29,8 @@ class GenerationService: def __init__(self, db: AsyncSession): self.db = db # 全局并发信号量:保证任意时刻最多 10 次模型调用 - self.question_semaphore = asyncio.Semaphore(10) - self.answer_semaphore = asyncio.Semaphore(10) + self.question_semaphore = asyncio.Semaphore(100) + self.answer_semaphore = asyncio.Semaphore(100) async def process_task(self, task_id: str): """处理数据合成任务入口:根据任务ID加载任务并逐个处理源文件。""" @@ -147,9 +147,8 @@ async def _process_single_file( answer_chat = get_chat_client(answer_model) # 分批次从 DB 读取并处理 chunk - batch_size = 16 + batch_size = 100 current_index = 1 - processed_chunks = 0 while current_index <= total_chunks: end_index = min(current_index + batch_size - 1, total_chunks) @@ -234,7 +233,14 @@ async def _process_single_chunk_qa( answer_chat=answer_chat, ) - # todo:每次处理完一个chunk,更新已经处理的chunk数量,要避免并发写冲突 + # 每次处理完一个chunk,若至少生成一条QA,则安全更新已处理的chunk数量,避免并发冲突 + if success_any: + try: + await self._increment_processed_chunks(file_task.id, 1) + except Exception as e: + logger.exception( + f"Failed to increment processed_chunks for file_task={file_task.id}, chunk_index={chunk_index}: {e}" + ) return success_any @@ -246,6 +252,8 @@ async def _generate_questions_for_one_chunk( ) -> list[str]: """针对单个 chunk 文本,调用 question_chat 生成问题列表。""" number = question_cfg.number or 5 + number = number if number is not None else 5 + number = int(len(chunk_text) / 1000 * number) template = getattr(question_cfg, "prompt_template", QUESTION_GENERATOR_PROMPT) template = template if (template is not None and template.strip() != "") else QUESTION_GENERATOR_PROMPT @@ -507,12 +515,6 @@ async def _load_chunk_batch( return list(result.scalars().all()) async def _increment_processed_chunks(self, file_task_id: str, delta: int) -> None: - """安全地增加文件级 processed_chunks 计数。 - - 本方法在单协程上下文中被顺序调用(每个文件一个逻辑写入者), - 避免了并发写冲突;同时采用读取 + 增加 + 提交的方式保证最终一致性。 - """ - # 重新加载最新的 file_task 记录,避免使用过期实例 result = await self.db.execute( select(DataSynthesisFileInstance).where( DataSynthesisFileInstance.id == file_task_id, From 81d0ed895d4b21baaeb59e5791f164b052ec4e49 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 18 Dec 2025 00:49:46 +0800 Subject: [PATCH 13/17] fix(generation_service): ensure processed chunks are incremented regardless of question generation success --- .../generation/service/generation_service.py | 50 +++++++++++-------- .../app/module/generation/service/prompt.py | 6 +-- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/runtime/datamate-python/app/module/generation/service/generation_service.py b/runtime/datamate-python/app/module/generation/service/generation_service.py index 913cd9127..04d2146fb 100644 --- a/runtime/datamate-python/app/module/generation/service/generation_service.py +++ b/runtime/datamate-python/app/module/generation/service/generation_service.py @@ -202,9 +202,20 @@ async def _process_single_chunk_qa( chunk_index = chunk.chunk_index chunk_text = chunk.chunk_content or "" if not chunk_text.strip(): - logger.warning(f"Empty chunk text for file_task={file_task.id}, chunk_index={chunk_index}") + logger.warning( + f"Empty chunk text for file_task={file_task.id}, chunk_index={chunk_index}" + ) + # 无论成功或失败,均视为该 chunk 已处理完成 + try: + await self._increment_processed_chunks(file_task.id, 1) + except Exception as e: + logger.exception( + f"Failed to increment processed_chunks for file_task={file_task.id}, chunk_index={chunk_index}: {e}" + ) return False + success_any = False + # 1. 生成问题 try: questions = await self._generate_questions_for_one_chunk( @@ -216,31 +227,30 @@ async def _process_single_chunk_qa( logger.error( f"Generate questions failed for file_task={file_task.id}, chunk_index={chunk_index}: {e}" ) - return False + questions = [] if not questions: logger.info( f"No questions generated for file_task={file_task.id}, chunk_index={chunk_index}" ) - return False - - # 2. 针对每个问题生成答案并入库 - success_any = await self._generate_answers_for_one_chunk( - file_task=file_task, - chunk=chunk, - questions=questions, - answer_cfg=answer_cfg, - answer_chat=answer_chat, - ) + else: + # 2. 针对每个问题生成答案并入库 + qa_success = await self._generate_answers_for_one_chunk( + file_task=file_task, + chunk=chunk, + questions=questions, + answer_cfg=answer_cfg, + answer_chat=answer_chat, + ) + success_any = bool(qa_success) - # 每次处理完一个chunk,若至少生成一条QA,则安全更新已处理的chunk数量,避免并发冲突 - if success_any: - try: - await self._increment_processed_chunks(file_task.id, 1) - except Exception as e: - logger.exception( - f"Failed to increment processed_chunks for file_task={file_task.id}, chunk_index={chunk_index}: {e}" - ) + # 无论本 chunk 处理是否成功,都增加 processed_chunks 计数,避免任务长时间卡住 + try: + await self._increment_processed_chunks(file_task.id, 1) + except Exception as e: + logger.exception( + f"Failed to increment processed_chunks for file_task={file_task.id}, chunk_index={chunk_index}: {e}" + ) return success_any diff --git a/runtime/datamate-python/app/module/generation/service/prompt.py b/runtime/datamate-python/app/module/generation/service/prompt.py index 5094be229..0aa420491 100644 --- a/runtime/datamate-python/app/module/generation/service/prompt.py +++ b/runtime/datamate-python/app/module/generation/service/prompt.py @@ -25,8 +25,8 @@ 1. 所有问题必须严格依据原文内容,不得添加外部信息或假设情境。 2. 问题需覆盖文本的不同主题、层级或视角,避免集中于单一片段。 3. 禁止输出与材料元信息相关的问题(如作者、章节、目录等)。 -4. 问题不得包含“报告/文章/文献/表格中提到”等表述,需自然流畅。 -5. 输出不少于 {{number}} 个问题,且保持格式一致。 +4. 提问时请假设没有相应的文章可供参考,因此不要在问题中使用"这个"或"这些"等指示代词,也不得包含“报告/文章/文献/表格中提到”等表述。 +5. 输出不少于 {{number}} 个问题,问题语言与原文主要语言保持一致。 ## Output Format: - 使用合法的 JSON 数组,仅包含字符串元素。 @@ -38,7 +38,7 @@ ## Output Example: ``` -["人工智能伦理框架应包含哪些核心要素?", "民法典对个人数据保护有哪些新规定?"] +["人工智能伦理框架应包含哪些核心要素?", "民法典对个人数据保护有哪些新规定"] ``` ## Text to Analyze: From 3bcc48c574e96639ebe1deb102f3ff7ff0d3fd1a Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Thu, 18 Dec 2025 14:31:52 +0800 Subject: [PATCH 14/17] feat(CreateTask): enhance task creation with new synthesis templates and improved configuration options --- .../src/pages/SynthesisTask/CreateTask.tsx | 539 ++++++++++++++---- .../module/generation/schema/generation.py | 15 +- .../generation/service/generation_service.py | 8 +- .../app/module/generation/service/prompt.py | 14 +- .../app/module/shared/util/model_chat.py | 11 + 5 files changed, 456 insertions(+), 131 deletions(-) diff --git a/frontend/src/pages/SynthesisTask/CreateTask.tsx b/frontend/src/pages/SynthesisTask/CreateTask.tsx index 5c00a2ce3..a753481a1 100644 --- a/frontend/src/pages/SynthesisTask/CreateTask.tsx +++ b/frontend/src/pages/SynthesisTask/CreateTask.tsx @@ -1,7 +1,7 @@ import { useEffect, useState } from "react"; import type { Dataset, DatasetFile } from "@/pages/DataManagement/dataset.model"; -import { Steps, Card, Select, Input, Checkbox, Button, Form, message } from "antd"; -import { Eye, ArrowLeft, ArrowRight, Play, Search, MoreHorizontal } from "lucide-react"; +import { Steps, Card, Select, Input, Button, Form, message, Tag, Tooltip, InputNumber } from "antd"; +import { Eye, ArrowLeft, ArrowRight, Play, Search, Sparkles, Brain, Layers } from "lucide-react"; import { Link, useNavigate } from "react-router"; import { queryDatasetsUsingGet } from "../DataManagement/dataset.api"; import DatasetFileTransfer from "@/components/business/DatasetFileTransfer"; @@ -31,13 +31,18 @@ export default function SynthesisTaskCreate() { const [selectedFiles, setSelectedFiles] = useState([]); const [selectedMap, setSelectedMap] = useState>({}); const [selectedDataset, setSelectedDataset] = useState(null); + // 当前选中的模板类型(QA / COT),用于高亮展示 const [selectedSynthesisTypes, setSelectedSynthesisTypes] = useState(["qa"]); const [taskType, setTaskType] = useState<"qa" | "cot">("qa"); - const [promptTemplate, setPromptTemplate] = useState(""); + const [questionPrompt, setQuestionPrompt] = useState(""); + const [answerPrompt, setAnswerPrompt] = useState(""); const [submitting, setSubmitting] = useState(false); const [modelOptions, setModelOptions] = useState<{ label: string; value: string }[]>([]); const [modelsLoading, setModelsLoading] = useState(false); - const [selectedModel, setSelectedModel] = useState(undefined); + const [questionModelId, setQuestionModelId] = useState(undefined); + const [answerModelId, setAnswerModelId] = useState(undefined); + + // 文本切片配置 const [sliceConfig, setSliceConfig] = useState({ processType: "DEFAULT_CHUNK" as | "DEFAULT_CHUNK" @@ -45,10 +50,23 @@ export default function SynthesisTaskCreate() { | "PARAGRAPH_CHUNK" | "FIXED_LENGTH_CHUNK" | "CUSTOM_SEPARATOR_CHUNK", - chunkSize: 500, - overlapSize: 50, + chunkSize: 2000, + overlapSize: 100, delimiter: "", }); + + // 问题/答案合成配置(与后端 question_synth_config / answer_synth_config 对齐) + const [questionConfig, setQuestionConfig] = useState({ + number: 1, + temperature: 0.7, + }); + const [answerConfig, setAnswerConfig] = useState({ + // 答案侧不再需要 number,只保留温度 + temperature: 0.7, + }); + // 合成总数上限,默认 5000 + const [maxQaPairs, setMaxQaPairs] = useState(5000); + const sliceOptions = [ { label: "默认分块", value: "DEFAULT_CHUNK" }, { label: "按章节分块", value: "CHAPTER_CHUNK" }, @@ -62,33 +80,43 @@ export default function SynthesisTaskCreate() { return data; }; - const fetchPrompt = async (type: "qa" | "cot") => { + // 问题 Prompt:固定使用 QUESTION 类型获取 + const fetchQuestionPrompt = async () => { try { - const synthTypeParam = type.toUpperCase(); - const res = await getPromptByTypeUsingGet(synthTypeParam); + const res = await getPromptByTypeUsingGet("QUESTION"); const prompt = typeof res === "string" ? res : (res as { data?: string })?.data ?? ""; - setPromptTemplate(prompt || ""); + setQuestionPrompt(prompt || ""); } catch (e) { console.error(e); - message.error("获取提示词模板失败"); - setPromptTemplate(""); + message.error("获取问题 Prompt 模板失败"); + setQuestionPrompt(""); } }; - useEffect(() => { - fetchDatasets(); - }, []); - - useEffect(() => { - fetchPrompt(taskType); - }, [taskType]); + // 答案 Prompt:根据当前任务类型获取 QA/COT 模板 + const fetchAnswerPrompt = async (type: "qa" | "cot") => { + try { + const synthTypeParam = type === "qa" ? "QA" : "COT"; + const res = await getPromptByTypeUsingGet(synthTypeParam); + const prompt = typeof res === "string" ? res : (res as { data?: string })?.data ?? ""; + setAnswerPrompt(prompt || ""); + } catch (e) { + console.error(e); + message.error("获取答案 Prompt 模板失败"); + setAnswerPrompt(""); + } + }; + // 拉取模型列表,仅保留 CHAT 模型 useEffect(() => { const loadModels = async () => { setModelsLoading(true); try { const { data } = await queryModelListUsingGet({ page: 0, size: 1000 }); - const options = (data?.content || []).map((model: ModelI) => ({ + const chatModels: ModelI[] = (data?.content || []).filter( + (model: ModelI) => model.type === "CHAT" + ); + const options = chatModels.map((model) => ({ label: `${model.modelName} (${model.provider})`, value: model.id, })); @@ -102,11 +130,22 @@ export default function SynthesisTaskCreate() { loadModels(); }, []); + // 默认选中第一个 CHAT 模型作为问题/答案模型 useEffect(() => { - if (!selectedModel && modelOptions.length > 0) { - setSelectedModel(modelOptions[0].value); + if (modelOptions.length > 0) { + setQuestionModelId((prev) => prev ?? modelOptions[0].value); + setAnswerModelId((prev) => prev ?? modelOptions[0].value); } - }, [modelOptions, selectedModel]); + }, [modelOptions]); + + useEffect(() => { + fetchDatasets(); + }, []); + + useEffect(() => { + fetchQuestionPrompt(); + fetchAnswerPrompt(taskType); + }, [taskType]); // 表单数据 const [formValues, setFormValues] = useState({ @@ -131,13 +170,12 @@ export default function SynthesisTaskCreate() { const handleCreateTask = async () => { try { const values = (await form.validateFields()) as CreateTaskFormValues; - // precise validation if (!(taskType === "qa" || taskType === "cot")) { message.error("请选择一个合成类型"); return; } - if (!selectedModel) { - message.error("请选择模型"); + if (!questionModelId || !answerModelId) { + message.error("请选择问题和答案使用的模型"); return; } if (selectedFiles.length === 0) { @@ -145,25 +183,42 @@ export default function SynthesisTaskCreate() { return; } - // 构造后端要求的参数格式 - const payload: Record = { - name: values.name || form.getFieldValue("name"), - model_id: selectedModel, - source_file_id: selectedFiles, + const synthConfig: Record = { text_split_config: { chunk_size: sliceConfig.chunkSize, chunk_overlap: sliceConfig.overlapSize, }, - synthesis_config: { - prompt_template: promptTemplate, + question_synth_config: { + model_id: questionModelId, + prompt_template: questionPrompt, + number: questionConfig.number, + temperature: questionConfig.temperature, }, + answer_synth_config: { + model_id: answerModelId, + prompt_template: answerPrompt, + temperature: answerConfig.temperature, + }, + max_qa_pairs: typeof maxQaPairs === "number" && maxQaPairs > 0 ? maxQaPairs : undefined, + }; + + const payload: Record = { + name: values.name || form.getFieldValue("name"), + description: values.description ?? form.getFieldValue("description"), synthesis_type: taskType === "qa" ? "QA" : "COT", + source_file_id: selectedFiles, + synth_config: synthConfig, }; - // 只有在有真实内容时携带 description,避免强制传空字符串 - const desc = values.description ?? form.getFieldValue("description"); - if (typeof desc === "string" && desc.trim().length > 0) { - payload.description = desc.trim(); + // 清洗 description:空字符串转为 undefined,让后端用 validator 处理为 None + const desc = payload.description; + if (typeof desc === "string" && desc.trim().length === 0) { + delete payload.description; + } + + // 如果未设置 max_qa_pairs,则从 synth_config 中移除该字段,避免传递 undefined + if (synthConfig.max_qa_pairs === undefined) { + delete (synthConfig as { max_qa_pairs?: number }).max_qa_pairs; } setSubmitting(true); @@ -187,25 +242,43 @@ export default function SynthesisTaskCreate() { return; } console.error(error); - message.error((error instanceof Error ? error.message : "合成任务创建失败")); + message.error(error instanceof Error ? error.message : "合成任务创建失败"); } finally { setSubmitting(false); } }; - // 仅两个一级类型,无二级目录 - const synthesisTypes = [ - { id: "qa", name: "生成问答对" }, - { id: "cot", name: "生成COT链式推理" }, - ] as const; - - const handleSynthesisTypeSelect = (typeId: "qa" | "cot") => { - setSelectedSynthesisTypes((prev) => { - const next = prev.includes(typeId) ? [] : [typeId]; - if (next[0] === "qa") setTaskType("qa"); - if (next[0] === "cot") setTaskType("cot"); - return next; - }); + // 仅两个一级类型,无二级目录 -> 扩展为模板配置 + const synthesisTemplates = [ + { + id: "sft-qa", + type: "qa" as const, + title: "SFT 问答数据合成", + subtitle: "从长文档自动生成高质量问答样本", + badge: "推荐", + description: + "适用于构建监督微调(SFT)问答数据集,支持从知识库或长文档中抽取关键问答对。", + colorClass: "from-sky-500/10 via-sky-400/5 to-transparent", + borderClass: "border-sky-100 hover:border-sky-300", + icon: Sparkles, + }, + { + id: "cot-reasoning", + type: "cot" as const, + title: "COT 链式推理合成", + subtitle: "一步步推理过程与最终答案", + badge: "推理增强", + description: + "生成包含模型推理中间过程的 COT 数据,用于提升模型的复杂推理和解释能力。", + colorClass: "from-violet-500/10 via-violet-400/5 to-transparent", + borderClass: "border-violet-100 hover:border-violet-300", + icon: Brain, + }, + ]; + + const handleTemplateClick = (tpl: (typeof synthesisTemplates)[number]) => { + setTaskType(tpl.type); + setSelectedSynthesisTypes([tpl.type]); }; useEffect(() => { @@ -247,120 +320,353 @@ export default function SynthesisTaskCreate() { if (createStep === 2) { return ( -
-
- {/* 左侧合成指令(仅两个一级类型,单选) */} +
+
+ {/* 左侧合成指令模板区:占 1/3 宽度 */}
- -

合成指令(仅支持单选)

-
-
- - + +
+
+

+ + 合成指令模板 +

+

+ 从左侧选择一个模板,我们会自动为你填充合适的 Prompt 与合成策略。 +

+ + 单选 +
-
- {synthesisTypes.map((type) => ( -
handleSynthesisTypeSelect(type.id)} - > - handleSynthesisTypeSelect(type.id)} - /> - {type.name} - -
- ))} + +
+
+ + +
+ +
+ {synthesisTemplates.map((tpl) => { + const Icon = tpl.icon; + const active = selectedSynthesisTypes.includes(tpl.type); + + return ( +
handleTemplateClick(tpl)} + className={`group relative rounded-xl border p-2.5 text-xs transition-all duration-200 cursor-pointer bg-white/80 hover:bg-white/100 ${ + tpl.borderClass + } ${ + active + ? "ring-1 ring-offset-1 ring-blue-500/60 border-blue-400/70 shadow-sm bg-gradient-to-r " + + tpl.colorClass + : "border-slate-100 hover:shadow-sm" + }`} + > +
+
+ +
+
+
+ + {tpl.title} + + {tpl.badge && ( + + {tpl.badge} + + )} +
+

+ {tpl.subtitle} +

+

+ {tpl.description} +

+
+
+ +
+ +
+ {active ? "✓" : ""} +
+
+
+
+ ); + })} +
- {/* 右侧合成配置 */} + {/* 右侧合成配置:占 2/3 宽度 */}
- -
-

合成配置

+ +
+
+

+ + 合成配置 +

+

+ 根据左侧模板自动带出配置,你也可以在此基础上进行微调。 +

+
- + + +
- {/* 切片配置 */} - + {/* 步骤说明条 */} +
+ 1 + 设置合成总数 + / + 2 + 配置文本切片策略 + / + 3 + 配置问题合成参数 + / + 4 + 配置答案合成参数 +
+ + {/* 1. 合成总数配置 */} +
+
+
+ 1 + 合成总数上限 +
+ 控制整个任务最多生成的 QA 对数量 +
+
+ setMaxQaPairs(typeof v === "number" ? v : undefined)} + /> + 可选项,建议在大规模合成时设置上限 +
+
+ + {/* 2. 文本切片配置 */} +
+
+
+ 2 + 文本切片配置 +
+ 影响上下文长度与召回粒度 +
- 分块策略 + 分块策略 setSliceConfig((p) => ({ ...p, chunkSize: Number(e.target.value) }))} + size="small" />
- 重叠大小 + 重叠大小 setSliceConfig((p) => ({ ...p, overlapSize: Number(e.target.value) }))} + size="small" />
{sliceConfig.processType === "CUSTOM_SEPARATOR_CHUNK" && (
- 自定义分隔符 + 自定义分隔符 setSliceConfig((p) => ({ ...p, delimiter: e.target.value }))} + size="small" />
)} - - - {/* 模型选择 */} - - 模型选择 - setQuestionModelId(v)} + /> +
+
+ 问题 Prompt 模板 +