diff --git a/evaluation/scripts/long_bench-v2/longbench_v2_ingestion.py b/evaluation/scripts/long_bench-v2/longbench_v2_ingestion.py
index fc65e4975..5a5c11968 100644
--- a/evaluation/scripts/long_bench-v2/longbench_v2_ingestion.py
+++ b/evaluation/scripts/long_bench-v2/longbench_v2_ingestion.py
@@ -33,7 +33,7 @@ def ingest_sample(
# Get context and convert to messages
context = sample.get("context", "")
- # For memos, we ingest the context as document content
+ # For memos, we ingest the context as a raw document content
messages = [
{
"type": "file",
@@ -185,7 +185,7 @@ def main(frame, version="default", num_workers=10, max_samples=None):
parser.add_argument(
"--workers",
type=int,
- default=3,
+ default=2,
help="Number of parallel workers",
)
parser.add_argument(
diff --git a/evaluation/scripts/long_bench-v2/longbench_v2_metric.py b/evaluation/scripts/long_bench-v2/longbench_v2_metric.py
index 6a4fc2b7f..af324c9c7 100644
--- a/evaluation/scripts/long_bench-v2/longbench_v2_metric.py
+++ b/evaluation/scripts/long_bench-v2/longbench_v2_metric.py
@@ -4,75 +4,80 @@
def calculate_accuracy(responses):
- """Calculate accuracy metrics for LongBench v2."""
+ """Calculate accuracy metrics for LongBench v2.
+
+ Logic is aligned with longbench_stx.print_metrics, but returns a dict
+ and additionally computes by_domain statistics.
+ """
total = len(responses)
if total == 0:
return {}
- # Overall accuracy
- correct = sum(1 for r in responses if r.get("judge", False))
- overall_acc = round(100 * correct / total, 1)
-
- # By difficulty
- easy_items = [r for r in responses if r.get("difficulty") == "easy"]
- hard_items = [r for r in responses if r.get("difficulty") == "hard"]
- easy_acc = (
- round(100 * sum(1 for r in easy_items if r.get("judge", False)) / len(easy_items), 1)
- if easy_items
- else 0.0
- )
- hard_acc = (
- round(100 * sum(1 for r in hard_items if r.get("judge", False)) / len(hard_items), 1)
- if hard_items
- else 0.0
- )
-
- # By length
- short_items = [r for r in responses if r.get("length") == "short"]
- medium_items = [r for r in responses if r.get("length") == "medium"]
- long_items = [r for r in responses if r.get("length") == "long"]
-
- short_acc = (
- round(100 * sum(1 for r in short_items if r.get("judge", False)) / len(short_items), 1)
- if short_items
- else 0.0
- )
- medium_acc = (
- round(100 * sum(1 for r in medium_items if r.get("judge", False)) / len(medium_items), 1)
- if medium_items
- else 0.0
- )
- long_acc = (
- round(100 * sum(1 for r in long_items if r.get("judge", False)) / len(long_items), 1)
- if long_items
- else 0.0
- )
-
- # By domain
+ # Counters (aligned with longbench_stx.print_metrics)
+ easy = hard = short = medium = long = 0
+ easy_acc = hard_acc = short_acc = medium_acc = long_acc = 0
+ total_prompt_tokens = 0
+
+ for pred in responses:
+ acc = int(pred.get("judge", False))
+ diff = pred.get("difficulty", "easy")
+ length = pred.get("length", "short")
+
+ pt = pred.get("prompt_tokens")
+ if isinstance(pt, int | float):
+ total_prompt_tokens += int(pt)
+
+ if diff == "easy":
+ easy += 1
+ easy_acc += acc
+ else:
+ hard += 1
+ hard_acc += acc
+
+ if length == "short":
+ short += 1
+ short_acc += acc
+ elif length == "medium":
+ medium += 1
+ medium_acc += acc
+ else:
+ long += 1
+ long_acc += acc
+
+ o_acc = round(100 * (easy_acc + hard_acc) / total, 2)
+ e_acc = round(100 * easy_acc / easy, 2) if easy > 0 else 0.0
+ h_acc = round(100 * hard_acc / hard, 2) if hard > 0 else 0.0
+ s_acc = round(100 * short_acc / short, 2) if short > 0 else 0.0
+ m_acc = round(100 * medium_acc / medium, 2) if medium > 0 else 0.0
+ l_acc = round(100 * long_acc / long, 2) if long > 0 else 0.0
+
+ # Additional by-domain stats (extra vs. stx)
domain_stats = {}
- for response in responses:
- domain = response.get("domain", "Unknown")
+ for r in responses:
+ domain = r.get("domain", "Unknown")
if domain not in domain_stats:
domain_stats[domain] = {"total": 0, "correct": 0}
domain_stats[domain]["total"] += 1
- if response.get("judge", False):
+ if r.get("judge", False):
domain_stats[domain]["correct"] += 1
domain_acc = {
- domain: round(100 * stats["correct"] / stats["total"], 1)
+ domain: round(100 * stats["correct"] / stats["total"], 2)
for domain, stats in domain_stats.items()
}
return {
- "overall": overall_acc,
- "easy": easy_acc,
- "hard": hard_acc,
- "short": short_acc,
- "medium": medium_acc,
- "long": long_acc,
+ "overall": o_acc,
+ "easy": e_acc,
+ "hard": h_acc,
+ "short": s_acc,
+ "medium": m_acc,
+ "long": l_acc,
"by_domain": domain_acc,
"total_samples": total,
- "correct_samples": correct,
+ "correct_samples": easy_acc + hard_acc,
+ "total_prompt_tokens": total_prompt_tokens,
+ "avg_prompt_tokens": round(total_prompt_tokens / total, 2) if total > 0 else 0.0,
}
@@ -92,11 +97,36 @@ def main(frame, version="default"):
with open(responses_path, encoding="utf-8") as f:
responses = json.load(f)
- # Only keep entries with non-empty context (search_context) to align with response generation
- filtered = [r for r in responses if str(r.get("search_context", "")).strip() != ""]
-
- # Calculate metrics
- metrics = calculate_accuracy(filtered)
+ # Only keep entries that actually have search results:
+ # - For new pipeline: non-empty memories_used list
+ # - For older runs: non-empty search_context string
+ def _has_search_results(r: dict) -> bool:
+ mems = r.get("memories_used")
+ if isinstance(mems, list) and any(str(m).strip() for m in mems):
+ return True
+ ctx = str(r.get("search_context", "")).strip()
+ return ctx != ""
+
+ filtered = [r for r in responses if _has_search_results(r)]
+
+ # Calculate metrics (handle case where no samples have search results)
+ if not filtered:
+ print("⚠️ No responses with valid search results were found. Metrics will be zeroed.")
+ metrics = {
+ "overall": 0.0,
+ "easy": 0.0,
+ "hard": 0.0,
+ "short": 0.0,
+ "medium": 0.0,
+ "long": 0.0,
+ "by_domain": {},
+ "total_samples": 0,
+ "correct_samples": 0,
+ "total_prompt_tokens": 0,
+ "avg_prompt_tokens": 0.0,
+ }
+ else:
+ metrics = calculate_accuracy(filtered)
# Save metrics
output_path = f"results/long_bench_v2/{frame}-{version}/{frame}_longbench_v2_metrics.json"
@@ -112,12 +142,13 @@ def main(frame, version="default"):
# Print summary table
print("\n📊 Summary of Results:")
print("-" * 80)
- print(f"{'Overall Accuracy':<30s}: {metrics['overall']:.1f}%")
- print(f"{'Easy':<30s}: {metrics['easy']:.1f}%")
- print(f"{'Hard':<30s}: {metrics['hard']:.1f}%")
- print(f"{'Short':<30s}: {metrics['short']:.1f}%")
- print(f"{'Medium':<30s}: {metrics['medium']:.1f}%")
- print(f"{'Long':<30s}: {metrics['long']:.1f}%")
+ print(f"{'Overall Accuracy':<30s}: {metrics['overall']:.2f}%")
+ print(f"{'Easy':<30s}: {metrics['easy']:.2f}%")
+ print(f"{'Hard':<30s}: {metrics['hard']:.2f}%")
+ print(f"{'Short':<30s}: {metrics['short']:.2f}%")
+ print(f"{'Medium':<30s}: {metrics['medium']:.2f}%")
+ print(f"{'Long':<30s}: {metrics['long']:.2f}%")
+ print(f"{'Avg Prompt Tokens':<30s}: {metrics.get('avg_prompt_tokens', 0.0):.2f}")
print("\nBy Domain:")
for domain, acc in metrics["by_domain"].items():
print(f" {domain:<28s}: {acc:.1f}%")
diff --git a/evaluation/scripts/long_bench-v2/longbench_v2_responses.py b/evaluation/scripts/long_bench-v2/longbench_v2_responses.py
index cc1586112..686062c5f 100644
--- a/evaluation/scripts/long_bench-v2/longbench_v2_responses.py
+++ b/evaluation/scripts/long_bench-v2/longbench_v2_responses.py
@@ -22,94 +22,134 @@
sys.path.insert(0, EVAL_SCRIPTS_DIR)
-# Prompt template from LongBench v2
-LONGBENCH_V2_PROMPT = """Please read the following text and answer the question below.
+# RAG-style prompt template aligned with longbench_stx.TEMPLATE_RAG
+TEMPLATE_RAG = """Please read the following retrieved text chunks and answer the question below.
-{context}
+$DOC$
-What is the correct answer to this question: {question}
+What is the correct answer to this question: $Q$
Choices:
-(A) {choice_A}
-(B) {choice_B}
-(C) {choice_C}
-(D) {choice_D}
+(A) $C_A$
+(B) $C_B$
+(C) $C_C$
+(D) $C_D$
Format your response as follows: "The correct answer is (insert answer here)"."""
def extract_answer(response):
- """Extract answer from response (A, B, C, or D)."""
+ """Extract answer from response (A, B, C, or D).
+
+ Logic is kept consistent with longbench_stx.extract_answer.
+ """
response = response.replace("*", "")
# Try to find "The correct answer is (X)" pattern
- match = re.search(r"The correct answer is \(([A-D])\)", response, re.IGNORECASE)
+ match = re.search(r"The correct answer is \(([A-D])\)", response)
if match:
- return match.group(1).upper()
+ return match.group(1)
else:
- match = re.search(r"The correct answer is ([A-D])", response, re.IGNORECASE)
+ match = re.search(r"The correct answer is ([A-D])", response)
if match:
- return match.group(1).upper()
- else:
- # Try to find standalone A, B, C, or D
- match = re.search(r"\b([A-D])\b", response)
- if match:
- return match.group(1).upper()
- return None
-
-
-def generate_response(llm_client, context, question, choice_a, choice_b, choice_c, choice_d):
- """Generate response using LLM."""
- prompt = LONGBENCH_V2_PROMPT.format(
- context=context,
- question=question,
- choice_A=choice_a,
- choice_B=choice_b,
- choice_C=choice_c,
- choice_D=choice_d,
+ return match.group(1)
+ return None
+
+
+def llm_answer(llm_client, memories, question, choices):
+ """Generate response using RAG-style prompt, aligned with longbench_stx.llm_answer.
+
+ Returns:
+ tuple[str, int | None]: (response_text, prompt_tokens)
+ """
+ # Join memories to form the retrieved context document
+ doc_content = "\n\n".join([f"Retrieved chunk {idx + 1}: {m}" for idx, m in enumerate(memories)])
+
+ prompt = (
+ TEMPLATE_RAG.replace("$DOC$", doc_content)
+ .replace("$Q$", question)
+ .replace("$C_A$", choices.get("A", ""))
+ .replace("$C_B$", choices.get("B", ""))
+ .replace("$C_C$", choices.get("C", ""))
+ .replace("$C_D$", choices.get("D", ""))
)
try:
response = llm_client.chat.completions.create(
model=os.getenv("CHAT_MODEL"),
- messages=[
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": prompt},
- ],
+ messages=[{"role": "user", "content": prompt}],
temperature=0.1,
- max_tokens=128,
+ max_tokens=12800,
)
- result = response.choices[0].message.content or ""
- return result
+ text = response.choices[0].message.content or ""
+ prompt_tokens = None
+ usage = getattr(response, "usage", None)
+ if usage is not None:
+ # openai>=1.x style: usage.prompt_tokens
+ pt = getattr(usage, "prompt_tokens", None)
+ if isinstance(pt, int):
+ prompt_tokens = pt
+ else:
+ # fallback for dict-like usage
+ try:
+ prompt_tokens = int(usage.get("prompt_tokens")) # type: ignore[call-arg]
+ except Exception:
+ prompt_tokens = None
+ return text, prompt_tokens
except Exception as e:
print(f"Error generating response: {e}")
- return ""
+ return "", None
def process_sample(search_result, llm_client, success_records, record_file, file_lock):
- """Process a single sample: generate answer."""
+ """Process a single sample: generate answer.
+
+ This mirrors longbench_stx.evaluate_sample but consumes precomputed search results
+ produced by longbench_v2_search.py.
+ """
+ # Use sample_idx when available, otherwise fall back to _id so that
+ # we can work with stx-style search results that only have _id.
sample_idx = search_result.get("sample_idx")
+ sample_key = str(sample_idx) if sample_idx is not None else str(search_result.get("_id", ""))
+
# Skip if already processed
- if sample_idx is not None and str(sample_idx) in success_records:
+ if sample_key and sample_key in success_records:
return None
start = time()
- context = search_result.get("context", "")
question = search_result.get("question", "")
- choice_a = search_result.get("choice_A", "")
- choice_b = search_result.get("choice_B", "")
- choice_c = search_result.get("choice_C", "")
- choice_d = search_result.get("choice_D", "")
+ choices = {
+ "A": search_result.get("choice_A", "") or "",
+ "B": search_result.get("choice_B", "") or "",
+ "C": search_result.get("choice_C", "") or "",
+ "D": search_result.get("choice_D", "") or "",
+ }
- # Skip empty/placeholder contexts (e.g., "\n" or whitespace-only)
- if not context or context.strip() == "":
+ # Prefer memories saved by longbench_v2_search; fall back to reconstructing
+ # from raw search_results if needed (for old search jsons).
+ memories = search_result.get("memories_used")
+ if memories is None:
+ raw = search_result.get("search_results") or {}
+ memories = []
+ if isinstance(raw, dict) and raw.get("text_mem"):
+ text_mem = raw["text_mem"]
+ if text_mem and text_mem[0].get("memories"):
+ memories = [
+ m.get("memory", "") for m in text_mem[0]["memories"] if isinstance(m, dict)
+ ]
+
+ # Ensure we have a list, even if empty
+ memories = memories or []
+
+ # Skip if no retrieved memories and no question
+ if not question:
+ return None
+ if not memories:
return None
# Generate answer
- response = generate_response(
- llm_client, context, question, choice_a, choice_b, choice_c, choice_d
- )
+ response, prompt_tokens = llm_answer(llm_client, memories, str(question), choices)
# Extract answer (A, B, C, or D)
pred = extract_answer(response)
@@ -117,6 +157,7 @@ def process_sample(search_result, llm_client, success_records, record_file, file
response_duration_ms = (time() - start) * 1000
result = {
+ # Preserve sample_idx if present for backward compatibility
"sample_idx": search_result.get("sample_idx"),
"_id": search_result.get("_id"),
"domain": search_result.get("domain"),
@@ -124,15 +165,17 @@ def process_sample(search_result, llm_client, success_records, record_file, file
"difficulty": search_result.get("difficulty"),
"length": search_result.get("length"),
"question": question,
- "choice_A": choice_a,
- "choice_B": choice_b,
- "choice_C": choice_c,
- "choice_D": choice_d,
+ "choice_A": choices["A"],
+ "choice_B": choices["B"],
+ "choice_C": choices["C"],
+ "choice_D": choices["D"],
"answer": search_result.get("answer"),
"pred": pred,
"response": response,
"judge": pred == search_result.get("answer") if pred else False,
- "search_context": context,
+ "prompt_tokens": prompt_tokens,
+ # Keep full retrieved memories list for inspection / debugging
+ "memories_used": memories,
# Preserve full search results payload (e.g., list of memories)
"search_results": search_result.get("search_results"),
"response_duration_ms": response_duration_ms,
@@ -140,9 +183,9 @@ def process_sample(search_result, llm_client, success_records, record_file, file
}
# Record successful processing (thread-safe)
- if sample_idx is not None:
+ if sample_key:
with file_lock, open(record_file, "a") as f:
- f.write(f"{sample_idx}\n")
+ f.write(f"{sample_key}\n")
f.flush()
return result
@@ -175,16 +218,18 @@ def main(frame, version="default", num_workers=10):
search_results = json.load(f)
# Load existing results and success records for resume
- existing_results = {}
- success_records = set()
+ existing_results: dict[str, dict] = {}
+ success_records: set[str] = set()
if os.path.exists(output_path):
with open(output_path, encoding="utf-8") as f:
existing_results_list = json.load(f)
for result in existing_results_list:
+ # Use sample_idx if present, otherwise _id as the unique key
sample_idx = result.get("sample_idx")
- if sample_idx is not None:
- existing_results[sample_idx] = result
- success_records.add(str(sample_idx))
+ key = str(sample_idx) if sample_idx is not None else str(result.get("_id", ""))
+ if key:
+ existing_results[key] = result
+ success_records.add(key)
print(f"📋 Found {len(existing_results)} existing responses (resume mode)")
else:
print("📋 Starting fresh response generation (no checkpoint found)")
@@ -205,7 +250,7 @@ def main(frame, version="default", num_workers=10):
)
print(f"🔌 Using OpenAI client with model: {os.getenv('CHAT_MODEL')}")
- # Process all samples
+ # Process all samples concurrently using ThreadPoolExecutor
new_results = []
file_lock = threading.Lock() # Lock for thread-safe file writing
with ThreadPoolExecutor(max_workers=num_workers) as executor:
@@ -224,15 +269,22 @@ def main(frame, version="default", num_workers=10):
result = future.result()
if result:
new_results.append(result)
- # Update existing results with new result
+ # Update existing results with new result (keyed by sample_idx or _id)
sample_idx = result.get("sample_idx")
- if sample_idx is not None:
- existing_results[sample_idx] = result
+ key = str(sample_idx) if sample_idx is not None else str(result.get("_id", ""))
+ if key:
+ existing_results[key] = result
# Merge and save all results
all_responses = list(existing_results.values())
- # Sort by sample_idx to maintain order
- all_responses.sort(key=lambda x: x.get("sample_idx", 0))
+
+ # Sort by sample_idx when available, otherwise by _id for stability
+ def _sort_key(x: dict):
+ if x.get("sample_idx") is not None:
+ return ("0", int(x.get("sample_idx")))
+ return ("1", str(x.get("_id", "")))
+
+ all_responses.sort(key=_sort_key)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(all_responses, f, ensure_ascii=False, indent=2)
diff --git a/evaluation/scripts/long_bench-v2/longbench_v2_search.py b/evaluation/scripts/long_bench-v2/longbench_v2_search.py
index 9730e937e..2347e5d66 100644
--- a/evaluation/scripts/long_bench-v2/longbench_v2_search.py
+++ b/evaluation/scripts/long_bench-v2/longbench_v2_search.py
@@ -25,63 +25,30 @@ def memos_api_search(client, query, user_id, top_k, frame):
start = time()
search_results = client.search(query=query, user_id=user_id, top_k=top_k)
- def _reorder_memories_by_sources(sr: dict) -> list:
- """
- Reorder text_mem[0].memories using sources' chunk_index (ascending).
- Falls back to original order if no chunk_index is found.
- """
- if not isinstance(sr, dict):
- return []
- text_mem = sr.get("text_mem") or []
- if not text_mem or not text_mem[0].get("memories"):
- return []
- memories = list(text_mem[0]["memories"])
-
- def _first_source(mem: dict):
- if not isinstance(mem, dict):
- return None
- # Prefer top-level sources, else metadata.sources
- return (mem.get("sources") or mem.get("metadata", {}).get("sources") or []) or None
-
- def _chunk_index(mem: dict):
- srcs = _first_source(mem)
- if not srcs or not isinstance(srcs, list):
- return None
- for s in srcs:
- if isinstance(s, dict) and s.get("chunk_index") is not None:
- return s.get("chunk_index")
- return None
-
- # Collect keys
- keyed = []
- for i, mem in enumerate(memories):
- ci = _chunk_index(mem)
- keyed.append((ci, i, mem)) # keep original order as tie-breaker
-
- # If no chunk_index present at all, return original
- if all(ci is None for ci, _, _ in keyed):
- return memories
-
- keyed.sort(key=lambda x: (float("inf") if x[0] is None else x[0], x[1]))
- return [k[2] for k in keyed]
-
- # Format context from search results based on frame type for backward compatibility
- context = ""
+ # Extract raw memory texts in the same way as longbench_stx.memos_search
+ memories_texts: list[str] = []
if (
(frame == "memos-api" or frame == "memos-api-online")
and isinstance(search_results, dict)
and "text_mem" in search_results
):
- ordered_memories = _reorder_memories_by_sources(search_results)
- if not ordered_memories and search_results["text_mem"][0].get("memories"):
- ordered_memories = search_results["text_mem"][0]["memories"]
-
- context = "\n".join([i.get("memory", "") for i in ordered_memories])
- if "pref_string" in search_results:
- context += f"\n{search_results.get('pref_string', '')}"
+ text_mem = search_results.get("text_mem") or []
+ if text_mem and text_mem[0].get("memories"):
+ memories = text_mem[0]["memories"]
+ for m in memories:
+ if not isinstance(m, dict):
+ continue
+ # tags may be at top-level or inside metadata
+ tags = m.get("tags") or m.get("metadata", {}).get("tags") or []
+ # Skip fast-mode memories
+ if any(isinstance(t, str) and "mode:fast" in t for t in tags):
+ continue
+ mem_text = m.get("memory", "")
+ if str(mem_text).strip():
+ memories_texts.append(mem_text)
duration_ms = (time() - start) * 1000
- return context, duration_ms, search_results
+ return memories_texts, duration_ms, search_results
def process_sample(
@@ -98,7 +65,12 @@ def process_sample(
if not query:
return None
- context, duration_ms, search_results = memos_api_search(client, query, user_id, top_k, frame)
+ memories_used, duration_ms, search_results = memos_api_search(
+ client, query, user_id, top_k, frame
+ )
+
+ if not (isinstance(memories_used, list) and any(str(m).strip() for m in memories_used)):
+ return None
result = {
"sample_idx": sample_idx,
@@ -113,8 +85,9 @@ def process_sample(
"choice_C": sample.get("choice_C"),
"choice_D": sample.get("choice_D"),
"answer": sample.get("answer"),
- "context": context,
- # Preserve full search results instead of only the concatenated context
+ # Raw memories used for RAG answering (aligned with longbench_stx)
+ "memories_used": memories_used,
+ # Preserve full search results payload for debugging / analysis
"search_results": search_results,
"search_duration_ms": duration_ms,
}
diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py
index 88ef56b7c..10bac319e 100644
--- a/src/memos/mem_reader/multi_modal_struct.py
+++ b/src/memos/mem_reader/multi_modal_struct.py
@@ -304,6 +304,10 @@ def _get_llm_response(
template = PROMPT_DICT["doc"][lang]
examples = "" # doc prompts don't have examples
prompt = template.replace("{chunk_text}", mem_str)
+ elif prompt_type == "general_string":
+ template = PROMPT_DICT["general_string"][lang]
+ examples = ""
+ prompt = template.replace("{chunk_text}", mem_str)
else:
template = PROMPT_DICT["chat"][lang]
examples = PROMPT_DICT["chat"][f"{lang}_example"]
@@ -316,7 +320,7 @@ def _get_llm_response(
)
# Replace custom_tags_prompt placeholder (different for doc vs chat)
- if prompt_type == "doc":
+ if prompt_type in ["doc", "general_string"]:
prompt = prompt.replace("{custom_tags_prompt}", custom_tags_prompt)
else:
prompt = prompt.replace("${custom_tags_prompt}", custom_tags_prompt)
@@ -348,7 +352,7 @@ def _determine_prompt_type(self, sources: list) -> str:
"""
if not sources:
return "chat"
- prompt_type = "doc"
+ prompt_type = "general_string"
for source in sources:
source_role = None
if hasattr(source, "role"):
diff --git a/src/memos/mem_reader/read_multi_modal/file_content_parser.py b/src/memos/mem_reader/read_multi_modal/file_content_parser.py
index 20fc03ec2..8fa0f2454 100644
--- a/src/memos/mem_reader/read_multi_modal/file_content_parser.py
+++ b/src/memos/mem_reader/read_multi_modal/file_content_parser.py
@@ -612,8 +612,6 @@ def parse_fine(
# Use parser from utils
if parser:
parsed_text = parser.parse(temp_file_path)
- else:
- parsed_text = "[File parsing error: Parser not available]"
except Exception as e:
logger.error(
f"[FileContentParser] Error parsing downloaded file: {e}"
@@ -633,18 +631,9 @@ def parse_fine(
# Priority 2: If file_id is provided but no file_data, try to use file_id as path
elif file_id:
logger.warning(f"[FileContentParser] File data not provided for file_id: {file_id}")
- parsed_text = f"[File ID: {file_id}]: File data not provided"
-
- # If no content could be parsed, create a placeholder
- if not parsed_text:
- if filename:
- parsed_text = f"[File: {filename}] File data not provided"
- else:
- parsed_text = "[File: unknown] File data not provided"
except Exception as e:
logger.error(f"[FileContentParser] Error in parse_fine: {e}")
- parsed_text = f"[File parsing error: {e!s}]"
finally:
# Clean up temporary file
@@ -656,7 +645,8 @@ def parse_fine(
logger.warning(
f"[FileContentParser] Failed to delete temp file {temp_file_path}: {e}"
)
-
+ if not parsed_text:
+ return []
# Extract and process images from parsed_text
if is_markdown and parsed_text and self.image_parser:
parsed_text = self._extract_and_process_images(parsed_text, info, **kwargs)
diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py
index 555f1f110..0c3645b49 100644
--- a/src/memos/mem_reader/simple_struct.py
+++ b/src/memos/mem_reader/simple_struct.py
@@ -26,6 +26,8 @@
from memos.templates.mem_reader_prompts import (
CUSTOM_TAGS_INSTRUCTION,
CUSTOM_TAGS_INSTRUCTION_ZH,
+ GENERAL_STRUCT_STRING_READER_PROMPT,
+ GENERAL_STRUCT_STRING_READER_PROMPT_ZH,
PROMPT_MAPPING,
SIMPLE_STRUCT_DOC_READER_PROMPT,
SIMPLE_STRUCT_DOC_READER_PROMPT_ZH,
@@ -79,6 +81,10 @@ def from_config(_config):
"zh_example": SIMPLE_STRUCT_MEM_READER_EXAMPLE_ZH,
},
"doc": {"en": SIMPLE_STRUCT_DOC_READER_PROMPT, "zh": SIMPLE_STRUCT_DOC_READER_PROMPT_ZH},
+ "general_string": {
+ "en": GENERAL_STRUCT_STRING_READER_PROMPT,
+ "zh": GENERAL_STRUCT_STRING_READER_PROMPT_ZH,
+ },
"custom_tags": {"en": CUSTOM_TAGS_INSTRUCTION, "zh": CUSTOM_TAGS_INSTRUCTION_ZH},
}
diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py
index 95f4e780d..c96d5a12a 100644
--- a/src/memos/memories/textual/tree_text_memory/organize/manager.py
+++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py
@@ -135,7 +135,7 @@ def _add_memories_parallel(
return added_ids
def _add_memories_batch(
- self, memories: list[TextualMemoryItem], user_name: str | None = None, batch_size: int = 50
+ self, memories: list[TextualMemoryItem], user_name: str | None = None, batch_size: int = 5
) -> list[str]:
"""
Add memories using batch database operations (more efficient for large batches).
@@ -200,25 +200,31 @@ def _add_memories_batch(
graph_node_ids.append(graph_node_id)
added_ids.append(graph_node_id)
- for i in range(0, len(working_nodes), batch_size):
- batch = working_nodes[i : i + batch_size]
- try:
- self.graph_store.add_nodes_batch(batch, user_name=user_name)
- except Exception as e:
- logger.exception(
- f"Batch add WorkingMemory nodes error (batch {i // batch_size + 1}): ",
- exc_info=e,
- )
-
- for i in range(0, len(graph_nodes), batch_size):
- batch = graph_nodes[i : i + batch_size]
- try:
- self.graph_store.add_nodes_batch(batch, user_name=user_name)
- except Exception as e:
- logger.exception(
- f"Batch add graph memory nodes error (batch {i // batch_size + 1}): ",
- exc_info=e,
- )
+ def _submit_batches(nodes: list[dict], node_kind: str) -> None:
+ if not nodes:
+ return
+
+ max_workers = min(8, max(1, len(nodes) // max(1, batch_size)))
+ with ContextThreadPoolExecutor(max_workers=max_workers) as executor:
+ futures: list[tuple[int, int, object]] = []
+ for batch_index, i in enumerate(range(0, len(nodes), batch_size), start=1):
+ batch = nodes[i : i + batch_size]
+ fut = executor.submit(
+ self.graph_store.add_nodes_batch, batch, user_name=user_name
+ )
+ futures.append((batch_index, len(batch), fut))
+
+ for idx, size, fut in futures:
+ try:
+ fut.result()
+ except Exception as e:
+ logger.exception(
+ f"Batch add {node_kind} nodes error (batch {idx}, size {size}): ",
+ exc_info=e,
+ )
+
+ _submit_batches(working_nodes, "WorkingMemory")
+ _submit_batches(graph_nodes, "graph memory")
if graph_node_ids and self.is_reorganize:
self.reorganizer.add_message(QueueMessage(op="add", after_node=graph_node_ids))
diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py
index cf8456c80..4ac12eb70 100644
--- a/src/memos/templates/mem_reader_prompts.py
+++ b/src/memos/templates/mem_reader_prompts.py
@@ -223,7 +223,6 @@
Your Output:"""
-
SIMPLE_STRUCT_DOC_READER_PROMPT_ZH = """您是搜索与检索系统的文本分析专家。
您的任务是处理文档片段,并生成一个结构化的 JSON 对象。
@@ -258,11 +257,215 @@
{custom_tags_prompt}
+示例:
+输入的文本片段:
+在Kalamang语中,亲属名词在所有格构式中的行为并不一致。名词 esa“父亲”和 ema“母亲”只能在技术称谓(teknonym)中与第三人称所有格后缀共现,而在非技术称谓用法中,带有所有格后缀是不合语法的。相比之下,大多数其他亲属名词并不允许所有格构式,只有极少数例外。
+语料中还发现一种“双重所有格标记”的现象,即名词同时带有所有格后缀和独立的所有格代词。这种构式在语料中极为罕见,其语用功能尚不明确,且多出现在马来语借词中,但也偶尔见于Kalamang本族词。
+此外,黏着词 =kin 可用于表达多种关联关系,包括目的性关联、空间关联以及泛指的群体所有关系。在此类构式中,被标记的通常是施事或关联方,而非被拥有物本身。这一用法显示出 =kin 可能处于近期语法化阶段。
+
+输出:
+{
+ "memory list": [
+ {
+ "key": "亲属名词在所有格构式中的不一致行为",
+ "memory_type": "LongTermMemory",
+ "value": "Kalamang语中的亲属名词在所有格构式中的行为存在显著差异,其中“父亲”(esa)和“母亲”(ema)仅能在技术称谓用法中与第三人称所有格后缀共现,而在非技术称谓中带所有格后缀是不合语法的。",
+ "tags": ["亲属名词", "所有格", "语法限制"]
+ },
+ {
+ "key": "双重所有格标记现象",
+ "memory_type": "LongTermMemory",
+ "value": "语料中存在名词同时带有所有格后缀和独立所有格代词的双重所有格标记构式,但该现象出现频率极低,其具体语用功能尚不明确。",
+ "tags": ["双重所有格", "罕见构式", "语用功能"]
+ },
+ {
+ "key": "双重所有格与借词的关系",
+ "memory_type": "LongTermMemory",
+ "value": "双重所有格标记多见于马来语借词中,但也偶尔出现在Kalamang本族词中,显示该构式并非完全由语言接触触发。",
+ "tags": ["语言接触", "借词", "构式分布"]
+ },
+ {
+ "key": "=kin 的关联功能与语法地位",
+ "memory_type": "LongTermMemory",
+ "value": "黏着词 =kin 用于表达目的性、空间或群体性的关联关系,其标记对象通常为关联方而非被拥有物,这表明 =kin 可能处于近期语法化过程中。",
+ "tags": ["=kin", "关联关系", "语法化"]
+ }
+ ],
+ "summary": "该文本描述了Kalamang语中所有格构式的多样性与不对称性。亲属名词在所有格标记上的限制显示出语义类别内部的分化,而罕见的双重所有格构式则反映了构式层面的不稳定性。同时,=kin 的多功能关联用法及其分布特征为理解该语言的语法化路径提供了重要线索。"
+}
+
+文档片段:
+{chunk_text}
+
+您的输出:"""
+
+GENERAL_STRUCT_STRING_READER_PROMPT = """You are a text analysis expert for search and retrieval systems.
+Your task is to parse a text chunk into multiple structured memories for long-term storage and precise future retrieval. The text chunk may contain information from various sources, including conversations, plain text, speech-to-text transcripts, tables, tool documentation, and more.
+
+Please perform the following steps:
+
+1. Decompose the text chunk into multiple memories that are mutually independent, minimally redundant, and each fully expresses a single information point. Together, these memories should cover different aspects of the document so that a reader can understand all core content without reading the original text.
+
+2. Memory splitting and deduplication rules (very important):
+2.1 Each memory must express only one primary information point, such as:
+ - A fact
+ - A clear conclusion or judgment
+ - A decision or action
+ - An important background or condition
+ - A notable emotional tone or attitude
+ - A plan, risk, or downstream impact
+
+2.2 Do not force multiple information points into a single memory.
+
+2.3 Do not generate memories that are semantically repetitive or highly overlapping:
+ - If two memories describe the same fact or judgment, retain only the one with more complete information.
+ - Do not create “different” memories solely by rephrasing.
+
+2.4 There is no fixed upper or lower limit on the number of memories; the count should be determined naturally by the information density of the text.
+
+3. Information parsing requirements:
+3.1 Identify and clearly specify all important:
+ - Times (distinguishing event time from document recording time)
+ - People (resolving pronouns and aliases to explicit identities)
+ - Organizations, locations, and events
+
+3.2 Explicitly resolve all references to time, people, locations, and events:
+ - When context allows, convert relative time expressions (e.g., “last year,” “next quarter”) into absolute dates.
+ - If uncertainty exists, explicitly state it (e.g., “around 2024,” “exact date unknown”).
+ - Include specific locations when mentioned.
+ - Resolve all pronouns, aliases, and ambiguous references to full names or clear identities.
+ - Disambiguate entities with the same name when necessary.
+
+4. Writing and perspective rules:
+ - Always write in the third person, clearly referring to subjects or content, and avoid first-person expressions (“I,” “we,” “my”).
+ - Use precise, neutral language and do not infer or introduce information not explicitly stated in the text.
+
+Return a valid JSON object with the following structure:
+
+{
+ "memory list": [
+ {
+ "key": ,
+ "memory_type": "LongTermMemory",
+ "value": ,
+ "tags":
+ },
+ ...
+ ],
+ "summary":
+}
+
+Language rules:
+- The `key`, `value`, `tags`, and `summary` fields must use the same primary language as the input document. **If the input is Chinese, output must be in Chinese.**
+- `memory_type` must remain in English.
+
+{custom_tags_prompt}
+
+Example:
+Text chunk:
+
+In Kalamang, kinship terms show uneven behavior in possessive constructions. The nouns esa ‘father’ and ema ‘mother’ can only co-occur with a third-person possessive suffix when used as teknonyms; outside of such contexts, possessive marking is ungrammatical. Most other kinship terms do not allow possessive constructions, with only a few marginal exceptions.
+
+The corpus also contains rare cases of double possessive marking, in which a noun bears both a possessive suffix and a free possessive pronoun. This construction is infrequent and its discourse function remains unclear. While it appears more often with Malay loanwords, it is not restricted to borrowed vocabulary.
+
+In addition, the clitic =kin encodes a range of associative relations, including purposive, spatial, and collective ownership. In such constructions, the marked element typically corresponds to the possessor or associated entity rather than the possessed item, suggesting that =kin may be undergoing recent grammaticalization.
+
+Output:
+{
+ "memory list": [
+ {
+ "key": "Asymmetric possessive behavior of kinship terms",
+ "memory_type": "LongTermMemory",
+ "value": "In Kalamang, kinship terms do not behave uniformly in possessive constructions: ‘father’ (esa) and ‘mother’ (ema) require a teknonymic context to appear with a third-person possessive suffix, whereas possessive marking is otherwise ungrammatical.",
+ "tags": ["kinship terms", "possessive constructions", "grammatical constraints"]
+ },
+ {
+ "key": "Rare double possessive marking",
+ "memory_type": "LongTermMemory",
+ "value": "The language exhibits a rare construction in which a noun carries both a possessive suffix and a free possessive pronoun, though the pragmatic function of this double marking remains unclear.",
+ "tags": ["double possessive", "rare constructions", "pragmatics"]
+ },
+ {
+ "key": "Distribution of double possessives across lexicon",
+ "memory_type": "LongTermMemory",
+ "value": "Double possessive constructions occur more frequently with Malay loanwords but are also attested with indigenous Kalamang vocabulary, indicating that the pattern is not solely contact-induced.",
+ "tags": ["loanwords", "language contact", "distribution"]
+ },
+ {
+ "key": "Associative clitic =kin",
+ "memory_type": "LongTermMemory",
+ "value": "The clitic =kin marks various associative relations, including purposive, spatial, and collective ownership, typically targeting the possessor or associated entity, and appears to reflect an ongoing process of grammaticalization.",
+ "tags": ["=kin", "associative relations", "grammaticalization"]
+ }
+ ],
+ "summary": "The text outlines key properties of possessive and associative constructions in Kalamang. Kinship terms exhibit asymmetric grammatical behavior, rare double possessive patterns suggest constructional instability, and the multifunctional clitic =kin provides evidence for evolving associative marking within the language’s grammar."
+}
+
+Text chunk:
+{chunk_text}
+
+Your output:
+"""
+
+GENERAL_STRUCT_STRING_READER_PROMPT_ZH = """您是搜索与检索系统的文本分析专家。
+您的任务是将一个文本片段解析为【多条结构化记忆】,用于长期存储和后续精准检索,这里的文本片段可能包含各种对话、纯文本、语音转录的文字、表格、工具说明等等的信息。
+
+请执行以下操作:
+1. 将文档片段拆解为若干条【相互独立、尽量不重复、各自完整表达单一信息点】的记忆。这些记忆应共同覆盖文档的不同方面,使读者无需阅读原文即可理解该文档的全部核心内容。
+2. 记忆拆分与去重规则(非常重要):
+2.1 每一条记忆应只表达【一个主要信息点】:
+ - 一个事实
+ - 一个明确结论或判断
+ - 一个决定或行动
+ - 一个重要背景或条件
+ - 一个显著的情感基调或态度
+ - 一个计划、风险或后续影响
+2.2 不要将多个信息点强行合并到同一条记忆中。
+2.3 不要生成语义重复或高度重叠的记忆:
+ - 如果两条记忆表达的是同一事实或同一判断,只保留信息更完整的一条。
+ - 不允许仅通过措辞变化来制造“不同”的记忆。
+2.4 记忆条数不设固定上限或下限,应由文档信息密度自然决定。
+3. 信息解析要求
+3.1 识别并明确所有重要的:
+ - 时间(区分事件发生时间与文档记录时间)
+ - 人物(解析代词、别名为明确身份)
+ - 组织、地点、事件
+3.2 清晰解析所有时间、人物、地点和事件的指代:
+ - 如果上下文允许,将相对时间表达(如“去年”、“下一季度”)转换为绝对日期。
+ - 如果存在不确定性,需明确说明(例如,“约2024年”,“具体日期不详”)。
+ - 若提及具体地点,请包含在内。
+ - 将所有代词、别名和模糊指代解析为全名或明确身份。
+ - 如有同名实体,需加以区分。
+4. 写作与视角规则
+ - 始终以第三人称视角撰写,清晰指代主题或内容,避免使用第一人称(“我”、“我们”、“我的”)。
+ - 语言应准确、中性,不自行引申文档未明确表达的内容。
+
+返回一个有效的 JSON 对象,结构如下:
+{
+ "memory list": [
+ {
+ "key": <字符串,简洁且唯一的记忆标题>,
+ "memory_type": "LongTermMemory",
+ "value": <一段完整、清晰、可独立理解的记忆描述;若输入为中文则使用中文,若为英文则使用英文>,
+ "tags": <与该记忆高度相关的主题关键词列表>
+ },
+ ...
+ ],
+ "summary": <一段整体性总结,概括这些记忆如何共同反映文档的核心内容与重点,语言与输入文档一致>
+}
+
+语言规则:
+- `key`、`value`、`tags`、`summary` 字段必须与输入文档摘要的主要语言一致。**如果输入是中文,请输出中文**
+- `memory_type` 保持英文。
+
+{custom_tags_prompt}
+
文档片段:
{chunk_text}
您的输出:"""
+
SIMPLE_STRUCT_MEM_READER_EXAMPLE = """Example:
Conversation:
user: [June 26, 2025 at 3:00 PM]: Hi Jerry! Yesterday at 3 PM I had a meeting with my team about the new project.