fix: HierarchicalSessionStore message loss and extended field wipe#1745
Conversation
HierarchicalSessionStore.add_message used an unlocked load-modify-save path, so concurrent writers could overwrite each other's messages. Inherited _modify_session_locked also loaded SessionData, which dropped parent_id, snapshots, and children_ids on metadata updates. - Load ExtendedSessionData under the existing file lock - Route add_message through _modify_session_locked - Keep _extended_cache in sync after locked writes - Add regression tests for concurrency and extended fields Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
|
@coderabbitai review |
|
/review |
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
✅ Actions performedReview triggered.
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthrough
ChangesLocked Session Mutation
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first — incorporate their findings. Review areas:
|
Thorough review completed against commit I checked all existing reviewer comments first: Qodo is paused (no findings), CodeRabbit has only status/progress comments so far (no actionable issues), and there are currently no Gemini review findings on this PR. My review:
Optional improvement (non-blocking): remove the redundant local import in # remove inside add_message:
from .store import SessionMessageNo further blocking issues found. |
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task —— View job Final Architecture Review ✅Phase 1: Review per AGENTS.md
Phase 2: Fix Valid Issues
Phase 3: Final Verdict
Architecture Review FindingsProtocol-driven Design: ✅ COMPLIANT
Backward Compatibility: ✅ MAINTAINED
Performance Impact: ✅ MINIMAL
Critical Bug Fixes Validated: ✅ CORRECT
Issues Fixed
Final Verdict: ✅ APPROVEDThis PR correctly fixes critical correctness bugs in
The fixes address real production issues where message loss and extended field corruption could occur under concurrent access patterns. The implementation is architecturally sound and maintains all existing guarantees. Recommendation: ✅ MERGE |
Copilot found redundant local import of SessionMessage in add_message method since it's already imported at module scope. This removes the unnecessary import statement. Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/praisonai-agents/praisonaiagents/session/hierarchy.py`:
- Around line 148-175: The loader _load_session_from_disk currently swallows
JSON/IO errors and returns a fresh ExtendedSessionData which causes
_modify_session_locked (and callers like add_message() and
update_session_metadata()) to silently overwrite existing corrupted session
files; change the behavior so that _load_session_from_disk raises a descriptive
exception (including session_id and error_label/context) on
JSONDecodeError/IOError (or returns a clear failure sentinel) and then update
_modify_session_locked to detect that failure and abort the write (return False)
instead of persisting a blank ExtendedSessionData; preserve existing behavior
only when the file truly does not exist, and surface the error via the provided
error_label and any on_error hook so callers can log/handle remediation.
In `@src/praisonai-agents/tests/unit/session/test_hierarchy.py`:
- Around line 106-118: The test test_add_message_preserves_concurrent_writes is
not exercising the stale-writer scenario because both the second write and final
read use writer; change the sequence so reader (the stale instance returned by
HierarchicalSessionStore and warmed by reader._load_session("session-1"))
performs the second add_user_message("session-1", "second"), then instantiate a
fresh HierarchicalSessionStore and call get_chat_history("session-1") on that
fresh store to assert both messages exist and the second message content is
"second"; reference the HierarchicalSessionStore constructor, add_user_message,
_load_session, and get_chat_history when updating the test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 872b869e-d8a5-48ec-b06a-03320d8f2b75
📒 Files selected for processing (2)
src/praisonai-agents/praisonaiagents/session/hierarchy.pysrc/praisonai-agents/tests/unit/session/test_hierarchy.py
| def _load_session_from_disk(self, session_id: str, filepath: str) -> ExtendedSessionData: | ||
| """Load extended session JSON from disk (caller must hold FileLock).""" | ||
| if os.path.exists(filepath): | ||
| try: | ||
| with open(filepath, "r", encoding="utf-8") as f: | ||
| data = json.load(f) | ||
| return ExtendedSessionData.from_dict(data) | ||
| except (json.JSONDecodeError, IOError): | ||
| pass | ||
| return ExtendedSessionData(session_id=session_id) | ||
|
|
||
| def _modify_session_locked( | ||
| self, | ||
| session_id: str, | ||
| mutator, | ||
| *, | ||
| error_label: str = "modify session", | ||
| ) -> bool: | ||
| """Locked read-modify-write preserving extended session fields.""" | ||
| result = super()._modify_session_locked( | ||
| session_id, mutator, error_label=error_label | ||
| ) | ||
| if result: | ||
| with self._lock: | ||
| cached = self._cache.get(session_id) | ||
| if isinstance(cached, ExtendedSessionData): | ||
| self._extended_cache[session_id] = cached | ||
| return result |
There was a problem hiding this comment.
Abort the locked write when reloading the session fails.
_load_session_from_disk() currently turns JSON/IO failures into a fresh ExtendedSessionData, and _modify_session_locked() will then persist that object. On an unreadable existing file, add_message() or update_session_metadata() can overwrite the session with an almost-empty payload and silently drop prior messages, snapshots, and hierarchy links.
Suggested direction
def _load_session_from_disk(self, session_id: str, filepath: str) -> ExtendedSessionData:
"""Load extended session JSON from disk (caller must hold FileLock)."""
- if os.path.exists(filepath):
- try:
- with open(filepath, "r", encoding="utf-8") as f:
- data = json.load(f)
- return ExtendedSessionData.from_dict(data)
- except (json.JSONDecodeError, IOError):
- pass
- return ExtendedSessionData(session_id=session_id)
+ if not os.path.exists(filepath):
+ return ExtendedSessionData(session_id=session_id)
+ try:
+ with open(filepath, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ return ExtendedSessionData.from_dict(data)
+ except (json.JSONDecodeError, IOError) as e:
+ raise RuntimeError(
+ f"Failed to load session {session_id}; repair or remove the stored session file and retry."
+ ) from e
def _modify_session_locked(
self,
session_id: str,
mutator,
*,
error_label: str = "modify session",
) -> bool:
"""Locked read-modify-write preserving extended session fields."""
- result = super()._modify_session_locked(
- session_id, mutator, error_label=error_label
- )
+ try:
+ result = super()._modify_session_locked(
+ session_id, mutator, error_label=error_label
+ )
+ except RuntimeError:
+ logger.exception("Failed to %s for session %s", error_label, session_id)
+ return FalseAs per coding guidelines, "Error handling: Fail fast with clear error messages; include remediation hints in exceptions; propagate context (agent name, tool name, session ID); provide hook points for error interception (on_error events)".
Also applies to: 198-205
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai-agents/praisonaiagents/session/hierarchy.py` around lines 148 -
175, The loader _load_session_from_disk currently swallows JSON/IO errors and
returns a fresh ExtendedSessionData which causes _modify_session_locked (and
callers like add_message() and update_session_metadata()) to silently overwrite
existing corrupted session files; change the behavior so that
_load_session_from_disk raises a descriptive exception (including session_id and
error_label/context) on JSONDecodeError/IOError (or returns a clear failure
sentinel) and then update _modify_session_locked to detect that failure and
abort the write (return False) instead of persisting a blank
ExtendedSessionData; preserve existing behavior only when the file truly does
not exist, and surface the error via the provided error_label and any on_error
hook so callers can log/handle remediation.
| def test_add_message_preserves_concurrent_writes(self): | ||
| """Concurrent add_message must not lose messages (same as DefaultSessionStore).""" | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| writer = HierarchicalSessionStore(session_dir=tmpdir) | ||
| reader = HierarchicalSessionStore(session_dir=tmpdir) | ||
|
|
||
| writer.add_user_message("session-1", "first") | ||
| reader._load_session("session-1") | ||
| writer.add_user_message("session-1", "second") | ||
|
|
||
| history = writer.get_chat_history("session-1") | ||
| assert len(history) == 2 | ||
| assert history[1]["content"] == "second" |
There was a problem hiding this comment.
This regression test never drives the stale-writer scenario it describes.
reader._load_session("session-1") only warms state, but the follow-up write and the final read both go through writer. That still passes even if another store instance would later clobber "second" from stale state. Make the stale instance perform the next add_user_message() and verify the persisted history from a fresh store instance.
One way to make the regression meaningful
def test_add_message_preserves_concurrent_writes(self):
"""Concurrent add_message must not lose messages (same as DefaultSessionStore)."""
with tempfile.TemporaryDirectory() as tmpdir:
writer = HierarchicalSessionStore(session_dir=tmpdir)
- reader = HierarchicalSessionStore(session_dir=tmpdir)
+ stale_writer = HierarchicalSessionStore(session_dir=tmpdir)
+ verifier = HierarchicalSessionStore(session_dir=tmpdir)
writer.add_user_message("session-1", "first")
- reader._load_session("session-1")
+ stale_writer.get_extended_session("session-1")
writer.add_user_message("session-1", "second")
+ stale_writer.add_user_message("session-1", "third")
- history = writer.get_chat_history("session-1")
- assert len(history) == 2
- assert history[1]["content"] == "second"
+ history = verifier.get_chat_history("session-1")
+ assert [message["content"] for message in history] == [
+ "first",
+ "second",
+ "third",
+ ]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai-agents/tests/unit/session/test_hierarchy.py` around lines 106 -
118, The test test_add_message_preserves_concurrent_writes is not exercising the
stale-writer scenario because both the second write and final read use writer;
change the sequence so reader (the stale instance returned by
HierarchicalSessionStore and warmed by reader._load_session("session-1"))
performs the second add_user_message("session-1", "second"), then instantiate a
fresh HierarchicalSessionStore and call get_chat_history("session-1") on that
fresh store to assert both messages exist and the second message content is
"second"; reference the HierarchicalSessionStore constructor, add_user_message,
_load_session, and get_chat_history when updating the test.
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
Greptile SummaryThis PR fixes two bugs in
Confidence Score: 3/5The disk-level correctness of add_message and metadata updates is sound, but the dual-cache design leaves stale-read windows and the regression test labelled concurrent does not actually run concurrent writers. The core disk write path is correctly fixed: a single FileLock now covers the entire read-modify-write for add_message and all inherited _modify_session_locked callers, and _load_session_from_disk is properly overridden so extended fields survive. However, _extended_cache is synced in a separate lock acquisition after the file lock is released, leaving a brief window where hierarchy reads return stale data. The inherited _load_session still hardcodes SessionData.from_dict, so a cold cache populates _cache with a plain SessionData that silently drops hierarchy fields. And the new concurrency test would have passed against the old buggy implementation because it never runs two writers simultaneously. Both changed files warrant attention: hierarchy.py for the dual-cache gap and the un-overridden _load_session, and test_hierarchy.py for the non-concurrent concurrency test. Important Files Changed
Sequence DiagramsequenceDiagram
participant C as Caller
participant H as HierarchicalSessionStore
participant D as DefaultSessionStore
participant Disk as File (JSON)
participant FC as _cache
participant EC as _extended_cache
Note over C,EC: New add_message flow (this PR)
C->>H: add_message(session_id, role, content)
H->>H: build SessionMessage + _apply mutator
H->>D: super()._modify_session_locked(session_id, _apply)
D->>Disk: acquire FileLock
D->>H: _load_session_from_disk() [overridden]
H->>Disk: json.load ExtendedSessionData.from_dict()
Disk-->>H: ExtendedSessionData (with parent_id, snapshots)
H-->>D: ExtendedSessionData
D->>D: _apply(session) append message
D->>Disk: atomic write (temp + os.replace)
D->>Disk: release FileLock
D->>FC: "_cache[session_id] = ExtendedSessionData"
D-->>H: True
Note over H,EC: brief window - _cache fresh, _extended_cache stale
H->>FC: _cache.get(session_id)
FC-->>H: ExtendedSessionData
H->>EC: "_extended_cache[session_id] = ExtendedSessionData"
H-->>C: True
Reviews (1): Last reviewed commit: "fix: remove redundant SessionMessage imp..." | Re-trigger Greptile |
| def test_add_message_preserves_concurrent_writes(self): | ||
| """Concurrent add_message must not lose messages (same as DefaultSessionStore).""" | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| writer = HierarchicalSessionStore(session_dir=tmpdir) | ||
| reader = HierarchicalSessionStore(session_dir=tmpdir) | ||
|
|
||
| writer.add_user_message("session-1", "first") | ||
| reader._load_session("session-1") | ||
| writer.add_user_message("session-1", "second") | ||
|
|
||
| history = writer.get_chat_history("session-1") | ||
| assert len(history) == 2 | ||
| assert history[1]["content"] == "second" |
There was a problem hiding this comment.
Test doesn't exercise actual concurrency
The test named "concurrent" performs strictly sequential operations. reader._load_session is called between two add_user_message calls on a separate store instance that never writes — so no actual race window is exercised.
With the old code the sequence was: _load_extended_session(force_reload=True) (reads from disk) followed by _save_extended_session (writes to disk) — two separate FileLock acquisitions. The race could only manifest when two writers interleaved their separate lock windows. That scenario requires two threads calling add_message at the same time, which this test never does. As written, this test would have passed with the old (buggy) code too, providing no regression signal for the actual fix.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| def _modify_session_locked( | ||
| self, | ||
| session_id: str, | ||
| mutator, | ||
| *, | ||
| error_label: str = "modify session", | ||
| ) -> bool: | ||
| """Locked read-modify-write preserving extended session fields.""" | ||
| result = super()._modify_session_locked( | ||
| session_id, mutator, error_label=error_label | ||
| ) | ||
| if result: | ||
| with self._lock: | ||
| cached = self._cache.get(session_id) | ||
| if isinstance(cached, ExtendedSessionData): | ||
| self._extended_cache[session_id] = cached | ||
| return result |
There was a problem hiding this comment.
_extended_cache sync races with _load_extended_session
The _extended_cache is updated in a separate with self._lock: block acquired after super()._modify_session_locked() has already returned and released its FileLock. In the window between those two points, _cache already holds the fresh ExtendedSessionData but _extended_cache still holds the previous version. A concurrent thread calling _load_extended_session (e.g., from fork_session, get_extended_session, or auto_title) in that window returns stale extended-field data (missing the just-appended message). Because _extended_cache is the primary fast path for all hierarchy reads, callers that hit the cache between steps 1 and 2 silently see an old snapshot of the session.
| def _load_session_from_disk(self, session_id: str, filepath: str) -> ExtendedSessionData: | ||
| """Load extended session JSON from disk (caller must hold FileLock).""" | ||
| if os.path.exists(filepath): | ||
| try: | ||
| with open(filepath, "r", encoding="utf-8") as f: | ||
| data = json.load(f) | ||
| return ExtendedSessionData.from_dict(data) | ||
| except (json.JSONDecodeError, IOError): | ||
| pass | ||
| return ExtendedSessionData(session_id=session_id) |
There was a problem hiding this comment.
_load_session is not overridden; inherited reads bypass ExtendedSessionData deserialization
DefaultSessionStore._load_session hardcodes SessionData.from_dict(data) and is not overridden here. Several inherited methods reach _cache through _load_session — most visibly get_chat_history, get_session, get_sessions_by_agent. If _cache is cold for a given session (e.g., after process restart or invalidate_cache), those callers receive a bare SessionData with parent_id, children_ids, snapshots, and title all absent. Code that casts the result of get_session() to ExtendedSessionData would lose the hierarchy fields silently. Overriding _load_session to call self._load_session_from_disk (already added in this PR) would close the gap.
Bug and impact
HierarchicalSessionStore(default for PraisonAI UI viaget_hierarchical_session_store()) had two correctness issues:add_messageloaded the session, mutated in memory, and saved outside a single locked read-modify-write. Two writers (e.g. agentauto_save+ UIadd_message) could overwrite each other, dropping messages.update_session_metadata/clear_sessionusedSessionData.from_dict, which strippedparent_id,children_ids, andsnapshotson save. Fork/snapshot hierarchy data could be silently deleted.Root cause
PR #1724 fixed the race in
DefaultSessionStorebutHierarchicalSessionStoreoverrodeadd_messagewith the old unsafe pattern and did not loadExtendedSessionDatain_load_session_from_disk.Fix and validation
_load_session_from_diskto useExtendedSessionData.from_dictadd_messagethrough_modify_session_locked_extended_cacheafter locked writesadd_message, metadata update preserving extended fieldspytest tests/unit/session/test_hierarchy.py— 22 passedSummary by CodeRabbit
Release Notes
Refactor
Tests