diff --git a/src/memos/mem_reader/read_multi_modal/file_content_parser.py b/src/memos/mem_reader/read_multi_modal/file_content_parser.py index ad862d559..408736d2f 100644 --- a/src/memos/mem_reader/read_multi_modal/file_content_parser.py +++ b/src/memos/mem_reader/read_multi_modal/file_content_parser.py @@ -2,6 +2,7 @@ import concurrent.futures import os +import re import tempfile from typing import Any @@ -13,6 +14,7 @@ from memos.llms.base import BaseLLM from memos.log import get_logger from memos.mem_reader.read_multi_modal.base import BaseMessageParser, _derive_key +from memos.mem_reader.read_multi_modal.image_parser import ImageParser from memos.mem_reader.read_multi_modal.utils import ( detect_lang, get_parser, @@ -129,6 +131,137 @@ def _handle_local(self, data: str) -> str: logger.info("[FileContentParser] Local file paths are not supported in fine mode.") return "" + def _process_single_image( + self, image_url: str, original_ref: str, info: dict[str, Any], **kwargs + ) -> tuple[str, str]: + """ + Process a single image and return (original_ref, replacement_text). + + Args: + image_url: URL of the image to process + original_ref: Original markdown image reference to replace + info: Dictionary containing user_id and session_id + **kwargs: Additional parameters for ImageParser + + Returns: + Tuple of (original_ref, replacement_text) + """ + try: + # Construct image message format for ImageParser + image_message = { + "type": "image_url", + "image_url": { + "url": image_url, + "detail": "auto", + }, + } + + # Process image using ImageParser + logger.debug(f"[FileContentParser] Processing image: {image_url}") + memory_items = self.image_parser.parse_fine(image_message, info, **kwargs) + + # Extract text content from memory items (only strings as requested) + extracted_texts = [] + for item in memory_items: + if hasattr(item, "memory") and item.memory: + extracted_texts.append(str(item.memory)) + + if extracted_texts: + # Combine all extracted texts + extracted_content = "\n".join(extracted_texts) + # Replace image with extracted content + return ( + original_ref, + f"\n[Image Content from {image_url}]:\n{extracted_content}\n", + ) + else: + # If no content extracted, keep original with a note + logger.warning(f"[FileContentParser] No content extracted from image: {image_url}") + return ( + original_ref, + f"\n[Image: {image_url} - No content extracted]\n", + ) + + except Exception as e: + logger.error(f"[FileContentParser] Error processing image {image_url}: {e}") + # On error, keep original image reference + return (original_ref, original_ref) + + def _extract_and_process_images(self, text: str, info: dict[str, Any], **kwargs) -> str: + """ + Extract all images from markdown text and process them using ImageParser in parallel. + Replaces image references with extracted text content. + + Args: + text: Markdown text containing image references + info: Dictionary containing user_id and session_id + **kwargs: Additional parameters for ImageParser + + Returns: + Text with image references replaced by extracted content + """ + if not text or not self.image_parser: + return text + + # Pattern to match markdown images: ![](url) or ![alt](url) + image_pattern = r"!\[([^\]]*)\]\(([^)]+)\)" + + # Find all image matches first + image_matches = list(re.finditer(image_pattern, text)) + if not image_matches: + return text + + logger.info(f"[FileContentParser] Found {len(image_matches)} images to process in parallel") + + # Prepare tasks for parallel processing + tasks = [] + for match in image_matches: + image_url = match.group(2) + original_ref = match.group(0) + tasks.append((image_url, original_ref)) + + # Process images in parallel + replacements = {} + max_workers = min(len(tasks), 10) # Limit concurrent image processing + + with ContextThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit( + self._process_single_image, image_url, original_ref, info, **kwargs + ): (image_url, original_ref) + for image_url, original_ref in tasks + } + + # Collect results with progress tracking + for future in tqdm( + concurrent.futures.as_completed(futures), + total=len(futures), + desc="[FileContentParser] Processing images", + ): + try: + original_ref, replacement = future.result() + replacements[original_ref] = replacement + except Exception as e: + image_url, original_ref = futures[future] + logger.error(f"[FileContentParser] Future failed for image {image_url}: {e}") + # On error, keep original image reference + replacements[original_ref] = original_ref + + # Replace all images in the text + processed_text = text + for original, replacement in replacements.items(): + processed_text = processed_text.replace(original, replacement, 1) + + # Count successfully extracted images + success_count = sum( + 1 for replacement in replacements.values() if "Image Content from" in replacement + ) + logger.info( + f"[FileContentParser] Processed {len(image_matches)} images in parallel, " + f"extracted content for {success_count} images" + ) + return processed_text + def __init__( self, embedder: BaseEmbedder, @@ -149,6 +282,8 @@ def __init__( """ super().__init__(embedder, llm) self.parser = parser + # Initialize ImageParser for processing images in markdown + self.image_parser = ImageParser(embedder, llm) if llm else None # Get inner markdown hostnames from config or environment if direct_markdown_hostnames is not None: @@ -521,6 +656,10 @@ def parse_fine( f"[FileContentParser] Failed to delete temp file {temp_file_path}: {e}" ) + # Extract and process images from parsed_text + if is_markdown and parsed_text and self.image_parser: + parsed_text = self._extract_and_process_images(parsed_text, info, **kwargs) + # Extract info fields if not info: info = {} diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 2a3bae944..470d2c483 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -92,7 +92,7 @@ def add( """ added_ids: list[str] = [] - with ContextThreadPoolExecutor(max_workers=50) as executor: + with ContextThreadPoolExecutor(max_workers=10) as executor: futures = {executor.submit(self._process_memory, m, user_name): m for m in memories} for future in as_completed(futures, timeout=500): try: