diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 588011d51..8dff5824a 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -201,53 +201,28 @@ def _get_connection_old(self): return conn def _get_connection(self): - """ - Get a connection from the pool. - - This function: - 1. Gets a connection from ThreadedConnectionPool - 2. Checks if connection is closed or unhealthy - 3. Returns healthy connection or retries (max 3 times) - 4. Handles connection pool exhaustion gracefully - - Returns: - psycopg2 connection object - - Raises: - RuntimeError: If connection pool is closed or exhausted after retries - """ + """Get a connection from the pool.""" if self._pool_closed: raise RuntimeError("Connection pool has been closed") - max_retries = 5 - import psycopg2.pool - + max_retries = 3 for attempt in range(max_retries): conn = None try: - # Try to get connection from pool - # This may raise PoolError if pool is exhausted conn = self.connection_pool.getconn() # Check if connection is closed if conn.closed != 0: # Connection is closed, return it to pool with close flag and try again - logger.warning( - f"[_get_connection] Got closed connection, attempt {attempt + 1}/{max_retries}" - ) try: self.connection_pool.putconn(conn, close=True) except Exception as e: - logger.warning( - f"[_get_connection] Failed to return closed connection to pool: {e}" - ) + logger.warning(f"Failed to return closed connection to pool: {e}") with suppress(Exception): conn.close() conn = None if attempt < max_retries - 1: - # Exponential backoff: 0.1s, 0.2s, 0.4s - time.sleep(0.1 * (2**attempt)) continue else: raise RuntimeError("Pool returned a closed connection after all retries") @@ -264,21 +239,19 @@ def _get_connection(self): except Exception as health_check_error: # Connection is not usable, return it to pool with close flag and try again logger.warning( - f"[_get_connection] Connection health check failed (attempt {attempt + 1}/{max_retries}): {health_check_error}" + f"Connection health check failed: {health_check_error}, returning connection to pool and retrying..." ) try: self.connection_pool.putconn(conn, close=True) except Exception as putconn_error: logger.warning( - f"[_get_connection] Failed to return unhealthy connection to pool: {putconn_error}" + f"Failed to return unhealthy connection to pool: {putconn_error}" ) with suppress(Exception): conn.close() conn = None if attempt < max_retries - 1: - # Exponential backoff: 0.1s, 0.2s, 0.4s - time.sleep(0.1 * (2**attempt)) continue else: raise RuntimeError( @@ -287,132 +260,62 @@ def _get_connection(self): # Connection is healthy, return it return conn - - except psycopg2.pool.PoolError as pool_error: - # Pool exhausted or other pool-related error - # Don't retry immediately for pool exhaustion - it's unlikely to resolve quickly - error_msg = str(pool_error).lower() - if "exhausted" in error_msg or "pool" in error_msg: - # Log pool status for debugging - try: - # Try to get pool stats if available - pool_info = f"Pool config: minconn={self.connection_pool.minconn}, maxconn={self.connection_pool.maxconn}" - logger.error( - f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries}). {pool_info}" - ) - except Exception: - logger.error( - f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries})" - ) - - # For pool exhaustion, wait longer before retry (connections may be returned) - if attempt < max_retries - 1: - # Longer backoff for pool exhaustion: 0.5s, 1.0s, 2.0s - wait_time = 0.5 * (2**attempt) - logger.info(f"[_get_connection] Waiting {wait_time}s before retry...") - time.sleep(wait_time) - continue - else: - raise RuntimeError( - f"Connection pool exhausted after {max_retries} attempts. " - f"This usually means connections are not being returned to the pool. " - f"Check for connection leaks in your code." - ) from pool_error - else: - # Other pool errors - retry with normal backoff - if attempt < max_retries - 1: - time.sleep(0.1 * (2**attempt)) - continue - else: - raise RuntimeError( - f"Failed to get connection from pool: {pool_error}" - ) from pool_error - except Exception as e: - # Other exceptions (not pool-related) # Only try to return connection if we actually got one # If getconn() failed (e.g., pool exhausted), conn will be None if conn is not None: try: - # Return connection to pool if it's valid - self.connection_pool.putconn(conn, close=True) + # If it's a PoolError or similar, close the connection instead of returning + if "pool" in str(e).lower() or "exhausted" in str(e).lower(): + with suppress(Exception): + conn.close() + else: + self.connection_pool.putconn(conn, close=True) except Exception as putconn_error: - logger.warning( - f"[_get_connection] Failed to return connection after error: {putconn_error}" - ) + logger.warning(f"Failed to handle connection after error: {putconn_error}") with suppress(Exception): conn.close() if attempt >= max_retries - 1: raise RuntimeError(f"Failed to get a valid connection from pool: {e}") from e else: - # Exponential backoff: 0.1s, 0.2s, 0.4s - time.sleep(0.1 * (2**attempt)) + time.sleep(0.1) continue - # Should never reach here, but just in case - raise RuntimeError("Failed to get connection after all retries") - def _return_connection(self, connection): - """ - Return a connection to the pool. - - This function safely returns a connection to the pool, handling: - - Closed connections (close them instead of returning) - - Pool closed state (close connection directly) - - None connections (no-op) - - putconn() failures (close connection as fallback) - - Args: - connection: psycopg2 connection object or None - """ + """Return a connection to the pool.""" if self._pool_closed: # Pool is closed, just close the connection if it exists if connection: try: connection.close() - logger.debug("[_return_connection] Closed connection (pool is closed)") except Exception as e: - logger.warning( - f"[_return_connection] Failed to close connection after pool closed: {e}" - ) + logger.warning(f"Failed to close connection after pool closed: {e}") return if not connection: - # No connection to return - this is normal if _get_connection() failed + # No connection to return return try: # Check if connection is closed if hasattr(connection, "closed") and connection.closed != 0: # Connection is closed, just close it explicitly and don't return to pool - logger.debug( - "[_return_connection] Connection is closed, closing it instead of returning to pool" - ) try: connection.close() except Exception as e: - logger.warning(f"[_return_connection] Failed to close closed connection: {e}") + logger.warning(f"Failed to close closed connection: {e}") return # Connection is valid, return to pool self.connection_pool.putconn(connection) - logger.debug("[_return_connection] Successfully returned connection to pool") except Exception as e: # If putconn fails, try to close the connection - # This prevents connection leaks if putconn() fails - logger.error( - f"[_return_connection] Failed to return connection to pool: {e}", exc_info=True - ) + logger.warning(f"Failed to return connection to pool: {e}") try: connection.close() - logger.debug( - "[_return_connection] Closed connection as fallback after putconn failure" - ) except Exception as close_error: - logger.warning( - f"[_return_connection] Failed to close connection after putconn error: {close_error}" - ) + logger.warning(f"Failed to close connection after putconn error: {close_error}") def _return_connection_old(self, connection): """Return a connection to the pool."""