Skip to content

Commit 021f3f7

Browse files
committed
feat: Migrate redis integration to span first
1 parent d70365e commit 021f3f7

File tree

6 files changed

+182
-78
lines changed

6 files changed

+182
-78
lines changed

sentry_sdk/integrations/redis/_async_common.py

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@
1111
_set_pipeline_data,
1212
)
1313
from sentry_sdk.tracing import Span
14+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
1415
from sentry_sdk.utils import capture_internal_exceptions
1516

1617
from typing import TYPE_CHECKING
1718

1819
if TYPE_CHECKING:
1920
from collections.abc import Callable
20-
from typing import Any, Union
21+
from typing import Any, Optional, Union
22+
from sentry_sdk.traces import StreamedSpan
2123
from redis.asyncio.client import Pipeline, StrictRedis
2224
from redis.asyncio.cluster import ClusterPipeline, RedisCluster
2325

@@ -26,21 +28,32 @@ def patch_redis_async_pipeline(
2628
pipeline_cls: "Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]]",
2729
is_cluster: bool,
2830
get_command_args_fn: "Any",
29-
set_db_data_fn: "Callable[[Span, Any], None]",
31+
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
3032
) -> None:
3133
old_execute = pipeline_cls.execute
3234

3335
from sentry_sdk.integrations.redis import RedisIntegration
3436

3537
async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
36-
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
38+
client = sentry_sdk.get_client()
39+
if client.get_integration(RedisIntegration) is None:
3740
return await old_execute(self, *args, **kwargs)
3841

39-
with sentry_sdk.start_span(
40-
op=OP.DB_REDIS,
41-
name="redis.pipeline.execute",
42-
origin=SPAN_ORIGIN,
43-
) as span:
42+
span_streaming = has_span_streaming_enabled(client.options)
43+
44+
span: "Union[Span, StreamedSpan]"
45+
if span_streaming:
46+
span = sentry_sdk.traces.start_span(name="redis.pipeline.execute")
47+
span.set_origin(SPAN_ORIGIN)
48+
span.set_op(OP.DB_REDIS)
49+
else:
50+
span = sentry_sdk.start_span(
51+
op=OP.DB_REDIS,
52+
name="redis.pipeline.execute",
53+
origin=SPAN_ORIGIN,
54+
)
55+
56+
with span:
4457
with capture_internal_exceptions():
4558
try:
4659
command_seq = self._execution_strategy._command_queue
@@ -67,7 +80,7 @@ async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
6780
def patch_redis_async_client(
6881
cls: "Union[type[StrictRedis[Any]], type[RedisCluster[Any]]]",
6982
is_cluster: bool,
70-
set_db_data_fn: "Callable[[Span, Any], None]",
83+
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
7184
) -> None:
7285
old_execute_command = cls.execute_command
7386

@@ -76,33 +89,49 @@ def patch_redis_async_client(
7689
async def _sentry_execute_command(
7790
self: "Any", name: str, *args: "Any", **kwargs: "Any"
7891
) -> "Any":
79-
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
92+
client = sentry_sdk.get_client()
93+
integration = client.get_integration(RedisIntegration)
8094
if integration is None:
8195
return await old_execute_command(self, name, *args, **kwargs)
8296

97+
span_streaming = has_span_streaming_enabled(client.options)
98+
8399
cache_properties = _compile_cache_span_properties(
84100
name,
85101
args,
86102
kwargs,
87103
integration,
88104
)
89105

90-
cache_span = None
106+
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
91107
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
92-
cache_span = sentry_sdk.start_span(
93-
op=cache_properties["op"],
94-
name=cache_properties["description"],
95-
origin=SPAN_ORIGIN,
96-
)
108+
if span_streaming:
109+
cache_span = sentry_sdk.traces.start_span(
110+
name=cache_properties["description"]
111+
)
112+
cache_span.set_op(cache_properties["op"])
113+
cache_span.set_origin(SPAN_ORIGIN)
114+
else:
115+
cache_span = sentry_sdk.start_span(
116+
op=cache_properties["op"],
117+
name=cache_properties["description"],
118+
origin=SPAN_ORIGIN,
119+
)
97120
cache_span.__enter__()
98121

99122
db_properties = _compile_db_span_properties(integration, name, args)
100123

101-
db_span = sentry_sdk.start_span(
102-
op=db_properties["op"],
103-
name=db_properties["description"],
104-
origin=SPAN_ORIGIN,
105-
)
124+
db_span: "Union[Span, StreamedSpan]"
125+
if span_streaming:
126+
db_span = sentry_sdk.traces.start_span(name=db_properties["description"])
127+
db_span.set_op(db_properties["op"])
128+
db_span.set_origin(SPAN_ORIGIN)
129+
else:
130+
db_span = sentry_sdk.start_span(
131+
op=db_properties["op"],
132+
name=db_properties["description"],
133+
origin=SPAN_ORIGIN,
134+
)
106135
db_span.__enter__()
107136

108137
set_db_data_fn(db_span, self)

sentry_sdk/integrations/redis/_sync_common.py

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,34 +11,47 @@
1111
_set_pipeline_data,
1212
)
1313
from sentry_sdk.tracing import Span
14+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
1415
from sentry_sdk.utils import capture_internal_exceptions
1516

1617
from typing import TYPE_CHECKING
1718

1819
if TYPE_CHECKING:
1920
from collections.abc import Callable
20-
from typing import Any
21+
from typing import Any, Optional, Union
22+
from sentry_sdk.traces import StreamedSpan
2123

2224

2325
def patch_redis_pipeline(
2426
pipeline_cls: "Any",
2527
is_cluster: bool,
2628
get_command_args_fn: "Any",
27-
set_db_data_fn: "Callable[[Span, Any], None]",
29+
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
2830
) -> None:
2931
old_execute = pipeline_cls.execute
3032

3133
from sentry_sdk.integrations.redis import RedisIntegration
3234

3335
def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
34-
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
36+
client = sentry_sdk.get_client()
37+
if client.get_integration(RedisIntegration) is None:
3538
return old_execute(self, *args, **kwargs)
3639

37-
with sentry_sdk.start_span(
38-
op=OP.DB_REDIS,
39-
name="redis.pipeline.execute",
40-
origin=SPAN_ORIGIN,
41-
) as span:
40+
span_streaming = has_span_streaming_enabled(client.options)
41+
42+
span: "Union[Span, StreamedSpan]"
43+
if span_streaming:
44+
span = sentry_sdk.traces.start_span(name="redis.pipeline.execute")
45+
span.set_origin(SPAN_ORIGIN)
46+
span.set_op(OP.DB_REDIS)
47+
else:
48+
span = sentry_sdk.start_span(
49+
op=OP.DB_REDIS,
50+
name="redis.pipeline.execute",
51+
origin=SPAN_ORIGIN,
52+
)
53+
54+
with span:
4255
with capture_internal_exceptions():
4356
command_seq = None
4457
try:
@@ -61,7 +74,9 @@ def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
6174

6275

6376
def patch_redis_client(
64-
cls: "Any", is_cluster: bool, set_db_data_fn: "Callable[[Span, Any], None]"
77+
cls: "Any",
78+
is_cluster: bool,
79+
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
6580
) -> None:
6681
"""
6782
This function can be used to instrument custom redis client classes or
@@ -74,33 +89,49 @@ def patch_redis_client(
7489
def sentry_patched_execute_command(
7590
self: "Any", name: str, *args: "Any", **kwargs: "Any"
7691
) -> "Any":
77-
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
92+
client = sentry_sdk.get_client()
93+
integration = client.get_integration(RedisIntegration)
7894
if integration is None:
7995
return old_execute_command(self, name, *args, **kwargs)
8096

97+
span_streaming = has_span_streaming_enabled(client.options)
98+
8199
cache_properties = _compile_cache_span_properties(
82100
name,
83101
args,
84102
kwargs,
85103
integration,
86104
)
87105

88-
cache_span = None
106+
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
89107
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
90-
cache_span = sentry_sdk.start_span(
91-
op=cache_properties["op"],
92-
name=cache_properties["description"],
93-
origin=SPAN_ORIGIN,
94-
)
108+
if span_streaming:
109+
cache_span = sentry_sdk.traces.start_span(
110+
name=cache_properties["description"]
111+
)
112+
cache_span.set_op(cache_properties["op"])
113+
cache_span.set_origin(SPAN_ORIGIN)
114+
else:
115+
cache_span = sentry_sdk.start_span(
116+
op=cache_properties["op"],
117+
name=cache_properties["description"],
118+
origin=SPAN_ORIGIN,
119+
)
95120
cache_span.__enter__()
96121

97122
db_properties = _compile_db_span_properties(integration, name, args)
98123

99-
db_span = sentry_sdk.start_span(
100-
op=db_properties["op"],
101-
name=db_properties["description"],
102-
origin=SPAN_ORIGIN,
103-
)
124+
db_span: "Union[Span, StreamedSpan]"
125+
if span_streaming:
126+
db_span = sentry_sdk.traces.start_span(name=db_properties["description"])
127+
db_span.set_op(db_properties["op"])
128+
db_span.set_origin(SPAN_ORIGIN)
129+
else:
130+
db_span = sentry_sdk.start_span(
131+
op=db_properties["op"],
132+
name=db_properties["description"],
133+
origin=SPAN_ORIGIN,
134+
)
104135
db_span.__enter__()
105136

106137
set_db_data_fn(db_span, self)

sentry_sdk/integrations/redis/modules/caches.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from sentry_sdk.consts import OP, SPANDATA
66
from sentry_sdk.integrations.redis.utils import _get_safe_key, _key_as_string
7+
from sentry_sdk.traces import StreamedSpan
78
from sentry_sdk.utils import capture_internal_exceptions
89

910
GET_COMMANDS = ("get", "mget")
@@ -14,7 +15,7 @@
1415
if TYPE_CHECKING:
1516
from sentry_sdk.integrations.redis import RedisIntegration
1617
from sentry_sdk.tracing import Span
17-
from typing import Any, Optional
18+
from typing import Any, Optional, Union
1819

1920

2021
def _get_op(name: str) -> "Optional[str]":
@@ -80,25 +81,30 @@ def _get_cache_span_description(
8081

8182

8283
def _set_cache_data(
83-
span: "Span",
84+
span: "Union[Span, StreamedSpan]",
8485
redis_client: "Any",
8586
properties: "dict[str, Any]",
8687
return_value: "Optional[Any]",
8788
) -> None:
89+
if isinstance(span, StreamedSpan):
90+
set_on_span = span.set_attribute
91+
else:
92+
set_on_span = span.set_data
93+
8894
with capture_internal_exceptions():
89-
span.set_data(SPANDATA.CACHE_KEY, properties["key"])
95+
set_on_span(SPANDATA.CACHE_KEY, properties["key"])
9096

9197
if properties["redis_command"] in GET_COMMANDS:
9298
if return_value is not None:
93-
span.set_data(SPANDATA.CACHE_HIT, True)
99+
set_on_span(SPANDATA.CACHE_HIT, True)
94100
size = (
95101
len(str(return_value).encode("utf-8"))
96102
if not isinstance(return_value, bytes)
97103
else len(return_value)
98104
)
99-
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
105+
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
100106
else:
101-
span.set_data(SPANDATA.CACHE_HIT, False)
107+
set_on_span(SPANDATA.CACHE_HIT, False)
102108

103109
elif properties["redis_command"] in SET_COMMANDS:
104110
if properties["value"] is not None:
@@ -107,7 +113,7 @@ def _set_cache_data(
107113
if not isinstance(properties["value"], bytes)
108114
else len(properties["value"])
109115
)
110-
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
116+
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
111117

112118
try:
113119
connection_params = redis_client.connection_pool.connection_kwargs
@@ -122,8 +128,8 @@ def _set_cache_data(
122128

123129
host = connection_params.get("host")
124130
if host is not None:
125-
span.set_data(SPANDATA.NETWORK_PEER_ADDRESS, host)
131+
set_on_span(SPANDATA.NETWORK_PEER_ADDRESS, host)
126132

127133
port = connection_params.get("port")
128134
if port is not None:
129-
span.set_data(SPANDATA.NETWORK_PEER_PORT, port)
135+
set_on_span(SPANDATA.NETWORK_PEER_PORT, port)

sentry_sdk/integrations/redis/modules/queries.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from sentry_sdk.consts import OP, SPANDATA
66
from sentry_sdk.integrations.redis.utils import _get_safe_command
7+
from sentry_sdk.traces import StreamedSpan
78
from sentry_sdk.utils import capture_internal_exceptions
89

910
from typing import TYPE_CHECKING
@@ -12,7 +13,7 @@
1213
from redis import Redis
1314
from sentry_sdk.integrations.redis import RedisIntegration
1415
from sentry_sdk.tracing import Span
15-
from typing import Any
16+
from typing import Any, Union
1617

1718

1819
def _compile_db_span_properties(
@@ -42,23 +43,32 @@ def _get_db_span_description(
4243
return description
4344

4445

45-
def _set_db_data_on_span(span: "Span", connection_params: "dict[str, Any]") -> None:
46-
span.set_data(SPANDATA.DB_SYSTEM, "redis")
46+
def _set_db_data_on_span(
47+
span: "Union[Span, StreamedSpan]", connection_params: "dict[str, Any]"
48+
) -> None:
49+
if isinstance(span, StreamedSpan):
50+
set_on_span = span.set_attribute
51+
else:
52+
set_on_span = span.set_data
53+
54+
set_on_span(SPANDATA.DB_SYSTEM, "redis")
4755

4856
db = connection_params.get("db")
4957
if db is not None:
50-
span.set_data(SPANDATA.DB_NAME, str(db))
58+
set_on_span(SPANDATA.DB_NAME, str(db))
5159

5260
host = connection_params.get("host")
5361
if host is not None:
54-
span.set_data(SPANDATA.SERVER_ADDRESS, host)
62+
set_on_span(SPANDATA.SERVER_ADDRESS, host)
5563

5664
port = connection_params.get("port")
5765
if port is not None:
58-
span.set_data(SPANDATA.SERVER_PORT, port)
66+
set_on_span(SPANDATA.SERVER_PORT, port)
5967

6068

61-
def _set_db_data(span: "Span", redis_instance: "Redis[Any]") -> None:
69+
def _set_db_data(
70+
span: "Union[Span, StreamedSpan]", redis_instance: "Redis[Any]"
71+
) -> None:
6272
try:
6373
_set_db_data_on_span(span, redis_instance.connection_pool.connection_kwargs)
6474
except AttributeError:

0 commit comments

Comments
 (0)