Skip to content

Commit 9ee6de8

Browse files
Fix: Reinitialize gRPC channel on UNAVAILABLE error (open-telemetry#4825)
* Fix: Reinitialize gRPC channel on UNAVAILABLE error (Fixes open-telemetry#4517) * fix: address PR review comments for gRPC reconnection * refactor(exporter): simplify reconnection logic and address review comments - Remove aggressive gRPC keepalive and retry settings to rely on defaults. - Fix compression precedence logic to correctly handle NoCompression (0). - Refactor channel initialization to be stateless (remove _channel_reconnection_enabled).- Update documentation to refer to 'OTLP-compatible receiver' * fix: remove extra blank line in docstring * fix(exporter): address typecheck errors and add changelog entry --------- Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
1 parent 615d467 commit 9ee6de8

File tree

3 files changed

+116
-20
lines changed

3 files changed

+116
-20
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212

1313
## Unreleased
1414

15+
- `opentelemetry-exporter-otlp-proto-grpc`: Fix re-initialization of gRPC channel on UNAVAILABLE error
16+
([#4825](https://github.com/open-telemetry/opentelemetry-python/pull/4825))
1517
- `opentelemetry-exporter-prometheus`: Fix duplicate HELP/TYPE declarations for metrics with different label sets
1618
([#4868](https://github.com/open-telemetry/opentelemetry-python/issues/4868))
1719
- Allow loading all resource detectors by setting `OTEL_EXPERIMENTAL_RESOURCE_DETECTORS` to `*`

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 77 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,13 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""OTLP Exporter"""
15+
"""OTLP Exporter
16+
17+
This module provides a mixin class for OTLP exporters that send telemetry data
18+
to an OTLP-compatible receiver via gRPC. It includes a configurable reconnection
19+
logic to handle transient collector outages.
20+
21+
"""
1622

1723
import random
1824
import threading
@@ -251,20 +257,27 @@ def _get_credentials(
251257
if certificate_file:
252258
client_key_file = environ.get(client_key_file_env_key)
253259
client_certificate_file = environ.get(client_certificate_file_env_key)
254-
return _load_credentials(
260+
credentials = _load_credentials(
255261
certificate_file, client_key_file, client_certificate_file
256262
)
263+
if credentials is not None:
264+
return credentials
257265
return ssl_channel_credentials()
258266

259267

260268
# pylint: disable=no-member
261269
class OTLPExporterMixin(
262270
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT, ExportStubT]
263271
):
264-
"""OTLP span exporter
272+
"""OTLP gRPC exporter mixin.
273+
274+
This class provides the base functionality for OTLP exporters that send
275+
telemetry data (spans or metrics) to an OTLP-compatible receiver via gRPC.
276+
It includes a configurable reconnection mechanism to handle transient
277+
receiver outages.
265278
266279
Args:
267-
endpoint: OpenTelemetry Collector receiver endpoint
280+
endpoint: OTLP-compatible receiver endpoint
268281
insecure: Connection type
269282
credentials: ChannelCredentials object for server authentication
270283
headers: Headers to send when exporting
@@ -308,6 +321,8 @@ def __init__(
308321
if parsed_url.netloc:
309322
self._endpoint = parsed_url.netloc
310323

324+
self._insecure = insecure
325+
self._credentials = credentials
311326
self._headers = headers or environ.get(OTEL_EXPORTER_OTLP_HEADERS)
312327
if isinstance(self._headers, str):
313328
temp_headers = parse_env_headers(self._headers, liberal=True)
@@ -336,37 +351,52 @@ def __init__(
336351
)
337352
self._collector_kwargs = None
338353

339-
compression = (
354+
self._compression = (
340355
environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION)
341356
if compression is None
342357
else compression
343358
) or Compression.NoCompression
344359

345-
if insecure:
346-
self._channel = insecure_channel(
347-
self._endpoint,
348-
compression=compression,
349-
options=self._channel_options,
350-
)
351-
else:
360+
self._channel = None
361+
self._client = None
362+
363+
self._shutdown_in_progress = threading.Event()
364+
self._shutdown = False
365+
366+
if not self._insecure:
352367
self._credentials = _get_credentials(
353-
credentials,
368+
self._credentials,
354369
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER,
355370
OTEL_EXPORTER_OTLP_CERTIFICATE,
356371
OTEL_EXPORTER_OTLP_CLIENT_KEY,
357372
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
358373
)
374+
375+
self._initialize_channel_and_stub()
376+
377+
def _initialize_channel_and_stub(self):
378+
"""
379+
Create a new gRPC channel and stub.
380+
381+
This method is used during initialization and by the reconnection
382+
mechanism to reinitialize the channel on transient errors.
383+
"""
384+
if self._insecure:
385+
self._channel = insecure_channel(
386+
self._endpoint,
387+
compression=self._compression,
388+
options=self._channel_options,
389+
)
390+
else:
391+
assert self._credentials is not None
359392
self._channel = secure_channel(
360393
self._endpoint,
361394
self._credentials,
362-
compression=compression,
395+
compression=self._compression,
363396
options=self._channel_options,
364397
)
365398
self._client = self._stub(self._channel) # type: ignore [reportCallIssue]
366399

367-
self._shutdown_in_progress = threading.Event()
368-
self._shutdown = False
369-
370400
@abstractmethod
371401
def _translate_data(
372402
self,
@@ -388,6 +418,8 @@ def _export(
388418
deadline_sec = time() + self._timeout
389419
for retry_num in range(_MAX_RETRYS):
390420
try:
421+
if self._client is None:
422+
return self._result.FAILURE
391423
self._client.Export(
392424
request=self._translate_data(data),
393425
metadata=self._headers,
@@ -407,6 +439,26 @@ def _export(
407439
retry_info.retry_delay.seconds
408440
+ retry_info.retry_delay.nanos / 1.0e9
409441
)
442+
443+
# For UNAVAILABLE errors, reinitialize the channel to force reconnection
444+
if error.code() == StatusCode.UNAVAILABLE and retry_num == 0: # type: ignore
445+
logger.debug(
446+
"Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error",
447+
self._exporting,
448+
)
449+
try:
450+
if self._channel:
451+
self._channel.close()
452+
except Exception as e:
453+
logger.debug(
454+
"Error closing channel for %s exporter to %s: %s",
455+
self._exporting,
456+
self._endpoint,
457+
str(e),
458+
)
459+
# Enable channel reconnection for subsequent calls
460+
self._initialize_channel_and_stub()
461+
410462
if (
411463
error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
412464
or retry_num + 1 == _MAX_RETRYS
@@ -436,12 +488,19 @@ def _export(
436488
return self._result.FAILURE # type: ignore [reportReturnType]
437489

438490
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
491+
"""
492+
Shut down the exporter.
493+
494+
Args:
495+
timeout_millis: Timeout in milliseconds for shutting down the exporter.
496+
"""
439497
if self._shutdown:
440498
logger.warning("Exporter already shutdown, ignoring call")
441499
return
442500
self._shutdown = True
443501
self._shutdown_in_progress.set()
444-
self._channel.close()
502+
if self._channel:
503+
self._channel.close()
445504

446505
@property
447506
@abstractmethod

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from unittest import TestCase
2525
from unittest.mock import Mock, patch
2626

27+
import grpc
2728
from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module
2829
Duration,
2930
)
@@ -91,8 +92,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
9192
def _exporting(self):
9293
return "traces"
9394

94-
def shutdown(self, timeout_millis=30_000):
95-
return OTLPExporterMixin.shutdown(self, timeout_millis)
95+
def shutdown(self, timeout_millis: float = 30_000, **kwargs):
96+
return OTLPExporterMixin.shutdown(self, timeout_millis, **kwargs)
9697

9798

9899
class TraceServiceServicerWithExportParams(TraceServiceServicer):
@@ -513,6 +514,16 @@ def test_timeout_set_correctly(self):
513514
self.assertEqual(mock_trace_service.num_requests, 2)
514515
self.assertAlmostEqual(after - before, 1.4, 1)
515516

517+
def test_channel_options_set_correctly(self):
518+
"""Test that gRPC channel options are set correctly for keepalive and reconnection"""
519+
# This test verifies that the channel is created with the right options
520+
# We patch grpc.insecure_channel to ensure it is called without errors
521+
with patch(
522+
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel"
523+
) as mock_channel:
524+
OTLPSpanExporterForTesting(insecure=True)
525+
self.assertTrue(mock_channel.called)
526+
516527
def test_otlp_headers_from_env(self):
517528
# pylint: disable=protected-access
518529
# This ensures that there is no other header than standard user-agent.
@@ -536,3 +547,27 @@ def test_permanent_failure(self):
536547
warning.records[-1].message,
537548
"Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS",
538549
)
550+
551+
def test_unavailable_reconnects(self):
552+
"""Test that the exporter reconnects on UNAVAILABLE error"""
553+
add_TraceServiceServicer_to_server(
554+
TraceServiceServicerWithExportParams(StatusCode.UNAVAILABLE),
555+
self.server,
556+
)
557+
558+
# Spy on grpc.insecure_channel to verify it's called for reconnection
559+
with patch(
560+
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel",
561+
side_effect=grpc.insecure_channel,
562+
) as mock_insecure_channel:
563+
# Mock sleep to avoid waiting
564+
with patch("time.sleep"):
565+
# We expect FAILURE because the server keeps returning UNAVAILABLE
566+
# but we want to verify reconnection attempts happened
567+
self.exporter.export([self.span])
568+
569+
# Verify that we attempted to reinitialize the channel (called insecure_channel)
570+
# Since the initial channel was created in setUp (unpatched), this call
571+
# must be from the reconnection logic.
572+
self.assertTrue(mock_insecure_channel.called)
573+
# Verify that reconnection enabled flag is set

0 commit comments

Comments
 (0)