diff --git a/python/ray/serve/_private/metrics_utils.py b/python/ray/serve/_private/metrics_utils.py index 8ddb1275be7a..508bb43f61a0 100644 --- a/python/ray/serve/_private/metrics_utils.py +++ b/python/ray/serve/_private/metrics_utils.py @@ -411,6 +411,9 @@ def merge_instantaneous_total( if not active_series: return [] + if len(active_series) == 1: + return active_series[0] + # True k-way merge: heap maintains exactly k elements (one per series) # Each element is (timestamp, replica_id, iterator) merge_heap = [] diff --git a/python/ray/serve/tests/unit/test_metrics_utils.py b/python/ray/serve/tests/unit/test_metrics_utils.py index bcb1e9bd8a8f..ec3f5b9d7ffe 100644 --- a/python/ray/serve/tests/unit/test_metrics_utils.py +++ b/python/ray/serve/tests/unit/test_metrics_utils.py @@ -4,10 +4,10 @@ import pytest from ray._common.test_utils import async_wait_for_condition +from ray.serve._private.common import TimeStampedValue from ray.serve._private.metrics_utils import ( InMemoryMetricsStore, MetricsPusher, - TimeStampedValue, aggregate_timeseries, merge_instantaneous_total, merge_timeseries_dicts, @@ -691,14 +691,17 @@ def test_merge_instantaneous_total_combine_same_timestamp(self): def test_merge_instantaneous_total_edge_cases_rounding(self): """Test edge cases for timestamp rounding and combination.""" - # Test rounding edge cases + # Test rounding edge cases with multiple replicas (rounding only happens with 2+ replicas) series1 = [ TimeStampedValue(1.004999, 5.0), # Should round to 1.0 TimeStampedValue(1.005000, 7.0), # Should round to 1.0 (round half to even) TimeStampedValue(1.005001, 9.0), # Should round to 1.01 ] + series2 = [ + TimeStampedValue(1.003, 2.0), # Should round to 1.0 + ] - result = merge_instantaneous_total([series1]) + result = merge_instantaneous_total([series1, series2]) # Should have two distinct rounded timestamps expected_timestamps = [1.0, 1.01] @@ -706,31 +709,39 @@ def test_merge_instantaneous_total_edge_cases_rounding(self): assert actual_timestamps == expected_timestamps # Values should reflect the changes - # Both 1.004999 and 1.005000 round to 1.0, so they get combined - # Order: 1.004999 (0->5), then 1.005000 (5->7) - final value at 1.0 is 7.0 - # Then 1.005001 (7->9) rounds to 1.01 - value at 1.01 is 9.0 + # At rounded timestamp 1.0: + # - series2 starts at 2 (t=1.003) + # - series1 starts at 5 (t=1.004999), total: 2+5=7 + # - series1 changes to 7 (t=1.005000), total: 2+7=9 + # At rounded timestamp 1.01: + # - series1 changes to 9 (t=1.005001), total: 2+9=11 expected = [ - TimeStampedValue( - 1.0, 7.0 - ), # Final state after all changes that round to 1.0 (1.004999: 0->5, 1.005000: 5->7) - TimeStampedValue(1.01, 9.0), # State after change at 1.005001 (7->9) + TimeStampedValue(1.0, 9.0), # Final state at rounded timestamp 1.0 + TimeStampedValue(1.01, 11.0), # State after change at 1.005001 ] assert_timeseries_equal(result, expected) def test_merge_instantaneous_total_no_changes_filtered(self): """Test that zero-change events are filtered even with rounding.""" + # Use multiple replicas to trigger the merge logic (single replica returns as-is) series1 = [ TimeStampedValue(1.001, 5.0), # Rounds to 1.00 TimeStampedValue(1.004, 5.0), # Also rounds to 1.00, no change TimeStampedValue(2.000, 7.0), # Rounds to 2.00, change ] + series2 = [ + TimeStampedValue(1.002, 3.0), # Rounds to 1.00 + TimeStampedValue(2.001, 3.0), # Rounds to 2.00, no change + ] - result = merge_instantaneous_total([series1]) + result = merge_instantaneous_total([series1, series2]) - # Should only include points where value actually changed + # Should only include points where total value actually changed + # At 1.00: series2 starts at 3, then series1 starts at 5, total = 8 + # At 2.00: series1 changes to 7, total = 3+7 = 10 expected = [ - TimeStampedValue(1.00, 5.0), # Initial value - TimeStampedValue(2.00, 7.0), # Value change + TimeStampedValue(1.00, 8.0), # Initial combined value + TimeStampedValue(2.00, 10.0), # Value change ] assert_timeseries_equal(result, expected)