Skip to content

Commit

Permalink
feat: metric span summaries (#2522)
Browse files Browse the repository at this point in the history
  • Loading branch information
mitsuhiko authored Nov 29, 2023
1 parent bd68a3e commit b250a89
Show file tree
Hide file tree
Showing 4 changed files with 388 additions and 48 deletions.
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
"transport_zlib_compression_level": Optional[int],
"transport_num_pools": Optional[int],
"enable_metrics": Optional[bool],
"metrics_summary_sample_rate": Optional[float],
"should_summarize_metric": Optional[Callable[[str, MetricTags], bool]],
"before_emit_metric": Optional[Callable[[str, MetricTags], bool]],
"metric_code_locations": Optional[bool],
},
Expand Down
212 changes: 167 additions & 45 deletions sentry_sdk/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,58 @@ def _encode_locations(timestamp, code_locations):
}


class LocalAggregator(object):
__slots__ = ("_measurements",)

def __init__(self):
# type: (...) -> None
self._measurements = (
{}
) # type: Dict[Tuple[str, MetricTagsInternal], Tuple[float, float, int, float]]

def add(
self,
ty, # type: MetricType
key, # type: str
value, # type: float
unit, # type: MeasurementUnit
tags, # type: MetricTagsInternal
):
# type: (...) -> None
export_key = "%s:%s@%s" % (ty, key, unit)
bucket_key = (export_key, tags)

old = self._measurements.get(bucket_key)
if old is not None:
v_min, v_max, v_count, v_sum = old
v_min = min(v_min, value)
v_max = max(v_max, value)
v_count += 1
v_sum += value
else:
v_min = v_max = v_sum = value
v_count = 1
self._measurements[bucket_key] = (v_min, v_max, v_count, v_sum)

def to_json(self):
# type: (...) -> Dict[str, Any]
rv = {}
for (export_key, tags), (
v_min,
v_max,
v_count,
v_sum,
) in self._measurements.items():
rv[export_key] = {
"tags": _tags_to_dict(tags),
"min": v_min,
"max": v_max,
"count": v_count,
"sum": v_sum,
}
return rv


class MetricsAggregator(object):
ROLLUP_IN_SECONDS = 10.0
MAX_WEIGHT = 100000
Expand Down Expand Up @@ -455,11 +507,12 @@ def add(
unit, # type: MeasurementUnit
tags, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
local_aggregator=None, # type: Optional[LocalAggregator]
stacklevel=0, # type: int
):
# type: (...) -> None
if not self._ensure_thread() or self._flusher is None:
return
return None

if timestamp is None:
timestamp = time.time()
Expand All @@ -469,11 +522,12 @@ def add(
bucket_timestamp = int(
(timestamp // self.ROLLUP_IN_SECONDS) * self.ROLLUP_IN_SECONDS
)
serialized_tags = _serialize_tags(tags)
bucket_key = (
ty,
key,
unit,
self._serialize_tags(tags),
serialized_tags,
)

with self._lock:
Expand All @@ -486,7 +540,8 @@ def add(
metric = local_buckets[bucket_key] = METRIC_TYPES[ty](value)
previous_weight = 0

self._buckets_total_weight += metric.weight - previous_weight
added = metric.weight - previous_weight
self._buckets_total_weight += added

# Store code location once per metric and per day (of bucket timestamp)
if self._enable_code_locations:
Expand All @@ -509,6 +564,10 @@ def add(
# Given the new weight we consider whether we want to force flush.
self._consider_force_flush()

if local_aggregator is not None:
local_value = float(added if ty == "s" else value)
local_aggregator.add(ty, key, local_value, unit, serialized_tags)

def kill(self):
# type: (...) -> None
if self._flusher is None:
Expand Down Expand Up @@ -554,55 +613,87 @@ def _emit(
return envelope
return None

def _serialize_tags(
self, tags # type: Optional[MetricTags]
):
# type: (...) -> MetricTagsInternal
if not tags:
return ()

rv = []
for key, value in iteritems(tags):
# If the value is a collection, we want to flatten it.
if isinstance(value, (list, tuple)):
for inner_value in value:
if inner_value is not None:
rv.append((key, text_type(inner_value)))
elif value is not None:
rv.append((key, text_type(value)))

# It's very important to sort the tags in order to obtain the
# same bucket key.
return tuple(sorted(rv))
def _serialize_tags(
tags, # type: Optional[MetricTags]
):
# type: (...) -> MetricTagsInternal
if not tags:
return ()

rv = []
for key, value in iteritems(tags):
# If the value is a collection, we want to flatten it.
if isinstance(value, (list, tuple)):
for inner_value in value:
if inner_value is not None:
rv.append((key, text_type(inner_value)))
elif value is not None:
rv.append((key, text_type(value)))

# It's very important to sort the tags in order to obtain the
# same bucket key.
return tuple(sorted(rv))


def _tags_to_dict(tags):
# type: (MetricTagsInternal) -> Dict[str, Any]
rv = {} # type: Dict[str, Any]
for tag_name, tag_value in tags:
old_value = rv.get(tag_name)
if old_value is not None:
if isinstance(old_value, list):
old_value.append(tag_value)
else:
rv[tag_name] = [old_value, tag_value]
else:
rv[tag_name] = tag_value
return rv


def _get_aggregator_and_update_tags(key, tags):
# type: (str, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[MetricTags]]
# type: (str, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[LocalAggregator], Optional[MetricTags]]
"""Returns the current metrics aggregator if there is one."""
hub = sentry_sdk.Hub.current
client = hub.client
if client is None or client.metrics_aggregator is None:
return None, tags
return None, None, tags

experiments = client.options.get("_experiments", {})

updated_tags = dict(tags or ()) # type: Dict[str, MetricTagValue]
updated_tags.setdefault("release", client.options["release"])
updated_tags.setdefault("environment", client.options["environment"])

scope = hub.scope
local_aggregator = None

# We go with the low-level API here to access transaction information as
# this one is the same between just errors and errors + performance
transaction_source = scope._transaction_info.get("source")
if transaction_source in GOOD_TRANSACTION_SOURCES:
transaction = scope._transaction
if transaction:
updated_tags.setdefault("transaction", transaction)
transaction_name = scope._transaction
if transaction_name:
updated_tags.setdefault("transaction", transaction_name)
if scope._span is not None:
sample_rate = experiments.get("metrics_summary_sample_rate") or 0.0
should_summarize_metric_callback = experiments.get(
"should_summarize_metric"
)
if random.random() < sample_rate and (
should_summarize_metric_callback is None
or should_summarize_metric_callback(key, updated_tags)
):
local_aggregator = scope._span._get_local_aggregator()

callback = client.options.get("_experiments", {}).get("before_emit_metric")
if callback is not None:
before_emit_callback = experiments.get("before_emit_metric")
if before_emit_callback is not None:
with recursion_protection() as in_metrics:
if not in_metrics:
if not callback(key, updated_tags):
return None, updated_tags
if not before_emit_callback(key, updated_tags):
return None, None, updated_tags

return client.metrics_aggregator, updated_tags
return client.metrics_aggregator, local_aggregator, updated_tags


def incr(
Expand All @@ -615,9 +706,11 @@ def incr(
):
# type: (...) -> None
"""Increments a counter."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("c", key, value, unit, tags, timestamp, stacklevel)
aggregator.add(
"c", key, value, unit, tags, timestamp, local_aggregator, stacklevel
)


class _Timing(object):
Expand All @@ -637,6 +730,7 @@ def __init__(
self.value = value
self.unit = unit
self.entered = None # type: Optional[float]
self._span = None # type: Optional[sentry_sdk.tracing.Span]
self.stacklevel = stacklevel

def _validate_invocation(self, context):
Expand All @@ -650,17 +744,37 @@ def __enter__(self):
# type: (...) -> _Timing
self.entered = TIMING_FUNCTIONS[self.unit]()
self._validate_invocation("context-manager")
self._span = sentry_sdk.start_span(op="metric.timing", description=self.key)
if self.tags:
for key, value in self.tags.items():
if isinstance(value, (tuple, list)):
value = ",".join(sorted(map(str, value)))
self._span.set_tag(key, value)
self._span.__enter__()
return self

def __exit__(self, exc_type, exc_value, tb):
# type: (Any, Any, Any) -> None
aggregator, tags = _get_aggregator_and_update_tags(self.key, self.tags)
assert self._span, "did not enter"
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
self.key, self.tags
)
if aggregator is not None:
elapsed = TIMING_FUNCTIONS[self.unit]() - self.entered # type: ignore
aggregator.add(
"d", self.key, elapsed, self.unit, tags, self.timestamp, self.stacklevel
"d",
self.key,
elapsed,
self.unit,
tags,
self.timestamp,
local_aggregator,
self.stacklevel,
)

self._span.__exit__(exc_type, exc_value, tb)
self._span = None

def __call__(self, f):
# type: (Any) -> Any
self._validate_invocation("decorator")
Expand Down Expand Up @@ -698,9 +812,11 @@ def timing(
- it can be used as a decorator
"""
if value is not None:
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("d", key, value, unit, tags, timestamp, stacklevel)
aggregator.add(
"d", key, value, unit, tags, timestamp, local_aggregator, stacklevel
)
return _Timing(key, tags, timestamp, value, unit, stacklevel)


Expand All @@ -714,9 +830,11 @@ def distribution(
):
# type: (...) -> None
"""Emits a distribution."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("d", key, value, unit, tags, timestamp, stacklevel)
aggregator.add(
"d", key, value, unit, tags, timestamp, local_aggregator, stacklevel
)


def set(
Expand All @@ -729,21 +847,25 @@ def set(
):
# type: (...) -> None
"""Emits a set."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("s", key, value, unit, tags, timestamp, stacklevel)
aggregator.add(
"s", key, value, unit, tags, timestamp, local_aggregator, stacklevel
)


def gauge(
key, # type: str
value, # type: float
unit="none", # type: MetricValue
unit="none", # type: MeasurementUnit
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
"""Emits a gauge."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("g", key, value, unit, tags, timestamp, stacklevel)
aggregator.add(
"g", key, value, unit, tags, timestamp, local_aggregator, stacklevel
)
Loading

0 comments on commit b250a89

Please sign in to comment.