Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cbed950
fix: doc fine mode bug
CaralHsi Dec 7, 2025
20e0839
fix: doc fine mode bug
CaralHsi Dec 7, 2025
fff0fb2
feat: init longbench_v2
CaralHsi Dec 7, 2025
15562c4
Merge branch 'dev' into feat/evaluation_doc_qa
CaralHsi Dec 7, 2025
9beabba
feat: more strict embedder trucation
CaralHsi Dec 7, 2025
fc54da8
Merge branch 'feat/evaluation_doc_qa' of github.com:CaralHsi/MemOSRea…
CaralHsi Dec 7, 2025
8f368bb
feat: parallel processing fine mode in multi-modal-fine
CaralHsi Dec 7, 2025
54897a9
Merge branch 'dev' into feat/evaluation_doc_qa
CaralHsi Dec 7, 2025
be293bc
feat: update parsers; add chunk info into source; remove origin_part
CaralHsi Dec 8, 2025
ba1c161
fix: conflict
CaralHsi Dec 8, 2025
8e8b91b
Merge branch 'dev' into feat/evaluation_doc_qa
CaralHsi Dec 8, 2025
2edd0a3
feat: modify chunk_content in file-fine-parser
CaralHsi Dec 8, 2025
6991ed7
Merge branch 'feat/evaluation_doc_qa' of github.com:CaralHsi/MemOSRea…
CaralHsi Dec 8, 2025
45609ab
Merge branch 'dev' into feat/evaluation_doc_qa
CaralHsi Dec 8, 2025
f80896e
fix: token counter bug
CaralHsi Dec 8, 2025
a3f2b32
Merge branch 'feat/evaluation_doc_qa' of github.com:CaralHsi/MemOSRea…
CaralHsi Dec 8, 2025
b375d51
feat: enlarge polardb
CaralHsi Dec 8, 2025
0bfcaa9
Merge branch 'dev' into feat/evaluation_doc_qa
CaralHsi Dec 8, 2025
69dd3a8
feat: derease parallrl
CaralHsi Dec 8, 2025
7fa7b77
Merge branch 'feat/evaluation_doc_qa' of github.com:CaralHsi/MemOSRea…
CaralHsi Dec 8, 2025
ac38046
feat: add image parser in file
CaralHsi Dec 8, 2025
ef02140
feat: add image parser in file
CaralHsi Dec 8, 2025
37bcc90
feat: update file_content_parser
CaralHsi Dec 8, 2025
7e2adb4
Merge branch 'dev' into feat/evaluation_doc_qa
CaralHsi Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions src/memos/mem_reader/read_multi_modal/file_content_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import concurrent.futures
import os
import re
import tempfile

from typing import Any
Expand All @@ -13,6 +14,7 @@
from memos.llms.base import BaseLLM
from memos.log import get_logger
from memos.mem_reader.read_multi_modal.base import BaseMessageParser, _derive_key
from memos.mem_reader.read_multi_modal.image_parser import ImageParser
from memos.mem_reader.read_multi_modal.utils import (
detect_lang,
get_parser,
Expand Down Expand Up @@ -129,6 +131,137 @@ def _handle_local(self, data: str) -> str:
logger.info("[FileContentParser] Local file paths are not supported in fine mode.")
return ""

def _process_single_image(
self, image_url: str, original_ref: str, info: dict[str, Any], **kwargs
) -> tuple[str, str]:
"""
Process a single image and return (original_ref, replacement_text).

Args:
image_url: URL of the image to process
original_ref: Original markdown image reference to replace
info: Dictionary containing user_id and session_id
**kwargs: Additional parameters for ImageParser

Returns:
Tuple of (original_ref, replacement_text)
"""
try:
# Construct image message format for ImageParser
image_message = {
"type": "image_url",
"image_url": {
"url": image_url,
"detail": "auto",
},
}

# Process image using ImageParser
logger.debug(f"[FileContentParser] Processing image: {image_url}")
memory_items = self.image_parser.parse_fine(image_message, info, **kwargs)

# Extract text content from memory items (only strings as requested)
extracted_texts = []
for item in memory_items:
if hasattr(item, "memory") and item.memory:
extracted_texts.append(str(item.memory))

if extracted_texts:
# Combine all extracted texts
extracted_content = "\n".join(extracted_texts)
# Replace image with extracted content
return (
original_ref,
f"\n[Image Content from {image_url}]:\n{extracted_content}\n",
)
else:
# If no content extracted, keep original with a note
logger.warning(f"[FileContentParser] No content extracted from image: {image_url}")
return (
original_ref,
f"\n[Image: {image_url} - No content extracted]\n",
)

except Exception as e:
logger.error(f"[FileContentParser] Error processing image {image_url}: {e}")
# On error, keep original image reference
return (original_ref, original_ref)

def _extract_and_process_images(self, text: str, info: dict[str, Any], **kwargs) -> str:
"""
Extract all images from markdown text and process them using ImageParser in parallel.
Replaces image references with extracted text content.

Args:
text: Markdown text containing image references
info: Dictionary containing user_id and session_id
**kwargs: Additional parameters for ImageParser

Returns:
Text with image references replaced by extracted content
"""
if not text or not self.image_parser:
return text

# Pattern to match markdown images: ![](url) or ![alt](url)
image_pattern = r"!\[([^\]]*)\]\(([^)]+)\)"

# Find all image matches first
image_matches = list(re.finditer(image_pattern, text))
if not image_matches:
return text

logger.info(f"[FileContentParser] Found {len(image_matches)} images to process in parallel")

# Prepare tasks for parallel processing
tasks = []
for match in image_matches:
image_url = match.group(2)
original_ref = match.group(0)
tasks.append((image_url, original_ref))

# Process images in parallel
replacements = {}
max_workers = min(len(tasks), 10) # Limit concurrent image processing

with ContextThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
self._process_single_image, image_url, original_ref, info, **kwargs
): (image_url, original_ref)
for image_url, original_ref in tasks
}

# Collect results with progress tracking
for future in tqdm(
concurrent.futures.as_completed(futures),
total=len(futures),
desc="[FileContentParser] Processing images",
):
try:
original_ref, replacement = future.result()
replacements[original_ref] = replacement
except Exception as e:
image_url, original_ref = futures[future]
logger.error(f"[FileContentParser] Future failed for image {image_url}: {e}")
# On error, keep original image reference
replacements[original_ref] = original_ref

# Replace all images in the text
processed_text = text
for original, replacement in replacements.items():
processed_text = processed_text.replace(original, replacement, 1)

# Count successfully extracted images
success_count = sum(
1 for replacement in replacements.values() if "Image Content from" in replacement
)
logger.info(
f"[FileContentParser] Processed {len(image_matches)} images in parallel, "
f"extracted content for {success_count} images"
)
return processed_text

def __init__(
self,
embedder: BaseEmbedder,
Expand All @@ -149,6 +282,8 @@ def __init__(
"""
super().__init__(embedder, llm)
self.parser = parser
# Initialize ImageParser for processing images in markdown
self.image_parser = ImageParser(embedder, llm) if llm else None

# Get inner markdown hostnames from config or environment
if direct_markdown_hostnames is not None:
Expand Down Expand Up @@ -521,6 +656,10 @@ def parse_fine(
f"[FileContentParser] Failed to delete temp file {temp_file_path}: {e}"
)

# Extract and process images from parsed_text
if is_markdown and parsed_text and self.image_parser:
parsed_text = self._extract_and_process_images(parsed_text, info, **kwargs)

# Extract info fields
if not info:
info = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def add(
"""
added_ids: list[str] = []

with ContextThreadPoolExecutor(max_workers=50) as executor:
with ContextThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(self._process_memory, m, user_name): m for m in memories}
for future in as_completed(futures, timeout=500):
try:
Expand Down
Loading