From ef92fde4d686f2da273afa14a6af4d4646d88b60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Thu, 11 Dec 2025 12:15:38 +0800 Subject: [PATCH 1/2] optimize pool --- src/memos/graph_dbs/polardb.py | 152 +++++++++++++++++++++++++++------ 1 file changed, 125 insertions(+), 27 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 84e6bf19f..9d90ca235 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -153,7 +153,7 @@ def __init__(self, config: PolarDBGraphDBConfig): # Create connection pool self.connection_pool = psycopg2.pool.ThreadedConnectionPool( minconn=5, - maxconn=100, + maxconn=5, host=host, port=port, user=user, @@ -201,28 +201,53 @@ def _get_connection_old(self): return conn def _get_connection(self): - """Get a connection from the pool.""" + """ + Get a connection from the pool. + + This function: + 1. Gets a connection from ThreadedConnectionPool + 2. Checks if connection is closed or unhealthy + 3. Returns healthy connection or retries (max 3 times) + 4. Handles connection pool exhaustion gracefully + + Returns: + psycopg2 connection object + + Raises: + RuntimeError: If connection pool is closed or exhausted after retries + """ if self._pool_closed: raise RuntimeError("Connection pool has been closed") - max_retries = 3 + max_retries = 5 + import psycopg2.pool + for attempt in range(max_retries): conn = None try: + # Try to get connection from pool + # This may raise PoolError if pool is exhausted conn = self.connection_pool.getconn() # Check if connection is closed if conn.closed != 0: # Connection is closed, return it to pool with close flag and try again + logger.warning( + f"[_get_connection] Got closed connection, attempt {attempt + 1}/{max_retries}" + ) try: self.connection_pool.putconn(conn, close=True) except Exception as e: - logger.warning(f"Failed to return closed connection to pool: {e}") + logger.warning( + f"[_get_connection] Failed to return closed connection to pool: {e}" + ) with suppress(Exception): conn.close() conn = None if attempt < max_retries - 1: + # Exponential backoff: 0.1s, 0.2s, 0.4s + time.sleep(0.1 * (2**attempt)) continue else: raise RuntimeError("Pool returned a closed connection after all retries") @@ -239,19 +264,21 @@ def _get_connection(self): except Exception as health_check_error: # Connection is not usable, return it to pool with close flag and try again logger.warning( - f"Connection health check failed: {health_check_error}, returning connection to pool and retrying..." + f"[_get_connection] Connection health check failed (attempt {attempt + 1}/{max_retries}): {health_check_error}" ) try: self.connection_pool.putconn(conn, close=True) except Exception as putconn_error: logger.warning( - f"Failed to return unhealthy connection to pool: {putconn_error}" + f"[_get_connection] Failed to return unhealthy connection to pool: {putconn_error}" ) with suppress(Exception): conn.close() conn = None if attempt < max_retries - 1: + # Exponential backoff: 0.1s, 0.2s, 0.4s + time.sleep(0.1 * (2**attempt)) continue else: raise RuntimeError( @@ -260,62 +287,132 @@ def _get_connection(self): # Connection is healthy, return it return conn + + except psycopg2.pool.PoolError as pool_error: + # Pool exhausted or other pool-related error + # Don't retry immediately for pool exhaustion - it's unlikely to resolve quickly + error_msg = str(pool_error).lower() + if "exhausted" in error_msg or "pool" in error_msg: + # Log pool status for debugging + try: + # Try to get pool stats if available + pool_info = f"Pool config: minconn={self.connection_pool.minconn}, maxconn={self.connection_pool.maxconn}" + logger.error( + f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries}). {pool_info}" + ) + except Exception: + logger.error( + f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries})" + ) + + # For pool exhaustion, wait longer before retry (connections may be returned) + if attempt < max_retries - 1: + # Longer backoff for pool exhaustion: 0.5s, 1.0s, 2.0s + wait_time = 0.5 * (2**attempt) + logger.info(f"[_get_connection] Waiting {wait_time}s before retry...") + time.sleep(wait_time) + continue + else: + raise RuntimeError( + f"Connection pool exhausted after {max_retries} attempts. " + f"This usually means connections are not being returned to the pool. " + f"Check for connection leaks in your code." + ) from pool_error + else: + # Other pool errors - retry with normal backoff + if attempt < max_retries - 1: + time.sleep(0.1 * (2**attempt)) + continue + else: + raise RuntimeError( + f"Failed to get connection from pool: {pool_error}" + ) from pool_error + except Exception as e: + # Other exceptions (not pool-related) # Only try to return connection if we actually got one # If getconn() failed (e.g., pool exhausted), conn will be None if conn is not None: try: - # If it's a PoolError or similar, close the connection instead of returning - if "pool" in str(e).lower() or "exhausted" in str(e).lower(): - with suppress(Exception): - conn.close() - else: - self.connection_pool.putconn(conn, close=True) + # Return connection to pool if it's valid + self.connection_pool.putconn(conn, close=True) except Exception as putconn_error: - logger.warning(f"Failed to handle connection after error: {putconn_error}") + logger.warning( + f"[_get_connection] Failed to return connection after error: {putconn_error}" + ) with suppress(Exception): conn.close() if attempt >= max_retries - 1: raise RuntimeError(f"Failed to get a valid connection from pool: {e}") from e else: - time.sleep(0.1) + # Exponential backoff: 0.1s, 0.2s, 0.4s + time.sleep(0.1 * (2**attempt)) continue + # Should never reach here, but just in case + raise RuntimeError("Failed to get connection after all retries") + def _return_connection(self, connection): - """Return a connection to the pool.""" + """ + Return a connection to the pool. + + This function safely returns a connection to the pool, handling: + - Closed connections (close them instead of returning) + - Pool closed state (close connection directly) + - None connections (no-op) + - putconn() failures (close connection as fallback) + + Args: + connection: psycopg2 connection object or None + """ if self._pool_closed: # Pool is closed, just close the connection if it exists if connection: try: connection.close() + logger.debug("[_return_connection] Closed connection (pool is closed)") except Exception as e: - logger.warning(f"Failed to close connection after pool closed: {e}") + logger.warning( + f"[_return_connection] Failed to close connection after pool closed: {e}" + ) return if not connection: - # No connection to return + # No connection to return - this is normal if _get_connection() failed return try: # Check if connection is closed if hasattr(connection, "closed") and connection.closed != 0: # Connection is closed, just close it explicitly and don't return to pool + logger.debug( + "[_return_connection] Connection is closed, closing it instead of returning to pool" + ) try: connection.close() except Exception as e: - logger.warning(f"Failed to close closed connection: {e}") + logger.warning(f"[_return_connection] Failed to close closed connection: {e}") return # Connection is valid, return to pool self.connection_pool.putconn(connection) + logger.debug("[_return_connection] Successfully returned connection to pool") except Exception as e: # If putconn fails, try to close the connection - logger.warning(f"Failed to return connection to pool: {e}") + # This prevents connection leaks if putconn() fails + logger.error( + f"[_return_connection] Failed to return connection to pool: {e}", exc_info=True + ) try: connection.close() + logger.debug( + "[_return_connection] Closed connection as fallback after putconn failure" + ) except Exception as close_error: - logger.warning(f"Failed to close connection after putconn error: {close_error}") + logger.warning( + f"[_return_connection] Failed to close connection after putconn error: {close_error}" + ) def _return_connection_old(self, connection): """Return a connection to the pool.""" @@ -1639,9 +1736,12 @@ def seach_by_keywords_like( """ params = (query_word,) - logger.info(f"[seach_by_keywords_LIKE start:] user_name: {user_name}, params: {params}") - conn = self._get_connection() + logger.info( + f"[seach_by_keywords_LIKE start:] user_name: {user_name}, query: {query}, params: {params}" + ) + conn = None try: + conn = self._get_connection() with conn.cursor() as cursor: cursor.execute(query, params) results = cursor.fetchall() @@ -1651,7 +1751,7 @@ def seach_by_keywords_like( id_val = str(oldid) output.append({"id": id_val}) logger.info( - f"[seach_by_keywords_LIKE end:] user_name: {user_name}, params: {params} recalled: {output}" + f"[seach_by_keywords_LIKE end:] user_name: {user_name}, query: {query}, params: {params} recalled: {output}" ) return output finally: @@ -1736,8 +1836,9 @@ def seach_by_keywords_tfidf( logger.info( f"[seach_by_keywords_TFIDF start:] user_name: {user_name}, query: {query}, params: {params}" ) - conn = self._get_connection() + conn = None try: + conn = self._get_connection() with conn.cursor() as cursor: cursor.execute(query, params) results = cursor.fetchall() @@ -1747,9 +1848,6 @@ def seach_by_keywords_tfidf( id_val = str(oldid) output.append({"id": id_val}) - logger.info( - f"[seach_by_keywords_TFIDF end:] user_name: {user_name}, query: {query}, params: {params} recalled: {output}" - ) logger.info( f"[seach_by_keywords_TFIDF end:] user_name: {user_name}, query: {query}, params: {params} recalled: {output}" ) From 7f2c0ddb3ecd0b93fcd95c5ffca353d1f8322055 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Thu, 11 Dec 2025 12:20:48 +0800 Subject: [PATCH 2/2] fix --- src/memos/graph_dbs/polardb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 9d90ca235..588011d51 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -153,7 +153,7 @@ def __init__(self, config: PolarDBGraphDBConfig): # Create connection pool self.connection_pool = psycopg2.pool.ThreadedConnectionPool( minconn=5, - maxconn=5, + maxconn=100, host=host, port=port, user=user,