Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/memos/mem_reader/multi_modal_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]:

fine_memory_items: list[TextualMemoryItem] = []

with ContextThreadPoolExecutor(max_workers=8) as executor:
with ContextThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(_process_one_item, item) for item in fast_memory_items]

for future in concurrent.futures.as_completed(futures):
Expand Down
92 changes: 71 additions & 21 deletions src/memos/mem_reader/read_multi_modal/file_content_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,28 +167,38 @@ def create_source(
self,
message: File,
info: dict[str, Any],
chunk_index: int | None = None,
chunk_total: int | None = None,
chunk_content: str | None = None,
) -> SourceMessage:
"""Create SourceMessage from file content part."""
if isinstance(message, dict):
file_info = message.get("file", {})
return SourceMessage(
type="file",
doc_path=file_info.get("filename") or file_info.get("file_id", ""),
content=chunk_content if chunk_content else file_info.get("file_data", ""),
original_part=message,
)
return SourceMessage(type="file", doc_path=str(message))
source_dict = {
"type": "file",
"doc_path": file_info.get("filename") or file_info.get("file_id", ""),
"content": chunk_content if chunk_content else file_info.get("file_data", ""),
}
# Add chunk ordering information if provided
if chunk_index is not None:
source_dict["chunk_index"] = chunk_index
if chunk_total is not None:
source_dict["chunk_total"] = chunk_total
return SourceMessage(**source_dict)
source_dict = {"type": "file", "doc_path": str(message)}
if chunk_index is not None:
source_dict["chunk_index"] = chunk_index
if chunk_total is not None:
source_dict["chunk_total"] = chunk_total
if chunk_content is not None:
source_dict["content"] = chunk_content
return SourceMessage(**source_dict)

def rebuild_from_source(
self,
source: SourceMessage,
) -> File:
"""Rebuild file content part from SourceMessage."""
# Use original_part if available
if hasattr(source, "original_part") and source.original_part:
return source.original_part

# Rebuild from source fields
return {
"type": "file",
Expand Down Expand Up @@ -312,9 +322,6 @@ def parse_fast(
# Split content into chunks
content_chunks = self._split_text(content)

# Create source
source = self.create_source(message, info)

# Extract info fields
info_ = info.copy()
if file_id:
Expand All @@ -326,12 +333,23 @@ def parse_fast(
# (since we don't have role information at this level)
memory_type = "LongTermMemory"
file_ids = [file_id] if file_id else []
total_chunks = len(content_chunks)

# Create memory items for each chunk
memory_items = []
for chunk_idx, chunk_text in enumerate(content_chunks):
if not chunk_text.strip():
continue

# Create source for this specific chunk with its index and content
source = self.create_source(
message,
info,
chunk_index=chunk_idx,
chunk_total=total_chunks,
chunk_content=chunk_text,
)

memory_item = TextualMemoryItem(
memory=chunk_text,
metadata=TreeNodeTextualMemoryMetadata(
Expand All @@ -342,7 +360,7 @@ def parse_fast(
tags=[
"mode:fast",
"multimodal:file",
f"chunk:{chunk_idx + 1}/{len(content_chunks)}",
f"chunk:{chunk_idx + 1}/{total_chunks}",
],
key=_derive_key(chunk_text),
embedding=self.embedder.embed([chunk_text])[0],
Expand All @@ -359,6 +377,14 @@ def parse_fast(

# If no chunks were created, create a placeholder
if not memory_items:
# Create source for placeholder (no chunk index since there are no chunks)
placeholder_source = self.create_source(
message,
info,
chunk_index=None,
chunk_total=0,
chunk_content=content,
)
memory_item = TextualMemoryItem(
memory=content,
metadata=TreeNodeTextualMemoryMetadata(
Expand All @@ -370,7 +396,7 @@ def parse_fast(
key=_derive_key(content),
embedding=self.embedder.embed([content])[0],
usage=[],
sources=[source],
sources=[placeholder_source],
background="",
confidence=0.99,
type="fact",
Expand Down Expand Up @@ -463,7 +489,9 @@ def parse_fine(
parsed_text = self._handle_base64(file_data)

else:
parsed_text = file_data
# TODO: discuss the proper place for processing
# string file-data
return []
# Priority 2: If file_id is provided but no file_data, try to use file_id as path
elif file_id:
logger.warning(f"[FileContentParser] File data not provided for file_id: {file_id}")
Expand Down Expand Up @@ -518,10 +546,26 @@ def _make_memory_item(
mem_type: str = memory_type,
tags: list[str] | None = None,
key: str | None = None,
chunk_idx: int | None = None,
chunk_content: str | None = None,
) -> TextualMemoryItem:
"""Construct memory item with common fields."""
source = self.create_source(message, info, chunk_content)
"""Construct memory item with common fields.

Args:
value: Memory content (chunk text)
mem_type: Memory type
tags: Tags for the memory item
key: Key for the memory item
chunk_idx: Index of the chunk in the document (0-based)
"""
# Create source for this specific chunk with its index and content
chunk_source = self.create_source(
message,
info,
chunk_index=chunk_idx,
chunk_total=total_chunks,
chunk_content=chunk_content,
)
return TextualMemoryItem(
memory=value,
metadata=TreeNodeTextualMemoryMetadata(
Expand All @@ -533,7 +577,7 @@ def _make_memory_item(
key=key if key is not None else _derive_key(value),
embedding=self.embedder.embed([value])[0],
usage=[],
sources=[source],
sources=[chunk_source],
background="",
confidence=0.99,
type="fact",
Expand All @@ -555,6 +599,8 @@ def _make_fallback(
f"fallback:{reason}",
f"chunk:{chunk_idx + 1}/{total_chunks}",
],
chunk_idx=chunk_idx,
chunk_content=chunk_text,
)

# Handle empty chunks case
Expand All @@ -563,6 +609,7 @@ def _make_fallback(
_make_memory_item(
value=parsed_text or "[File: empty content]",
tags=["mode:fine", "multimodal:file"],
chunk_idx=None,
)
]

Expand Down Expand Up @@ -591,6 +638,7 @@ def _process_chunk(chunk_idx: int, chunk_text: str) -> TextualMemoryItem:
mem_type=llm_mem_type,
tags=tags,
key=response_json.get("key"),
chunk_idx=chunk_idx,
chunk_content=chunk_text,
)
except Exception as e:
Expand Down Expand Up @@ -638,6 +686,8 @@ def _process_chunk(chunk_idx: int, chunk_text: str) -> TextualMemoryItem:

return memory_items or [
_make_memory_item(
value=parsed_text or "[File: empty content]", tags=["mode:fine", "multimodal:file"]
value=parsed_text or "[File: empty content]",
tags=["mode:fine", "multimodal:file"],
chunk_idx=None,
)
]
5 changes: 0 additions & 5 deletions src/memos/mem_reader/read_multi_modal/image_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def create_source(
return SourceMessage(
type="image",
content=url,
original_part=message,
url=url,
detail=detail,
)
Expand All @@ -64,10 +63,6 @@ def rebuild_from_source(
source: SourceMessage,
) -> ChatCompletionContentPartImageParam:
"""Rebuild image_url content part from SourceMessage."""
# Use original_part if available
if hasattr(source, "original_part") and source.original_part:
return source.original_part

# Rebuild from source fields
url = getattr(source, "url", "") or (source.content or "").replace("[image_url]: ", "")
detail = getattr(source, "detail", "auto")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def create_source(
return SourceMessage(
type="text",
content=text,
original_part=message,
)
return SourceMessage(type="text", content=str(message))

Expand Down
3 changes: 0 additions & 3 deletions src/memos/mem_reader/read_multi_modal/tool_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def create_source(
filename=file_info.get("filename", ""),
file_id=file_info.get("file_id", ""),
tool_call_id=tool_call_id,
original_part=part,
)
)
elif part_type == "image_url":
Expand All @@ -93,7 +92,6 @@ def create_source(
content=file_info.get("url", ""),
detail=file_info.get("detail", "auto"),
tool_call_id=tool_call_id,
original_part=part,
)
)
elif part_type == "input_audio":
Expand All @@ -107,7 +105,6 @@ def create_source(
content=file_info.get("data", ""),
format=file_info.get("format", "wav"),
tool_call_id=tool_call_id,
original_part=part,
)
)
else:
Expand Down
5 changes: 0 additions & 5 deletions src/memos/mem_reader/read_multi_modal/user_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ def create_source(
chat_time=chat_time,
message_id=message_id,
content=part.get("text", ""),
# Save original part for reconstruction
original_part=part,
)
)
elif part_type == "file":
Expand All @@ -82,7 +80,6 @@ def create_source(
message_id=message_id,
doc_path=file_info.get("filename") or file_info.get("file_id", ""),
content=file_info.get("file_data", ""),
original_part=part,
)
)
elif part_type == "image_url":
Expand All @@ -94,7 +91,6 @@ def create_source(
chat_time=chat_time,
message_id=message_id,
image_path=image_info.get("url"),
original_part=part,
)
)
else:
Expand All @@ -106,7 +102,6 @@ def create_source(
chat_time=chat_time,
message_id=message_id,
content=f"[{part_type}]",
original_part=part,
)
)
else:
Expand Down
Loading