diff --git a/CHANGELOG.md b/CHANGELOG.md index b69cbbf7e5..414aa39b4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4709](https://github.com/open-telemetry/opentelemetry-python/pull/4709)) - Implement experimental TracerConfigurator ([#4861](https://github.com/open-telemetry/opentelemetry-python/pull/4861)) +- `opentelemetry-exporter-otlp-proto-grpc`: make retryable gRPC error codes configurable for gRPC exporters + ([#4917](https://github.com/open-telemetry/opentelemetry-python/pull/4917)) ## Version 1.39.0/0.60b0 (2025-12-03) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index 63d8ac9cfb..0101cafb97 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -10,12 +10,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from collections.abc import Iterable from os import environ from typing import Dict, Literal, Optional, Sequence, Tuple, Union from typing import Sequence as TypingSequence -from grpc import ChannelCredentials, Compression +from grpc import ChannelCredentials, Compression, StatusCode from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.grpc.exporter import ( OTLPExporterMixin, @@ -66,6 +66,9 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, + retryable_error_codes: Optional[ + Union[Iterable[StatusCode], str] + ] = None, ): insecure_logs = environ.get(OTEL_EXPORTER_OTLP_LOGS_INSECURE) if insecure is None and insecure_logs is not None: @@ -105,6 +108,7 @@ def __init__( stub=LogsServiceStub, result=LogRecordExportResult, channel_options=channel_options, + retryable_error_codes=retryable_error_codes, ) def _translate_data( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 89c2608c30..a1a789c7d9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -20,10 +20,11 @@ """ +import os import random import threading from abc import ABC, abstractmethod -from collections.abc import Sequence # noqa: F401 +from collections.abc import Iterable, Sequence # noqa: F401 from logging import getLogger from os import environ from time import time @@ -91,6 +92,7 @@ from opentelemetry.sdk._shared_internal import DuplicateFilter from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER, + _OTEL_PYTHON_EXPORTER_OTLP_GRPC_RETRYABLE_ERROR_CODES, OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_KEY, @@ -299,6 +301,9 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, + retryable_error_codes: Optional[ + Union[Iterable[StatusCode], str] + ] = None, ): super().__init__() self._result = result @@ -357,6 +362,22 @@ def __init__( else compression ) or Compression.NoCompression + self._retryable_error_codes = retryable_error_codes or os.environ.get( + _OTEL_PYTHON_EXPORTER_OTLP_GRPC_RETRYABLE_ERROR_CODES + ) + if isinstance(self._retryable_error_codes, str): + self._retryable_error_codes = frozenset( + StatusCode[code.strip().upper()] + for code in self._retryable_error_codes.split(",") + if code.strip() + ) + elif self._retryable_error_codes is not None: + self._retryable_error_codes = frozenset( + self._retryable_error_codes + ) + else: + self._retryable_error_codes = _RETRYABLE_ERROR_CODES + self._channel = None self._client = None @@ -460,7 +481,7 @@ def _export( self._initialize_channel_and_stub() if ( - error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue] + error.code() not in self._retryable_error_codes # type: ignore [reportAttributeAccessIssue] or retry_num + 1 == _MAX_RETRYS or backoff_seconds > (deadline_sec - time()) or self._shutdown diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index af77f6d123..c3674889fa 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -19,7 +19,7 @@ from typing import Iterable, List, Tuple, Union from typing import Sequence as TypingSequence -from grpc import ChannelCredentials, Compression +from grpc import ChannelCredentials, Compression, StatusCode from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import ( OTLPMetricExporterMixin, ) @@ -109,6 +109,7 @@ def __init__( preferred_aggregation: dict[type, Aggregation] | None = None, max_export_batch_size: int | None = None, channel_options: Tuple[Tuple[str, str]] | None = None, + retryable_error_codes: Union[Iterable[StatusCode], str] | None = None, ): insecure_metrics = environ.get(OTEL_EXPORTER_OTLP_METRICS_INSECURE) if insecure is None and insecure_metrics is not None: @@ -153,6 +154,7 @@ def __init__( timeout=timeout or environ_timeout, compression=compression, channel_options=channel_options, + retryable_error_codes=retryable_error_codes, ) self._max_export_batch_size: int | None = max_export_batch_size diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index 19b189e5b9..69a1ce9785 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -14,11 +14,12 @@ """OTLP Span Exporter""" import logging +from collections.abc import Iterable from os import environ from typing import Dict, Optional, Sequence, Tuple, Union from typing import Sequence as TypingSequence -from grpc import ChannelCredentials, Compression +from grpc import ChannelCredentials, Compression, StatusCode from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, ) @@ -95,6 +96,9 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, + retryable_error_codes: Optional[ + Union[Iterable[StatusCode], str] + ] = None, ): insecure_spans = environ.get(OTEL_EXPORTER_OTLP_TRACES_INSECURE) if insecure is None and insecure_spans is not None: @@ -135,6 +139,7 @@ def __init__( timeout=timeout or environ_timeout, compression=compression, channel_options=channel_options, + retryable_error_codes=retryable_error_codes, ) def _translate_data( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index de27d0fe79..058ba4584f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -37,6 +37,7 @@ encode_spans, ) from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401 + _RETRYABLE_ERROR_CODES, InvalidCompressionValueException, OTLPExporterMixin, environ_to_compression, @@ -154,6 +155,7 @@ def join(self, timeout: Optional[float] = None) -> Any: return self._return +# pylint: disable-next=too-many-public-methods class TestOTLPExporterMixin(TestCase): def setUp(self): self.server = server(ThreadPoolExecutor(max_workers=10)) @@ -570,4 +572,69 @@ def test_unavailable_reconnects(self): # Since the initial channel was created in setUp (unpatched), this call # must be from the reconnection logic. self.assertTrue(mock_insecure_channel.called) - # Verify that reconnection enabled flag is set + + def test_retryable_error_codes_initialization(self): + # pylint: disable=protected-access + self.assertEqual( + self.exporter._retryable_error_codes, _RETRYABLE_ERROR_CODES + ) + custom_codes = [StatusCode.INTERNAL, StatusCode.UNKNOWN] + exporter = OTLPSpanExporterForTesting( + insecure=True, retryable_error_codes=custom_codes + ) + self.assertEqual( + exporter._retryable_error_codes, frozenset(custom_codes) + ) + + @patch.dict( + "os.environ", + { + "OTEL_PYTHON_EXPORTER_OTLP_GRPC_RETRYABLE_ERROR_CODES": ",INTERNAL, unknown,,,dEAdline_Exceeded " + }, + ) + def test_retryable_error_codes_initialization_from_env(self): + expected_codes = frozenset( + { + StatusCode.INTERNAL, + StatusCode.UNKNOWN, + StatusCode.DEADLINE_EXCEEDED, + } + ) + exporter = OTLPSpanExporterForTesting() + # pylint: disable=protected-access + self.assertEqual(exporter._retryable_error_codes, expected_codes) + + @unittest.skipIf( + system() == "Windows", + "For gRPC + windows there's some added delay in the RPCs which breaks the assertion over amount of time passed.", + ) + def test_retryable_error_codes_custom(self): + # Test that a custom error code is retried if specified + custom_codes = [StatusCode.INTERNAL] + mock_trace_service = TraceServiceServicerWithExportParams( + StatusCode.INTERNAL, + optional_retry_nanos=200000000, # .2 seconds + ) + add_TraceServiceServicer_to_server( + mock_trace_service, + self.server, + ) + exporter = OTLPSpanExporterForTesting( + insecure=True, retryable_error_codes=custom_codes, timeout=10 + ) + + self.assertEqual( + exporter.export([self.span]), + SpanExportResult.FAILURE, + ) + + self.assertEqual(mock_trace_service.num_requests, 6) + + # Test that a default retryable code is NOT retried if not in custom_codes + mock_trace_service.num_requests = 0 + mock_trace_service.export_result = StatusCode.UNAVAILABLE + self.assertEqual( + exporter.export([self.span]), + SpanExportResult.FAILURE, + ) + self.assertEqual(mock_trace_service.num_requests, 1) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py index 420b576c86..6f4946857a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py @@ -540,6 +540,19 @@ def channel_credential_provider() -> grpc.ChannelCredentials: Note: This environment variable is experimental and subject to change. """ +_OTEL_PYTHON_EXPORTER_OTLP_GRPC_RETRYABLE_ERROR_CODES = ( + "OTEL_PYTHON_EXPORTER_OTLP_GRPC_RETRYABLE_ERROR_CODES" +) +""" +.. envvar:: OTEL_PYTHON_EXPORTER_OTLP_GRPC_RETRYABLE_ERROR_CODES + +The :envvar:`OTEL_PYTHON_EXPORTER_OTLP_GRPC_RETRYABLE_ERROR_CODES` stores a comma-separated list of human-readable +gRPC error codes that are considered retryable for the OTLP gRPC exporters (e.g. `UNAVAILABLE, DEADLINE_EXCEEDED`). +Supported error codes are defined in `grpc.StatusCode` and are parsed in a case-insensitive manner. + +Note: This environment variable is experimental and subject to change. +""" + OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE = "OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE" """ .. envvar:: OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE