diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index 29f3bad152..9b37119564 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -1,17 +1,23 @@ from __future__ import annotations + import contextlib import re -from typing import Any, TypeVar, Callable, Awaitable, Iterator +from typing import Any, Awaitable, Callable, Iterator, TypeVar, Union import sentry_sdk from sentry_sdk.consts import OP, SPANDATA -from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable +from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version +from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing import Span -from sentry_sdk.tracing_utils import add_query_source, record_sql_queries +from sentry_sdk.tracing_utils import ( + add_query_source, + has_span_streaming_enabled, + record_sql_queries_supporting_streaming, +) from sentry_sdk.utils import ( + capture_internal_exceptions, ensure_integration_enabled, parse_version, - capture_internal_exceptions, ) try: @@ -62,7 +68,8 @@ def _normalize_query(query: str) -> str: def _wrap_execute(f: "Callable[..., Awaitable[T]]") -> "Callable[..., Awaitable[T]]": async def _inner(*args: "Any", **kwargs: "Any") -> "T": - if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None: + client = sentry_sdk.get_client() + if client.get_integration(AsyncPGIntegration) is None: return await f(*args, **kwargs) # Avoid recording calls to _execute twice. @@ -73,7 +80,7 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": return await f(*args, **kwargs) query = _normalize_query(args[1]) - with record_sql_queries( + with record_sql_queries_supporting_streaming( cursor=None, query=query, params_list=None, @@ -82,9 +89,13 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": span_origin=AsyncPGIntegration.origin, ) as span: res = await f(*args, **kwargs) + if isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) - with capture_internal_exceptions(): - add_query_source(span) + if not isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) return res @@ -101,15 +112,16 @@ def _record( params_list: "tuple[Any, ...] | None", *, executemany: bool = False, -) -> "Iterator[Span]": - integration = sentry_sdk.get_client().get_integration(AsyncPGIntegration) +) -> "Iterator[Union[Span, StreamedSpan]]": + client = sentry_sdk.get_client() + integration = client.get_integration(AsyncPGIntegration) if integration is not None and not integration._record_params: params_list = None param_style = "pyformat" if params_list else None query = _normalize_query(query) - with record_sql_queries( + with record_sql_queries_supporting_streaming( cursor=cursor, query=query, params_list=params_list, @@ -152,7 +164,6 @@ def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807 ) as span: _set_db_data(span, args[0]) res = f(*args, **kwargs) - span.set_data("db.cursor", res) return res @@ -163,56 +174,85 @@ def _wrap_connect_addr( f: "Callable[..., Awaitable[T]]", ) -> "Callable[..., Awaitable[T]]": async def _inner(*args: "Any", **kwargs: "Any") -> "T": - if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None: + client = sentry_sdk.get_client() + if client.get_integration(AsyncPGIntegration) is None: return await f(*args, **kwargs) user = kwargs["params"].user database = kwargs["params"].database - - with sentry_sdk.start_span( - op=OP.DB, - name="connect", - origin=AsyncPGIntegration.origin, - ) as span: - span.set_data(SPANDATA.DB_SYSTEM, "postgresql") - addr = kwargs.get("addr") + addr = kwargs.get("addr") + + if has_span_streaming_enabled(client.options): + span_attributes = { + "sentry.op": OP.DB, + "sentry.origin": AsyncPGIntegration.origin, + SPANDATA.DB_SYSTEM: "postgresql", + SPANDATA.DB_USER: user, + SPANDATA.DB_NAME: database, + SPANDATA.DB_DRIVER_NAME: "asyncpg", + } if addr: try: - span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) - span.set_data(SPANDATA.SERVER_PORT, addr[1]) + span_attributes[SPANDATA.SERVER_ADDRESS] = addr[0] + span_attributes[SPANDATA.SERVER_PORT] = addr[1] except IndexError: pass - span.set_data(SPANDATA.DB_NAME, database) - span.set_data(SPANDATA.DB_USER, user) - span.set_data(SPANDATA.DB_DRIVER_NAME, "asyncpg") - with capture_internal_exceptions(): - sentry_sdk.add_breadcrumb( - message="connect", category="query", data=span._data - ) - res = await f(*args, **kwargs) + with sentry_sdk.traces.start_span( + name="connect", attributes=span_attributes + ) as span: + with capture_internal_exceptions(): + sentry_sdk.add_breadcrumb( + message="connect", category="query", data=span_attributes + ) + res = await f(*args, **kwargs) + + else: + with sentry_sdk.start_span( + op=OP.DB, + name="connect", + origin=AsyncPGIntegration.origin, + ) as span: + span.set_data(SPANDATA.DB_SYSTEM, "postgresql") + if addr: + try: + span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) + span.set_data(SPANDATA.SERVER_PORT, addr[1]) + except IndexError: + pass + span.set_data(SPANDATA.DB_NAME, database) + span.set_data(SPANDATA.DB_USER, user) + span.set_data(SPANDATA.DB_DRIVER_NAME, "asyncpg") + + with capture_internal_exceptions(): + sentry_sdk.add_breadcrumb( + message="connect", category="query", data=span._data + ) + res = await f(*args, **kwargs) return res return _inner -def _set_db_data(span: "Span", conn: "Any") -> None: - span.set_data(SPANDATA.DB_SYSTEM, "postgresql") - span.set_data(SPANDATA.DB_DRIVER_NAME, "asyncpg") +def _set_db_data(span: "Union[Span, StreamedSpan]", conn: "Any") -> None: + set_value = span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + + set_value(SPANDATA.DB_SYSTEM, "postgresql") + set_value(SPANDATA.DB_DRIVER_NAME, "asyncpg") addr = conn._addr if addr: try: - span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) - span.set_data(SPANDATA.SERVER_PORT, addr[1]) + set_value(SPANDATA.SERVER_ADDRESS, addr[0]) + set_value(SPANDATA.SERVER_PORT, addr[1]) except IndexError: pass database = conn._params.database if database: - span.set_data(SPANDATA.DB_NAME, database) + set_value(SPANDATA.DB_NAME, database) user = conn._params.user if user: - span.set_data(SPANDATA.DB_USER, user) + set_value(SPANDATA.DB_USER, user) diff --git a/tests/integrations/asyncpg/test_asyncpg.py b/tests/integrations/asyncpg/test_asyncpg.py index 2dcce52070..35195e757b 100644 --- a/tests/integrations/asyncpg/test_asyncpg.py +++ b/tests/integrations/asyncpg/test_asyncpg.py @@ -9,20 +9,21 @@ The tests use the following credentials to establish a database connection. """ -import os import datetime +import os from contextlib import contextmanager from unittest import mock import asyncpg import pytest import pytest_asyncio -from asyncpg import connect, Connection +from asyncpg import Connection, connect +import sentry_sdk from sentry_sdk import capture_message, start_transaction -from sentry_sdk.integrations.asyncpg import AsyncPGIntegration from sentry_sdk.consts import SPANDATA -from sentry_sdk.tracing_utils import record_sql_queries +from sentry_sdk.integrations.asyncpg import AsyncPGIntegration +from sentry_sdk.tracing_utils import record_sql_queries_supporting_streaming from tests.conftest import ApproxDict PG_HOST = os.getenv("SENTRY_PYTHON_TEST_POSTGRES_HOST", "localhost") @@ -92,12 +93,22 @@ async def _clean_pg(): @pytest.mark.asyncio -async def test_connect(sentry_init, capture_events) -> None: +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_connect( + sentry_init, capture_events, capture_items, span_streaming +) -> None: sentry_init( integrations=[AsyncPGIntegration()], - _experiments={"record_sql_params": True}, + _experiments={ + "record_sql_params": True, + "trace_lifecycle": "stream" if span_streaming else "static", + }, ) - events = capture_events() + + if span_streaming: + items = capture_items("event") + else: + events = capture_events() conn: Connection = await connect(PG_CONNECTION_URI) @@ -105,7 +116,10 @@ async def test_connect(sentry_init, capture_events) -> None: capture_message("hi") - (event,) = events + if span_streaming: + event = items[0].payload + else: + (event,) = events for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -114,12 +128,22 @@ async def test_connect(sentry_init, capture_events) -> None: @pytest.mark.asyncio -async def test_execute(sentry_init, capture_events) -> None: +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_execute( + sentry_init, capture_events, capture_items, span_streaming +) -> None: sentry_init( integrations=[AsyncPGIntegration()], - _experiments={"record_sql_params": True}, + _experiments={ + "record_sql_params": True, + "trace_lifecycle": "stream" if span_streaming else "static", + }, ) - events = capture_events() + + if span_streaming: + items = capture_items("event") + else: + events = capture_events() conn: Connection = await connect(PG_CONNECTION_URI) @@ -144,7 +168,10 @@ async def test_execute(sentry_init, capture_events) -> None: capture_message("hi") - (event,) = events + if span_streaming: + event = items[0].payload + else: + (event,) = events for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -179,12 +206,22 @@ async def test_execute(sentry_init, capture_events) -> None: @pytest.mark.asyncio -async def test_execute_many(sentry_init, capture_events) -> None: +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_execute_many( + sentry_init, capture_events, capture_items, span_streaming +) -> None: sentry_init( integrations=[AsyncPGIntegration()], - _experiments={"record_sql_params": True}, + _experiments={ + "record_sql_params": True, + "trace_lifecycle": "stream" if span_streaming else "static", + }, ) - events = capture_events() + + if span_streaming: + items = capture_items("event") + else: + events = capture_events() conn: Connection = await connect(PG_CONNECTION_URI) @@ -200,7 +237,10 @@ async def test_execute_many(sentry_init, capture_events) -> None: capture_message("hi") - (event,) = events + if span_streaming: + event = items[0].payload + else: + (event,) = events for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -483,33 +523,62 @@ async def test_connection_pool(sentry_init, capture_events) -> None: @pytest.mark.asyncio -async def test_query_source_disabled(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_disabled( + sentry_init, capture_events, capture_items, span_streaming +): sentry_options = { "integrations": [AsyncPGIntegration()], "traces_sample_rate": 1.0, "enable_db_query_source": False, "db_query_source_threshold_ms": 0, + "_experiments": { + "trace_lifecycle": "stream" if span_streaming else "static", + }, } sentry_init(**sentry_options) - events = capture_events() + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) - with start_transaction(name="test_transaction", sampled=True): - conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) - await conn.execute( - "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", - ) + await conn.close() + sentry_sdk.flush() - await conn.close() + spans = [item.payload for item in items] - (event,) = events + assert len(spans) == 3 - span = event["spans"][-1] - assert span["description"].startswith("INSERT INTO") + connect_span = spans[0] + insert_span = spans[1] + segment = spans[2] - data = span.get("data", {}) + assert segment["name"] == "test_transaction" + assert insert_span["name"].startswith("INSERT INTO") + assert connect_span["name"] == "connect" + data = insert_span.get("attributes", {}) + else: + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) + + await conn.close() + + (event,) = events + span = event["spans"][-1] + assert span["description"].startswith("INSERT INTO") + data = span.get("data", {}) assert SPANDATA.CODE_LINENO not in data assert SPANDATA.CODE_NAMESPACE not in data @@ -519,85 +588,143 @@ async def test_query_source_disabled(sentry_init, capture_events): @pytest.mark.asyncio @pytest.mark.parametrize("enable_db_query_source", [None, True]) +@pytest.mark.parametrize("span_streaming", [True, False]) async def test_query_source_enabled( - sentry_init, capture_events, enable_db_query_source + sentry_init, capture_events, capture_items, enable_db_query_source, span_streaming ): sentry_options = { "integrations": [AsyncPGIntegration()], "traces_sample_rate": 1.0, "db_query_source_threshold_ms": 0, + "_experiments": { + "trace_lifecycle": "stream" if span_streaming else "static", + }, } if enable_db_query_source is not None: sentry_options["enable_db_query_source"] = enable_db_query_source sentry_init(**sentry_options) - events = capture_events() + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) - with start_transaction(name="test_transaction", sampled=True): - conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) - await conn.execute( - "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", - ) + await conn.close() + sentry_sdk.flush() - await conn.close() + spans = [item.payload for item in items] - (event,) = events + assert len(spans) == 3 - span = event["spans"][-1] - assert span["description"].startswith("INSERT INTO") + connect_span = spans[0] + insert_span = spans[1] + segment = spans[2] - data = span.get("data", {}) + assert segment["name"] == "test_transaction" + assert insert_span["name"].startswith("INSERT INTO") + assert connect_span["name"] == "connect" + data = insert_span.get("attributes", {}) + else: + events = capture_events() - assert SPANDATA.CODE_LINENO in data + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) + + await conn.close() + + (event,) = events + span = event["spans"][-1] + assert span["description"].startswith("INSERT INTO") + data = span.get("data", {}) + + lineno_key = "code.line.number" if span_streaming else SPANDATA.CODE_LINENO + filepath_key = "code.file.path" if span_streaming else SPANDATA.CODE_FILEPATH + + assert lineno_key in data + assert filepath_key in data assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data assert SPANDATA.CODE_FUNCTION in data @pytest.mark.asyncio -async def test_query_source(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source(sentry_init, capture_events, capture_items, span_streaming): sentry_init( integrations=[AsyncPGIntegration()], traces_sample_rate=1.0, enable_db_query_source=True, db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, ) - events = capture_events() + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) - with start_transaction(name="test_transaction", sampled=True): - conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) - await conn.execute( - "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", - ) + await conn.close() + sentry_sdk.flush() - await conn.close() + spans = [item.payload for item in items] - (event,) = events + assert len(spans) == 3 - span = event["spans"][-1] - assert span["description"].startswith("INSERT INTO") + connect_span = spans[0] + insert_span = spans[1] + segment = spans[2] - data = span.get("data", {}) + assert segment["name"] == "test_transaction" + assert insert_span["name"].startswith("INSERT INTO") + assert connect_span["name"] == "connect" + data = insert_span.get("attributes", {}) + else: + events = capture_events() - assert SPANDATA.CODE_LINENO in data + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) + + await conn.close() + + (event,) = events + span = event["spans"][-1] + assert span["description"].startswith("INSERT INTO") + data = span.get("data", {}) + + lineno_key = "code.line.number" if span_streaming else SPANDATA.CODE_LINENO + filepath_key = "code.file.path" if span_streaming else SPANDATA.CODE_FILEPATH + + assert lineno_key in data + assert filepath_key in data assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data assert SPANDATA.CODE_FUNCTION in data - assert type(data.get(SPANDATA.CODE_LINENO)) == int - assert data.get(SPANDATA.CODE_LINENO) > 0 + assert type(data.get(lineno_key)) == int + assert data.get(lineno_key) > 0 assert ( data.get(SPANDATA.CODE_NAMESPACE) == "tests.integrations.asyncpg.test_asyncpg" ) - assert data.get(SPANDATA.CODE_FILEPATH).endswith( - "tests/integrations/asyncpg/test_asyncpg.py" - ) + assert data.get(filepath_key).endswith("tests/integrations/asyncpg/test_asyncpg.py") - is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + is_relative_path = data.get(filepath_key)[0] != os.sep assert is_relative_path assert data.get(SPANDATA.CODE_FUNCTION) == "test_query_source" @@ -653,43 +780,89 @@ async def test_query_source_with_module_in_search_path(sentry_init, capture_even @pytest.mark.asyncio -async def test_no_query_source_if_duration_too_short(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_no_query_source_if_duration_too_short( + sentry_init, capture_events, capture_items, span_streaming +): sentry_init( integrations=[AsyncPGIntegration()], traces_sample_rate=1.0, enable_db_query_source=True, db_query_source_threshold_ms=100, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, ) - events = capture_events() - - with start_transaction(name="test_transaction", sampled=True): - conn: Connection = await connect(PG_CONNECTION_URI) + if span_streaming: + items = capture_items("span") @contextmanager - def fake_record_sql_queries(*args, **kwargs): - with record_sql_queries(*args, **kwargs) as span: + def fake_record_sql_queries_streaming(*args, **kwargs): + with record_sql_queries_supporting_streaming(*args, **kwargs) as span: pass - span.start_timestamp = datetime.datetime(2024, 1, 1, microsecond=0) - span.timestamp = datetime.datetime(2024, 1, 1, microsecond=99999) + span._start_timestamp = datetime.datetime(2024, 1, 1, microsecond=0) + if span_streaming: + span._end_timestamp = datetime.datetime(2024, 1, 1, microsecond=99999) + else: + span._timestamp = datetime.datetime(2024, 1, 1, microsecond=99999) yield span - with mock.patch( - "sentry_sdk.integrations.asyncpg.record_sql_queries", - fake_record_sql_queries, - ): - await conn.execute( - "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", - ) + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) - await conn.close() + with mock.patch( + "sentry_sdk.integrations.asyncpg.record_sql_queries_supporting_streaming", + fake_record_sql_queries_streaming, + ): + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) - (event,) = events + await conn.close() + sentry_sdk.flush() - span = event["spans"][-1] - assert span["description"].startswith("INSERT INTO") + spans = [item.payload for item in items] - data = span.get("data", {}) + assert len(spans) == 3 + + connect_span = spans[0] + insert_span = spans[1] + segment = spans[2] + + assert segment["name"] == "test_transaction" + assert insert_span["name"].startswith("INSERT INTO") + assert connect_span["name"] == "connect" + data = insert_span.get("attributes", {}) + else: + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + + @contextmanager + def fake_record_sql_queries(*args, **kwargs): + with record_sql_queries_supporting_streaming(*args, **kwargs) as span: + pass + span.start_timestamp = datetime.datetime(2024, 1, 1, microsecond=0) + span.timestamp = datetime.datetime(2024, 1, 1, microsecond=99999) + yield span + + with mock.patch( + "sentry_sdk.integrations.asyncpg.record_sql_queries_supporting_streaming", + fake_record_sql_queries, + ): + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + ) + + await conn.close() + + (event,) = events + + span = event["spans"][-1] + assert span["description"].startswith("INSERT INTO") + data = span.get("data", {}) assert SPANDATA.CODE_LINENO not in data assert SPANDATA.CODE_NAMESPACE not in data @@ -713,14 +886,14 @@ async def test_query_source_if_duration_over_threshold(sentry_init, capture_even @contextmanager def fake_record_sql_queries(*args, **kwargs): - with record_sql_queries(*args, **kwargs) as span: + with record_sql_queries_supporting_streaming(*args, **kwargs) as span: pass span.start_timestamp = datetime.datetime(2024, 1, 1, microsecond=0) span.timestamp = datetime.datetime(2024, 1, 1, microsecond=100001) yield span with mock.patch( - "sentry_sdk.integrations.asyncpg.record_sql_queries", + "sentry_sdk.integrations.asyncpg.record_sql_queries_supporting_streaming", fake_record_sql_queries, ): await conn.execute( @@ -760,61 +933,133 @@ def fake_record_sql_queries(*args, **kwargs): @pytest.mark.asyncio -async def test_span_origin(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_span_origin(sentry_init, capture_events, capture_items, span_streaming): sentry_init( integrations=[AsyncPGIntegration()], traces_sample_rate=1.0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, ) - events = capture_events() + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) - with start_transaction(name="test_transaction"): - conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute("SELECT 1") + await conn.fetchrow("SELECT 2") + await conn.close() + sentry_sdk.flush() - await conn.execute("SELECT 1") - await conn.fetchrow("SELECT 2") - await conn.close() + spans = [item.payload for item in items] - (event,) = events + assert len(spans) == 4 + + connect_span = spans[0] + select1_span = spans[1] + select2_span = spans[2] + segment = spans[3] + + assert segment["name"] == "test_transaction" + assert connect_span["name"] == "connect" + assert select1_span["name"] == "SELECT 1" + assert select2_span["name"] == "SELECT 2" + + assert segment["attributes"]["sentry.origin"] == "manual" + assert connect_span["attributes"]["sentry.origin"] == "auto.db.asyncpg" + assert select1_span["attributes"]["sentry.origin"] == "auto.db.asyncpg" + assert select2_span["attributes"]["sentry.origin"] == "auto.db.asyncpg" + else: + events = capture_events() + + with start_transaction(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.execute("SELECT 1") + await conn.fetchrow("SELECT 2") + await conn.close() - assert event["contexts"]["trace"]["origin"] == "manual" + (event,) = events - for span in event["spans"]: - assert span["origin"] == "auto.db.asyncpg" + assert event["contexts"]["trace"]["origin"] == "manual" + + for span in event["spans"]: + assert span["origin"] == "auto.db.asyncpg" @pytest.mark.asyncio -async def test_multiline_query_description_normalized(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_multiline_query_description_normalized( + sentry_init, capture_events, capture_items, span_streaming +): sentry_init( integrations=[AsyncPGIntegration()], traces_sample_rate=1.0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, ) - events = capture_events() - with start_transaction(name="test_transaction"): - conn: Connection = await connect(PG_CONNECTION_URI) - await conn.execute( - """ - SELECT - id, - name - FROM - users - WHERE - name = 'Alice' - """ - ) - await conn.close() + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + """ + SELECT + id, + name + FROM + users + WHERE + name = 'Alice' + """ + ) + await conn.close() + sentry_sdk.flush() - (event,) = events + spans = [item.payload for item in items] - spans = [ - s - for s in event["spans"] - if s["op"] == "db" and "SELECT" in s.get("description", "") - ] - assert len(spans) == 1 - assert spans[0]["description"] == "SELECT id, name FROM users WHERE name = 'Alice'" + assert len(spans) == 3 + + connect_span = spans[0] + select_span = spans[1] + segment = spans[2] + + assert segment["name"] == "test_transaction" + assert connect_span["name"] == "connect" + assert select_span["name"] == "SELECT id, name FROM users WHERE name = 'Alice'" + else: + events = capture_events() + + with start_transaction(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + """ + SELECT + id, + name + FROM + users + WHERE + name = 'Alice' + """ + ) + await conn.close() + + (event,) = events + + spans = [ + s + for s in event["spans"] + if s["op"] == "db" and "SELECT" in s.get("description", "") + ] + assert len(spans) == 1 + assert ( + spans[0]["description"] == "SELECT id, name FROM users WHERE name = 'Alice'" + ) @pytest.mark.asyncio