-
-
Notifications
You must be signed in to change notification settings - Fork 16.6k
[Bugfix][Metrics] Fix RayPrometheusMetric.labels() returning shared labeled child #40369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,10 @@ def _get_replica_id() -> str | None: | |
|
|
||
|
|
||
| class RayPrometheusMetric: | ||
| # Set by each concrete subclass to the matching _LabeledRay* class so that | ||
| # labels() returns the correct recording API (inc / set / observe). | ||
| _labeled_cls: type["_LabeledRayMetric"] | ||
|
|
||
| def __init__(self): | ||
| if ray_metrics is None: | ||
| raise ImportError("RayPrometheusMetric requires Ray to be installed.") | ||
|
|
@@ -39,7 +43,7 @@ def _get_tag_keys(labelnames: list[str] | None) -> tuple[str, ...]: | |
| labels.append("ReplicaId") | ||
| return tuple(labels) | ||
|
|
||
| def labels(self, *labels, **labelskwargs): | ||
| def _build_tags(self, *labels, **labelskwargs) -> dict[str, str]: | ||
| if labels: | ||
| # -1 because ReplicaId was added automatically | ||
| expected = len(self.metric._tag_keys) - 1 | ||
|
|
@@ -52,12 +56,18 @@ def labels(self, *labels, **labelskwargs): | |
|
|
||
| labelskwargs["ReplicaId"] = _get_replica_id() or "" | ||
|
|
||
| if labelskwargs: | ||
| for k, v in labelskwargs.items(): | ||
| if not isinstance(v, str): | ||
| labelskwargs[k] = str(v) | ||
| self.metric.set_default_tags(labelskwargs) | ||
| return self | ||
| return {k: v if isinstance(v, str) else str(v) for k, v in labelskwargs.items()} | ||
|
|
||
| def labels(self, *labels, **labelskwargs) -> "_LabeledRayMetric": | ||
| # Each call returns an independent labeled child carrying its own | ||
| # tag set, matching the prometheus_client.Metric.labels() contract | ||
| # that callsites rely on. Earlier versions mutated the underlying | ||
| # Ray metric's default tags in place and returned self, so every | ||
| # labeled "child" shared the last-set label values -- e.g. every | ||
| # vllm:request_success increment was attributed to the last | ||
| # FinishReason iterated (REPETITION). | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not a fan of "earlier versions [did this other thing]" that coding models tend to spit out - I think it just adds clutter |
||
| tags = self._build_tags(*labels, **labelskwargs) | ||
| return self._labeled_cls(self, tags) | ||
|
|
||
| @staticmethod | ||
| def _get_sanitized_opentelemetry_name(name: str) -> str: | ||
|
|
@@ -75,10 +85,54 @@ def _get_sanitized_opentelemetry_name(name: str) -> str: | |
| return re.sub(r"[^a-zA-Z0-9_]", "_", name) | ||
|
|
||
|
|
||
| class _LabeledRayMetric: | ||
| """A per-label-set view of a Ray metric. | ||
|
|
||
| Each instance carries its own tag set and forwards recording operations | ||
| to the underlying Ray metric with ``tags=self._tags`` on every call. Per | ||
| Ray's metric API, per-call tags take precedence over any default tags on | ||
| the wrapped metric, so concurrent labeled children do not clobber each | ||
| other. | ||
| """ | ||
|
|
||
| __slots__ = ("_wrapper", "_tags") | ||
|
|
||
| def __init__(self, wrapper: RayPrometheusMetric, tags: dict[str, str]): | ||
| self._wrapper = wrapper | ||
| self._tags = tags | ||
|
|
||
| def labels(self, *labels, **labelskwargs) -> "_LabeledRayMetric": | ||
| # Re-labeling a labeled child is unusual, but route through the root | ||
| # wrapper so tag-key validation happens against the original schema. | ||
| return self._wrapper.labels(*labels, **labelskwargs) | ||
|
|
||
|
|
||
| class _LabeledRayCounter(_LabeledRayMetric): | ||
| def inc(self, value: int | float = 1.0): | ||
| if value == 0: | ||
| return | ||
| return self._wrapper.metric.inc(value, tags=self._tags) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This duplicates the Can we not have |
||
|
|
||
|
|
||
| class _LabeledRayGauge(_LabeledRayMetric): | ||
| def set(self, value: int | float): | ||
| return self._wrapper.metric.set(value, tags=self._tags) | ||
|
|
||
| def set_to_current_time(self): | ||
| return self._wrapper.metric.set(time.time(), tags=self._tags) | ||
|
|
||
|
|
||
| class _LabeledRayHistogram(_LabeledRayMetric): | ||
| def observe(self, value: int | float): | ||
| return self._wrapper.metric.observe(value, tags=self._tags) | ||
|
|
||
|
|
||
| class RayGaugeWrapper(RayPrometheusMetric): | ||
| """Wraps around ray.util.metrics.Gauge to provide same API as | ||
| prometheus_client.Gauge""" | ||
|
|
||
| _labeled_cls = _LabeledRayGauge | ||
|
|
||
| def __init__( | ||
| self, | ||
| name: str, | ||
|
|
@@ -112,6 +166,8 @@ class RayCounterWrapper(RayPrometheusMetric): | |
| """Wraps around ray.util.metrics.Counter to provide same API as | ||
| prometheus_client.Counter""" | ||
|
|
||
| _labeled_cls = _LabeledRayCounter | ||
|
|
||
| def __init__( | ||
| self, | ||
| name: str, | ||
|
|
@@ -136,6 +192,8 @@ class RayHistogramWrapper(RayPrometheusMetric): | |
| """Wraps around ray.util.metrics.Histogram to provide same API as | ||
| prometheus_client.Histogram""" | ||
|
|
||
| _labeled_cls = _LabeledRayHistogram | ||
|
|
||
| def __init__( | ||
| self, | ||
| name: str, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto on this "prior to the fix" comment