diff --git a/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py b/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py index c5dff791..08635925 100644 --- a/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py +++ b/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py @@ -147,7 +147,9 @@ def __init__( ) = divmod(time_ns(), NANOS_PER_SECOND) self._prefix = prefix - def _batch_write(self, series: List[TimeSeries]) -> None: + def _batch_write( + self, series: List[TimeSeries], timeout_millis: float + ) -> None: """Cloud Monitoring allows writing up to 200 time series at once :param series: ProtoBuf TimeSeries @@ -162,11 +164,12 @@ def _batch_write(self, series: List[TimeSeries]) -> None: write_ind : write_ind + MAX_BATCH_WRITE ], ), + timeout=timeout_millis / 1000, ) write_ind += MAX_BATCH_WRITE def _get_metric_descriptor( - self, metric: Metric + self, metric: Metric, timeout_millis: float ) -> Optional[MetricDescriptor]: """We can map Metric to MetricDescriptor using Metric.name or MetricDescriptor.type. We create the MetricDescriptor if it doesn't @@ -248,7 +251,8 @@ def _get_metric_descriptor( response_descriptor = self.client.create_metric_descriptor( CreateMetricDescriptorRequest( name=self.project_name, metric_descriptor=descriptor - ) + ), + timeout=timeout_millis / 1000, ) # pylint: disable=broad-except except Exception as ex: @@ -362,7 +366,6 @@ def _to_point( def export( self, metrics_data: MetricsData, - # TODO(aabmass): pass timeout to api calls timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: @@ -395,7 +398,9 @@ def export( ), ) - descriptor = self._get_metric_descriptor(metric) + descriptor = self._get_metric_descriptor( + metric, timeout_millis + ) if not descriptor: continue @@ -426,7 +431,7 @@ def export( all_series.append(series) try: - self._batch_write(all_series) + self._batch_write(all_series, timeout_millis) # pylint: disable=broad-except except Exception as ex: logger.error( diff --git a/opentelemetry-exporter-gcp-monitoring/tests/test_cloud_monitoring.py b/opentelemetry-exporter-gcp-monitoring/tests/test_cloud_monitoring.py index aeec456b..6e190752 100644 --- a/opentelemetry-exporter-gcp-monitoring/tests/test_cloud_monitoring.py +++ b/opentelemetry-exporter-gcp-monitoring/tests/test_cloud_monitoring.py @@ -29,6 +29,7 @@ """ from typing import List, Union +from unittest.mock import Mock import pytest from fixtures.gcmfake import GcmFake, GcmFakeMeterProvider @@ -38,12 +39,23 @@ CloudMonitoringMetricsExporter, ) from opentelemetry.metrics import CallbackOptions, Observation +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + Metric, + MetricExportResult, + MetricsData, + NumberDataPoint, + ResourceMetrics, + ScopeMetrics, + Sum, +) from opentelemetry.sdk.metrics.view import ( ExplicitBucketHistogramAggregation, ExponentialBucketHistogramAggregation, View, ) from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.util.types import Attributes PROJECT_ID = "fakeproject" @@ -64,6 +76,63 @@ def test_create_monitoring_exporter() -> None: ) +def test_export_passes_timeout_to_cloud_monitoring_client() -> None: + client = Mock() + client.common_project_path.return_value = "projects/fakeproject" + client.create_metric_descriptor.side_effect = ( + lambda request, timeout: request.metric_descriptor + ) + + exporter = CloudMonitoringMetricsExporter( + project_id=PROJECT_ID, + client=client, + ) + + result = exporter.export(_metrics_data(), timeout_millis=2500) + + assert result is MetricExportResult.SUCCESS + assert client.create_metric_descriptor.call_args.kwargs["timeout"] == 2.5 + assert client.create_time_series.call_args.kwargs["timeout"] == 2.5 + + +def _metrics_data() -> MetricsData: + return MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource.create({}), + scope_metrics=[ + ScopeMetrics( + scope=InstrumentationScope(__name__), + metrics=[ + Metric( + name="timeout_counter", + description="", + unit="1", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=1, + time_unix_nano=2, + value=1, + ) + ], + aggregation_temporality=( + AggregationTemporality.CUMULATIVE + ), + is_monotonic=True, + ), + ) + ], + schema_url="", + ) + ], + schema_url="", + ) + ] + ) + + @pytest.mark.parametrize( "value", [pytest.param(123, id="int"), pytest.param(45.6, id="float")] )