Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ def __init__(
) = divmod(time_ns(), NANOS_PER_SECOND)
self._prefix = prefix

def _batch_write(self, series: List[TimeSeries]) -> None:
def _batch_write(
self, series: List[TimeSeries], timeout_millis: float
) -> None:
"""Cloud Monitoring allows writing up to 200 time series at once

:param series: ProtoBuf TimeSeries
Expand All @@ -162,11 +164,12 @@ def _batch_write(self, series: List[TimeSeries]) -> None:
write_ind : write_ind + MAX_BATCH_WRITE
],
),
timeout=timeout_millis / 1000,
)
write_ind += MAX_BATCH_WRITE

def _get_metric_descriptor(
self, metric: Metric
self, metric: Metric, timeout_millis: float
) -> Optional[MetricDescriptor]:
"""We can map Metric to MetricDescriptor using Metric.name or
MetricDescriptor.type. We create the MetricDescriptor if it doesn't
Expand Down Expand Up @@ -248,7 +251,8 @@ def _get_metric_descriptor(
response_descriptor = self.client.create_metric_descriptor(
CreateMetricDescriptorRequest(
name=self.project_name, metric_descriptor=descriptor
)
),
timeout=timeout_millis / 1000,
)
# pylint: disable=broad-except
except Exception as ex:
Expand Down Expand Up @@ -362,7 +366,6 @@ def _to_point(
def export(
self,
metrics_data: MetricsData,
# TODO(aabmass): pass timeout to api calls
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
Expand Down Expand Up @@ -395,7 +398,9 @@ def export(
),
)

descriptor = self._get_metric_descriptor(metric)
descriptor = self._get_metric_descriptor(
metric, timeout_millis
)
if not descriptor:
continue

Expand Down Expand Up @@ -426,7 +431,7 @@ def export(
all_series.append(series)

try:
self._batch_write(all_series)
self._batch_write(all_series, timeout_millis)
# pylint: disable=broad-except
except Exception as ex:
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"""

from typing import List, Union
from unittest.mock import Mock

import pytest
from fixtures.gcmfake import GcmFake, GcmFakeMeterProvider
Expand All @@ -38,12 +39,23 @@
CloudMonitoringMetricsExporter,
)
from opentelemetry.metrics import CallbackOptions, Observation
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
Metric,
MetricExportResult,
MetricsData,
NumberDataPoint,
ResourceMetrics,
ScopeMetrics,
Sum,
)
from opentelemetry.sdk.metrics.view import (
ExplicitBucketHistogramAggregation,
ExponentialBucketHistogramAggregation,
View,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.util.types import Attributes

PROJECT_ID = "fakeproject"
Expand All @@ -64,6 +76,63 @@ def test_create_monitoring_exporter() -> None:
)


def test_export_passes_timeout_to_cloud_monitoring_client() -> None:
client = Mock()
client.common_project_path.return_value = "projects/fakeproject"
client.create_metric_descriptor.side_effect = (
lambda request, timeout: request.metric_descriptor
)

exporter = CloudMonitoringMetricsExporter(
project_id=PROJECT_ID,
client=client,
)

result = exporter.export(_metrics_data(), timeout_millis=2500)

assert result is MetricExportResult.SUCCESS
assert client.create_metric_descriptor.call_args.kwargs["timeout"] == 2.5
assert client.create_time_series.call_args.kwargs["timeout"] == 2.5


def _metrics_data() -> MetricsData:
return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=Resource.create({}),
scope_metrics=[
ScopeMetrics(
scope=InstrumentationScope(__name__),
metrics=[
Metric(
name="timeout_counter",
description="",
unit="1",
data=Sum(
data_points=[
NumberDataPoint(
attributes={},
start_time_unix_nano=1,
time_unix_nano=2,
value=1,
)
],
aggregation_temporality=(
AggregationTemporality.CUMULATIVE
),
is_monotonic=True,
),
)
],
schema_url="",
)
],
schema_url="",
)
]
)


@pytest.mark.parametrize(
"value", [pytest.param(123, id="int"), pytest.param(45.6, id="float")]
)
Expand Down