Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -580,3 +580,16 @@ def get_meter(
self._measurement_consumer,
)
return self._meters[info]

def add_metric_reader(
self, metric_reader: "opentelemetry.sdk.metrics.export.MetricReader"
) -> None:
with self._lock:
self._measurement_consumer.add_metric_reader(metric_reader)

def remove_metric_reader(
self,
metric_reader: "opentelemetry.sdk.metrics.export.MetricReader",
) -> None:
with self._lock:
self._measurement_consumer.remove_metric_reader(metric_reader)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# pylint: disable=unused-import

from abc import ABC, abstractmethod
from logging import getLogger
from threading import Lock
from time import time_ns
from typing import Iterable, List, Mapping, Optional
Expand All @@ -31,6 +32,8 @@
)
from opentelemetry.sdk.metrics._internal.point import Metric

_logger = getLogger(__name__)


class MeasurementConsumer(ABC):
@abstractmethod
Expand Down Expand Up @@ -143,3 +146,39 @@ def collect(
result = self._reader_storages[metric_reader].collect()

return result

def add_metric_reader(
self, metric_reader: "opentelemetry.sdk.metrics.MetricReader"
) -> None:
"""Registers a new metric reader."""
with self._lock:
if metric_reader in self._reader_storages:
_logger.warning("'%s' already registered!", metric_reader)
self._sdk_config.metric_readers += type(
self._sdk_config.metric_readers
)((metric_reader,))
self._reader_storages[metric_reader] = MetricReaderStorage(
self._sdk_config,
metric_reader._instrument_class_temporality,
metric_reader._instrument_class_aggregation,
)
metric_reader._set_collect_callback(self.collect)

def remove_metric_reader(
self, metric_reader: "opentelemetry.sdk.metrics.MetricReader"
) -> None:
"""Unregisters the given metric reader."""
with self._lock:
if metric_reader not in self._reader_storages:
_logger.warning("'%s' has not been registered!", metric_reader)
self._reader_storages.pop(metric_reader, None)
metric_reader._set_collect_callback(None)
self._sdk_config.metric_readers = type(
self._sdk_config.metric_readers
)(
(
reader
for reader in self._sdk_config.metric_readers
if reader is not metric_reader
)
)
38 changes: 36 additions & 2 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
# limitations under the License.

# pylint: disable=protected-access,no-self-use

import weakref
from logging import WARNING
from logging import DEBUG, WARNING
from time import sleep
from typing import Iterable, Sequence
from unittest.mock import MagicMock, Mock, patch
Expand All @@ -36,6 +35,7 @@
)
from opentelemetry.sdk.metrics._internal import SynchronousMeasurementConsumer
from opentelemetry.sdk.metrics.export import (
InMemoryMetricReader,
Metric,
MetricExporter,
MetricExportResult,
Expand Down Expand Up @@ -426,6 +426,40 @@ def test_consume_measurement_gauge(self, mock_sync_measurement_consumer):

sync_consumer_instance.consume_measurement.assert_called()

def test_addition_of_metric_reader(self):
# Suppress warnings for calling collect on an unregistered metric reader
with self.assertLogs(
"opentelemetry.sdk.metrics._internal.export", DEBUG
):
reader = InMemoryMetricReader()
meter_provider = MeterProvider()
meter = meter_provider.get_meter(__name__)
counter = meter.create_counter("counter")
counter.add(1)
self.assertIsNone(reader.get_metrics_data())

meter_provider.add_metric_reader(reader)
counter.add(1)
self.assertIsNotNone(reader.get_metrics_data())

with self.assertLogs(
"opentelemetry.sdk.metrics._internal.measurement_consumer",
WARNING,
) as logger:
meter_provider.add_metric_reader(reader)
self.assertIn("already registered!", logger.output[0])

meter_provider.remove_metric_reader(reader)
counter.add(1)
self.assertIsNone(reader.get_metrics_data())

with self.assertLogs(
"opentelemetry.sdk.metrics._internal.measurement_consumer",
WARNING,
) as logger:
meter_provider.remove_metric_reader(reader)
self.assertIn("has not been registered!", logger.output[0])


class TestMeter(TestCase):
def setUp(self):
Expand Down