diff --git a/evaluation/scripts/long_bench-v2/longbench_v2_ingestion.py b/evaluation/scripts/long_bench-v2/longbench_v2_ingestion.py index d84a63d93..fc65e4975 100644 --- a/evaluation/scripts/long_bench-v2/longbench_v2_ingestion.py +++ b/evaluation/scripts/long_bench-v2/longbench_v2_ingestion.py @@ -106,7 +106,7 @@ def main(frame, version="default", num_workers=10, max_samples=None): # Initialize checkpoint file for resume functionality checkpoint_dir = os.path.join( - ROOT_DIR, "evaluation", "results", "longbench_v2", f"{frame}-{version}" + ROOT_DIR, "evaluation", "results", "long_bench_v2", f"{frame}-{version}" ) os.makedirs(checkpoint_dir, exist_ok=True) record_file = os.path.join(checkpoint_dir, "success_records.txt") @@ -179,13 +179,13 @@ def main(frame, version="default", num_workers=10, max_samples=None): parser.add_argument( "--version", type=str, - default="long-bench-v2-1208-1556", + default="default", help="Version identifier for saving results", ) parser.add_argument( "--workers", type=int, - default=20, + default=3, help="Number of parallel workers", ) parser.add_argument( diff --git a/evaluation/scripts/long_bench-v2/longbench_v2_ingestion_async.py b/evaluation/scripts/long_bench-v2/longbench_v2_ingestion_async.py deleted file mode 100644 index c23d7885f..000000000 --- a/evaluation/scripts/long_bench-v2/longbench_v2_ingestion_async.py +++ /dev/null @@ -1,158 +0,0 @@ -import argparse -import json -import os -import sys - -from concurrent.futures import ThreadPoolExecutor, as_completed - -from dotenv import load_dotenv -from tqdm import tqdm - - -ROOT_DIR = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -) -EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts") - -sys.path.insert(0, ROOT_DIR) -sys.path.insert(0, EVAL_SCRIPTS_DIR) - - -def ingest_sample(client, sample, sample_idx, frame, version): - """Ingest a single LongBench v2 sample as memories.""" - user_id = f"longbench_v2_{sample_idx}_{version}" - conv_id = f"longbench_v2_{sample_idx}_{version}" - - # Get context and convert to messages - context = sample.get("context", "") - - # For memos, we ingest the context as document content - messages = [ - { - "type": "file", - "file": { - "file_data": context, - "file_id": str(sample_idx), - }, - } - ] - - if "memos-api" in frame: - try: - client.add(messages=messages, user_id=user_id, conv_id=conv_id, batch_size=1) - print(f"✅ [{frame}] Ingested sample {sample_idx}") - return True - except Exception as e: - print(f"❌ [{frame}] Error ingesting sample {sample_idx}: {e}") - return False - - return False - - -def load_dataset_from_local(): - """Load LongBench v2 dataset from local JSON file.""" - data_dir = os.path.join( - os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), - "data", - "long_bench_v2", - ) - - filepath = os.path.join(data_dir, "data.json") - - if not os.path.exists(filepath): - raise FileNotFoundError(f"Dataset file not found: {filepath}") - - # Load JSON file - with open(filepath, encoding="utf-8") as f: - samples = json.load(f) - - return samples - - -def main(frame, version="default", num_workers=10, max_samples=None): - """Main ingestion function.""" - load_dotenv() - - print("\n" + "=" * 80) - print(f"🚀 LONGBENCH V2 INGESTION - {frame.upper()} v{version}".center(80)) - print("=" * 80 + "\n") - - # Load dataset from local file - try: - dataset = load_dataset_from_local() - print(f"Loaded {len(dataset)} samples from LongBench v2") - except FileNotFoundError as e: - print(f"❌ Error loading dataset: {e}") - return - except Exception as e: - print(f"❌ Error loading dataset: {e}") - return - - # Limit samples if specified - if max_samples: - dataset = dataset[:max_samples] - print(f"Limited to {len(dataset)} samples") - - # Initialize client - client = None - if frame == "memos-api": - from utils.client import MemosApiClient - - client = MemosApiClient() - else: - print(f"❌ Unsupported frame: {frame}") - return - - # Ingest samples - success_count = 0 - with ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = [] - for idx, sample in enumerate(dataset): - future = executor.submit(ingest_sample, client, sample, idx, frame, version) - futures.append(future) - - for future in tqdm( - as_completed(futures), - total=len(futures), - desc="Ingesting LongBench v2", - ): - try: - if future.result(): - success_count += 1 - except Exception as e: - print(f"Error processing sample: {e}") - - print(f"\n{'=' * 80}") - print(f"✅ INGESTION COMPLETE: {success_count}/{len(dataset)} samples ingested".center(80)) - print(f"{'=' * 80}\n") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument( - "--lib", - type=str, - choices=["memos-api", "memos-api-online"], - default="memos-api", - ) - parser.add_argument( - "--version", - type=str, - default="long-bench-v2-1208-1556-async", - help="Version identifier for saving results", - ) - parser.add_argument( - "--workers", - type=int, - default=20, - help="Number of parallel workers", - ) - parser.add_argument( - "--max_samples", - type=int, - default=None, - help="Maximum number of samples to process (default: all)", - ) - args = parser.parse_args() - - main(args.lib, args.version, args.workers, args.max_samples) diff --git a/evaluation/scripts/long_bench-v2/longbench_v2_metric.py b/evaluation/scripts/long_bench-v2/longbench_v2_metric.py index 5fee9a3de..6a4fc2b7f 100644 --- a/evaluation/scripts/long_bench-v2/longbench_v2_metric.py +++ b/evaluation/scripts/long_bench-v2/longbench_v2_metric.py @@ -83,7 +83,7 @@ def main(frame, version="default"): print("=" * 80 + "\n") # Load responses - responses_path = f"results/long_bench-v2/{frame}-{version}/{frame}_longbench_v2_responses.json" + responses_path = f"results/long_bench_v2/{frame}-{version}/{frame}_longbench_v2_responses.json" if not os.path.exists(responses_path): print(f"❌ Responses not found: {responses_path}") print("Please run longbench_v2_responses.py first") @@ -92,11 +92,14 @@ def main(frame, version="default"): with open(responses_path, encoding="utf-8") as f: responses = json.load(f) + # Only keep entries with non-empty context (search_context) to align with response generation + filtered = [r for r in responses if str(r.get("search_context", "")).strip() != ""] + # Calculate metrics - metrics = calculate_accuracy(responses) + metrics = calculate_accuracy(filtered) # Save metrics - output_path = f"results/long_bench-v2/{frame}-{version}/{frame}_longbench_v2_metrics.json" + output_path = f"results/long_bench_v2/{frame}-{version}/{frame}_longbench_v2_metrics.json" os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, "w", encoding="utf-8") as f: diff --git a/evaluation/scripts/long_bench-v2/longbench_v2_responses.py b/evaluation/scripts/long_bench-v2/longbench_v2_responses.py index 3e19dc95f..cc1586112 100644 --- a/evaluation/scripts/long_bench-v2/longbench_v2_responses.py +++ b/evaluation/scripts/long_bench-v2/longbench_v2_responses.py @@ -3,6 +3,7 @@ import os import re import sys +import threading from concurrent.futures import ThreadPoolExecutor, as_completed from time import time @@ -85,8 +86,13 @@ def generate_response(llm_client, context, question, choice_a, choice_b, choice_ return "" -def process_sample(search_result, llm_client): +def process_sample(search_result, llm_client, success_records, record_file, file_lock): """Process a single sample: generate answer.""" + sample_idx = search_result.get("sample_idx") + # Skip if already processed + if sample_idx is not None and str(sample_idx) in success_records: + return None + start = time() context = search_result.get("context", "") @@ -96,6 +102,10 @@ def process_sample(search_result, llm_client): choice_c = search_result.get("choice_C", "") choice_d = search_result.get("choice_D", "") + # Skip empty/placeholder contexts (e.g., "\n" or whitespace-only) + if not context or context.strip() == "": + return None + # Generate answer response = generate_response( llm_client, context, question, choice_a, choice_b, choice_c, choice_d @@ -106,7 +116,7 @@ def process_sample(search_result, llm_client): response_duration_ms = (time() - start) * 1000 - return { + result = { "sample_idx": search_result.get("sample_idx"), "_id": search_result.get("_id"), "domain": search_result.get("domain"), @@ -123,10 +133,20 @@ def process_sample(search_result, llm_client): "response": response, "judge": pred == search_result.get("answer") if pred else False, "search_context": context, + # Preserve full search results payload (e.g., list of memories) + "search_results": search_result.get("search_results"), "response_duration_ms": response_duration_ms, "search_duration_ms": search_result.get("search_duration_ms", 0), } + # Record successful processing (thread-safe) + if sample_idx is not None: + with file_lock, open(record_file, "a") as f: + f.write(f"{sample_idx}\n") + f.flush() + + return result + def main(frame, version="default", num_workers=10): """Main response generation function.""" @@ -136,10 +156,16 @@ def main(frame, version="default", num_workers=10): print(f"🚀 LONGBENCH V2 RESPONSE GENERATION - {frame.upper()} v{version}".center(80)) print("=" * 80 + "\n") - # Load search results - search_path = ( - f"results/long_bench-v2/{frame}-{version}/{frame}_longbench_v2_search_results.json" + # Initialize checkpoint file for resume functionality + checkpoint_dir = os.path.join( + ROOT_DIR, "evaluation", "results", "long_bench_v2", f"{frame}-{version}" ) + os.makedirs(checkpoint_dir, exist_ok=True) + record_file = os.path.join(checkpoint_dir, "response_success_records.txt") + search_path = os.path.join(checkpoint_dir, f"{frame}_longbench_v2_search_results.json") + output_path = os.path.join(checkpoint_dir, f"{frame}_longbench_v2_responses.json") + + # Load search results if not os.path.exists(search_path): print(f"❌ Search results not found: {search_path}") print("Please run longbench_v2_search.py first") @@ -148,6 +174,30 @@ def main(frame, version="default", num_workers=10): with open(search_path, encoding="utf-8") as f: search_results = json.load(f) + # Load existing results and success records for resume + existing_results = {} + success_records = set() + if os.path.exists(output_path): + with open(output_path, encoding="utf-8") as f: + existing_results_list = json.load(f) + for result in existing_results_list: + sample_idx = result.get("sample_idx") + if sample_idx is not None: + existing_results[sample_idx] = result + success_records.add(str(sample_idx)) + print(f"📋 Found {len(existing_results)} existing responses (resume mode)") + else: + print("📋 Starting fresh response generation (no checkpoint found)") + + # Load additional success records from checkpoint file + if os.path.exists(record_file): + with open(record_file) as f: + for line in f: + line = line.strip() + if line and line not in success_records: + success_records.add(line) + print(f"📋 Total {len(success_records)} samples already processed") + # Initialize LLM client llm_client = OpenAI( api_key=os.getenv("CHAT_MODEL_API_KEY"), @@ -156,9 +206,15 @@ def main(frame, version="default", num_workers=10): print(f"🔌 Using OpenAI client with model: {os.getenv('CHAT_MODEL')}") # Process all samples - all_responses = [] + new_results = [] + file_lock = threading.Lock() # Lock for thread-safe file writing with ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = [executor.submit(process_sample, sample, llm_client) for sample in search_results] + futures = [ + executor.submit( + process_sample, sample, llm_client, success_records, record_file, file_lock + ) + for sample in search_results + ] for future in tqdm( as_completed(futures), @@ -167,11 +223,16 @@ def main(frame, version="default", num_workers=10): ): result = future.result() if result: - all_responses.append(result) - - # Save responses - output_path = f"results/long_bench-v2/{frame}-{version}/{frame}_longbench_v2_responses.json" - os.makedirs(os.path.dirname(output_path), exist_ok=True) + new_results.append(result) + # Update existing results with new result + sample_idx = result.get("sample_idx") + if sample_idx is not None: + existing_results[sample_idx] = result + + # Merge and save all results + all_responses = list(existing_results.values()) + # Sort by sample_idx to maintain order + all_responses.sort(key=lambda x: x.get("sample_idx", 0)) with open(output_path, "w", encoding="utf-8") as f: json.dump(all_responses, f, ensure_ascii=False, indent=2) diff --git a/evaluation/scripts/long_bench-v2/longbench_v2_search.py b/evaluation/scripts/long_bench-v2/longbench_v2_search.py index f46928498..9730e937e 100644 --- a/evaluation/scripts/long_bench-v2/longbench_v2_search.py +++ b/evaluation/scripts/long_bench-v2/longbench_v2_search.py @@ -2,6 +2,7 @@ import json import os import sys +import threading from concurrent.futures import ThreadPoolExecutor, as_completed from time import time @@ -24,32 +25,82 @@ def memos_api_search(client, query, user_id, top_k, frame): start = time() search_results = client.search(query=query, user_id=user_id, top_k=top_k) - # Format context from search results based on frame type + def _reorder_memories_by_sources(sr: dict) -> list: + """ + Reorder text_mem[0].memories using sources' chunk_index (ascending). + Falls back to original order if no chunk_index is found. + """ + if not isinstance(sr, dict): + return [] + text_mem = sr.get("text_mem") or [] + if not text_mem or not text_mem[0].get("memories"): + return [] + memories = list(text_mem[0]["memories"]) + + def _first_source(mem: dict): + if not isinstance(mem, dict): + return None + # Prefer top-level sources, else metadata.sources + return (mem.get("sources") or mem.get("metadata", {}).get("sources") or []) or None + + def _chunk_index(mem: dict): + srcs = _first_source(mem) + if not srcs or not isinstance(srcs, list): + return None + for s in srcs: + if isinstance(s, dict) and s.get("chunk_index") is not None: + return s.get("chunk_index") + return None + + # Collect keys + keyed = [] + for i, mem in enumerate(memories): + ci = _chunk_index(mem) + keyed.append((ci, i, mem)) # keep original order as tie-breaker + + # If no chunk_index present at all, return original + if all(ci is None for ci, _, _ in keyed): + return memories + + keyed.sort(key=lambda x: (float("inf") if x[0] is None else x[0], x[1])) + return [k[2] for k in keyed] + + # Format context from search results based on frame type for backward compatibility context = "" if ( (frame == "memos-api" or frame == "memos-api-online") and isinstance(search_results, dict) and "text_mem" in search_results ): - context = "\n".join([i["memory"] for i in search_results["text_mem"][0]["memories"]]) + ordered_memories = _reorder_memories_by_sources(search_results) + if not ordered_memories and search_results["text_mem"][0].get("memories"): + ordered_memories = search_results["text_mem"][0]["memories"] + + context = "\n".join([i.get("memory", "") for i in ordered_memories]) if "pref_string" in search_results: context += f"\n{search_results.get('pref_string', '')}" duration_ms = (time() - start) * 1000 - return context, duration_ms + return context, duration_ms, search_results -def process_sample(client, sample, sample_idx, frame, version, top_k): +def process_sample( + client, sample, sample_idx, frame, version, top_k, success_records, record_file, file_lock +): """Process a single sample: search for relevant memories.""" + # Skip if already processed + if str(sample_idx) in success_records: + return None + user_id = f"longbench_v2_{sample_idx}_{version}" query = sample.get("question", "") if not query: return None - context, duration_ms = memos_api_search(client, query, user_id, top_k, frame) + context, duration_ms, search_results = memos_api_search(client, query, user_id, top_k, frame) - return { + result = { "sample_idx": sample_idx, "_id": sample.get("_id"), "domain": sample.get("domain"), @@ -63,9 +114,18 @@ def process_sample(client, sample, sample_idx, frame, version, top_k): "choice_D": sample.get("choice_D"), "answer": sample.get("answer"), "context": context, + # Preserve full search results instead of only the concatenated context + "search_results": search_results, "search_duration_ms": duration_ms, } + # Record successful processing (thread-safe) + with file_lock, open(record_file, "a") as f: + f.write(f"{sample_idx}\n") + f.flush() + + return result + def load_dataset_from_local(): """Load LongBench v2 dataset from local JSON file.""" @@ -111,6 +171,38 @@ def main(frame, version="default", num_workers=10, top_k=20, max_samples=None): dataset = dataset[:max_samples] print(f"Limited to {len(dataset)} samples") + # Initialize checkpoint file for resume functionality + checkpoint_dir = os.path.join( + ROOT_DIR, "evaluation", "results", "long_bench_v2", f"{frame}-{version}" + ) + os.makedirs(checkpoint_dir, exist_ok=True) + record_file = os.path.join(checkpoint_dir, "search_success_records.txt") + output_path = os.path.join(checkpoint_dir, f"{frame}_longbench_v2_search_results.json") + + # Load existing results and success records for resume + existing_results = {} + success_records = set() + if os.path.exists(output_path): + with open(output_path, encoding="utf-8") as f: + existing_results_list = json.load(f) + for result in existing_results_list: + sample_idx = result.get("sample_idx") + if sample_idx is not None: + existing_results[sample_idx] = result + success_records.add(str(sample_idx)) + print(f"📋 Found {len(existing_results)} existing search results (resume mode)") + else: + print("📋 Starting fresh search (no checkpoint found)") + + # Load additional success records from checkpoint file + if os.path.exists(record_file): + with open(record_file) as f: + for line in f: + line = line.strip() + if line and line not in success_records: + success_records.add(line) + print(f"📋 Total {len(success_records)} samples already processed") + # Initialize client client = None if frame == "memos-api": @@ -126,11 +218,23 @@ def main(frame, version="default", num_workers=10, top_k=20, max_samples=None): return # Process samples - search_results = [] + new_results = [] + file_lock = threading.Lock() # Lock for thread-safe file writing with ThreadPoolExecutor(max_workers=num_workers) as executor: futures = [] for idx, sample in enumerate(dataset): - future = executor.submit(process_sample, client, sample, idx, frame, version, top_k) + future = executor.submit( + process_sample, + client, + sample, + idx, + frame, + version, + top_k, + success_records, + record_file, + file_lock, + ) futures.append(future) for future in tqdm( @@ -140,13 +244,17 @@ def main(frame, version="default", num_workers=10, top_k=20, max_samples=None): ): result = future.result() if result: - search_results.append(result) + new_results.append(result) + # Update existing results with new result + sample_idx = result.get("sample_idx") + if sample_idx is not None: + existing_results[sample_idx] = result + + # Merge and save all results + search_results = list(existing_results.values()) + # Sort by sample_idx to maintain order + search_results.sort(key=lambda x: x.get("sample_idx", 0)) - # Save results - os.makedirs(f"results/long_bench-v2/{frame}-{version}/", exist_ok=True) - output_path = ( - f"results/long_bench-v2/{frame}-{version}/{frame}_longbench_v2_search_results.json" - ) with open(output_path, "w", encoding="utf-8") as f: json.dump(search_results, f, ensure_ascii=False, indent=2) @@ -172,7 +280,7 @@ def main(frame, version="default", num_workers=10, top_k=20, max_samples=None): parser.add_argument( "--workers", type=int, - default=10, + default=1, help="Number of parallel workers", ) parser.add_argument( diff --git a/evaluation/scripts/long_bench-v2/wait_scheduler.py b/evaluation/scripts/long_bench-v2/wait_scheduler.py new file mode 100644 index 000000000..716869a11 --- /dev/null +++ b/evaluation/scripts/long_bench-v2/wait_scheduler.py @@ -0,0 +1,67 @@ +import os +import time + +import requests + +from dotenv import load_dotenv + + +def wait_until_completed(params: dict, interval: float = 2.0, timeout: float = 600.0): + """ + Keep polling /product/scheduler/status until status == 'completed' (or terminal). + + params: dict passed as query params, e.g. {"user_id": "xxx"} or {"user_id": "xxx", "task_id": "..."} + interval: seconds between polls + timeout: max seconds to wait before raising TimeoutError + """ + load_dotenv() + base_url = os.getenv("MEMOS_URL") + if not base_url: + raise RuntimeError("MEMOS_URL not set in environment") + + url = f"{base_url}/product/scheduler/status" + start = time.time() + active_states = {"waiting", "pending", "in_progress"} + + while True: + resp = requests.get(url, params=params, timeout=10) + resp.raise_for_status() + data = resp.json() + + items = data.get("data", []) if isinstance(data, dict) else [] + statuses = [item.get("status") for item in items if isinstance(item, dict)] + status_set = set(statuses) + + # Print current status snapshot + print(f"Current status: {status_set or 'empty'}") + + # Completed if no active states remain + if not status_set or status_set.isdisjoint(active_states): + print("Task completed!") + return data + + if (time.time() - start) > timeout: + raise TimeoutError(f"Timeout after {timeout}s; last statuses={status_set or 'empty'}") + + time.sleep(interval) + + +if __name__ == "__main__": + import argparse + import json + + parser = argparse.ArgumentParser() + parser.add_argument( + "--user_id", default="longbench_v2_0_long-bench-v2-1208-2119-async", help="User ID to query" + ) + parser.add_argument("--task_id", help="Optional task_id to query") + parser.add_argument("--interval", type=float, default=2.0, help="Poll interval seconds") + parser.add_argument("--timeout", type=float, default=600.0, help="Timeout seconds") + args = parser.parse_args() + + params = {"user_id": args.user_id} + if args.task_id: + params["task_id"] = args.task_id + + result = wait_until_completed(params, interval=args.interval, timeout=args.timeout) + print(json.dumps(result, indent=2, ensure_ascii=False)) diff --git a/evaluation/scripts/run_longbench_v2_eval.sh b/evaluation/scripts/run_longbench_v2_eval.sh new file mode 100755 index 000000000..917c57bfb --- /dev/null +++ b/evaluation/scripts/run_longbench_v2_eval.sh @@ -0,0 +1,110 @@ +#!/bin/bash + +# Common parameters for all scripts +LIB="memos-api" +VERSION="long-bench-v2-1208-1556-async" +WORKERS=10 +TOPK=20 +MAX_SAMPLES="" # Empty means all samples +WAIT_INTERVAL=2 # seconds between polls +WAIT_TIMEOUT=900 # seconds per user + +# Parse command line arguments +while [[ $# -gt 0 ]]; do + case $1 in + --lib) + LIB="$2" + shift 2 + ;; + --version) + VERSION="$2" + shift 2 + ;; + --workers) + WORKERS="$2" + shift 2 + ;; + --top_k) + TOPK="$2" + shift 2 + ;; + --max_samples) + MAX_SAMPLES="$2" + shift 2 + ;; + *) + echo "Unknown option: $1" + exit 1 + ;; + esac +done + +# Build max_samples argument +MAX_SAMPLES_ARG="" +if [ -n "$MAX_SAMPLES" ]; then + MAX_SAMPLES_ARG="--max_samples $MAX_SAMPLES" +fi + +echo "Running LongBench v2 evaluation with:" +echo " LIB: $LIB" +echo " VERSION: $VERSION" +echo " WORKERS: $WORKERS" +echo " TOPK: $TOPK" +echo " MAX_SAMPLES: ${MAX_SAMPLES:-all}" +echo "" + +# Step 2: Search +echo "" +echo "==========================================" +echo "Step 2: Running longbench_v2_search.py..." +echo "==========================================" +python scripts/long_bench-v2/longbench_v2_search.py \ + --lib $LIB \ + --version $VERSION \ + --top_k $TOPK \ + --workers $WORKERS \ + $MAX_SAMPLES_ARG + +if [ $? -ne 0 ]; then + echo "Error running longbench_v2_search.py" + exit 1 +fi + +# Step 3: Response Generation +echo "" +echo "==========================================" +echo "Step 3: Running longbench_v2_responses.py..." +echo "==========================================" +python scripts/long_bench-v2/longbench_v2_responses.py \ + --lib $LIB \ + --version $VERSION \ + --workers $WORKERS + +if [ $? -ne 0 ]; then + echo "Error running longbench_v2_responses.py" + exit 1 +fi + +# Step 4: Metrics Calculation +echo "" +echo "==========================================" +echo "Step 4: Running longbench_v2_metric.py..." +echo "==========================================" +python scripts/long_bench-v2/longbench_v2_metric.py \ + --lib $LIB \ + --version $VERSION + +if [ $? -ne 0 ]; then + echo "Error running longbench_v2_metric.py" + exit 1 +fi + +echo "" +echo "==========================================" +echo "All steps completed successfully!" +echo "==========================================" +echo "" +echo "Results are saved in: results/long_bench-v2/$LIB-$VERSION/" +echo " - Search results: ${LIB}_longbench_v2_search_results.json" +echo " - Responses: ${LIB}_longbench_v2_responses.json" +echo " - Metrics: ${LIB}_longbench_v2_metrics.json" diff --git a/src/memos/mem_reader/read_multi_modal/file_content_parser.py b/src/memos/mem_reader/read_multi_modal/file_content_parser.py index 408736d2f..20fc03ec2 100644 --- a/src/memos/mem_reader/read_multi_modal/file_content_parser.py +++ b/src/memos/mem_reader/read_multi_modal/file_content_parser.py @@ -471,6 +471,7 @@ def parse_fast( total_chunks = len(content_chunks) # Create memory items for each chunk + content_chunk_embeddings = self.embedder.embed(content_chunks) memory_items = [] for chunk_idx, chunk_text in enumerate(content_chunks): if not chunk_text.strip(): @@ -499,7 +500,7 @@ def parse_fast( f"chunk:{chunk_idx + 1}/{total_chunks}", ], key=_derive_key(chunk_text), - embedding=self.embedder.embed([chunk_text])[0], + embedding=content_chunk_embeddings[chunk_idx], usage=[], sources=[source], background="", diff --git a/src/memos/mem_reader/read_multi_modal/image_parser.py b/src/memos/mem_reader/read_multi_modal/image_parser.py index 5a19393a9..741295089 100644 --- a/src/memos/mem_reader/read_multi_modal/image_parser.py +++ b/src/memos/mem_reader/read_multi_modal/image_parser.py @@ -64,7 +64,11 @@ def rebuild_from_source( ) -> ChatCompletionContentPartImageParam: """Rebuild image_url content part from SourceMessage.""" # Rebuild from source fields - url = getattr(source, "url", "") or (source.content or "").replace("[image_url]: ", "") + url = ( + getattr(source, "url", "") + or getattr(source, "image_path", "") + or (source.content or "").replace("[image_url]: ", "") + ) detail = getattr(source, "detail", "auto") return { "type": "image_url", diff --git a/src/memos/mem_reader/read_multi_modal/tool_parser.py b/src/memos/mem_reader/read_multi_modal/tool_parser.py index e13b684a7..705896489 100644 --- a/src/memos/mem_reader/read_multi_modal/tool_parser.py +++ b/src/memos/mem_reader/read_multi_modal/tool_parser.py @@ -79,6 +79,7 @@ def create_source( filename=file_info.get("filename", ""), file_id=file_info.get("file_id", ""), tool_call_id=tool_call_id, + file_info=file_info, ) ) elif part_type == "image_url": diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index 7f022b439..75eae30e8 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -210,7 +210,7 @@ def search( def get_relevant_subgraph( self, query: str, - top_k: int = 5, + top_k: int = 20, depth: int = 2, center_status: str = "activated", user_name: str | None = None,