diff --git a/CHANGELOG.md b/CHANGELOG.md index f3b24cc636..eeb57da086 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3959](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3959)) - `opentelemetry-instrumentation-httpx`: add ability to capture custom headers ([#4047](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4047)) +- `opentelemetry-instrumentation-tornado`: Implement new semantic convention opt-in migration + ([#3993](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3993)) ### Fixed diff --git a/instrumentation/README.md b/instrumentation/README.md index 1597be99af..02e9ef4d50 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -47,7 +47,7 @@ | [opentelemetry-instrumentation-starlette](./opentelemetry-instrumentation-starlette) | starlette >= 0.13 | Yes | development | [opentelemetry-instrumentation-system-metrics](./opentelemetry-instrumentation-system-metrics) | psutil >= 5 | No | development | [opentelemetry-instrumentation-threading](./opentelemetry-instrumentation-threading) | threading | No | development -| [opentelemetry-instrumentation-tornado](./opentelemetry-instrumentation-tornado) | tornado >= 5.1.1 | Yes | development +| [opentelemetry-instrumentation-tornado](./opentelemetry-instrumentation-tornado) | tornado >= 5.1.1 | Yes | migration | [opentelemetry-instrumentation-tortoiseorm](./opentelemetry-instrumentation-tortoiseorm) | tortoise-orm >= 0.17.0 | No | development | [opentelemetry-instrumentation-urllib](./opentelemetry-instrumentation-urllib) | urllib | Yes | migration | [opentelemetry-instrumentation-urllib3](./opentelemetry-instrumentation-urllib3) | urllib3 >= 1.0.0, < 3.0.0 | Yes | migration diff --git a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/__init__.py b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/__init__.py index e8a2477af8..b2f4ccb45a 100644 --- a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/__init__.py @@ -154,6 +154,7 @@ def client_response_hook(span, future): --- """ +import urllib from collections import namedtuple from functools import partial from logging import getLogger @@ -167,6 +168,21 @@ def client_response_hook(span, future): from wrapt import wrap_function_wrapper from opentelemetry import context, trace +from opentelemetry.instrumentation._semconv import ( + HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, + _get_schema_url, + _OpenTelemetrySemanticConventionStability, + _OpenTelemetryStabilitySignalType, + _report_new, + _report_old, + _set_http_flavor_version, + _set_http_host_server, + _set_http_method, + _set_http_scheme, + _set_http_target, + _set_status, + _StabilityMode, +) from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.propagators import ( FuncSetter, @@ -177,7 +193,6 @@ def client_response_hook(span, future): from opentelemetry.instrumentation.utils import ( _start_internal_or_server_span, extract_attributes_from_object, - http_status_to_status_code, unwrap, ) from opentelemetry.metrics import get_meter @@ -195,8 +210,23 @@ def client_response_hook(span, future): from opentelemetry.semconv._incubating.attributes.net_attributes import ( NET_PEER_IP, ) +from opentelemetry.semconv.attributes.client_attributes import ( + CLIENT_ADDRESS, +) +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_REQUEST_METHOD, + HTTP_RESPONSE_STATUS_CODE, +) +from opentelemetry.semconv.attributes.network_attributes import ( + NETWORK_PEER_ADDRESS, + NETWORK_PROTOCOL_VERSION, +) +from opentelemetry.semconv.attributes.url_attributes import ( + URL_PATH, + URL_QUERY, + URL_SCHEME, +) from opentelemetry.semconv.metrics import MetricInstruments -from opentelemetry.trace.status import Status, StatusCode from opentelemetry.util.http import ( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST, OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE, @@ -205,6 +235,7 @@ def client_response_hook(span, future): get_traced_request_attrs, normalise_request_header_name, normalise_response_header_name, + sanitize_method, ) from .client import fetch_async # pylint: disable=E0401 @@ -226,10 +257,14 @@ class TornadoInstrumentor(BaseInstrumentor): patched_handlers = [] original_handler_new = None + def __init__(self): + super().__init__() + self._sem_conv_opt_in_mode = _StabilityMode.DEFAULT + def instrumentation_dependencies(self) -> Collection[str]: return _instruments - def _instrument(self, **kwargs): + def _instrument(self, **kwargs): # pylint: disable=too-many-locals """ _instrument patches tornado.web.RequestHandler and tornado.httpclient.AsyncHTTPClient classes to automatically instrument requests both received and sent by Tornado. @@ -249,24 +284,49 @@ def _instrument(self, **kwargs): Note that the patch does not apply on every single __init__ call, only the first one for the entire process lifetime. """ + # Initialize semantic conventions opt-in mode + _OpenTelemetrySemanticConventionStability._initialize() + sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.HTTP, + ) + self._sem_conv_opt_in_mode = sem_conv_opt_in_mode + tracer_provider = kwargs.get("tracer_provider") tracer = trace.get_tracer( __name__, __version__, tracer_provider, - schema_url="https://opentelemetry.io/schemas/1.11.0", + schema_url=_get_schema_url(sem_conv_opt_in_mode), ) meter_provider = kwargs.get("meter_provider") - meter = get_meter( - __name__, - __version__, - meter_provider, - schema_url="https://opentelemetry.io/schemas/1.11.0", - ) - client_histograms = _create_client_histograms(meter) - server_histograms = _create_server_histograms(meter) + # Create meters for old and new semconv based on opt-in mode + meter_old = None + meter_new = None + + if _report_old(sem_conv_opt_in_mode): + meter_old = get_meter( + __name__, + __version__, + meter_provider, + schema_url=_get_schema_url(_StabilityMode.DEFAULT), + ) + + if _report_new(sem_conv_opt_in_mode): + meter_new = get_meter( + __name__, + __version__, + meter_provider, + schema_url=_get_schema_url(_StabilityMode.HTTP), + ) + + client_histograms = _create_client_histograms( + meter_old, meter_new, sem_conv_opt_in_mode + ) + server_histograms = _create_server_histograms( + meter_old, meter_new, sem_conv_opt_in_mode + ) client_request_hook = kwargs.get("client_request_hook", None) client_response_hook = kwargs.get("client_response_hook", None) @@ -275,7 +335,11 @@ def _instrument(self, **kwargs): def handler_init(init, handler, args, kwargs): cls = handler.__class__ if patch_handler_class( - tracer, server_histograms, cls, server_request_hook + tracer, + server_histograms, + cls, + server_request_hook, + sem_conv_opt_in_mode, ): self.patched_handlers.append(cls) return init(*args, **kwargs) @@ -283,6 +347,14 @@ def handler_init(init, handler, args, kwargs): wrap_function_wrapper( "tornado.web", "RequestHandler.__init__", handler_init ) + + duration_old = client_histograms.get("old_duration") + duration_new = client_histograms.get("new_duration") + request_size_old = client_histograms.get("old_request_size") + request_size_new = client_histograms.get("new_request_size") + response_size_old = client_histograms.get("old_response_size") + response_size_new = client_histograms.get("new_response_size") + wrap_function_wrapper( "tornado.httpclient", "AsyncHTTPClient.fetch", @@ -291,13 +363,19 @@ def handler_init(init, handler, args, kwargs): tracer, client_request_hook, client_response_hook, - client_histograms[MetricInstruments.HTTP_CLIENT_DURATION], - client_histograms[MetricInstruments.HTTP_CLIENT_REQUEST_SIZE], - client_histograms[MetricInstruments.HTTP_CLIENT_RESPONSE_SIZE], + duration_old, + duration_new, + request_size_old, + request_size_new, + response_size_old, + response_size_new, + sem_conv_opt_in_mode, ), ) def _uninstrument(self, **kwargs): + self._sem_conv_opt_in_mode = _StabilityMode.DEFAULT + unwrap(tornado.web.RequestHandler, "__init__") unwrap(tornado.httpclient.AsyncHTTPClient, "fetch") for handler in self.patched_handlers: @@ -305,56 +383,114 @@ def _uninstrument(self, **kwargs): self.patched_handlers = [] -def _create_server_histograms(meter) -> Dict[str, Histogram]: - histograms = { - MetricInstruments.HTTP_SERVER_DURATION: meter.create_histogram( +def _create_server_histograms( + meter_old, meter_new, sem_conv_opt_in_mode +) -> Dict[str, Histogram]: + histograms = {} + + # Create old semconv metrics + if _report_old(sem_conv_opt_in_mode): + histograms["old_duration"] = meter_old.create_histogram( name=MetricInstruments.HTTP_SERVER_DURATION, unit="ms", - description="Measures the duration of inbound HTTP requests.", - ), - MetricInstruments.HTTP_SERVER_REQUEST_SIZE: meter.create_histogram( + description="measures the duration of inbound HTTP requests", + ) + histograms["old_request_size"] = meter_old.create_histogram( name=MetricInstruments.HTTP_SERVER_REQUEST_SIZE, unit="By", description="measures the size of HTTP request messages (compressed)", - ), - MetricInstruments.HTTP_SERVER_RESPONSE_SIZE: meter.create_histogram( + ) + histograms["old_response_size"] = meter_old.create_histogram( name=MetricInstruments.HTTP_SERVER_RESPONSE_SIZE, unit="By", description="measures the size of HTTP response messages (compressed)", - ), - MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS: meter.create_up_down_counter( + ) + + # Create new semconv metrics + if _report_new(sem_conv_opt_in_mode): + histograms["new_duration"] = meter_new.create_histogram( + name="http.server.request.duration", + unit="s", + description="Duration of HTTP server requests.", + explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, + ) + histograms["new_request_size"] = meter_new.create_histogram( + name="http.server.request.body.size", + unit="By", + description="Size of HTTP server request bodies.", + ) + histograms["new_response_size"] = meter_new.create_histogram( + name="http.server.response.body.size", + unit="By", + description="Size of HTTP server response bodies.", + ) + + # Active request counter for old/new semantic conventions same + # because the attributes are the same for both + # Use meter_old if available, otherwise meter_new + active_meter = meter_old if meter_old is not None else meter_new + if active_meter is not None: + histograms["active_requests"] = active_meter.create_up_down_counter( name=MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS, unit="requests", description="measures the number of concurrent HTTP requests that are currently in-flight", - ), - } + ) return histograms -def _create_client_histograms(meter) -> Dict[str, Histogram]: - histograms = { - MetricInstruments.HTTP_CLIENT_DURATION: meter.create_histogram( +def _create_client_histograms( + meter_old, meter_new, sem_conv_opt_in_mode +) -> Dict[str, Histogram]: + histograms = {} + + # Create old semconv metrics + if _report_old(sem_conv_opt_in_mode): + histograms["old_duration"] = meter_old.create_histogram( name=MetricInstruments.HTTP_CLIENT_DURATION, unit="ms", description="measures the duration outbound HTTP requests", - ), - MetricInstruments.HTTP_CLIENT_REQUEST_SIZE: meter.create_histogram( + ) + histograms["old_request_size"] = meter_old.create_histogram( name=MetricInstruments.HTTP_CLIENT_REQUEST_SIZE, unit="By", description="measures the size of HTTP request messages (compressed)", - ), - MetricInstruments.HTTP_CLIENT_RESPONSE_SIZE: meter.create_histogram( + ) + histograms["old_response_size"] = meter_old.create_histogram( name=MetricInstruments.HTTP_CLIENT_RESPONSE_SIZE, unit="By", description="measures the size of HTTP response messages (compressed)", - ), - } + ) + + # Create new semconv metrics + if _report_new(sem_conv_opt_in_mode): + histograms["new_duration"] = meter_new.create_histogram( + name="http.client.request.duration", + unit="s", + description="Duration of HTTP client requests.", + explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, + ) + histograms["new_request_size"] = meter_new.create_histogram( + name="http.client.request.body.size", + unit="By", + description="Size of HTTP client request bodies.", + ) + histograms["new_response_size"] = meter_new.create_histogram( + name="http.client.response.body.size", + unit="By", + description="Size of HTTP client response bodies.", + ) return histograms -def patch_handler_class(tracer, server_histograms, cls, request_hook=None): +def patch_handler_class( + tracer, + server_histograms, + cls, + request_hook=None, + sem_conv_opt_in_mode=_StabilityMode.DEFAULT, +): if getattr(cls, _OTEL_PATCHED_KEY, False): return False @@ -362,22 +498,41 @@ def patch_handler_class(tracer, server_histograms, cls, request_hook=None): _wrap( cls, "prepare", - partial(_prepare, tracer, server_histograms, request_hook), + partial( + _prepare, + tracer, + server_histograms, + request_hook, + sem_conv_opt_in_mode, + ), ) _wrap( cls, "log_exception", - partial(_log_exception, tracer, server_histograms), + partial( + _log_exception, tracer, server_histograms, sem_conv_opt_in_mode + ), ) if issubclass(cls, tornado.websocket.WebSocketHandler): _wrap( cls, "on_close", - partial(_websockethandler_on_close, tracer, server_histograms), + partial( + _websockethandler_on_close, + tracer, + server_histograms, + sem_conv_opt_in_mode, + ), ) else: - _wrap(cls, "on_finish", partial(_on_finish, tracer, server_histograms)) + _wrap( + cls, + "on_finish", + partial( + _on_finish, tracer, server_histograms, sem_conv_opt_in_mode + ), + ) return True @@ -401,7 +556,14 @@ def _wrap(cls, method_name, wrapper): def _prepare( - tracer, server_histograms, request_hook, func, handler, args, kwargs + tracer, + server_histograms, + request_hook, + sem_conv_opt_in_mode, + func, + handler, + args, + kwargs, ): request = handler.request otel_handler_state = { @@ -413,40 +575,68 @@ def _prepare( if otel_handler_state["exclude_request"]: return func(*args, **kwargs) - _record_prepare_metrics(server_histograms, handler) + _record_prepare_metrics(server_histograms, handler, sem_conv_opt_in_mode) - ctx = _start_span(tracer, handler) + ctx = _start_span(tracer, handler, sem_conv_opt_in_mode) if request_hook: request_hook(ctx.span, handler) return func(*args, **kwargs) -def _on_finish(tracer, server_histograms, func, handler, args, kwargs): +def _on_finish( + tracer, + server_histograms, + sem_conv_opt_in_mode, + func, + handler, + args, + kwargs, +): try: return func(*args, **kwargs) finally: - _record_on_finish_metrics(server_histograms, handler) - _finish_span(tracer, handler) + _record_on_finish_metrics( + server_histograms, handler, None, sem_conv_opt_in_mode + ) + _finish_span(tracer, handler, None, sem_conv_opt_in_mode) def _websockethandler_on_close( - tracer, server_histograms, func, handler, args, kwargs + tracer, + server_histograms, + sem_conv_opt_in_mode, + func, + handler, + args, + kwargs, ): try: func() finally: - _record_on_finish_metrics(server_histograms, handler) - _finish_span(tracer, handler) + _record_on_finish_metrics( + server_histograms, handler, None, sem_conv_opt_in_mode + ) + _finish_span(tracer, handler, None, sem_conv_opt_in_mode) -def _log_exception(tracer, server_histograms, func, handler, args, kwargs): +def _log_exception( + tracer, + server_histograms, + sem_conv_opt_in_mode, + func, + handler, + args, + kwargs, +): error = None if len(args) == 3: error = args[1] - _record_on_finish_metrics(server_histograms, handler, error) + _record_on_finish_metrics( + server_histograms, handler, error, sem_conv_opt_in_mode + ) - _finish_span(tracer, handler, error) + _finish_span(tracer, handler, error, sem_conv_opt_in_mode) return func(*args, **kwargs) @@ -476,23 +666,42 @@ def _collect_custom_response_headers_attributes(response_headers): return attributes -def _get_attributes_from_request(request): - attrs = { - HTTP_METHOD: request.method, - HTTP_SCHEME: request.protocol, - HTTP_HOST: request.host, - HTTP_TARGET: request.path, - } +def _get_attributes_from_request(request, sem_conv_opt_in_mode): + attrs = {} + + # Set attributes based on semconv mode + _set_http_method( + attrs, + request.method, + sanitize_method(request.method), + sem_conv_opt_in_mode, + ) + _set_http_scheme(attrs, request.protocol, sem_conv_opt_in_mode) + _set_http_host_server(attrs, request.host, sem_conv_opt_in_mode) + _set_http_target(attrs, request.path, None, None, sem_conv_opt_in_mode) + + # HTTP version + if request.version: + _set_http_flavor_version(attrs, request.version, sem_conv_opt_in_mode) if request.remote_ip: - # NET_PEER_IP is the address of the network peer - # HTTP_CLIENT_IP is the address of the client, which might be different - # if Tornado is set to trust X-Forwarded-For headers (xheaders=True) - attrs[HTTP_CLIENT_IP] = request.remote_ip + # Client IP address + # e.g. if Tornado is set to trust X-Forwarded-For headers (xheaders=True) + if _report_old(sem_conv_opt_in_mode): + attrs[HTTP_CLIENT_IP] = request.remote_ip + if _report_new(sem_conv_opt_in_mode): + attrs[CLIENT_ADDRESS] = request.remote_ip + + # Network peer IP if different from remote_ip if hasattr(request.connection, "context") and getattr( request.connection.context, "_orig_remote_ip", None ): - attrs[NET_PEER_IP] = request.connection.context._orig_remote_ip + if _report_old(sem_conv_opt_in_mode): + attrs[NET_PEER_IP] = request.connection.context._orig_remote_ip + if _report_new(sem_conv_opt_in_mode): + attrs[NETWORK_PEER_ADDRESS] = ( + request.connection.context._orig_remote_ip + ) return extract_attributes_from_object( request, _traced_request_attrs, attrs @@ -523,7 +732,7 @@ def _get_full_handler_name(handler): return f"{klass.__module__}.{klass.__qualname__}" -def _start_span(tracer, handler) -> _TraceContext: +def _start_span(tracer, handler, sem_conv_opt_in_mode) -> _TraceContext: span, token = _start_internal_or_server_span( tracer=tracer, span_name=_get_default_span_name(handler.request), @@ -533,7 +742,9 @@ def _start_span(tracer, handler) -> _TraceContext: ) if span.is_recording(): - attributes = _get_attributes_from_request(handler.request) + attributes = _get_attributes_from_request( + handler.request, sem_conv_opt_in_mode + ) for key, value in attributes.items(): span.set_attribute(key, value) span.set_attribute("tornado.handler", _get_full_handler_name(handler)) @@ -559,9 +770,8 @@ def _start_span(tracer, handler) -> _TraceContext: return ctx -def _finish_span(tracer, handler, error=None): +def _finish_span(tracer, handler, error, sem_conv_opt_in_mode): status_code = handler.get_status() - reason = getattr(handler, "_reason") finish_args = (None, None, None) ctx = getattr(handler, _HANDLER_CONTEXT_KEY, None) @@ -569,10 +779,9 @@ def _finish_span(tracer, handler, error=None): if isinstance(error, tornado.web.HTTPError): status_code = error.status_code if not ctx and status_code == 404: - ctx = _start_span(tracer, handler) + ctx = _start_span(tracer, handler, sem_conv_opt_in_mode) else: status_code = 500 - reason = None if status_code >= 500: finish_args = ( type(error), @@ -584,18 +793,14 @@ def _finish_span(tracer, handler, error=None): return if ctx.span.is_recording(): - ctx.span.set_attribute(HTTP_STATUS_CODE, status_code) - otel_status_code = http_status_to_status_code( - status_code, server_span=True - ) - otel_status_description = None - if otel_status_code is StatusCode.ERROR: - otel_status_description = reason - ctx.span.set_status( - Status( - status_code=otel_status_code, - description=otel_status_description, - ) + metric_attributes = {} + _set_status( + ctx.span, + metric_attributes, + status_code, + str(status_code) if status_code else None, + server_span=True, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, ) if ctx.span.is_recording() and ctx.span.kind == trace.SpanKind.SERVER: custom_attributes = _collect_custom_response_headers_attributes( @@ -610,65 +815,139 @@ def _finish_span(tracer, handler, error=None): delattr(handler, _HANDLER_CONTEXT_KEY) -def _record_prepare_metrics(server_histograms, handler): +def _record_prepare_metrics(server_histograms, handler, sem_conv_opt_in_mode): request_size = int(handler.request.headers.get("Content-Length", 0)) - metric_attributes = _create_metric_attributes(handler) - server_histograms[MetricInstruments.HTTP_SERVER_REQUEST_SIZE].record( - request_size, attributes=metric_attributes - ) + # Record old semconv metrics + if _report_old(sem_conv_opt_in_mode): + metric_attributes_old = _create_metric_attributes_old(handler) + server_histograms["old_request_size"].record( + request_size, attributes=metric_attributes_old + ) + active_requests_attributes_old = ( + _create_active_requests_attributes_old(handler.request) + ) + server_histograms["active_requests"].add( + 1, attributes=active_requests_attributes_old + ) - active_requests_attributes = _create_active_requests_attributes( - handler.request - ) - server_histograms[MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS].add( - 1, attributes=active_requests_attributes - ) + # Record new semconv metrics + if _report_new(sem_conv_opt_in_mode): + metric_attributes_new = _create_metric_attributes_new(handler) + server_histograms["new_request_size"].record( + request_size, attributes=metric_attributes_new + ) + # Don't add to active_requests again if already added in old mode + if not _report_old(sem_conv_opt_in_mode): + active_requests_attributes_new = ( + _create_active_requests_attributes_new(handler.request) + ) + server_histograms["active_requests"].add( + 1, attributes=active_requests_attributes_new + ) -def _record_on_finish_metrics(server_histograms, handler, error=None): +def _record_on_finish_metrics( + server_histograms, handler, error, sem_conv_opt_in_mode +): otel_handler_state = getattr(handler, _HANDLER_STATE_KEY, None) or {} if otel_handler_state.get("exclude_request"): return start_time = otel_handler_state.get(_START_TIME, None) or default_timer() - elapsed_time = round((default_timer() - start_time) * 1000) + elapsed_time_s = default_timer() - start_time + elapsed_time_ms = round(elapsed_time_s * 1000) response_size = int(handler._headers.get("Content-Length", 0)) - metric_attributes = _create_metric_attributes(handler) + status_code = handler.get_status() if isinstance(error, tornado.web.HTTPError): - metric_attributes[HTTP_STATUS_CODE] = error.status_code + status_code = error.status_code - server_histograms[MetricInstruments.HTTP_SERVER_RESPONSE_SIZE].record( - response_size, attributes=metric_attributes - ) + # Record old semconv metrics + if _report_old(sem_conv_opt_in_mode): + metric_attributes_old = _create_metric_attributes_old(handler) + if isinstance(error, tornado.web.HTTPError): + metric_attributes_old[HTTP_STATUS_CODE] = status_code - server_histograms[MetricInstruments.HTTP_SERVER_DURATION].record( - elapsed_time, attributes=metric_attributes - ) + server_histograms["old_response_size"].record( + response_size, attributes=metric_attributes_old + ) + server_histograms["old_duration"].record( + elapsed_time_ms, attributes=metric_attributes_old + ) - active_requests_attributes = _create_active_requests_attributes( - handler.request - ) - server_histograms[MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS].add( - -1, attributes=active_requests_attributes - ) + active_requests_attributes_old = ( + _create_active_requests_attributes_old(handler.request) + ) + server_histograms["active_requests"].add( + -1, attributes=active_requests_attributes_old + ) + + # Record new semconv metrics + if _report_new(sem_conv_opt_in_mode): + metric_attributes_new = _create_metric_attributes_new(handler) + if isinstance(error, tornado.web.HTTPError): + metric_attributes_new[HTTP_RESPONSE_STATUS_CODE] = status_code + server_histograms["new_response_size"].record( + response_size, attributes=metric_attributes_new + ) + server_histograms["new_duration"].record( + elapsed_time_s, attributes=metric_attributes_new + ) -def _create_active_requests_attributes(request): + # Don't subtract from active_requests again if already done in old mode + if not _report_old(sem_conv_opt_in_mode): + active_requests_attributes_new = ( + _create_active_requests_attributes_new(handler.request) + ) + server_histograms["active_requests"].add( + -1, attributes=active_requests_attributes_new + ) + + +def _create_active_requests_attributes_old(request): + """Create metric attributes for active requests using old semconv.""" metric_attributes = { HTTP_METHOD: request.method, HTTP_SCHEME: request.protocol, HTTP_FLAVOR: request.version, HTTP_HOST: request.host, - HTTP_TARGET: request.path, } + metric_attributes[HTTP_TARGET] = request.path + return metric_attributes + +def _create_active_requests_attributes_new(request): + """Create metric attributes for active requests using new semconv.""" + metric_attributes = { + HTTP_REQUEST_METHOD: request.method, + URL_SCHEME: request.protocol, + } + if request.version: + metric_attributes[NETWORK_PROTOCOL_VERSION] = request.version return metric_attributes -def _create_metric_attributes(handler): - metric_attributes = _create_active_requests_attributes(handler.request) +def _create_metric_attributes_old(handler): + """Create metric attributes using old semconv.""" + metric_attributes = _create_active_requests_attributes_old(handler.request) metric_attributes[HTTP_STATUS_CODE] = handler.get_status() + return metric_attributes + + +def _create_metric_attributes_new(handler): + """Create metric attributes using new semconv.""" + metric_attributes = _create_active_requests_attributes_new(handler.request) + metric_attributes[HTTP_RESPONSE_STATUS_CODE] = handler.get_status() + + # Add URL path if available + if handler.request.path: + # Parse query from path if present + parsed = urllib.parse.urlparse(handler.request.path) + if parsed.path: + metric_attributes[URL_PATH] = parsed.path + if parsed.query: + metric_attributes[URL_QUERY] = parsed.query return metric_attributes diff --git a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/client.py b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/client.py index 8660181c87..e53a363bf6 100644 --- a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/client.py +++ b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/client.py @@ -18,15 +18,29 @@ from tornado.httpclient import HTTPError, HTTPRequest from opentelemetry import trace -from opentelemetry.instrumentation.utils import http_status_to_status_code +from opentelemetry.instrumentation._semconv import ( + _report_new, + _report_old, + _set_http_method, + _set_http_url, + _set_status, +) from opentelemetry.propagate import inject from opentelemetry.semconv._incubating.attributes.http_attributes import ( HTTP_METHOD, HTTP_STATUS_CODE, HTTP_URL, ) -from opentelemetry.trace.status import Status, StatusCode -from opentelemetry.util.http import redact_url +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_REQUEST_METHOD, + HTTP_RESPONSE_STATUS_CODE, +) +from opentelemetry.semconv.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) +from opentelemetry.semconv.attributes.url_attributes import URL_FULL +from opentelemetry.util.http import redact_url, sanitize_method def _normalize_request(args, kwargs): @@ -45,13 +59,17 @@ def _normalize_request(args, kwargs): return (new_args, new_kwargs) -def fetch_async( +def fetch_async( # pylint: disable=too-many-locals tracer, request_hook, response_hook, - duration_histogram, - request_size_histogram, - response_size_histogram, + duration_histogram_old, + duration_histogram_new, + request_size_histogram_old, + request_size_histogram_new, + response_size_histogram_old, + response_size_histogram_new, + sem_conv_opt_in_mode, func, _, args, @@ -78,10 +96,17 @@ def fetch_async( request_hook(span, request) if span.is_recording(): - attributes = { - HTTP_URL: redact_url(request.url), - HTTP_METHOD: request.method, - } + attributes = {} + _set_http_url( + attributes, redact_url(request.url), sem_conv_opt_in_mode + ) + _set_http_method( + attributes, + request.method, + sanitize_method(request.method), + sem_conv_opt_in_mode, + ) + for key, value in attributes.items(): span.set_attribute(key, value) @@ -93,80 +118,132 @@ def fetch_async( _finish_tracing_callback, span=span, response_hook=response_hook, - duration_histogram=duration_histogram, - request_size_histogram=request_size_histogram, - response_size_histogram=response_size_histogram, + duration_histogram_old=duration_histogram_old, + duration_histogram_new=duration_histogram_new, + request_size_histogram_old=request_size_histogram_old, + request_size_histogram_new=request_size_histogram_new, + response_size_histogram_old=response_size_histogram_old, + response_size_histogram_new=response_size_histogram_new, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, ) ) return future -def _finish_tracing_callback( +def _finish_tracing_callback( # pylint: disable=too-many-locals,too-many-branches future, span, response_hook, - duration_histogram, - request_size_histogram, - response_size_histogram, + duration_histogram_old, + duration_histogram_new, + request_size_histogram_old, + request_size_histogram_new, + response_size_histogram_old, + response_size_histogram_new, + sem_conv_opt_in_mode, ): response = None status_code = None - status = None - description = None exc = future.exception() if exc: - description = f"{type(exc).__qualname__}: {exc}" if isinstance(exc, HTTPError): response = exc.response status_code = exc.code - status = Status( - status_code=http_status_to_status_code(status_code), - description=description, - ) else: - status = Status( - status_code=StatusCode.ERROR, - description=description, - ) span.record_exception(exc) else: response = future.result() status_code = response.code - status = Status( - status_code=http_status_to_status_code(status_code), - description=description, - ) - if status_code is not None: - span.set_attribute(HTTP_STATUS_CODE, status_code) - span.set_status(status) + # Set status using semconv helper + metric_attributes = {} + _set_status( + span, + metric_attributes, + status_code if status_code is not None else -1, + str(status_code) if status_code is not None else "Exception", + server_span=False, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, + ) if response is not None: - metric_attributes = _create_metric_attributes(response) request_size = int(response.request.headers.get("Content-Length", 0)) response_size = int(response.headers.get("Content-Length", 0)) - duration_histogram.record( - response.request_time, attributes=metric_attributes - ) - request_size_histogram.record( - request_size, attributes=metric_attributes - ) - response_size_histogram.record( - response_size, attributes=metric_attributes - ) + # Record old semconv metrics + if ( + _report_old(sem_conv_opt_in_mode) + and duration_histogram_old is not None + ): + metric_attributes_old = _create_metric_attributes_old(response) + if duration_histogram_old: + duration_histogram_old.record( + response.request_time, attributes=metric_attributes_old + ) + if request_size_histogram_old: + request_size_histogram_old.record( + request_size, attributes=metric_attributes_old + ) + if response_size_histogram_old: + response_size_histogram_old.record( + response_size, attributes=metric_attributes_old + ) + + # Record new semconv metrics (duration in seconds) + if ( + _report_new(sem_conv_opt_in_mode) + and duration_histogram_new is not None + ): + metric_attributes_new = _create_metric_attributes_new(response) + # Convert request_time from seconds to seconds (it's already in seconds) + if duration_histogram_new: + duration_histogram_new.record( + response.request_time, attributes=metric_attributes_new + ) + if request_size_histogram_new: + request_size_histogram_new.record( + request_size, attributes=metric_attributes_new + ) + if response_size_histogram_new: + response_size_histogram_new.record( + response_size, attributes=metric_attributes_new + ) if response_hook: response_hook(span, future) span.end() -def _create_metric_attributes(response): +def _create_metric_attributes_old(response): + """Create metric attributes using old semconv.""" metric_attributes = { HTTP_STATUS_CODE: response.code, HTTP_URL: redact_url(response.request.url), HTTP_METHOD: response.request.method, } + return metric_attributes + + +def _create_metric_attributes_new(response): + """Create metric attributes using new semconv.""" + metric_attributes = { + HTTP_RESPONSE_STATUS_CODE: response.code, + URL_FULL: redact_url(response.request.url), + HTTP_REQUEST_METHOD: response.request.method, + } + + # Add server address and port if available + if hasattr(response.request, "host") and response.request.host: + host = response.request.host + if ":" in host: + server_address, port_str = host.rsplit(":", 1) + metric_attributes[SERVER_ADDRESS] = server_address + try: + metric_attributes[SERVER_PORT] = int(port_str) + except ValueError: + pass + else: + metric_attributes[SERVER_ADDRESS] = host return metric_attributes diff --git a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/package.py b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/package.py index 734587b752..cb4f6e0750 100644 --- a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/package.py +++ b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/package.py @@ -16,3 +16,5 @@ _instruments = ("tornado >= 5.1.1",) _supports_metrics = True + +_semconv_status = "migration" diff --git a/instrumentation/opentelemetry-instrumentation-tornado/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-tornado/tests/test_instrumentation.py index 148e520ebc..07ccd923cb 100644 --- a/instrumentation/opentelemetry-instrumentation-tornado/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-tornado/tests/test_instrumentation.py @@ -71,6 +71,13 @@ def get_app(self): def setUp(self): super().setUp() + # Reset semconv initialization to ensure clean state + # pylint: disable=import-outside-toplevel + from opentelemetry.instrumentation._semconv import ( # noqa: PLC0415 + _OpenTelemetrySemanticConventionStability, + ) + + _OpenTelemetrySemanticConventionStability._initialized = False # pylint: disable=protected-access TornadoInstrumentor().instrument( server_request_hook=getattr(self, "server_request_hook", None), client_request_hook=getattr(self, "client_request_hook", None), diff --git a/instrumentation/opentelemetry-instrumentation-tornado/tests/test_metrics_instrumentation.py b/instrumentation/opentelemetry-instrumentation-tornado/tests/test_metrics_instrumentation.py index 859c38a05e..37531f25f4 100644 --- a/instrumentation/opentelemetry-instrumentation-tornado/tests/test_metrics_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-tornado/tests/test_metrics_instrumentation.py @@ -15,15 +15,42 @@ import asyncio from timeit import default_timer +from unittest.mock import patch import tornado.testing +from tornado.testing import AsyncHTTPTestCase +from opentelemetry import trace +from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, +) from opentelemetry.instrumentation.tornado import TornadoInstrumentor from opentelemetry.sdk.metrics.export import HistogramDataPoint +from opentelemetry.semconv._incubating.attributes.http_attributes import ( + HTTP_HOST, + HTTP_METHOD, + HTTP_SCHEME, + HTTP_STATUS_CODE, + HTTP_TARGET, + HTTP_URL, +) +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_REQUEST_METHOD, + HTTP_RESPONSE_STATUS_CODE, +) +from opentelemetry.semconv.attributes.url_attributes import ( + URL_FULL, + URL_PATH, + URL_SCHEME, +) +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import SpanKind from .test_instrumentation import ( # pylint: disable=no-name-in-module,import-error TornadoTest, ) +from .tornado_test_app import make_app class TestTornadoMetricsInstrumentation(TornadoTest): @@ -265,3 +292,287 @@ def test_excluded(path): test_excluded("/healthz") test_excluded("/ping") + + +class TornadoSemconvTestBase(AsyncHTTPTestCase, TestBase): + def get_app(self): + tracer = trace.get_tracer(__name__) + app = make_app(tracer) + return app + + def tearDown(self): + TornadoInstrumentor().uninstrument() + super().tearDown() + + @staticmethod + def _get_server_span(spans): + for span in spans: + if span.kind == SpanKind.SERVER: + return span + return None + + @staticmethod + def _get_client_span(spans): + for span in spans: + if span.kind == SpanKind.CLIENT: + return span + return None + + +class TestTornadoSemconvDefault(TornadoSemconvTestBase): + def setUp(self): + super().setUp() + _OpenTelemetrySemanticConventionStability._initialized = False + TornadoInstrumentor().instrument() + + def test_server_span_attributes_old_semconv(self): + response = self.fetch("/") + self.assertEqual(response.code, 201) + spans = self.memory_exporter.get_finished_spans() + server_span = self._get_server_span(spans) + self.assertIsNotNone(server_span) + + # Verify old semconv attributes are present + self.assertIn(HTTP_METHOD, server_span.attributes) + self.assertIn(HTTP_SCHEME, server_span.attributes) + self.assertIn(HTTP_HOST, server_span.attributes) + self.assertIn(HTTP_TARGET, server_span.attributes) + self.assertIn(HTTP_STATUS_CODE, server_span.attributes) + # Verify new semconv attributes are NOT present + self.assertNotIn(HTTP_REQUEST_METHOD, server_span.attributes) + self.assertNotIn(URL_SCHEME, server_span.attributes) + self.assertNotIn(URL_PATH, server_span.attributes) + self.assertNotIn(HTTP_RESPONSE_STATUS_CODE, server_span.attributes) + # Verify schema URL + self.assertEqual( + server_span.instrumentation_scope.schema_url, + "https://opentelemetry.io/schemas/1.11.0", + ) + + def test_client_span_attributes_old_semconv(self): + response = self.fetch("/") + self.assertEqual(response.code, 201) + spans = self.memory_exporter.get_finished_spans() + client_span = self._get_client_span(spans) + self.assertIsNotNone(client_span) + + # Verify old semconv attributes are present + self.assertIn(HTTP_METHOD, client_span.attributes) + self.assertIn(HTTP_URL, client_span.attributes) + self.assertIn(HTTP_STATUS_CODE, client_span.attributes) + # Verify new semconv attributes are NOT present + self.assertNotIn(HTTP_REQUEST_METHOD, client_span.attributes) + self.assertNotIn(URL_FULL, client_span.attributes) + self.assertNotIn(HTTP_RESPONSE_STATUS_CODE, client_span.attributes) + + def test_server_metrics_old_semconv(self): + """Test that server metrics use old semantic conventions by default.""" + response = self.fetch("/") + self.assertEqual(response.code, 201) + metrics = self.memory_metrics_reader.get_metrics_data() + resource_metrics = metrics.resource_metrics + + # Find old semconv metrics + old_duration_found = False + new_duration_found = False + for rm in resource_metrics: + for sm in rm.scope_metrics: + for metric in sm.metrics: + if metric.name == "http.server.duration": + old_duration_found = True + # Verify unit is milliseconds for old semconv + self.assertEqual(metric.unit, "ms") + elif metric.name == "http.server.request.duration": + new_duration_found = True + self.assertTrue(old_duration_found, "Old semconv metric not found") + self.assertFalse( + new_duration_found, "New semconv metric should not be present" + ) + + +class TestTornadoSemconvHttpNew(TornadoSemconvTestBase): + def setUp(self): + super().setUp() + _OpenTelemetrySemanticConventionStability._initialized = False + with patch.dict("os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: "http"}): + TornadoInstrumentor().instrument() + + def test_server_span_attributes_new_semconv(self): + response = self.fetch("/") + self.assertEqual(response.code, 201) + spans = self.memory_exporter.get_finished_spans() + server_span = self._get_server_span(spans) + self.assertIsNotNone(server_span) + + # Verify new semconv attributes are present + self.assertIn(HTTP_REQUEST_METHOD, server_span.attributes) + self.assertIn(URL_SCHEME, server_span.attributes) + self.assertIn(HTTP_RESPONSE_STATUS_CODE, server_span.attributes) + # Verify old semconv attributes are NOT present + self.assertNotIn(HTTP_METHOD, server_span.attributes) + self.assertNotIn(HTTP_SCHEME, server_span.attributes) + self.assertNotIn(HTTP_TARGET, server_span.attributes) + self.assertNotIn(HTTP_STATUS_CODE, server_span.attributes) + # Verify schema URL + self.assertEqual( + server_span.instrumentation_scope.schema_url, + "https://opentelemetry.io/schemas/1.21.0", + ) + + def test_client_span_attributes_new_semconv(self): + """Test that client spans use new semantic conventions in http mode.""" + response = self.fetch("/") + self.assertEqual(response.code, 201) + spans = self.memory_exporter.get_finished_spans() + client_span = self._get_client_span(spans) + self.assertIsNotNone(client_span) + + # Verify new semconv attributes are present + self.assertIn(HTTP_REQUEST_METHOD, client_span.attributes) + self.assertIn(URL_FULL, client_span.attributes) + self.assertIn(HTTP_RESPONSE_STATUS_CODE, client_span.attributes) + # Verify old semconv attributes are NOT present + self.assertNotIn(HTTP_METHOD, client_span.attributes) + self.assertNotIn(HTTP_URL, client_span.attributes) + self.assertNotIn(HTTP_STATUS_CODE, client_span.attributes) + + def test_server_metrics_new_semconv(self): + """Test that server metrics use new semantic conventions in http mode.""" + response = self.fetch("/") + self.assertEqual(response.code, 201) + metrics = self.memory_metrics_reader.get_metrics_data() + resource_metrics = metrics.resource_metrics + + # Find new semconv metrics + old_duration_found = False + new_duration_found = False + for rm in resource_metrics: + for sm in rm.scope_metrics: + for metric in sm.metrics: + if metric.name == "http.server.duration": + old_duration_found = True + elif metric.name == "http.server.request.duration": + new_duration_found = True + # Verify unit is seconds for new semconv + self.assertEqual(metric.unit, "s") + self.assertFalse( + old_duration_found, "Old semconv metric should not be present" + ) + self.assertTrue(new_duration_found, "New semconv metric not found") + + +class TestTornadoSemconvHttpDup(TornadoSemconvTestBase): + def setUp(self): + super().setUp() + _OpenTelemetrySemanticConventionStability._initialized = False + with patch.dict( + "os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: "http/dup"} + ): + TornadoInstrumentor().instrument() + + def test_server_span_attributes_both_semconv(self): + response = self.fetch("/") + self.assertEqual(response.code, 201) + spans = self.memory_exporter.get_finished_spans() + server_span = self._get_server_span(spans) + self.assertIsNotNone(server_span) + + # Verify old semconv attributes are present + self.assertIn(HTTP_METHOD, server_span.attributes) + self.assertIn(HTTP_SCHEME, server_span.attributes) + self.assertIn(HTTP_HOST, server_span.attributes) + self.assertIn(HTTP_TARGET, server_span.attributes) + self.assertIn(HTTP_STATUS_CODE, server_span.attributes) + # Verify new semconv attributes are also present + self.assertIn(HTTP_REQUEST_METHOD, server_span.attributes) + self.assertIn(URL_SCHEME, server_span.attributes) + self.assertIn(HTTP_RESPONSE_STATUS_CODE, server_span.attributes) + # Verify values match between old and new + self.assertEqual( + server_span.attributes[HTTP_METHOD], + server_span.attributes[HTTP_REQUEST_METHOD], + ) + self.assertEqual( + server_span.attributes[HTTP_STATUS_CODE], + server_span.attributes[HTTP_RESPONSE_STATUS_CODE], + ) + self.assertEqual( + server_span.attributes[HTTP_SCHEME], + server_span.attributes[URL_SCHEME], + ) + # Verify schema URL (in dup mode, schema_url should be the new one) + self.assertEqual( + server_span.instrumentation_scope.schema_url, + "https://opentelemetry.io/schemas/1.21.0", + ) + + def test_client_span_attributes_both_semconv(self): + response = self.fetch("/") + self.assertEqual(response.code, 201) + spans = self.memory_exporter.get_finished_spans() + client_span = self._get_client_span(spans) + self.assertIsNotNone(client_span) + + # Verify old semconv attributes are present + self.assertIn(HTTP_METHOD, client_span.attributes) + self.assertIn(HTTP_URL, client_span.attributes) + self.assertIn(HTTP_STATUS_CODE, client_span.attributes) + # Verify new semconv attributes are also present + self.assertIn(HTTP_REQUEST_METHOD, client_span.attributes) + self.assertIn(URL_FULL, client_span.attributes) + self.assertIn(HTTP_RESPONSE_STATUS_CODE, client_span.attributes) + # Verify values match between old and new + self.assertEqual( + client_span.attributes[HTTP_METHOD], + client_span.attributes[HTTP_REQUEST_METHOD], + ) + self.assertEqual( + client_span.attributes[HTTP_STATUS_CODE], + client_span.attributes[HTTP_RESPONSE_STATUS_CODE], + ) + + def test_server_metrics_both_semconv(self): + response = self.fetch("/") + self.assertEqual(response.code, 201) + metrics = self.memory_metrics_reader.get_metrics_data() + resource_metrics = metrics.resource_metrics + + # Find both old and new semconv metrics + old_duration_found = False + new_duration_found = False + for rm in resource_metrics: + for sm in rm.scope_metrics: + for metric in sm.metrics: + if metric.name == "http.server.duration": + old_duration_found = True + self.assertEqual(metric.unit, "ms") + elif metric.name == "http.server.request.duration": + new_duration_found = True + self.assertEqual(metric.unit, "s") + self.assertTrue(old_duration_found, "Old semconv metric not found") + self.assertTrue(new_duration_found, "New semconv metric not found") + + def test_client_metrics_both_semconv(self): + response = self.fetch("/") + self.assertEqual(response.code, 201) + metrics = self.memory_metrics_reader.get_metrics_data() + resource_metrics = metrics.resource_metrics + + # Find both old and new semconv metrics + old_duration_found = False + new_duration_found = False + for rm in resource_metrics: + for sm in rm.scope_metrics: + for metric in sm.metrics: + if metric.name == "http.client.duration": + old_duration_found = True + self.assertEqual(metric.unit, "ms") + elif metric.name == "http.client.request.duration": + new_duration_found = True + self.assertEqual(metric.unit, "s") + self.assertTrue( + old_duration_found, "Old semconv client metric not found" + ) + self.assertTrue( + new_duration_found, "New semconv client metric not found" + )