From fc6015967e5f5ba38377ce38b9101112c1dc3927 Mon Sep 17 00:00:00 2001 From: giulio-leone Date: Sat, 14 Mar 2026 21:08:17 +0100 Subject: [PATCH] fix(sessions): use read-only transactions for get/list operations DatabaseSessionService.get_session() and list_sessions() are pure-read operations but currently open regular read-write transactions via _rollback_on_exception_session(). On Cloud Spanner this causes RetryAborted errors when a concurrent write commits during the read, because Spanner's OCC layer sees the read-write transaction as conflicting. Add _readonly_session() context manager that: - Marks the connection as read-only (postgresql_readonly=True) which also benefits Cloud Spanner's sqlalchemy-spanner dialect - Never commits, avoiding unnecessary write-path overhead - Still rolls back on exception to release the connection cleanly Switch get_session() and list_sessions() to use _readonly_session(). Write operations (create_session, delete_session, append_event) continue to use _rollback_on_exception_session() with explicit commits. Fixes #4771 --- .../adk/sessions/database_session_service.py | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index eed1d9eae6..ec38f38056 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -213,6 +213,26 @@ async def _rollback_on_exception_session( await sql_session.rollback() raise + @asynccontextmanager + async def _readonly_session( + self, + ) -> AsyncIterator[DatabaseSessionFactory]: + """Yields a read-only database session for pure SELECT operations. + + On Spanner this avoids OCC read-write transactions (which can trigger + RetryAborted when conflicting with concurrent writes). On PostgreSQL + the session is marked read-only. On other dialects the behaviour is + equivalent to a regular session but the transaction is never committed. + """ + async with self.database_session_factory() as sql_session: + conn = await sql_session.connection() + await conn.execution_options(postgresql_readonly=True) + try: + yield sql_session + except BaseException: + await sql_session.rollback() + raise + def _supports_row_level_locking(self) -> bool: return self.db_engine.dialect.name in ( _MARIADB_DIALECT, @@ -403,7 +423,7 @@ async def get_session( # 2. Get all the events based on session id and filtering config # 3. Convert and return the session schema = self._get_schema_classes() - async with self._rollback_on_exception_session() as sql_session: + async with self._readonly_session() as sql_session: storage_session = await sql_session.get( schema.StorageSession, (app_name, user_id, session_id) ) @@ -458,7 +478,7 @@ async def list_sessions( ) -> ListSessionsResponse: await self._prepare_tables() schema = self._get_schema_classes() - async with self._rollback_on_exception_session() as sql_session: + async with self._readonly_session() as sql_session: stmt = select(schema.StorageSession).filter( schema.StorageSession.app_name == app_name )