diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 4d4faff30..ed139f958 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -422,7 +422,7 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]: fine_memory_items: list[TextualMemoryItem] = [] - with ContextThreadPoolExecutor(max_workers=8) as executor: + with ContextThreadPoolExecutor(max_workers=30) as executor: futures = [executor.submit(_process_one_item, item) for item in fast_memory_items] for future in concurrent.futures.as_completed(futures): diff --git a/src/memos/mem_reader/read_multi_modal/file_content_parser.py b/src/memos/mem_reader/read_multi_modal/file_content_parser.py index 9efb58263..cce99e76a 100644 --- a/src/memos/mem_reader/read_multi_modal/file_content_parser.py +++ b/src/memos/mem_reader/read_multi_modal/file_content_parser.py @@ -167,28 +167,38 @@ def create_source( self, message: File, info: dict[str, Any], + chunk_index: int | None = None, + chunk_total: int | None = None, chunk_content: str | None = None, ) -> SourceMessage: """Create SourceMessage from file content part.""" if isinstance(message, dict): file_info = message.get("file", {}) - return SourceMessage( - type="file", - doc_path=file_info.get("filename") or file_info.get("file_id", ""), - content=chunk_content if chunk_content else file_info.get("file_data", ""), - original_part=message, - ) - return SourceMessage(type="file", doc_path=str(message)) + source_dict = { + "type": "file", + "doc_path": file_info.get("filename") or file_info.get("file_id", ""), + "content": chunk_content if chunk_content else file_info.get("file_data", ""), + } + # Add chunk ordering information if provided + if chunk_index is not None: + source_dict["chunk_index"] = chunk_index + if chunk_total is not None: + source_dict["chunk_total"] = chunk_total + return SourceMessage(**source_dict) + source_dict = {"type": "file", "doc_path": str(message)} + if chunk_index is not None: + source_dict["chunk_index"] = chunk_index + if chunk_total is not None: + source_dict["chunk_total"] = chunk_total + if chunk_content is not None: + source_dict["content"] = chunk_content + return SourceMessage(**source_dict) def rebuild_from_source( self, source: SourceMessage, ) -> File: """Rebuild file content part from SourceMessage.""" - # Use original_part if available - if hasattr(source, "original_part") and source.original_part: - return source.original_part - # Rebuild from source fields return { "type": "file", @@ -312,9 +322,6 @@ def parse_fast( # Split content into chunks content_chunks = self._split_text(content) - # Create source - source = self.create_source(message, info) - # Extract info fields info_ = info.copy() if file_id: @@ -326,12 +333,23 @@ def parse_fast( # (since we don't have role information at this level) memory_type = "LongTermMemory" file_ids = [file_id] if file_id else [] + total_chunks = len(content_chunks) + # Create memory items for each chunk memory_items = [] for chunk_idx, chunk_text in enumerate(content_chunks): if not chunk_text.strip(): continue + # Create source for this specific chunk with its index and content + source = self.create_source( + message, + info, + chunk_index=chunk_idx, + chunk_total=total_chunks, + chunk_content=chunk_text, + ) + memory_item = TextualMemoryItem( memory=chunk_text, metadata=TreeNodeTextualMemoryMetadata( @@ -342,7 +360,7 @@ def parse_fast( tags=[ "mode:fast", "multimodal:file", - f"chunk:{chunk_idx + 1}/{len(content_chunks)}", + f"chunk:{chunk_idx + 1}/{total_chunks}", ], key=_derive_key(chunk_text), embedding=self.embedder.embed([chunk_text])[0], @@ -359,6 +377,14 @@ def parse_fast( # If no chunks were created, create a placeholder if not memory_items: + # Create source for placeholder (no chunk index since there are no chunks) + placeholder_source = self.create_source( + message, + info, + chunk_index=None, + chunk_total=0, + chunk_content=content, + ) memory_item = TextualMemoryItem( memory=content, metadata=TreeNodeTextualMemoryMetadata( @@ -370,7 +396,7 @@ def parse_fast( key=_derive_key(content), embedding=self.embedder.embed([content])[0], usage=[], - sources=[source], + sources=[placeholder_source], background="", confidence=0.99, type="fact", @@ -463,7 +489,9 @@ def parse_fine( parsed_text = self._handle_base64(file_data) else: - parsed_text = file_data + # TODO: discuss the proper place for processing + # string file-data + return [] # Priority 2: If file_id is provided but no file_data, try to use file_id as path elif file_id: logger.warning(f"[FileContentParser] File data not provided for file_id: {file_id}") @@ -518,10 +546,26 @@ def _make_memory_item( mem_type: str = memory_type, tags: list[str] | None = None, key: str | None = None, + chunk_idx: int | None = None, chunk_content: str | None = None, ) -> TextualMemoryItem: - """Construct memory item with common fields.""" - source = self.create_source(message, info, chunk_content) + """Construct memory item with common fields. + + Args: + value: Memory content (chunk text) + mem_type: Memory type + tags: Tags for the memory item + key: Key for the memory item + chunk_idx: Index of the chunk in the document (0-based) + """ + # Create source for this specific chunk with its index and content + chunk_source = self.create_source( + message, + info, + chunk_index=chunk_idx, + chunk_total=total_chunks, + chunk_content=chunk_content, + ) return TextualMemoryItem( memory=value, metadata=TreeNodeTextualMemoryMetadata( @@ -533,7 +577,7 @@ def _make_memory_item( key=key if key is not None else _derive_key(value), embedding=self.embedder.embed([value])[0], usage=[], - sources=[source], + sources=[chunk_source], background="", confidence=0.99, type="fact", @@ -555,6 +599,8 @@ def _make_fallback( f"fallback:{reason}", f"chunk:{chunk_idx + 1}/{total_chunks}", ], + chunk_idx=chunk_idx, + chunk_content=chunk_text, ) # Handle empty chunks case @@ -563,6 +609,7 @@ def _make_fallback( _make_memory_item( value=parsed_text or "[File: empty content]", tags=["mode:fine", "multimodal:file"], + chunk_idx=None, ) ] @@ -591,6 +638,7 @@ def _process_chunk(chunk_idx: int, chunk_text: str) -> TextualMemoryItem: mem_type=llm_mem_type, tags=tags, key=response_json.get("key"), + chunk_idx=chunk_idx, chunk_content=chunk_text, ) except Exception as e: @@ -638,6 +686,8 @@ def _process_chunk(chunk_idx: int, chunk_text: str) -> TextualMemoryItem: return memory_items or [ _make_memory_item( - value=parsed_text or "[File: empty content]", tags=["mode:fine", "multimodal:file"] + value=parsed_text or "[File: empty content]", + tags=["mode:fine", "multimodal:file"], + chunk_idx=None, ) ] diff --git a/src/memos/mem_reader/read_multi_modal/image_parser.py b/src/memos/mem_reader/read_multi_modal/image_parser.py index 88991fbe7..5a19393a9 100644 --- a/src/memos/mem_reader/read_multi_modal/image_parser.py +++ b/src/memos/mem_reader/read_multi_modal/image_parser.py @@ -53,7 +53,6 @@ def create_source( return SourceMessage( type="image", content=url, - original_part=message, url=url, detail=detail, ) @@ -64,10 +63,6 @@ def rebuild_from_source( source: SourceMessage, ) -> ChatCompletionContentPartImageParam: """Rebuild image_url content part from SourceMessage.""" - # Use original_part if available - if hasattr(source, "original_part") and source.original_part: - return source.original_part - # Rebuild from source fields url = getattr(source, "url", "") or (source.content or "").replace("[image_url]: ", "") detail = getattr(source, "detail", "auto") diff --git a/src/memos/mem_reader/read_multi_modal/text_content_parser.py b/src/memos/mem_reader/read_multi_modal/text_content_parser.py index 5ff0a76fd..febc166ec 100644 --- a/src/memos/mem_reader/read_multi_modal/text_content_parser.py +++ b/src/memos/mem_reader/read_multi_modal/text_content_parser.py @@ -51,7 +51,6 @@ def create_source( return SourceMessage( type="text", content=text, - original_part=message, ) return SourceMessage(type="text", content=str(message)) diff --git a/src/memos/mem_reader/read_multi_modal/tool_parser.py b/src/memos/mem_reader/read_multi_modal/tool_parser.py index 09bd9e9d0..e13b684a7 100644 --- a/src/memos/mem_reader/read_multi_modal/tool_parser.py +++ b/src/memos/mem_reader/read_multi_modal/tool_parser.py @@ -79,7 +79,6 @@ def create_source( filename=file_info.get("filename", ""), file_id=file_info.get("file_id", ""), tool_call_id=tool_call_id, - original_part=part, ) ) elif part_type == "image_url": @@ -93,7 +92,6 @@ def create_source( content=file_info.get("url", ""), detail=file_info.get("detail", "auto"), tool_call_id=tool_call_id, - original_part=part, ) ) elif part_type == "input_audio": @@ -107,7 +105,6 @@ def create_source( content=file_info.get("data", ""), format=file_info.get("format", "wav"), tool_call_id=tool_call_id, - original_part=part, ) ) else: diff --git a/src/memos/mem_reader/read_multi_modal/user_parser.py b/src/memos/mem_reader/read_multi_modal/user_parser.py index c7b8ad4e9..359506e13 100644 --- a/src/memos/mem_reader/read_multi_modal/user_parser.py +++ b/src/memos/mem_reader/read_multi_modal/user_parser.py @@ -68,8 +68,6 @@ def create_source( chat_time=chat_time, message_id=message_id, content=part.get("text", ""), - # Save original part for reconstruction - original_part=part, ) ) elif part_type == "file": @@ -82,7 +80,6 @@ def create_source( message_id=message_id, doc_path=file_info.get("filename") or file_info.get("file_id", ""), content=file_info.get("file_data", ""), - original_part=part, ) ) elif part_type == "image_url": @@ -94,7 +91,6 @@ def create_source( chat_time=chat_time, message_id=message_id, image_path=image_info.get("url"), - original_part=part, ) ) else: @@ -106,7 +102,6 @@ def create_source( chat_time=chat_time, message_id=message_id, content=f"[{part_type}]", - original_part=part, ) ) else: