diff --git a/src/memos/mem_reader/read_multi_modal/file_content_parser.py b/src/memos/mem_reader/read_multi_modal/file_content_parser.py index c8ca9a400..4ec4f5279 100644 --- a/src/memos/mem_reader/read_multi_modal/file_content_parser.py +++ b/src/memos/mem_reader/read_multi_modal/file_content_parser.py @@ -1,31 +1,86 @@ """Parser for file content parts (RawMessageList).""" +import concurrent.futures import os import tempfile from typing import Any +from tqdm import tqdm + +from memos.context.context import ContextThreadPoolExecutor from memos.embedders.base import BaseEmbedder from memos.llms.base import BaseLLM from memos.log import get_logger +from memos.mem_reader.read_multi_modal.base import BaseMessageParser, _derive_key +from memos.mem_reader.read_multi_modal.utils import ( + detect_lang, + get_parser, + parse_json_result, +) from memos.memories.textual.item import ( SourceMessage, TextualMemoryItem, TreeNodeTextualMemoryMetadata, ) +from memos.templates.mem_reader_prompts import ( + CUSTOM_TAGS_INSTRUCTION, + CUSTOM_TAGS_INSTRUCTION_ZH, + SIMPLE_STRUCT_DOC_READER_PROMPT, + SIMPLE_STRUCT_DOC_READER_PROMPT_ZH, +) from memos.types.openai_chat_completion_types import File -from .base import BaseMessageParser, _derive_key -from .utils import get_parser - logger = get_logger(__name__) +# Prompt dictionary for doc processing (shared by simple_struct and file_content_parser) +DOC_PROMPT_DICT = { + "doc": {"en": SIMPLE_STRUCT_DOC_READER_PROMPT, "zh": SIMPLE_STRUCT_DOC_READER_PROMPT_ZH}, + "custom_tags": {"en": CUSTOM_TAGS_INSTRUCTION, "zh": CUSTOM_TAGS_INSTRUCTION_ZH}, +} + class FileContentParser(BaseMessageParser): """Parser for file content parts.""" - def _handle_url(self, url_str: str, filename: str) -> tuple[str, str | None]: + def _get_doc_llm_response(self, chunk_text: str, custom_tags: list[str] | None = None) -> dict: + """ + Call LLM to extract memory from document chunk. + Uses doc prompts from DOC_PROMPT_DICT. + + Args: + chunk_text: Text chunk to extract memory from + custom_tags: Optional list of custom tags for LLM extraction + + Returns: + Parsed JSON response from LLM or empty dict if failed + """ + if not self.llm: + logger.warning("[FileContentParser] LLM not available for fine mode") + return {} + + lang = detect_lang(chunk_text) + template = DOC_PROMPT_DICT["doc"][lang] + prompt = template.replace("{chunk_text}", chunk_text) + + custom_tags_prompt = ( + DOC_PROMPT_DICT["custom_tags"][lang].replace("{custom_tags}", str(custom_tags)) + if custom_tags + else "" + ) + prompt = prompt.replace("{custom_tags_prompt}", custom_tags_prompt) + + messages = [{"role": "user", "content": prompt}] + try: + response_text = self.llm.generate(messages) + response_json = parse_json_result(response_text) + except Exception as e: + logger.error(f"[FileContentParser] LLM generation error: {e}") + response_json = {} + return response_json + + def _handle_url(self, url_str: str, filename: str) -> tuple[str, str | None, bool]: """Download and parse file from URL.""" try: from urllib.parse import urlparse @@ -42,14 +97,14 @@ def _handle_url(self, url_str: str, filename: str) -> tuple[str, str | None]: filename = os.path.basename(parsed_url.path) or "downloaded_file" if hostname in self.direct_markdown_hostnames: - return response.text, None + return response.text, None, True file_ext = os.path.splitext(filename)[1].lower() if file_ext in [".md", ".markdown", ".txt"]: - return response.text, None + return response.text, None, True with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=file_ext) as temp_file: temp_file.write(response.content) - return "", temp_file.name + return "", temp_file.name, False except Exception as e: logger.error(f"[FileContentParser] URL processing error: {e}") return f"[File URL download failed: {url_str}]", None @@ -261,6 +316,8 @@ def parse_fast( # Extract info fields info_ = info.copy() + if file_id: + info_.update({"file_id": file_id}) user_id = info_.pop("user_id", "") session_id = info_.pop("session_id", "") @@ -331,10 +388,19 @@ def parse_fine( """ Parse file content part in fine mode. Fine mode downloads and parses file content, especially for URLs. + Then uses LLM to extract structured memories from each chunk. + Handles various file parameter scenarios: - file_data: URL (http://, https://, or @http://), base64 encoded data, or plain text content - file_id: ID of an uploaded file - filename: name of the file + + Args: + message: File content part to parse + info: Dictionary containing user_id and session_id + **kwargs: Additional parameters including: + - custom_tags: Optional list of custom tags for LLM extraction + - context_items: Optional list of TextualMemoryItem for context """ if not isinstance(message, dict): logger.warning(f"[FileContentParser] Expected dict, got {type(message)}") @@ -351,6 +417,9 @@ def parse_fine( file_id = file_info.get("file_id", "") filename = file_info.get("filename", "") + # Extract custom_tags from kwargs (for LLM extraction) + custom_tags = kwargs.get("custom_tags") + # Use parser from utils parser = self.parser or get_parser() if not parser: @@ -359,6 +428,7 @@ def parse_fine( parsed_text = "" temp_file_path = None + is_markdown = False try: # Priority 1: If file_data is provided, process it @@ -367,7 +437,9 @@ def parse_fine( url_str = file_data[1:] if file_data.startswith("@") else file_data if url_str.startswith(("http://", "https://")): - parsed_text, temp_file_path = self._handle_url(url_str, filename) + parsed_text, temp_file_path, is_markdown = self._handle_url( + url_str, filename + ) if temp_file_path: try: # Use parser from utils @@ -432,26 +504,30 @@ def parse_fine( # Split parsed text into chunks content_chunks = self._split_text(parsed_text) - # Create memory items for each chunk - memory_items = [] - for chunk_idx, chunk_text in enumerate(content_chunks): - if not chunk_text.strip(): - continue - - memory_item = TextualMemoryItem( - memory=chunk_text, + # Filter out empty chunks and create indexed list + valid_chunks = [ + (idx, chunk_text) for idx, chunk_text in enumerate(content_chunks) if chunk_text.strip() + ] + total_chunks = len(content_chunks) + + # Helper function to create memory item (similar to SimpleStructMemReader._make_memory_item) + def _make_memory_item( + value: str, + mem_type: str = memory_type, + tags: list[str] | None = None, + key: str | None = None, + ) -> TextualMemoryItem: + """Construct memory item with common fields.""" + return TextualMemoryItem( + memory=value, metadata=TreeNodeTextualMemoryMetadata( user_id=user_id, session_id=session_id, - memory_type=memory_type, + memory_type=mem_type, status="activated", - tags=[ - "mode:fine", - "multimodal:file", - f"chunk:{chunk_idx + 1}/{len(content_chunks)}", - ], - key=_derive_key(chunk_text), - embedding=self.embedder.embed([chunk_text])[0], + tags=tags or [], + key=key if key is not None else _derive_key(value), + embedding=self.embedder.embed([value])[0], usage=[], sources=[source], background="", @@ -460,28 +536,102 @@ def parse_fine( info=info_, ), ) - memory_items.append(memory_item) - # If no chunks were created, create a placeholder - if not memory_items: - memory_item = TextualMemoryItem( - memory=parsed_text, - metadata=TreeNodeTextualMemoryMetadata( - user_id=user_id, - session_id=session_id, - memory_type=memory_type, - status="activated", - tags=["mode:fine", "multimodal:file"], - key=_derive_key(parsed_text), - embedding=self.embedder.embed([parsed_text])[0], - usage=[], - sources=[source], - background="", - confidence=0.99, - type="fact", - info=info_, - ), + # Helper function to create fallback item for a chunk + def _make_fallback( + chunk_idx: int, chunk_text: str, reason: str = "raw" + ) -> TextualMemoryItem: + """Create fallback memory item with raw chunk text.""" + return _make_memory_item( + value=chunk_text, + tags=[ + "mode:fine", + "multimodal:file", + f"fallback:{reason}", + f"chunk:{chunk_idx + 1}/{total_chunks}", + ], ) - memory_items.append(memory_item) - return memory_items + # Handle empty chunks case + if not valid_chunks: + return [ + _make_memory_item( + value=parsed_text or "[File: empty content]", + tags=["mode:fine", "multimodal:file"], + ) + ] + + # If no LLM available, create memory items directly from chunks + if not self.llm: + return [_make_fallback(idx, text, "no_llm") for idx, text in valid_chunks] + + # Process single chunk with LLM extraction (worker function) + def _process_chunk(chunk_idx: int, chunk_text: str) -> TextualMemoryItem: + """Process chunk with LLM, fallback to raw on failure.""" + try: + response_json = self._get_doc_llm_response(chunk_text, custom_tags) + if response_json: + value = response_json.get("value", "").strip() + if value: + tags = response_json.get("tags", []) + tags = tags if isinstance(tags, list) else [] + tags.extend(["mode:fine", "multimodal:file"]) + + llm_mem_type = response_json.get("memory_type", memory_type) + if llm_mem_type not in ["LongTermMemory", "UserMemory"]: + llm_mem_type = memory_type + + return _make_memory_item( + value=value, + mem_type=llm_mem_type, + tags=tags, + key=response_json.get("key"), + ) + except Exception as e: + logger.error(f"[FileContentParser] LLM error for chunk {chunk_idx}: {e}") + + # Fallback to raw chunk + logger.warning(f"[FileContentParser] Fallback to raw for chunk {chunk_idx}") + return _make_fallback(chunk_idx, chunk_text) + + # Process chunks concurrently with progress bar + memory_items = [] + chunk_map = dict(valid_chunks) + total_chunks = len(valid_chunks) + + logger.info(f"[FileContentParser] Processing {total_chunks} chunks with LLM...") + + with ContextThreadPoolExecutor(max_workers=20) as executor: + futures = { + executor.submit(_process_chunk, idx, text): idx for idx, text in valid_chunks + } + + # Use tqdm for progress bar (similar to simple_struct.py _process_doc_data) + for future in tqdm( + concurrent.futures.as_completed(futures), + total=total_chunks, + desc="[FileContentParser] Processing chunks", + ): + chunk_idx = futures[future] + try: + node = future.result() + if node: + memory_items.append(node) + except Exception as e: + tqdm.write(f"[ERROR] Chunk {chunk_idx} failed: {e}") + logger.error(f"[FileContentParser] Future failed for chunk {chunk_idx}: {e}") + # Create fallback for failed future + if chunk_idx in chunk_map: + memory_items.append( + _make_fallback(chunk_idx, chunk_map[chunk_idx], "error") + ) + + logger.info( + f"[FileContentParser] Completed processing {len(memory_items)}/{total_chunks} chunks" + ) + + return memory_items or [ + _make_memory_item( + value=parsed_text or "[File: empty content]", tags=["mode:fine", "multimodal:file"] + ) + ] diff --git a/src/memos/mem_reader/read_multi_modal/utils.py b/src/memos/mem_reader/read_multi_modal/utils.py index 9582a258c..0c887a9f2 100644 --- a/src/memos/mem_reader/read_multi_modal/utils.py +++ b/src/memos/mem_reader/read_multi_modal/utils.py @@ -1,5 +1,6 @@ """Utility functions for message parsing.""" +import json import os import re @@ -43,6 +44,63 @@ re.I, ) + +def parse_json_result(response_text: str) -> dict: + """ + Parse JSON result from LLM response. + + Handles various formats including: + - JSON wrapped in markdown code blocks + - Raw JSON + - Incomplete JSON (attempts to fix) + + Args: + response_text: Raw response text from LLM + + Returns: + Parsed dictionary or empty dict if parsing fails + """ + s = (response_text or "").strip() + + m = re.search(r"```(?:json)?\s*([\s\S]*?)```", s, flags=re.I) + s = (m.group(1) if m else s.replace("```", "")).strip() + + i = s.find("{") + if i == -1: + return {} + s = s[i:].strip() + + try: + return json.loads(s) + except json.JSONDecodeError: + pass + + j = max(s.rfind("}"), s.rfind("]")) + if j != -1: + try: + return json.loads(s[: j + 1]) + except json.JSONDecodeError: + pass + + def _cheap_close(t: str) -> str: + t += "}" * max(0, t.count("{") - t.count("}")) + t += "]" * max(0, t.count("[") - t.count("]")) + return t + + t = _cheap_close(s) + try: + return json.loads(t) + except json.JSONDecodeError as e: + if "Invalid \\escape" in str(e): + s = s.replace("\\", "\\\\") + try: + return json.loads(s) + except json.JSONDecodeError: + pass + logger.error(f"[JSONParse] Failed to decode JSON: {e}\nRaw: {response_text}") + return {} + + # Default configuration for parser and text splitter DEFAULT_PARSER_CONFIG = { "backend": "markitdown", @@ -114,7 +172,10 @@ def _simple_split_text(text: str, chunk_size: int, chunk_overlap: int) -> list[s from langchain.text_splitter import RecursiveCharacterTextSplitter except ImportError: try: - from langchain_text_splitters import RecursiveCharacterTextSplitter + from langchain_text_splitters import ( + MarkdownHeaderTextSplitter, + RecursiveCharacterTextSplitter, + ) except ImportError: logger.error( "langchain not available. Install with: pip install langchain or pip install langchain-text-splitters" @@ -126,6 +187,10 @@ def _simple_split_text(text: str, chunk_size: int, chunk_overlap: int) -> list[s length_function=len, separators=["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " ", ""], ) + markdown_text_splitter = MarkdownHeaderTextSplitter( + headers_to_split_on=[("#", "Header 1"), ("##", "Header 2"), ("###", "Header 3")], + strip_headers=False, + ) logger.debug( f"[FileContentParser] Initialized langchain text splitter with chunk_size={DEFAULT_CHUNK_SIZE}, " f"chunk_overlap={DEFAULT_CHUNK_OVERLAP}"