Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/ray/serve/_private/metrics_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ def merge_instantaneous_total(
if not active_series:
return []

if len(active_series) == 1:
return active_series[0]

# True k-way merge: heap maintains exactly k elements (one per series)
# Each element is (timestamp, replica_id, iterator)
merge_heap = []
Expand Down
39 changes: 25 additions & 14 deletions python/ray/serve/tests/unit/test_metrics_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import pytest

from ray._common.test_utils import async_wait_for_condition
from ray.serve._private.common import TimeStampedValue
from ray.serve._private.metrics_utils import (
InMemoryMetricsStore,
MetricsPusher,
TimeStampedValue,
aggregate_timeseries,
merge_instantaneous_total,
merge_timeseries_dicts,
Expand Down Expand Up @@ -691,46 +691,57 @@ def test_merge_instantaneous_total_combine_same_timestamp(self):

def test_merge_instantaneous_total_edge_cases_rounding(self):
"""Test edge cases for timestamp rounding and combination."""
# Test rounding edge cases
# Test rounding edge cases with multiple replicas (rounding only happens with 2+ replicas)
series1 = [
TimeStampedValue(1.004999, 5.0), # Should round to 1.0
TimeStampedValue(1.005000, 7.0), # Should round to 1.0 (round half to even)
TimeStampedValue(1.005001, 9.0), # Should round to 1.01
]
series2 = [
TimeStampedValue(1.003, 2.0), # Should round to 1.0
]

result = merge_instantaneous_total([series1])
result = merge_instantaneous_total([series1, series2])

# Should have two distinct rounded timestamps
expected_timestamps = [1.0, 1.01]
actual_timestamps = [point.timestamp for point in result]
assert actual_timestamps == expected_timestamps

# Values should reflect the changes
# Both 1.004999 and 1.005000 round to 1.0, so they get combined
# Order: 1.004999 (0->5), then 1.005000 (5->7) - final value at 1.0 is 7.0
# Then 1.005001 (7->9) rounds to 1.01 - value at 1.01 is 9.0
# At rounded timestamp 1.0:
# - series2 starts at 2 (t=1.003)
# - series1 starts at 5 (t=1.004999), total: 2+5=7
# - series1 changes to 7 (t=1.005000), total: 2+7=9
# At rounded timestamp 1.01:
# - series1 changes to 9 (t=1.005001), total: 2+9=11
expected = [
TimeStampedValue(
1.0, 7.0
), # Final state after all changes that round to 1.0 (1.004999: 0->5, 1.005000: 5->7)
TimeStampedValue(1.01, 9.0), # State after change at 1.005001 (7->9)
TimeStampedValue(1.0, 9.0), # Final state at rounded timestamp 1.0
TimeStampedValue(1.01, 11.0), # State after change at 1.005001
]
assert_timeseries_equal(result, expected)

def test_merge_instantaneous_total_no_changes_filtered(self):
"""Test that zero-change events are filtered even with rounding."""
# Use multiple replicas to trigger the merge logic (single replica returns as-is)
series1 = [
TimeStampedValue(1.001, 5.0), # Rounds to 1.00
TimeStampedValue(1.004, 5.0), # Also rounds to 1.00, no change
TimeStampedValue(2.000, 7.0), # Rounds to 2.00, change
]
series2 = [
TimeStampedValue(1.002, 3.0), # Rounds to 1.00
TimeStampedValue(2.001, 3.0), # Rounds to 2.00, no change
]

result = merge_instantaneous_total([series1])
result = merge_instantaneous_total([series1, series2])

# Should only include points where value actually changed
# Should only include points where total value actually changed
# At 1.00: series2 starts at 3, then series1 starts at 5, total = 8
# At 2.00: series1 changes to 7, total = 3+7 = 10
expected = [
TimeStampedValue(1.00, 5.0), # Initial value
TimeStampedValue(2.00, 7.0), # Value change
TimeStampedValue(1.00, 8.0), # Initial combined value
TimeStampedValue(2.00, 10.0), # Value change
]
assert_timeseries_equal(result, expected)

Expand Down