Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Views to metrics SDK #2391

Merged
merged 49 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8d96a69
Adds Views to metrics SDK
ocelotl Sep 24, 2021
d4d78ab
Add view documentation
ocelotl Feb 2, 2022
853230c
Add type for instrument
ocelotl Feb 3, 2022
d7af951
Add more restrictions to matching criteria
ocelotl Feb 3, 2022
7750436
Remove regex matching from meter name
ocelotl Feb 3, 2022
c9f1852
Remove name from matching criteria
ocelotl Feb 3, 2022
064b1bc
Add some fixes
ocelotl Feb 3, 2022
b1c8afb
Rename attribute
ocelotl Feb 4, 2022
9213606
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py
ocelotl Feb 4, 2022
d449dfc
Update attribute keys type
ocelotl Feb 4, 2022
a6b77ee
More fixes
ocelotl Feb 4, 2022
13fdb59
Remove regex and default view mechanism
ocelotl Feb 10, 2022
5b83370
Raise exceptions instead of logging warnings
ocelotl Feb 10, 2022
c559fdf
Refactor warning
ocelotl Feb 11, 2022
b41dfdf
Fix issues
ocelotl Feb 14, 2022
1f50862
Add support for fnmatch and fix view matching
ocelotl Feb 16, 2022
2bd6c3f
Fix test case
ocelotl Feb 16, 2022
5cdf32c
Add some fixes
ocelotl Feb 23, 2022
842b98e
WIP
ocelotl Feb 23, 2022
5b92fc2
WIP
ocelotl Feb 24, 2022
98bb5a6
Add final decorator
ocelotl Feb 24, 2022
e5b7f09
Fix lint
ocelotl Feb 24, 2022
3728b73
Use f string
ocelotl Feb 24, 2022
f982cad
Fix documentation
ocelotl Feb 24, 2022
fe68f0c
Fix documentation
ocelotl Feb 24, 2022
3036928
Remove check
ocelotl Feb 24, 2022
d357545
Remove unnecessary attribute
ocelotl Feb 24, 2022
83131bd
Fix documentation
ocelotl Feb 24, 2022
86c155d
Fix documentation
ocelotl Feb 24, 2022
2336033
Fix tests
ocelotl Feb 24, 2022
f6a318f
More fixes
ocelotl Feb 24, 2022
06596b0
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader…
ocelotl Feb 24, 2022
1f55bd9
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader…
ocelotl Feb 24, 2022
4e0a3c6
More fixes
ocelotl Feb 24, 2022
53dc33e
WIP fix tests
ocelotl Feb 25, 2022
32c8c84
WIP
ocelotl Feb 25, 2022
be91b64
More fixing
ocelotl Feb 25, 2022
d922207
Fix issue with 3.6
ocelotl Feb 25, 2022
1dcabd7
Add comment for private view attributes
ocelotl Feb 25, 2022
1f5a454
Fix view matching
ocelotl Feb 25, 2022
5edf8f5
Handle view aggregations in metric reader storage
ocelotl Feb 26, 2022
6f59aca
Fix docs
ocelotl Feb 28, 2022
eb93968
Remove wildcard support for meter attributes
ocelotl Feb 28, 2022
6134aee
Fix documentation
ocelotl Feb 28, 2022
cae1572
Rename to _default_aggregation
ocelotl Feb 28, 2022
1693b71
Use default view argument
ocelotl Feb 28, 2022
041b2e8
Merge branch 'main' into issue_2247
Mar 2, 2022
41331cf
Merge branch 'main' into issue_2247
ocelotl Mar 3, 2022
a19a011
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py
ocelotl Mar 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
from opentelemetry.sdk._metrics.view import View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
from opentelemetry.util._once import Once
Expand Down Expand Up @@ -154,12 +155,17 @@ def __init__(
metric_readers: Sequence[MetricReader] = (),
resource: Resource = Resource.create({}),
shutdown_on_exit: bool = True,
views: Sequence[View] = (),
enable_default_view: bool = True,
):
self._lock = Lock()
self._meter_lock = Lock()
self._atexit_handler = None
self._sdk_config = SdkConfiguration(
resource=resource, metric_readers=metric_readers, views=()
resource=resource,
metric_readers=metric_readers,
views=views,
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
enable_default_view=enable_default_view,
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
)
self._measurement_consumer = SynchronousMeasurementConsumer(
sdk_config=self._sdk_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Iterable, Set

from opentelemetry.sdk._metrics.aggregation import (
_Aggregation,
_convert_aggregation_temporality,
)
from opentelemetry.sdk._metrics.measurement import Measurement
Expand All @@ -34,10 +35,10 @@ def __init__(
name: str,
unit: str,
description: str,
aggregation: type,
aggregation: _Aggregation,
instrumentation_info: InstrumentationInfo,
resource: Resource,
attribute_keys: Set[str] = None,
attribute_keys: Set[str],
):
self._name = name
self._unit = unit
Expand All @@ -52,7 +53,7 @@ def __init__(

def consume_measurement(self, measurement: Measurement) -> None:

if self._attribute_keys is not None:
if self._attribute_keys:

attributes = {}

Expand All @@ -68,7 +69,7 @@ def consume_measurement(self, measurement: Measurement) -> None:

if attributes not in self._attributes_aggregation.keys():
with self._lock:
self._attributes_aggregation[attributes] = self._aggregation()
self._attributes_aggregation[attributes] = self._aggregation
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

self._attributes_aggregation[attributes].aggregate(measurement)

Expand Down
59 changes: 52 additions & 7 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# pylint: disable=too-many-ancestors

import logging
from abc import ABC, abstractmethod
from typing import Dict, Generator, Iterable, Union

from opentelemetry._metrics.instrument import CallbackT
Expand All @@ -30,14 +31,28 @@
ObservableUpDownCounter as APIObservableUpDownCounter,
)
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
from opentelemetry.sdk._metrics.aggregation import (
_Aggregation,
_ExplicitBucketHistogramAggregation,
_LastValueAggregation,
_SumAggregation,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer
from opentelemetry.sdk._metrics.point import AggregationTemporality
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo

_logger = logging.getLogger(__name__)


class _Synchronous:
class _Instrument(ABC):
@property
@abstractmethod
def _default_aggregation(self) -> _Aggregation:
pass


class _Synchronous(_Instrument):
def __init__(
self,
name: str,
Expand All @@ -49,12 +64,12 @@ def __init__(
self.name = name
self.unit = unit
self.description = description
self._instrumentation_info = instrumentation_info
self.instrumentation_info = instrumentation_info
self._measurement_consumer = measurement_consumer
super().__init__(name, unit=unit, description=description)


class _Asynchronous:
class _Asynchronous(_Instrument):
def __init__(
self,
name: str,
Expand All @@ -67,7 +82,7 @@ def __init__(
self.name = name
self.unit = unit
self.description = description
self._instrumentation_info = instrumentation_info
self.instrumentation_info = instrumentation_info
self._measurement_consumer = measurement_consumer
super().__init__(name, callback, unit=unit, description=description)

Expand All @@ -86,6 +101,13 @@ def callback(self) -> CallbackT:


class Counter(_Synchronous, APICounter):
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=True,
instrument_temporality=AggregationTemporality.DELTA,
)

def add(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
Expand All @@ -100,6 +122,13 @@ def add(


class UpDownCounter(_Synchronous, APIUpDownCounter):
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=False,
instrument_temporality=AggregationTemporality.DELTA,
)

def add(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
Expand All @@ -109,14 +138,28 @@ def add(


class ObservableCounter(_Asynchronous, APIObservableCounter):
pass
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=True,
instrument_temporality=AggregationTemporality.CUMULATIVE,
)


class ObservableUpDownCounter(_Asynchronous, APIObservableUpDownCounter):
pass
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=False,
instrument_temporality=AggregationTemporality.CUMULATIVE,
)


class Histogram(_Synchronous, APIHistogram):
@property
def _default_aggregation(self) -> _Aggregation:
return _ExplicitBucketHistogramAggregation()

def record(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
Expand All @@ -132,4 +175,6 @@ def record(


class ObservableGauge(_Asynchronous, APIObservableGauge):
pass
@property
def _default_aggregation(self) -> _Aggregation:
return _LastValueAggregation()
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@
from threading import RLock
from typing import Dict, Iterable, List

from opentelemetry._metrics.instrument import Counter, Histogram, Instrument
from opentelemetry._metrics.instrument import Instrument
from opentelemetry.sdk._metrics._view_instrument_match import (
_ViewInstrumentMatch,
)
from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
_ExplicitBucketHistogramAggregation,
_LastValueAggregation,
_SumAggregation,
)
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Metric
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
Expand All @@ -46,6 +41,7 @@ def _get_or_init_view_instrument_match(
) -> List["_ViewInstrumentMatch"]:
# Optimistically get the relevant views for the given instrument. Once set for a given
# instrument, the mapping will never change

if instrument in self._view_instrument_match:
return self._view_instrument_match[instrument]

Expand All @@ -55,50 +51,59 @@ def _get_or_init_view_instrument_match(
return self._view_instrument_match[instrument]

# not present, hold the lock and add a new mapping
matches = []
view_instrument_matches = []
for view in self._sdk_config.views:
if view.match(instrument):
# Note: if a view matches multiple instruments, this will create a separate
# _ViewInstrumentMatch per instrument. If the user's View configuration includes a
# name, this will cause multiple conflicting output streams.
matches.append(
# pylint: disable=protected-access
if view._match(instrument):

if view._aggregation is not None:
aggregation = view._aggregation._create_aggregation(
instrument
)
else:
aggregation = instrument._default_aggregation

view_instrument_matches.append(
_ViewInstrumentMatch(
name=view.name or instrument.name,
resource=self._sdk_config.resource,
instrumentation_info=None,
aggregation=view.aggregation,
name=view._name or instrument.name,
unit=instrument.unit,
description=view.description,
description=(
view._description or instrument.description
),
aggregation=aggregation,
instrumentation_info=(
instrument.instrumentation_info
),
resource=self._sdk_config.resource,
attribute_keys=view._attribute_keys,
)
)

# if no view targeted the instrument, use the default
if not matches:
# TODO: the logic to select aggregation could be moved
if isinstance(instrument, Counter):
agg = _SumAggregation(True, AggregationTemporality.DELTA)
elif isinstance(instrument, Histogram):
agg = _ExplicitBucketHistogramAggregation()
else:
agg = _LastValueAggregation()
matches.append(
if (
not view_instrument_matches
and self._sdk_config.enable_default_view
):
view_instrument_matches.append(
_ViewInstrumentMatch(
resource=self._sdk_config.resource,
instrumentation_info=None,
aggregation=agg,
name=instrument.name,
unit=instrument.unit,
description=instrument.description,
name=instrument.name,
# pylint: disable=protected-access
aggregation=instrument._default_aggregation,
instrumentation_info=instrument.instrumentation_info,
resource=self._sdk_config.resource,
attribute_keys=set(),
)
)
self._view_instrument_match[instrument] = matches
return matches
self._view_instrument_match[instrument] = view_instrument_matches
return view_instrument_matches

def consume_measurement(self, measurement: Measurement) -> None:
for matches in self._get_or_init_view_instrument_match(
for view_instrument_match in self._get_or_init_view_instrument_match(
measurement.instrument
):
matches.consume_measurement(measurement)
view_instrument_match.consume_measurement(measurement)

def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]:
# use a list instead of yielding to prevent a slow reader from holding SDK locks
Expand All @@ -111,9 +116,11 @@ def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]:
# that end times can be slightly skewed among the metric streams produced by the SDK,
# but we still align the output timestamps for a single instrument.
with self._lock:
for matches in self._view_instrument_match.values():
for match in matches:
metrics.extend(match.collect(temporality))
for (
view_instrument_matches
) in self._view_instrument_match.values():
for view_instrument_match in view_instrument_matches:
metrics.extend(view_instrument_match.collect(temporality))

return metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class SdkConfiguration:
resource: Resource
metric_readers: Sequence[MetricReader]
views: Sequence[View]
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
enable_default_view: bool
Loading