From 5e288f702d1b77d229e8350a1d3c81f773ba84b8 Mon Sep 17 00:00:00 2001 From: JY Tan Date: Tue, 27 Jan 2026 21:56:30 -0800 Subject: [PATCH] Commit --- .../psycopg/instrumentation.py | 790 +++++++++++++++++- drift/instrumentation/psycopg/mocks.py | 228 +++++ .../psycopg2/instrumentation.py | 92 +- .../django-postgres/src/test_requests.py | 52 +- .../django-redis/.tusk/config.yaml | 3 + .../django-redis/src/test_requests.py | 9 +- .../fastapi-postgres/requirements.txt | 1 + drift/stack-tests/fastapi-postgres/src/app.py | 3 +- .../fastapi-postgres/src/test_requests.py | 52 +- 9 files changed, 1146 insertions(+), 84 deletions(-) diff --git a/drift/instrumentation/psycopg/instrumentation.py b/drift/instrumentation/psycopg/instrumentation.py index 5170703..f1ede88 100644 --- a/drift/instrumentation/psycopg/instrumentation.py +++ b/drift/instrumentation/psycopg/instrumentation.py @@ -112,6 +112,9 @@ def patched_connect(*args, **kwargs): module.connect = patched_connect # type: ignore[attr-defined] logger.debug("psycopg.connect instrumented") + # Patch async connection (AsyncConnection.connect) + self._patch_async_connection(module) + # Patch Pipeline class for pipeline mode support self._patch_pipeline_class(module) @@ -159,6 +162,183 @@ def patched_exit(pipeline_self, exc_type, exc_val, exc_tb): Pipeline.__exit__ = patched_exit logger.debug("psycopg.Pipeline.__exit__ instrumented") + # Also patch AsyncConnection.pipeline for async pipeline support + self._patch_async_pipeline(module) + + def _patch_async_pipeline(self, module: ModuleType) -> None: + """Patch AsyncConnection.pipeline to finalize spans on exit.""" + try: + from psycopg import AsyncConnection + except ImportError: + logger.debug("psycopg.AsyncConnection not available, skipping async pipeline instrumentation") + return + + import contextlib + + instrumentation = self + original_pipeline = AsyncConnection.pipeline + + @contextlib.asynccontextmanager + async def patched_async_pipeline(conn_self): + """Patched AsyncConnection.pipeline that finalizes pending spans on exit.""" + async with original_pipeline(conn_self): + yield + # After the pipeline context exits, finalize pending spans + instrumentation._finalize_pending_pipeline_spans(conn_self) + + AsyncConnection.pipeline = patched_async_pipeline + logger.debug("psycopg.AsyncConnection.pipeline instrumented") + + def _patch_async_connection(self, module: ModuleType) -> None: + """Patch async psycopg connection (AsyncConnection.connect) and AsyncConnectionPool.""" + try: + from psycopg import AsyncConnection + except ImportError: + logger.debug("psycopg.AsyncConnection not available, skipping async instrumentation") + return + + instrumentation = self + original_async_connect = AsyncConnection.connect + + async def patched_async_connect(conninfo="", **kwargs): + """Patched AsyncConnection.connect method.""" + sdk = TuskDrift.get_instance() + logger.debug(f"[PATCHED_ASYNC_CONNECT] Called with mode={sdk.mode}") + + if sdk.mode == TuskDriftMode.DISABLED: + return await original_async_connect(conninfo, **kwargs) + + user_cursor_factory = kwargs.pop("cursor_factory", None) + user_row_factory = kwargs.pop("row_factory", None) + cursor_factory = instrumentation._create_async_cursor_factory(sdk, user_cursor_factory) + + # In REPLAY mode, try to connect but fall back to mock connection if DB is unavailable + if sdk.mode == TuskDriftMode.REPLAY: + try: + kwargs["cursor_factory"] = cursor_factory + if user_row_factory is not None: + kwargs["row_factory"] = user_row_factory + connection = await original_async_connect(conninfo, **kwargs) + logger.info( + "[PATCHED_ASYNC_CONNECT] REPLAY mode: Successfully connected to database (psycopg3 async)" + ) + return connection + except Exception as e: + logger.info( + f"[PATCHED_ASYNC_CONNECT] REPLAY mode: Database connection failed ({e}), using mock async connection" + ) + # Import mock async connection + from .mocks import MockAsyncConnection + + return MockAsyncConnection(sdk, instrumentation, cursor_factory, row_factory=user_row_factory) + + # In RECORD mode, always require real connection + kwargs["cursor_factory"] = cursor_factory + if user_row_factory is not None: + kwargs["row_factory"] = user_row_factory + connection = await original_async_connect(conninfo, **kwargs) + logger.debug("[PATCHED_ASYNC_CONNECT] RECORD mode: Connected to database (psycopg3 async)") + return connection + + # Replace the classmethod with our patched version + AsyncConnection.connect = classmethod( + lambda cls, conninfo="", **kwargs: patched_async_connect(conninfo, **kwargs) + ) # type: ignore[method-assign] + logger.info("psycopg.AsyncConnection.connect instrumented") + + # Also patch AsyncConnectionPool to inject cursor_factory + self._patch_async_connection_pool(module) + + def _patch_async_connection_pool(self, module: ModuleType) -> None: + """Patch AsyncConnectionPool to inject cursor_factory into connections.""" + try: + from psycopg_pool import AsyncConnectionPool + except ImportError: + logger.debug("psycopg_pool.AsyncConnectionPool not available, skipping pool instrumentation") + return + + instrumentation = self + original_init = AsyncConnectionPool.__init__ + + def patched_init(pool_self, conninfo="", **kwargs): + """Patched AsyncConnectionPool.__init__ to inject cursor_factory. + + Note: AsyncConnectionPool has a 'kwargs' PARAMETER (a dict) that gets passed + to connections when they're created. We need to inject cursor_factory into + that dict, not into the function call's **kwargs. + """ + sdk = TuskDrift.get_instance() + + if sdk.mode != TuskDriftMode.DISABLED: + # Get or create the 'kwargs' parameter (dict passed to connections) + conn_kwargs = kwargs.get("kwargs") or {} + if "cursor_factory" not in conn_kwargs: + user_cursor_factory = None + cursor_factory = instrumentation._create_async_cursor_factory(sdk, user_cursor_factory) + conn_kwargs["cursor_factory"] = cursor_factory + kwargs["kwargs"] = conn_kwargs + logger.debug("[PATCHED_ASYNC_POOL] Injected cursor_factory into AsyncConnectionPool.kwargs") + + return original_init(pool_self, conninfo, **kwargs) + + AsyncConnectionPool.__init__ = patched_init # type: ignore[method-assign] + logger.info("psycopg_pool.AsyncConnectionPool.__init__ instrumented") + + def _create_async_cursor_factory(self, sdk: TuskDrift, base_factory=None): + """Create an async cursor factory that wraps async cursors with instrumentation. + + Returns a cursor CLASS (psycopg3 expects a class, not a function). + """ + instrumentation = self + logger.debug(f"[ASYNC_CURSOR_FACTORY] Creating async cursor factory, sdk.mode={sdk.mode}") + + try: + from psycopg import AsyncCursor as BaseAsyncCursor + except ImportError: + logger.warning("[ASYNC_CURSOR_FACTORY] Could not import psycopg.AsyncCursor") + BaseAsyncCursor = object # type: ignore + + base = base_factory or BaseAsyncCursor + + class InstrumentedAsyncCursor(base): + """Instrumented async cursor with tracing support.""" + + _tusk_description = None # Store mock description for replay mode + + @property + def description(self): + # In replay mode, return mock description if set; otherwise use base + if self._tusk_description is not None: + return self._tusk_description + return super().description + + @property + def rownumber(self): + # In captured mode, return tracked index + if hasattr(self, "_tusk_rows") and self._tusk_rows is not None: + return self._tusk_index + # In replay mode with mock data, return mock index + if hasattr(self, "_mock_rows") and self._mock_rows is not None: + return self._mock_index + return super().rownumber + + @property + def statusmessage(self): + # In replay mode with mock data, return mock statusmessage + if hasattr(self, "_mock_statusmessage"): + return self._mock_statusmessage + return super().statusmessage + + async def execute(self, query, params=None, **kwargs): + return await instrumentation._traced_async_execute(self, super().execute, sdk, query, params, **kwargs) + + async def executemany(self, query, params_seq, **kwargs): + return await instrumentation._traced_async_executemany( + self, super().executemany, sdk, query, params_seq, **kwargs + ) + + return InstrumentedAsyncCursor + def _create_cursor_factory(self, sdk: TuskDrift, base_factory=None): """Create a cursor factory that wraps cursors with instrumentation. @@ -375,8 +555,12 @@ def _noop_execute(self, cursor: Any) -> Any: cursor._mock_index = 0 # pyright: ignore return cursor - def _replay_execute(self, cursor: Any, sdk: TuskDrift, query_str: str, params: Any) -> Any: - """Handle REPLAY mode for execute - fetch mock from CLI.""" + def _replay_execute(self, cursor: Any, sdk: TuskDrift, query_str: str, params: Any, is_async: bool = False) -> Any: + """Handle REPLAY mode for execute - fetch mock from CLI. + + Args: + is_async: If True, wrap fetch methods as async functions for async cursors. + """ span_info = self._create_query_span(sdk, "query") if not span_info: @@ -393,7 +577,7 @@ def _replay_execute(self, cursor: Any, sdk: TuskDrift, query_str: str, params: A f"Query: {query_str[:100]}..." ) - self._mock_execute_with_data(cursor, mock_result) + self._mock_execute_with_data(cursor, mock_result, is_async=is_async) span_info.span.end() return cursor @@ -601,6 +785,156 @@ def _record_executemany( except Exception: pass + async def _traced_async_execute( + self, cursor: Any, original_execute: Any, sdk: TuskDrift, query: str, params=None, **kwargs + ) -> Any: + """Traced async cursor.execute method.""" + if sdk.mode == TuskDriftMode.DISABLED: + return await original_execute(query, params, **kwargs) + + query_str = self._query_to_string(query, cursor) + + if sdk.mode == TuskDriftMode.REPLAY: + # For async replay, pass is_async=True to wrap fetch methods as async + return self._replay_execute(cursor, sdk, query_str, params, is_async=True) + + # RECORD mode - use async record handler + return await self._record_async_execute(cursor, original_execute, sdk, query, query_str, params, kwargs) + + async def _record_async_execute( + self, + cursor: Any, + original_execute: Any, + sdk: TuskDrift, + query: str, + query_str: str, + params: Any, + kwargs: dict, + ) -> Any: + """Handle RECORD mode for async execute - create span and execute query.""" + is_pre_app_start = not sdk.app_ready + + # Reset cursor state from any previous execute() on this cursor + if hasattr(cursor, "_tusk_patched"): + for attr in ("fetchone", "fetchmany", "fetchall", "scroll"): + if attr in cursor.__dict__: + delattr(cursor, attr) + cursor._tusk_rows = None + cursor._tusk_index = 0 + del cursor._tusk_patched + + span_info = self._create_query_span(sdk, "query", is_pre_app_start) + + if not span_info: + return await original_execute(query, params, **kwargs) + + error = None + + # Check if we're in pipeline mode BEFORE executing + in_pipeline_mode = self._is_in_pipeline_mode(cursor) + + with SpanUtils.with_span(span_info): + try: + result = await original_execute(query, params, **kwargs) + return result + except Exception as e: + error = e + raise + finally: + try: + if error is not None: + self._finalize_query_span(span_info.span, cursor, query_str, params, error) + span_info.span.end() + elif in_pipeline_mode: + # For pipeline mode, set up fetch interception to capture data when user fetches + # We can't defer to pipeline exit because user fetches inside the pipeline + self._setup_async_pipeline_fetch_interception(cursor, span_info, query_str, params) + else: + span_finalized = self._finalize_query_span(span_info.span, cursor, query_str, params, None) + if span_finalized: + span_info.span.end() + except Exception as e: + logger.error(f"Error in async span finalization: {e}") + try: + span_info.span.end() + except Exception: + pass + + async def _traced_async_executemany( + self, cursor: Any, original_executemany: Any, sdk: TuskDrift, query: str, params_seq, **kwargs + ) -> Any: + """Traced async cursor.executemany method.""" + if sdk.mode == TuskDriftMode.DISABLED: + return await original_executemany(query, params_seq, **kwargs) + + query_str = self._query_to_string(query, cursor) + params_list = list(params_seq) + returning = kwargs.get("returning", False) + + if sdk.mode == TuskDriftMode.REPLAY: + # For async replay, use sync replay handler + return self._replay_executemany(cursor, sdk, query_str, params_list, returning) + + # RECORD mode + return await self._record_async_executemany( + cursor, original_executemany, sdk, query, query_str, params_list, kwargs, returning + ) + + async def _record_async_executemany( + self, + cursor: Any, + original_executemany: Any, + sdk: TuskDrift, + query: str, + query_str: str, + params_list: list, + kwargs: dict, + returning: bool = False, + ) -> Any: + """Handle RECORD mode for async executemany - create span and execute query.""" + is_pre_app_start = not sdk.app_ready + span_info = self._create_query_span(sdk, "query", is_pre_app_start) + + if not span_info: + return await original_executemany(query, params_list, **kwargs) + + error = None + + with SpanUtils.with_span(span_info): + try: + result = await original_executemany(query, params_list, **kwargs) + return result + except Exception as e: + error = e + raise + finally: + try: + if returning and error is None: + self._finalize_executemany_returning_span( + span_info.span, + cursor, + query_str, + {"_batch": params_list, "_returning": True}, + error, + ) + span_info.span.end() + else: + span_finalized = self._finalize_query_span( + span_info.span, + cursor, + query_str, + {"_batch": params_list}, + error, + ) + if span_finalized: + span_info.span.end() + except Exception as e: + logger.error(f"Error in async span finalization: {e}") + try: + span_info.span.end() + except Exception: + pass + def _traced_stream( self, cursor: Any, original_stream: Any, sdk: TuskDrift, query: str, params=None, **kwargs ) -> Any: @@ -1293,8 +1627,12 @@ def _try_get_mock( logger.error(f"Error getting mock for psycopg query: {e}") return None - def _mock_execute_with_data(self, cursor: Any, mock_data: dict[str, Any]) -> None: - """Mock cursor execute by setting internal state.""" + def _mock_execute_with_data(self, cursor: Any, mock_data: dict[str, Any], is_async: bool = False) -> None: + """Mock cursor execute by setting internal state. + + Args: + is_async: If True, wrap fetch methods as async functions for async cursors. + """ # The SDK communicator already extracts response.body from the CLI's MockInteraction structure # So mock_data should already contain: {"rowcount": ..., "description": [...], "rows": [...]} actual_data = mock_data @@ -1330,9 +1668,31 @@ def _mock_execute_with_data(self, cursor: Any, mock_data: dict[str, Any]) -> Non # Use helper methods to create fetch and scroll methods fetchone, fetchmany, fetchall = self._create_fetch_methods(cursor, "_mock_rows", "_mock_index", transform_row) - cursor.fetchone = fetchone # pyright: ignore[reportAttributeAccessIssue] - cursor.fetchmany = fetchmany # pyright: ignore[reportAttributeAccessIssue] - cursor.fetchall = fetchall # pyright: ignore[reportAttributeAccessIssue] + + if is_async: + # Wrap sync fetch methods in async for async cursors + sync_fetchone = fetchone + sync_fetchmany = fetchmany + sync_fetchall = fetchall + + async def async_fetchone(): + return sync_fetchone() + + async def async_fetchmany(size=None): + if size is None: + size = cursor.arraysize + return sync_fetchmany(size) + + async def async_fetchall(): + return sync_fetchall() + + cursor.fetchone = async_fetchone # pyright: ignore[reportAttributeAccessIssue] + cursor.fetchmany = async_fetchmany # pyright: ignore[reportAttributeAccessIssue] + cursor.fetchall = async_fetchall # pyright: ignore[reportAttributeAccessIssue] + else: + cursor.fetchone = fetchone # pyright: ignore[reportAttributeAccessIssue] + cursor.fetchmany = fetchmany # pyright: ignore[reportAttributeAccessIssue] + cursor.fetchall = fetchall # pyright: ignore[reportAttributeAccessIssue] cursor.scroll = self._create_scroll_method(cursor, "_mock_rows", "_mock_index") # pyright: ignore[reportAttributeAccessIssue] @@ -1710,12 +2070,22 @@ def _setup_lazy_capture(self, cursor: Any) -> None: requests results. This avoids issues with binary format and other cases where calling fetchall() immediately after execute() can hang. """ + import inspect + # Get references to original fetch methods from the cursor's class # (not instance methods which might already be patched) cursor_class = type(cursor) original_fetchall = cursor_class.fetchall original_scroll = cursor_class.scroll if hasattr(cursor_class, "scroll") else None + # Detect if this is an async cursor + is_async_cursor = inspect.iscoroutinefunction(original_fetchall) + + if is_async_cursor: + # For async cursors, set up async lazy capture + self._setup_async_lazy_capture(cursor, original_fetchall, original_scroll) + return + def do_lazy_capture(): """Perform the actual capture - called on first fetch.""" if hasattr(cursor, "_tusk_rows") and cursor._tusk_rows is not None: @@ -1856,6 +2226,410 @@ def lazy_scroll(value: int, mode: str = "relative") -> None: cursor.scroll = lazy_scroll # pyright: ignore[reportAttributeAccessIssue] cursor._tusk_patched = True # pyright: ignore[reportAttributeAccessIssue] + def _setup_async_lazy_capture(self, cursor: Any, original_fetchall: Any, original_scroll: Any) -> None: + """Set up async lazy capture wrappers for async cursors. + + Similar to _setup_lazy_capture but handles async cursors where fetchall() + returns a coroutine that must be awaited. + """ + + async def do_async_lazy_capture(): + """Perform the actual capture for async cursor - called on first fetch.""" + if hasattr(cursor, "_tusk_rows") and cursor._tusk_rows is not None: + return # Already captured + + try: + # Get the actual rows from psycopg - AWAIT for async cursor + all_rows = await original_fetchall(cursor) + + # Store for subsequent fetch calls + cursor._tusk_rows = all_rows # pyright: ignore[reportAttributeAccessIssue] + cursor._tusk_index = 0 # pyright: ignore[reportAttributeAccessIssue] + + # Process rows for trace capture + description = cursor._tusk_lazy_description # pyright: ignore[reportAttributeAccessIssue] + row_factory_type = cursor._tusk_lazy_row_factory_type # pyright: ignore[reportAttributeAccessIssue] + column_names = cursor._tusk_lazy_column_names # pyright: ignore[reportAttributeAccessIssue] + + rows = [] + for row in all_rows: + if row_factory_type == "kwargs": + rows.append(row) + elif row_factory_type == "scalar": + rows.append([row]) + elif row_factory_type == "class" or hasattr(row, "__dataclass_fields__"): + rows.append([getattr(row, col, None) for col in column_names]) + elif isinstance(row, dict): + rows.append([row.get(col) for col in column_names]) + elif hasattr(row, "_fields"): + rows.append([getattr(row, col, None) for col in column_names]) + else: + rows.append(list(row)) + + # Finalize the span with captured data + span = cursor._tusk_lazy_span # pyright: ignore[reportAttributeAccessIssue] + input_value = cursor._tusk_lazy_input_value # pyright: ignore[reportAttributeAccessIssue] + instrumentation = cursor._tusk_lazy_instrumentation # pyright: ignore[reportAttributeAccessIssue] + + output_value = { + "rowcount": cursor.rowcount if hasattr(cursor, "rowcount") else -1, + } + + if description: + output_value["description"] = description + + if rows: + if row_factory_type == "kwargs": + serialized_rows = [serialize_value(row) for row in rows] + else: + serialized_rows = [[serialize_value(col) for col in row] for row in rows] + output_value["rows"] = serialized_rows + + if hasattr(cursor, "statusmessage") and cursor.statusmessage is not None: + output_value["statusmessage"] = cursor.statusmessage + + instrumentation._set_span_attributes(span, input_value, output_value) + + span.set_status(Status(OTelStatusCode.OK)) + span.end() + + logger.debug("[PSYCOPG] Async lazy capture completed, span finalized") + + except Exception as e: + logger.error(f"Error in async lazy capture: {e}") + try: + span = cursor._tusk_lazy_span + span.set_status(Status(OTelStatusCode.ERROR, str(e))) + span.end() + except Exception: + pass + raise + + finally: + # Clean up lazy capture attributes + for attr in ( + "_tusk_lazy_span", + "_tusk_lazy_input_value", + "_tusk_lazy_description", + "_tusk_lazy_row_factory_type", + "_tusk_lazy_column_names", + "_tusk_lazy_instrumentation", + ): + if hasattr(cursor, attr): + try: + delattr(cursor, attr) + except AttributeError: + pass + + async def async_lazy_fetchone(): + await do_async_lazy_capture() + if cursor._tusk_index < len(cursor._tusk_rows): # pyright: ignore[reportAttributeAccessIssue] + row = cursor._tusk_rows[cursor._tusk_index] # pyright: ignore[reportAttributeAccessIssue] + cursor._tusk_index += 1 # pyright: ignore[reportAttributeAccessIssue] + return row + return None + + async def async_lazy_fetchmany(size=None): + await do_async_lazy_capture() + if size is None: + size = cursor.arraysize + result = cursor._tusk_rows[cursor._tusk_index : cursor._tusk_index + size] # pyright: ignore[reportAttributeAccessIssue] + cursor._tusk_index += len(result) # pyright: ignore[reportAttributeAccessIssue] + return result + + async def async_lazy_fetchall(): + await do_async_lazy_capture() + result = cursor._tusk_rows[cursor._tusk_index :] # pyright: ignore[reportAttributeAccessIssue] + cursor._tusk_index = len(cursor._tusk_rows) # pyright: ignore[reportAttributeAccessIssue] + return result + + async def async_lazy_scroll(value: int, mode: str = "relative") -> None: + await do_async_lazy_capture() + if mode == "relative": + newpos = cursor._tusk_index + value # pyright: ignore[reportAttributeAccessIssue] + elif mode == "absolute": + newpos = value + else: + raise ValueError(f"bad mode: {mode}. It should be 'relative' or 'absolute'") + + num_rows = len(cursor._tusk_rows) # pyright: ignore[reportAttributeAccessIssue] + if num_rows > 0: + if not (0 <= newpos < num_rows): + raise IndexError("cursor position out of range") + elif newpos != 0: + raise IndexError("cursor position out of range") + + cursor._tusk_index = newpos # pyright: ignore[reportAttributeAccessIssue] + + # Patch the cursor with async lazy wrappers + cursor.fetchone = async_lazy_fetchone # pyright: ignore[reportAttributeAccessIssue] + cursor.fetchmany = async_lazy_fetchmany # pyright: ignore[reportAttributeAccessIssue] + cursor.fetchall = async_lazy_fetchall # pyright: ignore[reportAttributeAccessIssue] + if original_scroll: + cursor.scroll = async_lazy_scroll # pyright: ignore[reportAttributeAccessIssue] + cursor._tusk_patched = True # pyright: ignore[reportAttributeAccessIssue] + + logger.debug("[PSYCOPG] Async lazy capture wrappers installed") + + def _setup_async_pipeline_fetch_interception( + self, + cursor: Any, + span_info: Any, + query: str, + params: Any, + ) -> None: + """Set up fetch interception for async pipeline mode. + + In pipeline mode, we can't wait for pipeline exit to capture data because + users fetch data inside the pipeline. This method wraps fetch methods to + capture data when the user calls them. + """ + # Get references to original fetch methods + cursor_class = type(cursor) + original_fetchone = cursor_class.fetchone + original_fetchmany = cursor_class.fetchmany + original_fetchall = cursor_class.fetchall + + instrumentation = self + captured = {"done": False, "rows": None} + + async def capturing_fetchall(): + """Intercept fetchall to capture data for the span.""" + # Call original fetchall + rows = await original_fetchall(cursor) + + # Capture on first fetch + if not captured["done"]: + captured["done"] = True + captured["rows"] = rows + + # Finalize the span with captured data + try: + description = None + if cursor.description: + description = [ + { + "name": desc[0] if hasattr(desc, "__getitem__") else desc.name, + "type_code": desc[1] + if hasattr(desc, "__getitem__") and len(desc) > 1 + else getattr(desc, "type_code", None), + } + for desc in cursor.description + ] + + row_factory = getattr(cursor, "row_factory", None) + if row_factory is None: + conn = getattr(cursor, "connection", None) + if conn: + row_factory = getattr(conn, "row_factory", None) + row_factory_type = instrumentation._detect_row_factory_type(row_factory) + column_names = [d["name"] for d in description] if description else [] + + processed_rows = [] + for row in rows: + if row_factory_type == "kwargs": + processed_rows.append(row) + elif row_factory_type == "scalar": + processed_rows.append([row]) + elif row_factory_type == "class" or hasattr(row, "__dataclass_fields__"): + processed_rows.append([getattr(row, str(col), None) for col in column_names]) + elif isinstance(row, dict): + processed_rows.append([row.get(col) for col in column_names]) + elif hasattr(row, "_fields"): + processed_rows.append([getattr(row, str(col), None) for col in column_names]) + else: + processed_rows.append(list(row)) + + input_value = {"query": query.strip()} + if params is not None: + input_value["parameters"] = serialize_value(params) + + output_value = { + "rowcount": cursor.rowcount if hasattr(cursor, "rowcount") else -1, + } + if description: + output_value["description"] = description + if processed_rows: + if row_factory_type == "kwargs": + output_value["rows"] = [serialize_value(row) for row in processed_rows] + else: + output_value["rows"] = [[serialize_value(col) for col in row] for row in processed_rows] + + instrumentation._set_span_attributes(span_info.span, input_value, output_value) + span_info.span.set_status(Status(OTelStatusCode.OK)) + span_info.span.end() + logger.debug("[PSYCOPG] Pipeline fetch interception: span finalized") + + except Exception as e: + logger.error(f"[PSYCOPG] Error in pipeline fetch interception: {e}") + try: + span_info.span.set_status(Status(OTelStatusCode.ERROR, str(e))) + span_info.span.end() + except Exception: + pass + + return rows + + async def capturing_fetchone(): + """Intercept fetchone to capture data for the span.""" + row = await original_fetchone(cursor) + + if not captured["done"]: + captured["done"] = True + # For fetchone, we capture just this row + rows = [row] if row is not None else [] + captured["rows"] = rows + + try: + description = None + if cursor.description: + description = [ + { + "name": desc[0] if hasattr(desc, "__getitem__") else desc.name, + "type_code": desc[1] + if hasattr(desc, "__getitem__") and len(desc) > 1 + else getattr(desc, "type_code", None), + } + for desc in cursor.description + ] + + row_factory = getattr(cursor, "row_factory", None) + if row_factory is None: + conn = getattr(cursor, "connection", None) + if conn: + row_factory = getattr(conn, "row_factory", None) + row_factory_type = instrumentation._detect_row_factory_type(row_factory) + column_names = [d["name"] for d in description] if description else [] + + processed_rows = [] + for r in rows: + if row_factory_type == "kwargs": + processed_rows.append(r) + elif row_factory_type == "scalar": + processed_rows.append([r]) + elif row_factory_type == "class" or hasattr(r, "__dataclass_fields__"): + processed_rows.append([getattr(r, str(col), None) for col in column_names]) + elif isinstance(r, dict): + processed_rows.append([r.get(col) for col in column_names]) + elif hasattr(r, "_fields"): + processed_rows.append([getattr(r, str(col), None) for col in column_names]) + else: + processed_rows.append(list(r)) + + input_value = {"query": query.strip()} + if params is not None: + input_value["parameters"] = serialize_value(params) + + output_value = { + "rowcount": cursor.rowcount if hasattr(cursor, "rowcount") else -1, + } + if description: + output_value["description"] = description + if processed_rows: + if row_factory_type == "kwargs": + output_value["rows"] = [serialize_value(r) for r in processed_rows] + else: + output_value["rows"] = [[serialize_value(col) for col in r] for r in processed_rows] + + instrumentation._set_span_attributes(span_info.span, input_value, output_value) + span_info.span.set_status(Status(OTelStatusCode.OK)) + span_info.span.end() + logger.debug("[PSYCOPG] Pipeline fetchone interception: span finalized") + + except Exception as e: + logger.error(f"[PSYCOPG] Error in pipeline fetchone interception: {e}") + try: + span_info.span.set_status(Status(OTelStatusCode.ERROR, str(e))) + span_info.span.end() + except Exception: + pass + + return row + + async def capturing_fetchmany(size=None): + """Intercept fetchmany to capture data for the span.""" + if size is None: + size = cursor.arraysize + rows = await original_fetchmany(cursor, size) + + if not captured["done"]: + captured["done"] = True + captured["rows"] = rows + + try: + description = None + if cursor.description: + description = [ + { + "name": desc[0] if hasattr(desc, "__getitem__") else desc.name, + "type_code": desc[1] + if hasattr(desc, "__getitem__") and len(desc) > 1 + else getattr(desc, "type_code", None), + } + for desc in cursor.description + ] + + row_factory = getattr(cursor, "row_factory", None) + if row_factory is None: + conn = getattr(cursor, "connection", None) + if conn: + row_factory = getattr(conn, "row_factory", None) + row_factory_type = instrumentation._detect_row_factory_type(row_factory) + column_names = [d["name"] for d in description] if description else [] + + processed_rows = [] + for r in rows: + if row_factory_type == "kwargs": + processed_rows.append(r) + elif row_factory_type == "scalar": + processed_rows.append([r]) + elif row_factory_type == "class" or hasattr(r, "__dataclass_fields__"): + processed_rows.append([getattr(r, str(col), None) for col in column_names]) + elif isinstance(r, dict): + processed_rows.append([r.get(col) for col in column_names]) + elif hasattr(r, "_fields"): + processed_rows.append([getattr(r, str(col), None) for col in column_names]) + else: + processed_rows.append(list(r)) + + input_value = {"query": query.strip()} + if params is not None: + input_value["parameters"] = serialize_value(params) + + output_value = { + "rowcount": cursor.rowcount if hasattr(cursor, "rowcount") else -1, + } + if description: + output_value["description"] = description + if processed_rows: + if row_factory_type == "kwargs": + output_value["rows"] = [serialize_value(r) for r in processed_rows] + else: + output_value["rows"] = [[serialize_value(col) for col in r] for r in processed_rows] + + instrumentation._set_span_attributes(span_info.span, input_value, output_value) + span_info.span.set_status(Status(OTelStatusCode.OK)) + span_info.span.end() + logger.debug("[PSYCOPG] Pipeline fetchmany interception: span finalized") + + except Exception as e: + logger.error(f"[PSYCOPG] Error in pipeline fetchmany interception: {e}") + try: + span_info.span.set_status(Status(OTelStatusCode.ERROR, str(e))) + span_info.span.end() + except Exception: + pass + + return rows + + # Patch the cursor instance with our capturing wrappers + cursor.fetchone = capturing_fetchone # pyright: ignore[reportAttributeAccessIssue] + cursor.fetchmany = capturing_fetchmany # pyright: ignore[reportAttributeAccessIssue] + cursor.fetchall = capturing_fetchall # pyright: ignore[reportAttributeAccessIssue] + cursor._tusk_pipeline_patched = True # pyright: ignore[reportAttributeAccessIssue] + + logger.debug("[PSYCOPG] Pipeline fetch interception wrappers installed") + def _finalize_executemany_returning_span( self, span: trace.Span, diff --git a/drift/instrumentation/psycopg/mocks.py b/drift/instrumentation/psycopg/mocks.py index 0709aed..a449b64 100644 --- a/drift/instrumentation/psycopg/mocks.py +++ b/drift/instrumentation/psycopg/mocks.py @@ -383,3 +383,231 @@ def __exit__(self, exc_type, exc_val, exc_tb): def sync(self): """No-op sync for mock pipeline.""" pass + + +# ==================== ASYNC MOCKS ==================== + + +class MockAsyncConnection: + """Mock async database connection for REPLAY mode when postgres is not available. + + Provides minimal async interface for FastAPI/asyncio apps to work without a real database. + All queries are mocked at the cursor.execute() level. + """ + + def __init__( + self, + sdk: TuskDrift, + instrumentation: PsycopgInstrumentation, + cursor_factory, + row_factory=None, + ): + self.sdk = sdk + self.instrumentation = instrumentation + self.cursor_factory = cursor_factory + self.row_factory = row_factory + self.closed = False + self.autocommit = False + + # psycopg3 async connection attributes + self.isolation_level = None + self.encoding = "UTF8" + self.adapters = MockAdapters() + self.pgconn = None + + class MockInfo: + vendor = "postgresql" + server_version = 150000 + encoding = "UTF8" + + def parameter_status(self, param): + if param == "TimeZone": + return "UTC" + elif param == "server_version": + return "15.0" + return None + + self.info = MockInfo() + + logger.debug("[MOCK_ASYNC_CONNECTION] Created mock async connection for REPLAY mode (psycopg3)") + + def cursor(self, name=None, *, cursor_factory=None, **kwargs): + """Create an async cursor using the instrumented cursor factory.""" + cursor = MockAsyncCursor(self) + + instrumentation = self.instrumentation + sdk = self.sdk + + async def mock_execute(query, params=None, **kwargs): + # Use async execute handler + async def noop_execute(q, p, **kw): + return cursor + + return await instrumentation._traced_async_execute(cursor, noop_execute, sdk, query, params, **kwargs) + + async def mock_executemany(query, params_seq, **kwargs): + async def noop_executemany(q, ps, **kw): + return cursor + + return await instrumentation._traced_async_executemany( + cursor, noop_executemany, sdk, query, params_seq, **kwargs + ) + + cursor.execute = mock_execute # type: ignore[method-assign] + cursor.executemany = mock_executemany # type: ignore[method-assign] + + logger.debug("[MOCK_ASYNC_CONNECTION] Created async cursor (psycopg3)") + return cursor + + async def commit(self): + """Mock async commit - no-op in REPLAY mode.""" + logger.debug("[MOCK_ASYNC_CONNECTION] commit() called (no-op)") + pass + + async def rollback(self): + """Mock async rollback - no-op in REPLAY mode.""" + logger.debug("[MOCK_ASYNC_CONNECTION] rollback() called (no-op)") + pass + + async def close(self): + """Mock async close - no-op in REPLAY mode.""" + logger.debug("[MOCK_ASYNC_CONNECTION] close() called (no-op)") + self.closed = True + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + await self.rollback() + else: + await self.commit() + return False + + def transaction(self): + """Return a mock async transaction context manager for REPLAY mode.""" + return MockAsyncTransaction(self) + + def pipeline(self): + """Return a mock async pipeline context manager for REPLAY mode.""" + return MockAsyncPipeline(self) + + +class MockAsyncCursor: + """Mock async cursor for when we can't create a real async cursor. + + This is a fallback when the async connection is completely mocked. + """ + + def __init__(self, connection): + self.connection = connection + self.rowcount = -1 + self._tusk_description = None + self.arraysize = 1 + self._mock_rows = [] + self._mock_index = 0 + self._mock_result_sets = [] + self._mock_result_set_index = 0 + self.adapters = MockAdapters() + logger.debug("[MOCK_ASYNC_CURSOR] Created fallback mock async cursor (psycopg3)") + + @property + def description(self): + return self._tusk_description + + @property + def rownumber(self): + if self._mock_rows: + return self._mock_index + return None + + @property + def statusmessage(self): + return getattr(self, "_mock_statusmessage", None) + + async def execute(self, query, params=None, **kwargs): + """Will be replaced by instrumentation.""" + logger.debug(f"[MOCK_ASYNC_CURSOR] execute() called: {query[:100]}") + return self + + async def executemany(self, query, params_seq, **kwargs): + """Will be replaced by instrumentation.""" + logger.debug(f"[MOCK_ASYNC_CURSOR] executemany() called: {query[:100]}") + return self + + async def fetchone(self): + if self._mock_index < len(self._mock_rows): + row = self._mock_rows[self._mock_index] + self._mock_index += 1 + return tuple(row) if isinstance(row, list) else row + return None + + async def fetchmany(self, size=None): + if size is None: + size = self.arraysize + rows = [] + for _ in range(size): + row = await self.fetchone() + if row is None: + break + rows.append(row) + return rows + + async def fetchall(self): + rows = self._mock_rows[self._mock_index :] + self._mock_index = len(self._mock_rows) + return [tuple(row) if isinstance(row, list) else row for row in rows] + + def __aiter__(self): + return self + + async def __anext__(self): + if self._mock_index < len(self._mock_rows): + row = self._mock_rows[self._mock_index] + self._mock_index += 1 + return tuple(row) if isinstance(row, list) else row + raise StopAsyncIteration + + async def close(self): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + return False + + +class MockAsyncTransaction: + """Mock async transaction context manager for REPLAY mode.""" + + def __init__(self, connection: MockAsyncConnection): + self._conn = connection + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + await self._conn.rollback() + else: + await self._conn.commit() + return False + + +class MockAsyncPipeline: + """Mock async Pipeline for REPLAY mode.""" + + def __init__(self, connection: MockAsyncConnection): + self._conn = connection + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + return False + + async def sync(self): + """No-op async sync for mock pipeline.""" + pass diff --git a/drift/instrumentation/psycopg2/instrumentation.py b/drift/instrumentation/psycopg2/instrumentation.py index 061893e..6758eb6 100644 --- a/drift/instrumentation/psycopg2/instrumentation.py +++ b/drift/instrumentation/psycopg2/instrumentation.py @@ -150,6 +150,8 @@ class MockCursor: """Mock cursor for when we can't create a real cursor from base class. This is a fallback when the connection is completely mocked. + Provides all attributes that psycopg2 cursors have to ensure compatibility + with frameworks like Django that access cursor properties. """ def __init__(self, connection): @@ -159,6 +161,14 @@ def __init__(self, connection): self.arraysize = 1 self._mock_rows = [] self._mock_index = 0 + # psycopg2 cursor attributes that Django/Flask may access + self.query = None # Last executed query string + self.statusmessage = None # Status message from last command + self.lastrowid = None # OID of last inserted row (if applicable) + self.closed = False + self.name = None # Server-side cursor name (None for client-side) + self.scrollable = None + self.withhold = False logger.debug("[MOCK_CURSOR] Created fallback mock cursor") def execute(self, query: Any, vars: Any = None) -> None: @@ -210,6 +220,58 @@ def __init__(self, connection: Any, instrumentation: Psycopg2Instrumentation, sd def cursor(self, name: str | None = None, cursor_factory: Any = None, *args: Any, **kwargs: Any) -> Any: """Intercept cursor creation to wrap user-provided cursor_factory.""" + # In REPLAY mode, use MockCursor to have full control over cursor state + # This is necessary because psycopg2's cursor.description is a read-only + # C-level property that cannot be set after the cursor is created + if self._sdk.mode == TuskDriftMode.REPLAY: + cursor = MockCursor(self) + instrumentation = self._instrumentation + sdk = self._sdk + + # Detect if user wants dict-style cursors (RealDictCursor, DictCursor) + is_dict_cursor = False + effective_cursor_factory = cursor_factory if cursor_factory is not None else self._default_cursor_factory + if effective_cursor_factory is not None: + try: + import psycopg2.extras + + if effective_cursor_factory in ( + psycopg2.extras.RealDictCursor, + psycopg2.extras.DictCursor, + ) or ( + isinstance(effective_cursor_factory, type) + and issubclass( + effective_cursor_factory, (psycopg2.extras.RealDictCursor, psycopg2.extras.DictCursor) + ) + ): + is_dict_cursor = True + except (ImportError, AttributeError): + pass + + # Store cursor type info on the cursor for _mock_execute_with_data + cursor._is_dict_cursor = is_dict_cursor # type: ignore[attr-defined] + + def mock_execute(query, vars=None): + def noop_execute(q, v): + return None + + return instrumentation._traced_execute(cursor, noop_execute, sdk, query, vars) + + def mock_executemany(query, vars_list): + def noop_executemany(q, vl): + return None + + return instrumentation._traced_executemany(cursor, noop_executemany, sdk, query, vars_list) + + cursor.execute = mock_execute # type: ignore[method-assign] + cursor.executemany = mock_executemany # type: ignore[method-assign] + + logger.debug( + f"[INSTRUMENTED_CONNECTION] Created MockCursor for REPLAY mode (is_dict_cursor={is_dict_cursor})" + ) + return cursor + + # In RECORD mode, use real cursor with instrumentation # Use cursor_factory from cursor() call, or fall back to connection's default base_factory = cursor_factory if cursor_factory is not None else self._default_cursor_factory # Create instrumented cursor factory (wrapping the base factory) @@ -493,6 +555,9 @@ def _replay_execute(self, cursor: Any, sdk: TuskDrift, query_str: str, params: A logger.warning("[PSYCOPG2_REPLAY] No mock found for pre-app-start query, returning empty result") empty_mock = {"rowcount": 0, "rows": [], "description": None} self._mock_execute_with_data(cursor, empty_mock) + # Set cursor.query to the executed query (psycopg2 cursor attribute) + if hasattr(cursor, "query"): + cursor.query = query_str.encode("utf-8") if isinstance(query_str, str) else query_str span_info.span.end() return None @@ -503,6 +568,9 @@ def _replay_execute(self, cursor: Any, sdk: TuskDrift, query_str: str, params: A ) self._mock_execute_with_data(cursor, mock_result) + # Set cursor.query to the executed query (psycopg2 cursor attribute) + if hasattr(cursor, "query"): + cursor.query = query_str.encode("utf-8") if isinstance(query_str, str) else query_str span_info.span.end() return None @@ -621,6 +689,9 @@ def _replay_executemany(self, cursor: Any, sdk: TuskDrift, query_str: str, param ) empty_mock = {"rowcount": 0, "rows": [], "description": None} self._mock_execute_with_data(cursor, empty_mock) + # Set cursor.query to the executed query (psycopg2 cursor attribute) + if hasattr(cursor, "query"): + cursor.query = query_str.encode("utf-8") if isinstance(query_str, str) else query_str span_info.span.end() return None @@ -631,6 +702,9 @@ def _replay_executemany(self, cursor: Any, sdk: TuskDrift, query_str: str, param ) self._mock_execute_with_data(cursor, mock_result) + # Set cursor.query to the executed query (psycopg2 cursor attribute) + if hasattr(cursor, "query"): + cursor.query = query_str.encode("utf-8") if isinstance(query_str, str) else query_str span_info.span.end() return None @@ -777,15 +851,17 @@ def _mock_execute_with_data(self, cursor: Any, mock_data: dict[str, Any]) -> Non # Deserialize datetime strings back to datetime objects for consistent Flask/Django serialization mock_rows = [deserialize_db_value(row) for row in mock_rows] - # Check if this is a dict-cursor (like RealDictCursor) by checking if cursor class - # inherits from a dict-returning cursor type - is_dict_cursor = False - try: - import psycopg2.extras + # Check if this is a dict-cursor (like RealDictCursor) + # First check if cursor has _is_dict_cursor attribute (set by InstrumentedConnection.cursor()) + # Then fall back to isinstance check for real dict cursors + is_dict_cursor = getattr(cursor, "_is_dict_cursor", False) + if not is_dict_cursor: + try: + import psycopg2.extras - is_dict_cursor = isinstance(cursor, (psycopg2.extras.RealDictCursor, psycopg2.extras.DictCursor)) - except (ImportError, AttributeError): - pass + is_dict_cursor = isinstance(cursor, (psycopg2.extras.RealDictCursor, psycopg2.extras.DictCursor)) + except (ImportError, AttributeError): + pass # If it's a dict cursor and we have description, convert rows to dicts if is_dict_cursor and description_data: diff --git a/drift/stack-tests/django-postgres/src/test_requests.py b/drift/stack-tests/django-postgres/src/test_requests.py index b5480b0..41b0f64 100644 --- a/drift/stack-tests/django-postgres/src/test_requests.py +++ b/drift/stack-tests/django-postgres/src/test_requests.py @@ -15,33 +15,29 @@ # Transaction test (rollback, doesn't return data) make_request("POST", "/db/transaction") - # TODO: Re-enable these tests once cursor.description REPLAY bug is fixed - # The issue is that cursor.description is None in REPLAY mode when using - # Django's cursor wrapper with INSERT/UPDATE RETURNING queries. - # - # # Query operations using Django's connection - # make_request("GET", "/db/query") - # - # # Insert operations - # resp1 = make_request("POST", "/db/insert", json={"name": "Alice", "email": "alice@example.com"}) - # resp2 = make_request("POST", "/db/insert", json={"name": "Bob", "email": "bob@example.com"}) - # - # # Update operation - # if resp1.status_code == 201: - # user_id = resp1.json().get("id") - # if user_id: - # make_request("PUT", f"/db/update/{user_id}", json={"name": "Alice Updated"}) - # - # # Raw connection test - # make_request("GET", "/db/raw-connection") - # - # # Query again to see all users - # make_request("GET", "/db/query") - # - # # Delete operation - # if resp2.status_code == 201: - # user_id = resp2.json().get("id") - # if user_id: - # make_request("DELETE", f"/db/delete/{user_id}") + # Query operations using Django's connection + make_request("GET", "/db/query") + + # Insert operations + resp1 = make_request("POST", "/db/insert", json={"name": "Alice", "email": "alice@example.com"}) + resp2 = make_request("POST", "/db/insert", json={"name": "Bob", "email": "bob@example.com"}) + + # Update operation + if resp1.status_code == 201: + user_id = resp1.json().get("id") + if user_id: + make_request("PUT", f"/db/update/{user_id}", json={"name": "Alice Updated"}) + + # Raw connection test + make_request("GET", "/db/raw-connection") + + # Query again to see all users + make_request("GET", "/db/query") + + # Delete operation + if resp2.status_code == 201: + user_id = resp2.json().get("id") + if user_id: + make_request("DELETE", f"/db/delete/{user_id}") print_request_summary() diff --git a/drift/stack-tests/django-redis/.tusk/config.yaml b/drift/stack-tests/django-redis/.tusk/config.yaml index d8fe930..2fef147 100644 --- a/drift/stack-tests/django-redis/.tusk/config.yaml +++ b/drift/stack-tests/django-redis/.tusk/config.yaml @@ -15,6 +15,9 @@ tusk_api: test_execution: timeout: 30s +comparison: + ignore_fields: ["session_key", "cleared_session_key"] + recording: sampling_rate: 1.0 export_spans: false diff --git a/drift/stack-tests/django-redis/src/test_requests.py b/drift/stack-tests/django-redis/src/test_requests.py index 30571a7..953f34e 100644 --- a/drift/stack-tests/django-redis/src/test_requests.py +++ b/drift/stack-tests/django-redis/src/test_requests.py @@ -27,11 +27,10 @@ make_request("DELETE", "/cache/delete/test_key") make_request("DELETE", "/cache/delete/counter") - # TODO: Session tests commented out - session_key is dynamic and differs between - # RECORD and REPLAY, causing false test failures. # Session operations (Redis-backed) - # resp = make_request("POST", "/session/set", json={"user_name": "Alice", "logged_in": True}) - # make_request("GET", "/session/get") - # make_request("POST", "/session/clear") + # Note: Session endpoints return dynamic session_key values, handled via comparison.ignore_fields in config + make_request("POST", "/session/set", json={"user_name": "Alice", "logged_in": True}) + make_request("GET", "/session/get") + make_request("POST", "/session/clear") print_request_summary() diff --git a/drift/stack-tests/fastapi-postgres/requirements.txt b/drift/stack-tests/fastapi-postgres/requirements.txt index 7e38c55..6f8d254 100644 --- a/drift/stack-tests/fastapi-postgres/requirements.txt +++ b/drift/stack-tests/fastapi-postgres/requirements.txt @@ -2,5 +2,6 @@ fastapi>=0.109.0 uvicorn>=0.27.0 psycopg[binary]>=3.1.0 +psycopg-pool>=3.2.0 httpx>=0.26.0 requests>=2.32.5 diff --git a/drift/stack-tests/fastapi-postgres/src/app.py b/drift/stack-tests/fastapi-postgres/src/app.py index 3c5cfb8..8b02698 100644 --- a/drift/stack-tests/fastapi-postgres/src/app.py +++ b/drift/stack-tests/fastapi-postgres/src/app.py @@ -7,6 +7,7 @@ import psycopg from fastapi import FastAPI, HTTPException +from psycopg_pool import AsyncConnectionPool from pydantic import BaseModel from drift import TuskDrift @@ -38,7 +39,7 @@ async def get_async_connection(): """Get an async connection from pool.""" global _async_pool if _async_pool is None: - _async_pool = psycopg.AsyncConnectionPool(get_conn_string(), min_size=1, max_size=5) + _async_pool = AsyncConnectionPool(get_conn_string(), min_size=1, max_size=5) await _async_pool.open() async with _async_pool.connection() as conn: yield conn diff --git a/drift/stack-tests/fastapi-postgres/src/test_requests.py b/drift/stack-tests/fastapi-postgres/src/test_requests.py index 29a17de..3d2aae8 100644 --- a/drift/stack-tests/fastapi-postgres/src/test_requests.py +++ b/drift/stack-tests/fastapi-postgres/src/test_requests.py @@ -8,41 +8,25 @@ # Execute test sequence make_request("GET", "/health") - # Transaction test (rollback, doesn't depend on data) + # Sync fallback test - uses sync psycopg.connect() which is fully instrumented + make_request("GET", "/db/sync-fallback") + + # Async psycopg tests using AsyncConnectionPool make_request("GET", "/db/transaction") + make_request("GET", "/db/query") + + # Insert/Update/Delete operations + resp1 = make_request("POST", "/db/insert", json={"name": "Alice", "email": "alice@example.com"}) + if resp1.status_code == 201: + user_id = resp1.json().get("id") + if user_id: + make_request("PUT", f"/db/update/{user_id}", json={"name": "Alice Updated"}) + make_request("DELETE", f"/db/delete/{user_id}") + + # Async context propagation test (3 concurrent queries) + make_request("GET", "/db/async-context") - # TODO: Re-enable these tests once psycopg (async) REPLAY mode is verified - # Currently only 3 traces recorded vs 9 requests - some endpoints not recording properly - # - # # Query operations using async psycopg3 - # make_request("GET", "/db/query") - # - # # Insert operations - # resp1 = make_request("POST", "/db/insert", json={"name": "Alice", "email": "alice@example.com"}) - # resp2 = make_request("POST", "/db/insert", json={"name": "Bob", "email": "bob@example.com"}) - # - # # Update operation - # if resp1.status_code == 201: - # user_id = resp1.json().get("id") - # if user_id: - # make_request("PUT", f"/db/update/{user_id}", json={"name": "Alice Updated"}) - # - # # Async context propagation test - # make_request("GET", "/db/async-context") - # - # # Sync fallback test - # make_request("GET", "/db/sync-fallback") - # - # # Pipeline test - # make_request("GET", "/db/pipeline") - # - # # Query again to see all users - # make_request("GET", "/db/query") - # - # # Delete operation - # if resp2.status_code == 201: - # user_id = resp2.json().get("id") - # if user_id: - # make_request("DELETE", f"/db/delete/{user_id}") + # Pipeline test (multiple cursors) + make_request("GET", "/db/pipeline") print_request_summary()