Skip to content

Commit

Permalink
Adds Views to metrics SDK (#2391)
Browse files Browse the repository at this point in the history
* Adds Views to metrics SDK

* Add view documentation

* Add type for instrument

* Add more restrictions to matching criteria

* Remove regex matching from meter name

* Remove name from matching criteria

* Add some fixes

* Rename attribute

* Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py

Co-authored-by: Aaron Abbott <[email protected]>

* Update attribute keys type

* More fixes

* Remove regex and default view mechanism

* Raise exceptions instead of logging warnings

* Refactor warning

* Fix issues

* Add support for fnmatch and fix view matching

* Fix test case

* Add some fixes

* WIP

* WIP

* Add final decorator

* Fix lint

* Use f string

* Fix documentation

* Fix documentation

* Remove check

* Remove unnecessary attribute

* Fix documentation

* Fix documentation

* Fix tests

* More fixes

* Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py

Co-authored-by: Alex Boten <[email protected]>

* Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py

Co-authored-by: Alex Boten <[email protected]>

* More fixes

* WIP fix tests

* WIP

* More fixing

* Fix issue with 3.6

* Add comment for private view attributes

* Fix view matching

* Handle view aggregations in metric reader storage

* Fix docs

* Remove wildcard support for meter attributes

* Fix documentation

* Rename to _default_aggregation

* Use default view argument

* Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py

Co-authored-by: Aaron Abbott <[email protected]>

Co-authored-by: Aaron Abbott <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
4 people authored Mar 3, 2022
1 parent 50093f2 commit 24bd23d
Show file tree
Hide file tree
Showing 10 changed files with 515 additions and 76 deletions.
8 changes: 7 additions & 1 deletion opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
from opentelemetry.sdk._metrics.view import View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
from opentelemetry.util._once import Once
Expand Down Expand Up @@ -154,12 +155,17 @@ def __init__(
metric_readers: Sequence[MetricReader] = (),
resource: Resource = Resource.create({}),
shutdown_on_exit: bool = True,
views: Sequence[View] = (),
enable_default_view: bool = True,
):
self._lock = Lock()
self._meter_lock = Lock()
self._atexit_handler = None
self._sdk_config = SdkConfiguration(
resource=resource, metric_readers=metric_readers, views=()
resource=resource,
metric_readers=metric_readers,
views=views,
enable_default_view=enable_default_view,
)
self._measurement_consumer = SynchronousMeasurementConsumer(
sdk_config=self._sdk_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Iterable, Set

from opentelemetry.sdk._metrics.aggregation import (
_Aggregation,
_convert_aggregation_temporality,
)
from opentelemetry.sdk._metrics.measurement import Measurement
Expand All @@ -34,10 +35,10 @@ def __init__(
name: str,
unit: str,
description: str,
aggregation: type,
aggregation: _Aggregation,
instrumentation_info: InstrumentationInfo,
resource: Resource,
attribute_keys: Set[str] = None,
attribute_keys: Set[str],
):
self._name = name
self._unit = unit
Expand All @@ -52,7 +53,7 @@ def __init__(

def consume_measurement(self, measurement: Measurement) -> None:

if self._attribute_keys is not None:
if self._attribute_keys:

attributes = {}

Expand All @@ -68,7 +69,7 @@ def consume_measurement(self, measurement: Measurement) -> None:

if attributes not in self._attributes_aggregation.keys():
with self._lock:
self._attributes_aggregation[attributes] = self._aggregation()
self._attributes_aggregation[attributes] = self._aggregation

self._attributes_aggregation[attributes].aggregate(measurement)

Expand Down
59 changes: 52 additions & 7 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# pylint: disable=too-many-ancestors

import logging
from abc import ABC, abstractmethod
from typing import Dict, Generator, Iterable, Union

from opentelemetry._metrics.instrument import CallbackT
Expand All @@ -30,14 +31,28 @@
ObservableUpDownCounter as APIObservableUpDownCounter,
)
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
from opentelemetry.sdk._metrics.aggregation import (
_Aggregation,
_ExplicitBucketHistogramAggregation,
_LastValueAggregation,
_SumAggregation,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer
from opentelemetry.sdk._metrics.point import AggregationTemporality
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo

_logger = logging.getLogger(__name__)


class _Synchronous:
class _Instrument(ABC):
@property
@abstractmethod
def _default_aggregation(self) -> _Aggregation:
pass


class _Synchronous(_Instrument):
def __init__(
self,
name: str,
Expand All @@ -49,12 +64,12 @@ def __init__(
self.name = name
self.unit = unit
self.description = description
self._instrumentation_info = instrumentation_info
self.instrumentation_info = instrumentation_info
self._measurement_consumer = measurement_consumer
super().__init__(name, unit=unit, description=description)


class _Asynchronous:
class _Asynchronous(_Instrument):
def __init__(
self,
name: str,
Expand All @@ -67,7 +82,7 @@ def __init__(
self.name = name
self.unit = unit
self.description = description
self._instrumentation_info = instrumentation_info
self.instrumentation_info = instrumentation_info
self._measurement_consumer = measurement_consumer
super().__init__(name, callback, unit=unit, description=description)

Expand All @@ -86,6 +101,13 @@ def callback(self) -> CallbackT:


class Counter(_Synchronous, APICounter):
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=True,
instrument_temporality=AggregationTemporality.DELTA,
)

def add(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
Expand All @@ -100,6 +122,13 @@ def add(


class UpDownCounter(_Synchronous, APIUpDownCounter):
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=False,
instrument_temporality=AggregationTemporality.DELTA,
)

def add(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
Expand All @@ -109,14 +138,28 @@ def add(


class ObservableCounter(_Asynchronous, APIObservableCounter):
pass
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=True,
instrument_temporality=AggregationTemporality.CUMULATIVE,
)


class ObservableUpDownCounter(_Asynchronous, APIObservableUpDownCounter):
pass
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=False,
instrument_temporality=AggregationTemporality.CUMULATIVE,
)


class Histogram(_Synchronous, APIHistogram):
@property
def _default_aggregation(self) -> _Aggregation:
return _ExplicitBucketHistogramAggregation()

def record(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
Expand All @@ -132,4 +175,6 @@ def record(


class ObservableGauge(_Asynchronous, APIObservableGauge):
pass
@property
def _default_aggregation(self) -> _Aggregation:
return _LastValueAggregation()
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@
from threading import RLock
from typing import Dict, Iterable, List

from opentelemetry._metrics.instrument import Counter, Histogram, Instrument
from opentelemetry._metrics.instrument import Instrument
from opentelemetry.sdk._metrics._view_instrument_match import (
_ViewInstrumentMatch,
)
from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
_ExplicitBucketHistogramAggregation,
_LastValueAggregation,
_SumAggregation,
)
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Metric
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
Expand All @@ -46,6 +41,7 @@ def _get_or_init_view_instrument_match(
) -> List["_ViewInstrumentMatch"]:
# Optimistically get the relevant views for the given instrument. Once set for a given
# instrument, the mapping will never change

if instrument in self._view_instrument_match:
return self._view_instrument_match[instrument]

Expand All @@ -55,50 +51,59 @@ def _get_or_init_view_instrument_match(
return self._view_instrument_match[instrument]

# not present, hold the lock and add a new mapping
matches = []
view_instrument_matches = []
for view in self._sdk_config.views:
if view.match(instrument):
# Note: if a view matches multiple instruments, this will create a separate
# _ViewInstrumentMatch per instrument. If the user's View configuration includes a
# name, this will cause multiple conflicting output streams.
matches.append(
# pylint: disable=protected-access
if view._match(instrument):

if view._aggregation is not None:
aggregation = view._aggregation._create_aggregation(
instrument
)
else:
aggregation = instrument._default_aggregation

view_instrument_matches.append(
_ViewInstrumentMatch(
name=view.name or instrument.name,
resource=self._sdk_config.resource,
instrumentation_info=None,
aggregation=view.aggregation,
name=view._name or instrument.name,
unit=instrument.unit,
description=view.description,
description=(
view._description or instrument.description
),
aggregation=aggregation,
instrumentation_info=(
instrument.instrumentation_info
),
resource=self._sdk_config.resource,
attribute_keys=view._attribute_keys,
)
)

# if no view targeted the instrument, use the default
if not matches:
# TODO: the logic to select aggregation could be moved
if isinstance(instrument, Counter):
agg = _SumAggregation(True, AggregationTemporality.DELTA)
elif isinstance(instrument, Histogram):
agg = _ExplicitBucketHistogramAggregation()
else:
agg = _LastValueAggregation()
matches.append(
if (
not view_instrument_matches
and self._sdk_config.enable_default_view
):
view_instrument_matches.append(
_ViewInstrumentMatch(
resource=self._sdk_config.resource,
instrumentation_info=None,
aggregation=agg,
name=instrument.name,
unit=instrument.unit,
description=instrument.description,
name=instrument.name,
# pylint: disable=protected-access
aggregation=instrument._default_aggregation,
instrumentation_info=instrument.instrumentation_info,
resource=self._sdk_config.resource,
attribute_keys=set(),
)
)
self._view_instrument_match[instrument] = matches
return matches
self._view_instrument_match[instrument] = view_instrument_matches
return view_instrument_matches

def consume_measurement(self, measurement: Measurement) -> None:
for matches in self._get_or_init_view_instrument_match(
for view_instrument_match in self._get_or_init_view_instrument_match(
measurement.instrument
):
matches.consume_measurement(measurement)
view_instrument_match.consume_measurement(measurement)

def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]:
# use a list instead of yielding to prevent a slow reader from holding SDK locks
Expand All @@ -111,9 +116,11 @@ def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]:
# that end times can be slightly skewed among the metric streams produced by the SDK,
# but we still align the output timestamps for a single instrument.
with self._lock:
for matches in self._view_instrument_match.values():
for match in matches:
metrics.extend(match.collect(temporality))
for (
view_instrument_matches
) in self._view_instrument_match.values():
for view_instrument_match in view_instrument_matches:
metrics.extend(view_instrument_match.collect(temporality))

return metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class SdkConfiguration:
resource: Resource
metric_readers: Sequence[MetricReader]
views: Sequence[View]
enable_default_view: bool
Loading

0 comments on commit 24bd23d

Please sign in to comment.