Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
11b63e6
debug an error function name
tangg555 Oct 20, 2025
72e8f39
feat: Add DynamicCache compatibility for different transformers versions
tangg555 Oct 20, 2025
5702870
feat: implement APIAnalyzerForScheduler for memory operations
tangg555 Oct 21, 2025
4655b41
feat: Add search_ws API endpoint and enhance API analyzer functionality
tangg555 Oct 21, 2025
c20736c
fix: resolve test failures and warnings in test suite
tangg555 Oct 21, 2025
da72e7e
feat: add a test_robustness execution to test thread pool execution
tangg555 Oct 21, 2025
5b9b1e4
feat: optimize scheduler configuration and API search functionality
tangg555 Oct 22, 2025
6dac11e
feat: Add Redis auto-initialization with fallback strategies
tangg555 Oct 22, 2025
a207bf4
feat: add database connection management to ORM module
tangg555 Oct 24, 2025
8c1cc04
remove part of test
tangg555 Oct 24, 2025
f2b0da4
feat: add Redis-based ORM with multiprocess synchronization
tangg555 Oct 24, 2025
f0e8aab
fix: resolve scheduler module import and Redis integration issues
tangg555 Oct 24, 2025
731f00d
revise naive memcube creation in server router
tangg555 Oct 25, 2025
6d442fb
remove long-time tests in test_scheduler
tangg555 Oct 25, 2025
157f858
remove redis test which needs .env
tangg555 Oct 25, 2025
c483011
refactor all codes about mixture search with scheduler
tangg555 Oct 25, 2025
b81b82e
fix: resolve Redis API synchronization issues and implement search AP…
tangg555 Oct 26, 2025
90d1a0b
remove a test for api module
tangg555 Oct 26, 2025
1de72cf
revise to pass the test suite
tangg555 Oct 26, 2025
c72858e
addressed all conflicts
tangg555 Oct 27, 2025
3245376
address some bugs to make mix_search normally running
tangg555 Oct 27, 2025
57482cf
modify codes according to evaluation logs
tangg555 Oct 27, 2025
e4b8313
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 27, 2025
011d248
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 28, 2025
8c8d672
feat: Optimize mixture search and enhance API client
tangg555 Oct 28, 2025
aabad8d
feat: Add conversation_turn tracking for session-based memory search
tangg555 Oct 28, 2025
3faa5c3
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 28, 2025
c6376cd
adress time bug in monitor
tangg555 Oct 29, 2025
bd0b234
revise simple tree
tangg555 Oct 29, 2025
5332d12
add mode to evaluation client; rewrite print to logger.info in db files
tangg555 Oct 29, 2025
aee13ba
feat: 1. add redis queue for scheduler 2. finish the code related to …
tangg555 Nov 5, 2025
f957967
debug the working memory code
tangg555 Nov 5, 2025
f520cca
addressed conflicts to merge
tangg555 Nov 5, 2025
a3f6636
addressed a range of bugs to make scheduler running correctly
tangg555 Nov 5, 2025
47e9851
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 5, 2025
161af12
remove test_dispatch_parallel test
tangg555 Nov 5, 2025
1d8d14b
print change to logger.info
tangg555 Nov 5, 2025
00e3a75
addressed conflicts
tangg555 Nov 6, 2025
2852e56
adjucted the core code related to fine and mixture apis
tangg555 Nov 17, 2025
5d3cf45
addressed conflicts
tangg555 Nov 17, 2025
ab71f17
feat: create task queue to wrap local queue and redis queue. queue no…
tangg555 Nov 18, 2025
7665cda
fix bugs: debug bugs about internet trigger
tangg555 Nov 18, 2025
3559323
debug get searcher mode
tangg555 Nov 18, 2025
7c8e0d0
feat: add manual internet
fridayL Nov 18, 2025
27b0971
Merge branch 'feat/redis_scheduler' of https://github.com/MemTensor/M…
fridayL Nov 18, 2025
94d456b
Fix: fix code format
fridayL Nov 18, 2025
87b5358
feat: add strategy for fine search
tangg555 Nov 18, 2025
127fdc7
debug redis queue
tangg555 Nov 18, 2025
0911ced
debug redis queue
tangg555 Nov 18, 2025
d1a7261
fix bugs: completely addressed bugs about redis queue
tangg555 Nov 18, 2025
232be6f
refactor: add searcher to handler_init; remove info log from task_queue
tangg555 Nov 19, 2025
d16a7c8
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 19, 2025
bc7236f
refactor: modify analyzer
tangg555 Nov 19, 2025
afaf8df
refactor: revise locomo_eval to make it support llm other than gpt-4o…
tangg555 Nov 19, 2025
0b02d3c
feat: develop advanced searcher with deep search
tangg555 Nov 20, 2025
2097eae
feat: finish a complete version of deep search
tangg555 Nov 21, 2025
aff2932
refactor: refactor deep search feature, now only allowing one-round d…
tangg555 Nov 24, 2025
4226a77
feat: implement the feature of get_tasks_status, but completed tasks …
tangg555 Nov 24, 2025
e27483c
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 24, 2025
51964ec
debuging merged code; searching memories have bugs
tangg555 Nov 24, 2025
1e28ee5
change logging level
tangg555 Nov 24, 2025
e0001ea
debug api evaluation
tangg555 Nov 24, 2025
bae7022
fix bugs: change top to top_k
tangg555 Nov 24, 2025
d6cf824
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 24, 2025
742df4e
change log
tangg555 Nov 24, 2025
07a47b7
feat(scheduler): Implement comprehensive observability and fix critic…
Nov 25, 2025
ffb64b7
fix(ci): Resolve top-level redis import error in TaskStatusTracker
Nov 25, 2025
491a207
feat(scheduler): Implement conditional cloud status updates
Nov 25, 2025
e9e71da
fix(deps): Promote prometheus-client to core dependency
Nov 25, 2025
e4390a3
fix(ci): Resolve ruff linting and style errors
Nov 25, 2025
c8d4925
fix(ci): Reformat code to comply with ruff standards
Nov 25, 2025
b28543f
fix(docker): Add prometheus-client to Docker requirements.txt
Nov 25, 2025
867763b
Merge branch 'dev' into feat/redis_scheduler
glin93 Nov 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,4 @@ watchfiles==1.1.0
websockets==15.0.1
xlrd==2.0.2
xlsxwriter==3.2.5
prometheus-client==0.23.1
57 changes: 42 additions & 15 deletions evaluation/scripts/locomo/locomo_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import re
import time

import nltk
Expand Down Expand Up @@ -47,6 +48,29 @@ class LLMGrade(BaseModel):
llm_reasoning: str = Field(description="Explain why the answer is correct or incorrect.")


def extract_label_json(text: str) -> str | None:
"""
Extracts a JSON object of the form {"label": "VALUE"} from a given text string.
This function is designed to handle cases where the LLM response contains
natural language alongside a final JSON snippet, ensuring robust parsing.

Supports both single and double quotes around the label value.
Ignores surrounding whitespace and formatting.

Returns:
The full matching JSON string (e.g., '{"label": "CORRECT"}') if found.
None if no valid label JSON is found.
"""
# Regex pattern to match: { "label": "value" } with optional whitespace
# Matches both single and double quotes, allows spaces around keys and values
pattern = r'\{\s*"label"\s*:\s*["\']([^"\']*)["\']\s*\}'
match = re.search(pattern, text)
if match:
# Return the complete matched JSON string for safe json.loads()
return match.group(0)
return None


async def locomo_grader(llm_client, question: str, gold_answer: str, response: str) -> bool:
system_prompt = """
You are an expert grader that determines if answers to questions match a gold standard answer
Expand Down Expand Up @@ -77,20 +101,23 @@ async def locomo_grader(llm_client, question: str, gold_answer: str, response: s

Just return the label CORRECT or WRONG in a json format with the key as "label".
"""

response = await llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": accuracy_prompt},
],
temperature=0,
)
message_content = response.choices[0].message.content
label = json.loads(message_content)["label"]
parsed = LLMGrade(llm_judgment=label, llm_reasoning="")

return parsed.llm_judgment.strip().lower() == "correct"
try:
response = await llm_client.chat.completions.create(
model=os.getenv("EVAL_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": accuracy_prompt},
],
temperature=0,
)
message_content = response.choices[0].message.content
message_content = extract_label_json(text=message_content)
label = json.loads(message_content)["label"]
parsed = LLMGrade(llm_judgment=label, llm_reasoning="")
return parsed.llm_judgment.strip().lower() == "correct"
except Exception as e:
print(f"======== {e}, {response} ===========")
exit()


def calculate_rouge_scores(gold_answer, response):
Expand Down Expand Up @@ -284,7 +311,7 @@ async def main(frame, version="default", options=None, num_runs=1, max_workers=4
with open(response_path) as file:
locomo_responses = json.load(file)

num_users = 10
num_users = 2
all_grades = {}

total_responses_count = sum(
Expand Down
4 changes: 1 addition & 3 deletions evaluation/scripts/utils/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,7 @@ def search(self, query, user_id, top_k):
)
response = requests.request("POST", url, data=payload, headers=self.headers)
assert response.status_code == 200, response.text
assert json.loads(response.text)["message"] == "Search completed successfully", (
response.text
)
assert json.loads(response.text)["message"] == "Memory searched successfully", response.text
return json.loads(response.text)["data"]


Expand Down
33 changes: 17 additions & 16 deletions examples/mem_scheduler/api_w_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from time import sleep

from memos.api.handlers.scheduler_handler import (
handle_scheduler_status,
handle_scheduler_wait,
)
from memos.api.routers.server_router import mem_scheduler
from memos.api.routers.server_router import mem_scheduler, status_tracker
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem


Expand All @@ -26,26 +28,25 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
for msg in messages:
print(f" my_test_handler - {msg.item_id}: {msg.content}")
user_status_running = handle_scheduler_status(
user_name=USER_MEM_CUBE, mem_scheduler=mem_scheduler, instance_id="api_w_scheduler"
user_id=msg.user_id, status_tracker=status_tracker
)
print(f"[Monitor] Status for {USER_MEM_CUBE} after submit:", user_status_running)
print("[Monitor] Status after submit:", user_status_running)


# 2. Register the handler
TEST_HANDLER_LABEL = "test_handler"
TEST_USER_ID = "test_user"
mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler})

# 2.1 Monitor global scheduler status before submitting tasks
global_status_before = handle_scheduler_status(
user_name=None, mem_scheduler=mem_scheduler, instance_id="api_w_scheduler"
)
global_status_before = handle_scheduler_status(user_id=TEST_USER_ID, status_tracker=status_tracker)
print("[Monitor] Global status before submit:", global_status_before)

# 3. Create messages
messages_to_send = [
ScheduleMessageItem(
item_id=f"test_item_{i}",
user_id="test_user",
user_id=TEST_USER_ID,
mem_cube_id="test_mem_cube",
label=TEST_HANDLER_LABEL,
content=f"This is test message {i}",
Expand All @@ -56,28 +57,28 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
# 5. Submit messages
for mes in messages_to_send:
print(f"Submitting message {mes.item_id} to the scheduler...")
mem_scheduler.memos_message_queue.submit_messages([mes])
mem_scheduler.submit_messages([mes])
sleep(1)

# 5.1 Monitor status for specific mem_cube while running
USER_MEM_CUBE = "test_mem_cube"

# 6. Wait for messages to be processed (limited to 100 checks)
print("Waiting for messages to be consumed (max 100 checks)...")
mem_scheduler.mem_scheduler_wait()

user_status_running = handle_scheduler_status(user_id=TEST_USER_ID, status_tracker=status_tracker)
print(f"[Monitor] Status for {USER_MEM_CUBE} after submit:", user_status_running)

# 6.1 Wait until idle for specific mem_cube via handler
wait_result = handle_scheduler_wait(
user_name=USER_MEM_CUBE,
user_name=TEST_USER_ID,
status_tracker=status_tracker,
timeout_seconds=120.0,
poll_interval=0.2,
mem_scheduler=mem_scheduler,
poll_interval=0.5,
)
print(f"[Monitor] Wait result for {USER_MEM_CUBE}:", wait_result)

# 6.2 Monitor global scheduler status after processing
global_status_after = handle_scheduler_status(
user_name=None, mem_scheduler=mem_scheduler, instance_id="api_w_scheduler"
)
global_status_after = handle_scheduler_status(user_id=TEST_USER_ID, status_tracker=status_tracker)
print("[Monitor] Global status after processing:", global_status_after)

# 7. Stop the scheduler
Expand Down
19 changes: 17 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies = [
"scikit-learn (>=1.7.0,<2.0.0)", # Machine learning
"fastmcp (>=2.10.5,<3.0.0)",
"python-dateutil (>=2.9.0.post0,<3.0.0)",
"prometheus-client (>=0.23.1,<0.24.0)",
]

[project.urls]
Expand Down
4 changes: 2 additions & 2 deletions src/memos/api/handlers/base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from memos.log import get_logger
from memos.mem_scheduler.base_scheduler import BaseScheduler
from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher
from memos.memories.textual.tree_text_memory.retrieve.advanced_searcher import AdvancedSearcher


logger = get_logger(__name__)
Expand Down Expand Up @@ -132,7 +132,7 @@ def mem_scheduler(self) -> BaseScheduler:
return self.deps.mem_scheduler

@property
def searcher(self) -> Searcher:
def searcher(self) -> AdvancedSearcher:
"""Get scheduler instance."""
return self.deps.searcher

Expand Down
19 changes: 19 additions & 0 deletions src/memos/api/handlers/component_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,21 @@ def init_server() -> dict[str, Any]:
"""
logger.info("Initializing MemOS server components...")

# Initialize Redis client first as it is a core dependency for features like scheduler status tracking
try:
from memos.mem_scheduler.orm_modules.api_redis_model import APIRedisDBManager

redis_client = APIRedisDBManager.load_redis_engine_from_env()
if redis_client:
logger.info("Redis client initialized successfully.")
else:
logger.error(
"Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables."
)
except Exception as e:
logger.error(f"Failed to initialize Redis client: {e}", exc_info=True)
redis_client = None # Ensure redis_client exists even on failure

# Get default cube configuration
default_cube_config = APIConfig.get_default_cube_config()

Expand Down Expand Up @@ -272,6 +287,8 @@ def init_server() -> dict[str, Any]:
tree_mem: TreeTextMemory = naive_mem_cube.text_mem
searcher: Searcher = tree_mem.get_searcher(
manual_close_internet=os.getenv("ENABLE_INTERNET", "true").lower() == "false",
moscube=False,
process_llm=mem_reader.llm,
)
logger.debug("Searcher created")

Expand All @@ -286,6 +303,7 @@ def init_server() -> dict[str, Any]:
process_llm=mem_reader.llm,
db_engine=BaseDBManager.create_default_sqlite_engine(),
mem_reader=mem_reader,
redis_client=redis_client,
)
mem_scheduler.init_mem_cube(mem_cube=naive_mem_cube, searcher=searcher)
logger.debug("Scheduler initialized")
Expand Down Expand Up @@ -335,5 +353,6 @@ def init_server() -> dict[str, Any]:
"text_mem": text_mem,
"pref_mem": pref_mem,
"online_bot": online_bot,
"redis_client": redis_client,
"deepsearch_agent": deepsearch_agent,
}
Loading
Loading