Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/test-integrations-dbs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ jobs:
run: |
set -x # print commands that are executed
./scripts/runtox.sh "py${{ matrix.python-version }}-clickhouse_driver"
- name: Test elasticsearch
run: |
set -x # print commands that are executed
./scripts/runtox.sh "py${{ matrix.python-version }}-elasticsearch"
- name: Test pymongo
run: |
set -x # print commands that are executed
Expand Down
4 changes: 4 additions & 0 deletions scripts/populate_tox/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@
"package": "dramatiq",
"num_versions": 2,
},
"elasticsearch": {
"package": "elasticsearch",
"python": ">=3.7",
},
"falcon": {
"package": "falcon",
"python": "<3.13",
Expand Down
14 changes: 14 additions & 0 deletions scripts/populate_tox/package_dependencies.jsonl

Large diffs are not rendered by default.

49 changes: 29 additions & 20 deletions scripts/populate_tox/releases.jsonl

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions scripts/split_tox_gh_actions/split_tox_gh_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
"DBs": [
"asyncpg",
"clickhouse_driver",
"elasticsearch",
"pymongo",
"redis",
"redis_py_cluster_legacy",
Expand Down
2 changes: 2 additions & 0 deletions sentry_sdk/integrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def iter_default_integrations(
"sentry_sdk.integrations.clickhouse_driver.ClickhouseDriverIntegration",
"sentry_sdk.integrations.cohere.CohereIntegration",
"sentry_sdk.integrations.django.DjangoIntegration",
"sentry_sdk.integrations.elasticsearch.ElasticsearchIntegration",
"sentry_sdk.integrations.falcon.FalconIntegration",
"sentry_sdk.integrations.fastapi.FastApiIntegration",
"sentry_sdk.integrations.flask.FlaskIntegration",
Expand Down Expand Up @@ -130,6 +131,7 @@ def iter_default_integrations(
"cohere": (5, 4, 0),
"django": (1, 8),
"dramatiq": (1, 9),
"elasticsearch": (7, 0, 0),
"falcon": (1, 4),
"fastapi": (0, 79, 0),
"flask": (1, 1, 4),
Expand Down
238 changes: 238 additions & 0 deletions sentry_sdk/integrations/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import sentry_sdk
from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
from sentry_sdk.integrations import Integration, DidNotEnable, _check_minimum_version
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any, Optional, Tuple

try:
from elasticsearch import VERSION # type: ignore[import-not-found]
except ImportError:
raise DidNotEnable("elasticsearch is not installed")


class ElasticsearchIntegration(Integration):
identifier = "elasticsearch"
origin = "auto.db.elasticsearch"

@staticmethod
def setup_once():
# type: () -> None
_check_minimum_version(ElasticsearchIntegration, VERSION)

major_version = VERSION[0]

if major_version >= 8:
# v8/v9: perform_request is on the Elasticsearch client class
from elasticsearch import Elasticsearch

_patch_perform_request(Elasticsearch, major_version)

try:
from elasticsearch._async.client import AsyncElasticsearch

_patch_perform_request_async(AsyncElasticsearch, major_version)
except ImportError:
pass
else:
# v7: perform_request is on the Transport class
from elasticsearch import Transport

_patch_perform_request(Transport, major_version)

try:
from elasticsearch._async.transport import AsyncTransport

_patch_perform_request_async(AsyncTransport, major_version)
except ImportError:
pass


def _parse_url(url):
# type: (str) -> Tuple[Optional[str], Optional[str]]
"""Extract the operation name and index from an Elasticsearch URL path.

Returns a (operation, index) tuple.

Examples:
/my-index/_search -> ("search", "my-index")
/_search -> ("search", None)
/my-index/_doc/1 -> ("doc", "my-index")
/_bulk -> ("bulk", None)
/ -> (None, None)
"""
parts = [p for p in url.split("/") if p]
if not parts:
return None, None

operation = None
index = None

for i, part in enumerate(parts):
if part.startswith("_"):
operation = part.lstrip("_")
if i > 0:
index = parts[0]
break

if operation is None and parts:
index = parts[0]

return operation, index


def _get_connection_info(obj, major_version):
# type: (Any, int) -> Tuple[Optional[str], Optional[int]]
"""Best-effort extraction of server address and port."""
# v7: Transport has a hosts list of dicts
try:
host_info = obj.hosts[0]
return host_info.get("host"), host_info.get("port")
except Exception:
pass

# v8/v9: Elasticsearch client with node pool
try:
node = list(obj.transport.node_pool.all())[0]
return node.config.host, node.config.port
except Exception:
pass

return None, None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused major_version parameter in _get_connection_info

Low Severity

The major_version parameter in _get_connection_info is accepted but never referenced in the function body. The function tries both the v7 approach (obj.hosts[0]) and the v8/v9 approach (obj.transport.node_pool.all()) sequentially via try/except regardless of the version. This unused parameter is misleading — it suggests version-specific logic that doesn't exist, and could confuse future maintainers into thinking version-dependent handling was intended but accidentally omitted.

Fix in Cursor Fix in Web



def _get_body(args, kwargs):
# type: (Any, Any) -> Any
"""Extract the request body from args/kwargs.

In v7, body can be positional (params=args[0], body=args[1]).
In v8/v9, body is always a keyword argument.
"""
body = kwargs.get("body")
if body is None and len(args) > 1:
body = args[1]
return body


def _patch_perform_request(cls, major_version):
# type: (Any, int) -> None
original = cls.perform_request

@ensure_integration_enabled(ElasticsearchIntegration, original)
def _sentry_perform_request(self, method, path, *args, **kwargs):
# type: (Any, str, str, *Any, **Any) -> Any
operation, index = _parse_url(path)
description = "{} {}".format(method, path)

span = sentry_sdk.start_span(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The new Elasticsearch integration creates a span but doesn't use it as a context manager or manually call span.__enter__(), so the span is not attached to the current scope.
Severity: MEDIUM

Suggested Fix

The span should be managed using a with statement to ensure its lifecycle is handled correctly. Wrap the operation that the span is measuring in a with start_span(...) as span: block. This will automatically handle entering and exiting the span's context, ensuring it is properly attached to the transaction.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_sdk/integrations/elasticsearch.py#L131

Potential issue: The `elasticsearch` integration creates a new span using
`sentry_sdk.start_span(...)` but fails to properly manage its lifecycle. It does not use
a `with` statement or manually call `span.__enter__()`. This prevents the span from
being pushed onto the current scope's span stack. As a result, the span is not correctly
associated with the active transaction and may not be recorded or sent to Sentry,
leading to incomplete tracing data for Elasticsearch operations. Other integrations like
`boto3` and `pymongo` correctly handle this by explicitly calling `span.__enter__()` and
`span.__exit__()`.

Did we get this right? 👍 / 👎 to inform future reviews.

op=OP.DB,
name=description,
origin=ElasticsearchIntegration.origin,
)

span.set_data(SPANDATA.DB_SYSTEM, "elasticsearch")
if operation:
span.set_data(SPANDATA.DB_OPERATION, operation)
if index:
span.set_data(SPANDATA.DB_NAME, index)

with capture_internal_exceptions():
address, port = _get_connection_info(self, major_version)
if address:
span.set_data(SPANDATA.SERVER_ADDRESS, address)
if port:
span.set_data(SPANDATA.SERVER_PORT, port)

if should_send_default_pii():
body = _get_body(args, kwargs)
if body is not None:
span.set_data("db.statement.body", body)

try:
result = original(self, method, path, *args, **kwargs)
span.set_status(SPANSTATUS.OK)
except Exception:
span.set_status(SPANSTATUS.INTERNAL_ERROR)
raise
finally:
with capture_internal_exceptions():
breadcrumb_data = {SPANDATA.DB_SYSTEM: "elasticsearch"}
if operation:
breadcrumb_data[SPANDATA.DB_OPERATION] = operation
if index:
breadcrumb_data[SPANDATA.DB_NAME] = index
sentry_sdk.add_breadcrumb(
message=description,
category="query",
data=breadcrumb_data,
)
span.finish()

return result

cls.perform_request = _sentry_perform_request


def _patch_perform_request_async(cls, major_version):
# type: (Any, int) -> None
original = cls.perform_request

async def _sentry_perform_request(self, method, path, *args, **kwargs):
# type: (Any, str, str, *Any, **Any) -> Any
if sentry_sdk.get_client().get_integration(ElasticsearchIntegration) is None:
return await original(self, method, path, *args, **kwargs)

operation, index = _parse_url(path)
description = "{} {}".format(method, path)

span = sentry_sdk.start_span(
op=OP.DB,
name=description,
origin=ElasticsearchIntegration.origin,
)

span.set_data(SPANDATA.DB_SYSTEM, "elasticsearch")
if operation:
span.set_data(SPANDATA.DB_OPERATION, operation)
if index:
span.set_data(SPANDATA.DB_NAME, index)

with capture_internal_exceptions():
address, port = _get_connection_info(self, major_version)
if address:
span.set_data(SPANDATA.SERVER_ADDRESS, address)
if port:
span.set_data(SPANDATA.SERVER_PORT, port)

if should_send_default_pii():
body = _get_body(args, kwargs)
if body is not None:
span.set_data("db.statement.body", body)

try:
result = await original(self, method, path, *args, **kwargs)
span.set_status(SPANSTATUS.OK)
except Exception:
span.set_status(SPANSTATUS.INTERNAL_ERROR)
raise
finally:
with capture_internal_exceptions():
breadcrumb_data = {SPANDATA.DB_SYSTEM: "elasticsearch"}
if operation:
breadcrumb_data[SPANDATA.DB_OPERATION] = operation
if index:
breadcrumb_data[SPANDATA.DB_NAME] = index
sentry_sdk.add_breadcrumb(
message=description,
category="query",
data=breadcrumb_data,
)
span.finish()

return result

cls.perform_request = _sentry_perform_request
3 changes: 3 additions & 0 deletions tests/integrations/elasticsearch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("elasticsearch")
Loading
Loading