Skip to content

Feat/redis scheduler#526

Merged
tangg555 merged 73 commits intodevfrom
feat/redis_scheduler
Nov 25, 2025
Merged

Feat/redis scheduler#526
tangg555 merged 73 commits intodevfrom
feat/redis_scheduler

Conversation

@glin93
Copy link
Contributor

@glin93 glin93 commented Nov 25, 2025

Description

Summary: (summary)

Fix: #(issue)

Docs Issue/PR: (docs-issue-or-pr-link)

Reviewer: @(reviewer)

Checklist:

  • I have performed a self-review of my own code | 我已自行检查了自己的代码
  • I have commented my code in hard-to-understand areas | 我已在难以理解的地方对代码进行了注释
  • I have added tests that prove my fix is effective or that my feature works | 我已添加测试以证明我的修复有效或功能正常
  • I have created related documentation issue/PR in MemOS-Docs (if applicable) | 我已在 MemOS-Docs 中创建了相关的文档 issue/PR(如果适用)
  • I have linked the issue to this PR (if applicable) | 我已将 issue 链接到此 PR(如果适用)
  • I have mentioned the person who will review this PR | 我已提及将审查此 PR 的人

- Fix build_kv_cache method in hf.py to handle both old and new DynamicCache structures
  - Support new 'layers' attribute with key_cache/value_cache or keys/values
  - Maintain backward compatibility with direct key_cache/value_cache attributes
  - Add comprehensive error handling and logging for unsupported structures

- Update move_dynamic_cache_htod function in kv.py for cross-version compatibility
  - Handle layers-based structure in newer transformers versions
  - Support alternative attribute names (keys/values vs key_cache/value_cache)
  - Preserve original functionality for older transformers versions

- Add comprehensive tests for DynamicCache compatibility
  - Test activation memory update with mock DynamicCache layers
  - Verify layers attribute access across different transformers versions
  - Fix scheduler logger mock to include memory_manager attribute
This resolves AttributeError issues when using different versions of the
transformers library and ensures robust handling of DynamicCache objects.

debug
- Add APIAnalyzerForScheduler class with search/add operations
- Support requests and http.client with connection reuse
- Include comprehensive error handling and dynamic configuration
- Add English test suite with real-world conversation scenarios
- Add search_ws endpoint in server_router.py for scheduler-enabled search
- Fix missing imports: time module, SearchRequest class, and get_mos_product_instance function
- Implement search_ws method in api_analyzer.py with HTTP client support
- Add _search_ws_with_requests and _search_ws_with_http_client private methods
- Include search_ws usage example in demonstration code
- Enhance scheduler and dispatcher capabilities for improved memory management
- Expand test coverage to ensure functionality stability

This update primarily strengthens the memory scheduling system's search capabilities,
providing users with more flexible API interface options.
- Fix Pydantic serialization warning in test_memos_chen_tang_hello_world
  * Add warnings filter to suppress UserWarning from Pydantic serialization

- Fix KeyError: 'past_key_values' in test_build_kv_cache_and_generation
  * Update mock configuration to properly return forward_output with past_key_values
  * Add DynamicCache version compatibility handling in test mocks
  * Support both old and new transformers versions with layers/key_cache attributes
  * Improve assertion logic to check all model calls for required parameters

- Update base_scheduler.py to use centralized DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE constant
  * Add import for DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE from general_schemas
  * Replace hardcoded value 100 with configurable constant (1000)

All tests now pass successfully with proper version compatibility handling.
- Add DEFAULT_TOP_K and DEFAULT_CONTEXT_WINDOW_SIZE global constants in general_schemas.py
- Update base_scheduler.py to use global default values instead of hardcoded numbers
- Fix SchedulerConfigFactory initialization issue by using keyword argument expansion
- Resolve UnboundLocalError variable conflict in search_memories_ws function
- Fix indentation and parameter issues in OptimizedScheduler search_for_api method
- Improve code standardization and maintainability
- Add auto_initialize_redis() with config/env/local fallback
- Move Redis logic from dispatcher_monitor to redis_service
- Update base_scheduler to use auto initialization
- Add proper resource cleanup and error handling
- Add MySQL engine loading from environment variables in BaseDBManager
- Add Redis connection loading from environment variables in BaseDBManager
- Enhance database configuration validation and error handling
- Complete database adapter infrastructure for ORM module
- Provide unified database connection management interface

This update provides comprehensive database connection management capabilities
for the mem_scheduler module, supporting dynamic MySQL and Redis configuration
loading from environment variables, establishing reliable data persistence
foundation for scheduling services and API services.
- Add RedisDBManager and RedisLockableORM classes
- Implement atomic locking mechanism for concurrent access
- Add merge functionality for different object types
- Include comprehensive test suite and examples
- Fix Redis key type conflicts in lock operations
…I with reranker

- Fix running_entries to running_task_ids migration across codebase
- Update sync_search_data method to properly handle TaskRunningStatus
- Correct variable naming and logic in API synchronization flow
- Implement search API endpoint with reranker functionality
- Update test files to reflect new running_task_ids convention
- Ensure proper Redis state management for concurrent tasks
    - Add conversation_turn field to APIMemoryHistoryEntryItem schema with default value 0
    - Implement session counter in OptimizedScheduler to track turn count per session_id
    - Update sync_search_data method to accept and store conversation_turn parameter
    - Maintain session history with LRU eviction (max 5 sessions)
    - Rename conversation_id to session_id for consistency with request object
    - Enable direct access to session_id from search requests

    This feature allows tracking conversation turns within the same session,
    providing better context for memory retrieval and search history management.
tangg555 and others added 21 commits November 19, 2025 20:30
…are not recorded yet; waiting to be developed
…al bugs

This commit introduces a robust observability system for the scheduler and resolves several critical bugs identified during code review and testing.

Key Improvements:
- **Task Status Tracking**: Implemented `TaskStatusTracker` using Redis to provide persistent, per-task lifecycle tracking (`waiting`, `in_progress`, `completed`, `failed`).
- **Prometheus Metrics**: Added a new metrics system to expose key performance indicators (QPS, latency, queue length, failure/completion rates) for monitoring.
- **API Refactoring**: Refactored `/scheduler/status` and `/scheduler/wait` APIs to use the new reliable `TaskStatusTracker`, ensuring accurate state reporting.

Bug Fixes:
- **Initialization**: Corrected the `SchedulerDispatcher` initialization order to prevent `NoneType` errors in tests and at runtime.
- **CPU Usage**: Fixed a busy-wait loop in the metrics monitor thread that caused 100% CPU usage when idle.
- **Exception Handling**: Refined API handlers to correctly propagate HTTP error codes (e.g., 404) instead of masking them as 500 errors.
- **Dependencies**: Added missing dependencies (`prometheus-client`) and updated test mocks to ensure the test suite can run correctly.
- **Legacy Code**: Removed the old, buggy `mem_scheduler_wait` method.

All 394 unit tests now pass, and a functional test of the new features has been successfully verified.
Adds functionality to send task status updates (success/failure) to RabbitMQ via , specifically for the cloud service platform. This includes:
- Adding a  field to .
- Passing  to .
- Modifying  to conditionally send  with  or  (along with  and ) based on the  environment variable.
Moves prometheus-client from an optional group to the main project dependencies. This ensures it is always installed in all environments, including the CI/CD deployment pipeline, to resolve the recurring 'ModuleNotFoundError'.
Addresses all linting errors reported by ruff, including undefined names for 'os' and 'timezone' by correcting import statements in 'dispatcher.py' and 'status_tracker.py'. Also resolves various code style violations (line breaks, comment punctuation) to align with ruff standards. This ensures the code adheres to project standards and passes the CI quality checks.
Adds prometheus-client to docker/requirements.txt to ensure it is installed in the Docker build environment, resolving deployment failures related to missing dependencies.
@tangg555 tangg555 requested review from CaralHsi, fridayL and tangg555 and removed request for fridayL and tangg555 November 25, 2025 12:42
Copy link
Contributor

@tangg555 tangg555 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me

@tangg555 tangg555 merged commit 9341861 into dev Nov 25, 2025
40 checks passed
tianxing02 pushed a commit to tianxing02/MemOS that referenced this pull request Feb 24, 2026
* debug an error function name

* feat: Add DynamicCache compatibility for different transformers versions

- Fix build_kv_cache method in hf.py to handle both old and new DynamicCache structures
  - Support new 'layers' attribute with key_cache/value_cache or keys/values
  - Maintain backward compatibility with direct key_cache/value_cache attributes
  - Add comprehensive error handling and logging for unsupported structures

- Update move_dynamic_cache_htod function in kv.py for cross-version compatibility
  - Handle layers-based structure in newer transformers versions
  - Support alternative attribute names (keys/values vs key_cache/value_cache)
  - Preserve original functionality for older transformers versions

- Add comprehensive tests for DynamicCache compatibility
  - Test activation memory update with mock DynamicCache layers
  - Verify layers attribute access across different transformers versions
  - Fix scheduler logger mock to include memory_manager attribute
This resolves AttributeError issues when using different versions of the
transformers library and ensures robust handling of DynamicCache objects.

debug

* feat: implement APIAnalyzerForScheduler for memory operations

- Add APIAnalyzerForScheduler class with search/add operations
- Support requests and http.client with connection reuse
- Include comprehensive error handling and dynamic configuration
- Add English test suite with real-world conversation scenarios

* feat: Add search_ws API endpoint and enhance API analyzer functionality

- Add search_ws endpoint in server_router.py for scheduler-enabled search
- Fix missing imports: time module, SearchRequest class, and get_mos_product_instance function
- Implement search_ws method in api_analyzer.py with HTTP client support
- Add _search_ws_with_requests and _search_ws_with_http_client private methods
- Include search_ws usage example in demonstration code
- Enhance scheduler and dispatcher capabilities for improved memory management
- Expand test coverage to ensure functionality stability

This update primarily strengthens the memory scheduling system's search capabilities,
providing users with more flexible API interface options.

* fix: resolve test failures and warnings in test suite

- Fix Pydantic serialization warning in test_memos_chen_tang_hello_world
  * Add warnings filter to suppress UserWarning from Pydantic serialization

- Fix KeyError: 'past_key_values' in test_build_kv_cache_and_generation
  * Update mock configuration to properly return forward_output with past_key_values
  * Add DynamicCache version compatibility handling in test mocks
  * Support both old and new transformers versions with layers/key_cache attributes
  * Improve assertion logic to check all model calls for required parameters

- Update base_scheduler.py to use centralized DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE constant
  * Add import for DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE from general_schemas
  * Replace hardcoded value 100 with configurable constant (1000)

All tests now pass successfully with proper version compatibility handling.

* feat: add a test_robustness execution to test thread pool execution

* feat: optimize scheduler configuration and API search functionality

- Add DEFAULT_TOP_K and DEFAULT_CONTEXT_WINDOW_SIZE global constants in general_schemas.py
- Update base_scheduler.py to use global default values instead of hardcoded numbers
- Fix SchedulerConfigFactory initialization issue by using keyword argument expansion
- Resolve UnboundLocalError variable conflict in search_memories_ws function
- Fix indentation and parameter issues in OptimizedScheduler search_for_api method
- Improve code standardization and maintainability

* feat: Add Redis auto-initialization with fallback strategies

- Add auto_initialize_redis() with config/env/local fallback
- Move Redis logic from dispatcher_monitor to redis_service
- Update base_scheduler to use auto initialization
- Add proper resource cleanup and error handling

* feat: add database connection management to ORM module

- Add MySQL engine loading from environment variables in BaseDBManager
- Add Redis connection loading from environment variables in BaseDBManager
- Enhance database configuration validation and error handling
- Complete database adapter infrastructure for ORM module
- Provide unified database connection management interface

This update provides comprehensive database connection management capabilities
for the mem_scheduler module, supporting dynamic MySQL and Redis configuration
loading from environment variables, establishing reliable data persistence
foundation for scheduling services and API services.

* remove part of test

* feat: add Redis-based ORM with multiprocess synchronization

- Add RedisDBManager and RedisLockableORM classes
- Implement atomic locking mechanism for concurrent access
- Add merge functionality for different object types
- Include comprehensive test suite and examples
- Fix Redis key type conflicts in lock operations

* fix: resolve scheduler module import and Redis integration issues

* revise naive memcube creation in server router

* remove long-time tests in test_scheduler

* remove redis test which needs .env

* refactor all codes about mixture search with scheduler

* fix: resolve Redis API synchronization issues and implement search API with reranker

- Fix running_entries to running_task_ids migration across codebase
- Update sync_search_data method to properly handle TaskRunningStatus
- Correct variable naming and logic in API synchronization flow
- Implement search API endpoint with reranker functionality
- Update test files to reflect new running_task_ids convention
- Ensure proper Redis state management for concurrent tasks

* remove a test for api module

* revise to pass the test suite

* address some bugs to make mix_search normally running

* modify codes according to evaluation logs

* feat: Optimize mixture search and enhance API client

* feat: Add conversation_turn tracking for session-based memory search

    - Add conversation_turn field to APIMemoryHistoryEntryItem schema with default value 0
    - Implement session counter in OptimizedScheduler to track turn count per session_id
    - Update sync_search_data method to accept and store conversation_turn parameter
    - Maintain session history with LRU eviction (max 5 sessions)
    - Rename conversation_id to session_id for consistency with request object
    - Enable direct access to session_id from search requests

    This feature allows tracking conversation turns within the same session,
    providing better context for memory retrieval and search history management.

* adress time bug in monitor

* revise simple tree

* add mode to evaluation client; rewrite print to logger.info in db files

* feat: 1. add redis queue for scheduler 2. finish the code related to mix search and fine search

* debug the working memory code

* addressed a range of bugs to make scheduler running correctly

* remove test_dispatch_parallel test

* print change to logger.info

* adjucted the core code related to fine and mixture apis

* feat: create task queue to wrap local queue and redis queue. queue now split FIFO to multi queue from different users. addressed a range of bugs

* fix bugs: debug bugs about internet trigger

* debug get searcher mode

* feat: add manual internet

* Fix: fix code format

* feat: add strategy for fine search

* debug redis queue

* debug redis queue

* fix bugs: completely addressed bugs about redis queue

* refactor: add searcher to handler_init; remove info log from task_queue

* refactor: modify analyzer

* refactor: revise locomo_eval to make it support llm other than gpt-4o-mini

* feat: develop advanced searcher with deep search

* feat: finish a complete version of deep search

* refactor: refactor deep search feature, now only allowing one-round deep search

* feat: implement the feature of get_tasks_status, but completed tasks are not recorded yet; waiting to be developed

* debuging merged code; searching memories have bugs

* change logging level

* debug api evaluation

* fix bugs: change top to top_k

* change log

* feat(scheduler): Implement comprehensive observability and fix critical bugs

This commit introduces a robust observability system for the scheduler and resolves several critical bugs identified during code review and testing.

Key Improvements:
- **Task Status Tracking**: Implemented `TaskStatusTracker` using Redis to provide persistent, per-task lifecycle tracking (`waiting`, `in_progress`, `completed`, `failed`).
- **Prometheus Metrics**: Added a new metrics system to expose key performance indicators (QPS, latency, queue length, failure/completion rates) for monitoring.
- **API Refactoring**: Refactored `/scheduler/status` and `/scheduler/wait` APIs to use the new reliable `TaskStatusTracker`, ensuring accurate state reporting.

Bug Fixes:
- **Initialization**: Corrected the `SchedulerDispatcher` initialization order to prevent `NoneType` errors in tests and at runtime.
- **CPU Usage**: Fixed a busy-wait loop in the metrics monitor thread that caused 100% CPU usage when idle.
- **Exception Handling**: Refined API handlers to correctly propagate HTTP error codes (e.g., 404) instead of masking them as 500 errors.
- **Dependencies**: Added missing dependencies (`prometheus-client`) and updated test mocks to ensure the test suite can run correctly.
- **Legacy Code**: Removed the old, buggy `mem_scheduler_wait` method.

All 394 unit tests now pass, and a functional test of the new features has been successfully verified.

* fix(ci): Resolve top-level redis import error in TaskStatusTracker

* feat(scheduler): Implement conditional cloud status updates

Adds functionality to send task status updates (success/failure) to RabbitMQ via , specifically for the cloud service platform. This includes:
- Adding a  field to .
- Passing  to .
- Modifying  to conditionally send  with  or  (along with  and ) based on the  environment variable.

* fix(deps): Promote prometheus-client to core dependency

Moves prometheus-client from an optional group to the main project dependencies. This ensures it is always installed in all environments, including the CI/CD deployment pipeline, to resolve the recurring 'ModuleNotFoundError'.

* fix(ci): Resolve ruff linting and style errors

Addresses all linting errors reported by ruff, including undefined names for 'os' and 'timezone' by correcting import statements in 'dispatcher.py' and 'status_tracker.py'. Also resolves various code style violations (line breaks, comment punctuation) to align with ruff standards. This ensures the code adheres to project standards and passes the CI quality checks.

* fix(ci): Reformat code to comply with ruff standards

* fix(docker): Add prometheus-client to Docker requirements.txt

Adds prometheus-client to docker/requirements.txt to ensure it is installed in the Docker build environment, resolving deployment failures related to missing dependencies.

---------

Co-authored-by: chentang <travistang@foxmail.com>
Co-authored-by: fridayL <lcy081099@gmail.com>
Co-authored-by: glin1993@outlook.com <>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants