Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 19 additions & 116 deletions src/memos/graph_dbs/polardb.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,53 +201,28 @@ def _get_connection_old(self):
return conn

def _get_connection(self):
"""
Get a connection from the pool.

This function:
1. Gets a connection from ThreadedConnectionPool
2. Checks if connection is closed or unhealthy
3. Returns healthy connection or retries (max 3 times)
4. Handles connection pool exhaustion gracefully

Returns:
psycopg2 connection object

Raises:
RuntimeError: If connection pool is closed or exhausted after retries
"""
"""Get a connection from the pool."""
if self._pool_closed:
raise RuntimeError("Connection pool has been closed")

max_retries = 5
import psycopg2.pool

max_retries = 3
for attempt in range(max_retries):
conn = None
try:
# Try to get connection from pool
# This may raise PoolError if pool is exhausted
conn = self.connection_pool.getconn()

# Check if connection is closed
if conn.closed != 0:
# Connection is closed, return it to pool with close flag and try again
logger.warning(
f"[_get_connection] Got closed connection, attempt {attempt + 1}/{max_retries}"
)
try:
self.connection_pool.putconn(conn, close=True)
except Exception as e:
logger.warning(
f"[_get_connection] Failed to return closed connection to pool: {e}"
)
logger.warning(f"Failed to return closed connection to pool: {e}")
with suppress(Exception):
conn.close()

conn = None
if attempt < max_retries - 1:
# Exponential backoff: 0.1s, 0.2s, 0.4s
time.sleep(0.1 * (2**attempt))
continue
else:
raise RuntimeError("Pool returned a closed connection after all retries")
Expand All @@ -264,21 +239,19 @@ def _get_connection(self):
except Exception as health_check_error:
# Connection is not usable, return it to pool with close flag and try again
logger.warning(
f"[_get_connection] Connection health check failed (attempt {attempt + 1}/{max_retries}): {health_check_error}"
f"Connection health check failed: {health_check_error}, returning connection to pool and retrying..."
)
try:
self.connection_pool.putconn(conn, close=True)
except Exception as putconn_error:
logger.warning(
f"[_get_connection] Failed to return unhealthy connection to pool: {putconn_error}"
f"Failed to return unhealthy connection to pool: {putconn_error}"
)
with suppress(Exception):
conn.close()

conn = None
if attempt < max_retries - 1:
# Exponential backoff: 0.1s, 0.2s, 0.4s
time.sleep(0.1 * (2**attempt))
continue
else:
raise RuntimeError(
Expand All @@ -287,132 +260,62 @@ def _get_connection(self):

# Connection is healthy, return it
return conn

except psycopg2.pool.PoolError as pool_error:
# Pool exhausted or other pool-related error
# Don't retry immediately for pool exhaustion - it's unlikely to resolve quickly
error_msg = str(pool_error).lower()
if "exhausted" in error_msg or "pool" in error_msg:
# Log pool status for debugging
try:
# Try to get pool stats if available
pool_info = f"Pool config: minconn={self.connection_pool.minconn}, maxconn={self.connection_pool.maxconn}"
logger.error(
f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries}). {pool_info}"
)
except Exception:
logger.error(
f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries})"
)

# For pool exhaustion, wait longer before retry (connections may be returned)
if attempt < max_retries - 1:
# Longer backoff for pool exhaustion: 0.5s, 1.0s, 2.0s
wait_time = 0.5 * (2**attempt)
logger.info(f"[_get_connection] Waiting {wait_time}s before retry...")
time.sleep(wait_time)
continue
else:
raise RuntimeError(
f"Connection pool exhausted after {max_retries} attempts. "
f"This usually means connections are not being returned to the pool. "
f"Check for connection leaks in your code."
) from pool_error
else:
# Other pool errors - retry with normal backoff
if attempt < max_retries - 1:
time.sleep(0.1 * (2**attempt))
continue
else:
raise RuntimeError(
f"Failed to get connection from pool: {pool_error}"
) from pool_error

except Exception as e:
# Other exceptions (not pool-related)
# Only try to return connection if we actually got one
# If getconn() failed (e.g., pool exhausted), conn will be None
if conn is not None:
try:
# Return connection to pool if it's valid
self.connection_pool.putconn(conn, close=True)
# If it's a PoolError or similar, close the connection instead of returning
if "pool" in str(e).lower() or "exhausted" in str(e).lower():
with suppress(Exception):
conn.close()
else:
self.connection_pool.putconn(conn, close=True)
except Exception as putconn_error:
logger.warning(
f"[_get_connection] Failed to return connection after error: {putconn_error}"
)
logger.warning(f"Failed to handle connection after error: {putconn_error}")
with suppress(Exception):
conn.close()

if attempt >= max_retries - 1:
raise RuntimeError(f"Failed to get a valid connection from pool: {e}") from e
else:
# Exponential backoff: 0.1s, 0.2s, 0.4s
time.sleep(0.1 * (2**attempt))
time.sleep(0.1)
continue

# Should never reach here, but just in case
raise RuntimeError("Failed to get connection after all retries")

def _return_connection(self, connection):
"""
Return a connection to the pool.

This function safely returns a connection to the pool, handling:
- Closed connections (close them instead of returning)
- Pool closed state (close connection directly)
- None connections (no-op)
- putconn() failures (close connection as fallback)

Args:
connection: psycopg2 connection object or None
"""
"""Return a connection to the pool."""
if self._pool_closed:
# Pool is closed, just close the connection if it exists
if connection:
try:
connection.close()
logger.debug("[_return_connection] Closed connection (pool is closed)")
except Exception as e:
logger.warning(
f"[_return_connection] Failed to close connection after pool closed: {e}"
)
logger.warning(f"Failed to close connection after pool closed: {e}")
return

if not connection:
# No connection to return - this is normal if _get_connection() failed
# No connection to return
return

try:
# Check if connection is closed
if hasattr(connection, "closed") and connection.closed != 0:
# Connection is closed, just close it explicitly and don't return to pool
logger.debug(
"[_return_connection] Connection is closed, closing it instead of returning to pool"
)
try:
connection.close()
except Exception as e:
logger.warning(f"[_return_connection] Failed to close closed connection: {e}")
logger.warning(f"Failed to close closed connection: {e}")
return

# Connection is valid, return to pool
self.connection_pool.putconn(connection)
logger.debug("[_return_connection] Successfully returned connection to pool")
except Exception as e:
# If putconn fails, try to close the connection
# This prevents connection leaks if putconn() fails
logger.error(
f"[_return_connection] Failed to return connection to pool: {e}", exc_info=True
)
logger.warning(f"Failed to return connection to pool: {e}")
try:
connection.close()
logger.debug(
"[_return_connection] Closed connection as fallback after putconn failure"
)
except Exception as close_error:
logger.warning(
f"[_return_connection] Failed to close connection after putconn error: {close_error}"
)
logger.warning(f"Failed to close connection after putconn error: {close_error}")

def _return_connection_old(self, connection):
"""Return a connection to the pool."""
Expand Down