diff --git a/python/ray/serve/_private/autoscaling_state.py b/python/ray/serve/_private/autoscaling_state.py index 8f3a59354404..0992c7be8af1 100644 --- a/python/ray/serve/_private/autoscaling_state.py +++ b/python/ray/serve/_private/autoscaling_state.py @@ -234,21 +234,15 @@ def get_decision_num_replicas( return self.apply_bounds(decision_num_replicas) - def get_autoscaling_context(self, curr_target_num_replicas): - total_num_requests = self.get_total_num_requests() - total_queued_requests = self._get_queued_requests() - # NOTE: for non additive aggregation functions, total_running_requests is not - # accurate, consider this is a approximation. - total_running_requests = total_num_requests - total_queued_requests - - autoscaling_context: AutoscalingContext = AutoscalingContext( + def get_autoscaling_context(self, curr_target_num_replicas) -> AutoscalingContext: + return AutoscalingContext( deployment_id=self._deployment_id, deployment_name=self._deployment_id.name, app_name=self._deployment_id.app_name, current_num_replicas=len(self._running_replicas), target_num_replicas=curr_target_num_replicas, running_replicas=self._running_replicas, - total_num_requests=total_num_requests, + total_num_requests=self.get_total_num_requests, capacity_adjusted_min_replicas=self.get_num_replicas_lower_bound(), capacity_adjusted_max_replicas=self.get_num_replicas_upper_bound(), policy_state=( @@ -256,16 +250,13 @@ def get_autoscaling_context(self, curr_target_num_replicas): ), current_time=time.time(), config=self._config, - total_queued_requests=total_queued_requests, - total_running_requests=total_running_requests, - aggregated_metrics=self._get_aggregated_custom_metrics(), - raw_metrics=self._get_raw_custom_metrics(), + total_queued_requests=self._get_queued_requests, + aggregated_metrics=self._get_aggregated_custom_metrics, + raw_metrics=self._get_raw_custom_metrics, last_scale_up_time=None, last_scale_down_time=None, ) - return autoscaling_context - def _collect_replica_running_requests(self) -> List[TimeSeries]: """Collect running requests timeseries from replicas for aggregation. diff --git a/python/ray/serve/config.py b/python/ray/serve/config.py index 6aa7ac54317b..05b343acc24c 100644 --- a/python/ray/serve/config.py +++ b/python/ray/serve/config.py @@ -1,8 +1,8 @@ import json import logging import warnings -from dataclasses import dataclass from enum import Enum +from functools import cached_property from typing import Any, Callable, Dict, List, Optional, Union from ray import cloudpickle @@ -39,7 +39,6 @@ @PublicAPI(stability="alpha") -@dataclass class AutoscalingContext: """Rich context provided to custom autoscaling policies. @@ -49,49 +48,120 @@ class AutoscalingContext: The context includes deployment metadata, current replica state, built-in and custom metrics, capacity bounds, policy state, and timing information. + + Note: The aggregated_metrics and raw_metrics fields support lazy evaluation. + You can pass callables that will be evaluated only when accessed, with results + cached for subsequent accesses. """ - # Deployment information - deployment_id: DeploymentID #: Unique identifier for the deployment. - deployment_name: str #: Name of the deployment. - app_name: Optional[str] #: Name of the application containing this deployment. - - # Current state - current_num_replicas: int #: Current number of running replicas. - target_num_replicas: int #: Target number of replicas set by the autoscaler. - running_replicas: List[ReplicaID] #: List of currently running replica IDs. - - # Built-in metrics - total_num_requests: float #: Total number of requests across all replicas. - total_queued_requests: Optional[float] #: Number of requests currently queued. - total_running_requests: Optional[ - float - ] #: Total number of requests currently running. - - # Custom metrics - aggregated_metrics: Dict[ - str, Dict[ReplicaID, float] - ] #: Time-weighted averages of custom metrics per replica. - raw_metrics: Dict[ - str, Dict[ReplicaID, TimeSeries] - ] #: Raw custom metric timeseries per replica. - - # Capacity and bounds - capacity_adjusted_min_replicas: int #: Minimum replicas adjusted for cluster capacity. - capacity_adjusted_max_replicas: int #: Maximum replicas adjusted for cluster capacity. - - # Policy state - policy_state: Dict[ - str, Any - ] #: Persistent state dictionary for the autoscaling policy. - - # Timing - last_scale_up_time: Optional[float] #: Timestamp of last scale-up action. - last_scale_down_time: Optional[float] #: Timestamp of last scale-down action. - current_time: Optional[float] #: Current timestamp. - - # Config - config: Optional[Any] #: Autoscaling configuration for this deployment. + def __init__( + self, + deployment_id: DeploymentID, + deployment_name: str, + app_name: Optional[str], + current_num_replicas: int, + target_num_replicas: int, + running_replicas: List[ReplicaID], + total_num_requests: Union[float, Callable[[], float]], + total_queued_requests: Optional[Union[float, Callable[[], float]]], + aggregated_metrics: Optional[ + Union[ + Dict[str, Dict[ReplicaID, float]], + Callable[[], Dict[str, Dict[ReplicaID, float]]], + ] + ], + raw_metrics: Optional[ + Union[ + Dict[str, Dict[ReplicaID, TimeSeries]], + Callable[[], Dict[str, Dict[ReplicaID, TimeSeries]]], + ] + ], + capacity_adjusted_min_replicas: int, + capacity_adjusted_max_replicas: int, + policy_state: Dict[str, Any], + last_scale_up_time: Optional[float], + last_scale_down_time: Optional[float], + current_time: Optional[float], + config: Optional[Any], + ): + # Deployment information + self.deployment_id = deployment_id #: Unique identifier for the deployment. + self.deployment_name = deployment_name #: Name of the deployment. + self.app_name = app_name #: Name of the application containing this deployment. + + # Current state + self.current_num_replicas = ( + current_num_replicas #: Current number of running replicas. + ) + self.target_num_replicas = ( + target_num_replicas #: Target number of replicas set by the autoscaler. + ) + self.running_replicas = ( + running_replicas #: List of currently running replica IDs. + ) + + # Built-in metrics + self._total_num_requests_value = ( + total_num_requests #: Total number of requests across all replicas. + ) + self._total_queued_requests_value = ( + total_queued_requests #: Number of requests currently queued. + ) + + # Custom metrics - store potentially lazy callables privately + self._aggregated_metrics_value = aggregated_metrics + self._raw_metrics_value = raw_metrics + + # Capacity and bounds + self.capacity_adjusted_min_replicas = capacity_adjusted_min_replicas #: Minimum replicas adjusted for cluster capacity. + self.capacity_adjusted_max_replicas = capacity_adjusted_max_replicas #: Maximum replicas adjusted for cluster capacity. + + # Policy state + self.policy_state = ( + policy_state #: Persistent state dictionary for the autoscaling policy. + ) + + # Timing + self.last_scale_up_time = ( + last_scale_up_time #: Timestamp of last scale-up action. + ) + self.last_scale_down_time = ( + last_scale_down_time #: Timestamp of last scale-down action. + ) + self.current_time = current_time #: Current timestamp. + + # Config + self.config = config #: Autoscaling configuration for this deployment. + + @cached_property + def aggregated_metrics(self) -> Optional[Dict[str, Dict[ReplicaID, float]]]: + if callable(self._aggregated_metrics_value): + return self._aggregated_metrics_value() + return self._aggregated_metrics_value + + @cached_property + def raw_metrics(self) -> Optional[Dict[str, Dict[ReplicaID, TimeSeries]]]: + if callable(self._raw_metrics_value): + return self._raw_metrics_value() + return self._raw_metrics_value + + @cached_property + def total_num_requests(self) -> float: + if callable(self._total_num_requests_value): + return self._total_num_requests_value() + return self._total_num_requests_value + + @cached_property + def total_queued_requests(self) -> float: + if callable(self._total_queued_requests_value): + return self._total_queued_requests_value() + return self._total_queued_requests_value + + @property + def total_running_requests(self) -> float: + # NOTE: for non-additive aggregation functions, total_running_requests is not + # accurate, consider this is an approximation. + return self.total_num_requests - self.total_queued_requests @PublicAPI(stability="alpha") diff --git a/python/ray/serve/tests/unit/test_autoscaling_policy.py b/python/ray/serve/tests/unit/test_autoscaling_policy.py index 3a4787a551aa..337507b52584 100644 --- a/python/ray/serve/tests/unit/test_autoscaling_policy.py +++ b/python/ray/serve/tests/unit/test_autoscaling_policy.py @@ -2,6 +2,7 @@ import pytest +from ray.serve._private.common import DeploymentID, ReplicaID, TimeStampedValue from ray.serve._private.constants import CONTROL_LOOP_INTERVAL_S from ray.serve.autoscaling_policy import ( _calculate_desired_num_replicas, @@ -10,6 +11,45 @@ from ray.serve.config import AutoscalingConfig, AutoscalingContext +def create_context_with_overrides( + base_ctx: AutoscalingContext, **kwargs +) -> AutoscalingContext: + """Helper to create a new AutoscalingContext with specified attributes overridden. + + Args: + base_ctx: The base AutoscalingContext to copy values from. + **kwargs: Attributes to override in the new context. + + Returns: + A new AutoscalingContext with overridden values. + """ + # Get all constructor parameters with defaults from base context + params = { + "config": base_ctx.config, + "deployment_id": base_ctx.deployment_id, + "deployment_name": base_ctx.deployment_name, + "app_name": base_ctx.app_name, + "current_num_replicas": base_ctx.current_num_replicas, + "target_num_replicas": base_ctx.target_num_replicas, + "running_replicas": base_ctx.running_replicas, + "total_num_requests": base_ctx.total_num_requests, + "total_queued_requests": base_ctx.total_queued_requests, + "aggregated_metrics": base_ctx.aggregated_metrics, + "raw_metrics": base_ctx.raw_metrics, + "capacity_adjusted_min_replicas": base_ctx.capacity_adjusted_min_replicas, + "capacity_adjusted_max_replicas": base_ctx.capacity_adjusted_max_replicas, + "policy_state": base_ctx.policy_state, + "last_scale_up_time": base_ctx.last_scale_up_time, + "last_scale_down_time": base_ctx.last_scale_down_time, + "current_time": base_ctx.current_time, + } + + # Override with provided kwargs + params.update(kwargs) + + return AutoscalingContext(**params) + + class TestCalculateDesiredNumReplicas: def test_bounds_checking(self): num_replicas = 10 @@ -232,7 +272,6 @@ def test_scaling_factor_scale_up_from_0_replicas( running_replicas=None, current_time=None, total_queued_requests=None, - total_running_requests=None, aggregated_metrics=None, raw_metrics=None, last_scale_up_time=None, @@ -289,7 +328,6 @@ def test_scaling_factor_scale_down_to_0_replicas( running_replicas=None, current_time=None, total_queued_requests=None, - total_running_requests=None, aggregated_metrics=None, raw_metrics=None, last_scale_up_time=None, @@ -313,11 +351,14 @@ def test_scaling_factor_scale_down_to_0_replicas( config.downscaling_factor = 0.2 # policy_manager = AutoscalingPolicyManager(config) - ctx.total_num_requests = 0 num_replicas = 5 for _ in range(5): - ctx.current_num_replicas = num_replicas - ctx.target_num_replicas = num_replicas + ctx = create_context_with_overrides( + ctx, + total_num_requests=0, + current_num_replicas=num_replicas, + target_num_replicas=num_replicas, + ) num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert num_replicas == 0 @@ -369,7 +410,6 @@ def test_upscale_downscale_delay(self, downscale_to_zero_delay_s): running_replicas=None, current_time=None, total_queued_requests=None, - total_running_requests=None, aggregated_metrics=None, raw_metrics=None, last_scale_up_time=None, @@ -380,129 +420,192 @@ def test_upscale_downscale_delay(self, downscale_to_zero_delay_s): new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 1 - ctx.total_num_requests = overload_requests - ctx.current_num_replicas = 1 - ctx.target_num_replicas = 1 - # We should scale up only after enough consecutive scale-up decisions. for i in range(upscale_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 1, i + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 2 no_requests = 0 - ctx.total_num_requests = no_requests - ctx.current_num_replicas = 2 - ctx.target_num_replicas = 2 - # We should scale down only after enough consecutive scale-down decisions. # Downscaling to zero follows current_num_replicas->1->0 for i in range(downscale_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=2, + target_num_replicas=2, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 2, i + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=2, + target_num_replicas=2, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 1 - ctx.current_num_replicas = 1 - ctx.target_num_replicas = 1 # We should scale down to zero only after enough consecutive downscale-to-zero decisions. for i in range(downscale_to_zero_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 1, i + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 0 # Get some scale-up decisions, but not enough to trigger a scale up. - ctx.total_num_requests = overload_requests - ctx.current_num_replicas = 1 - ctx.target_num_replicas = 1 - for i in range(int(upscale_wait_periods / 2)): + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 1, i - ctx.total_num_requests = 0 - ctx.current_num_replicas = 1 - ctx.target_num_replicas = 1 - # Interrupt with a scale-down decision. + ctx = create_context_with_overrides( + ctx, + total_num_requests=0, + current_num_replicas=1, + target_num_replicas=1, + ) replica_queue_length_autoscaling_policy(ctx=ctx) # The counter should be reset, so it should require `upscale_wait_periods` # more periods before we actually scale up. - - ctx.total_num_requests = overload_requests - ctx.current_num_replicas = 1 - ctx.target_num_replicas = 1 - for i in range(upscale_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 1, i + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 2 - ctx.total_num_requests = no_requests - ctx.current_num_replicas = 2 - ctx.target_num_replicas = 2 - # Get some scale-down decisions, but not enough to trigger a scale down. for i in range(int(downscale_wait_periods / 2)): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=2, + target_num_replicas=2, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 2, i - ctx.total_num_requests = 200 - ctx.current_num_replicas = 2 - ctx.target_num_replicas = 2 - # Interrupt with a scale-up decision. + ctx = create_context_with_overrides( + ctx, + total_num_requests=200, + current_num_replicas=2, + target_num_replicas=2, + ) replica_queue_length_autoscaling_policy(ctx=ctx) # The counter should be reset so it should require `downscale_wait_periods` # more periods before we actually scale down. - ctx.total_num_requests = no_requests - ctx.current_num_replicas = 2 - ctx.target_num_replicas = 2 - # We should scale down only after enough consecutive scale-down decisions. for i in range(downscale_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=2, + target_num_replicas=2, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 2, i # First scale down to 1 replica + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=2, + target_num_replicas=2, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 1 - ctx.current_num_replicas = 1 - ctx.target_num_replicas = 1 - # Scale down to 0, but not enough to trigger a complete scale down to zero. for i in range(int(downscale_to_zero_wait_periods / 2)): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 1, i - ctx.total_num_requests = 100 - ctx.current_num_replicas = 1 - ctx.target_num_replicas = 1 # Interrupt with a scale-up decision. + ctx = create_context_with_overrides( + ctx, + total_num_requests=100, + current_num_replicas=1, + target_num_replicas=1, + ) replica_queue_length_autoscaling_policy(ctx=ctx) - ctx.total_num_requests = no_requests - ctx.current_num_replicas = 1 - ctx.target_num_replicas = 1 - # The counter should be reset so it should require `downscale_to_zero_wait_periods` # more periods before we actually scale down. for i in range(downscale_to_zero_wait_periods): + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, v = replica_queue_length_autoscaling_policy(ctx=ctx) # print(new_num_replicas, v) assert new_num_replicas == 1, i + ctx = create_context_with_overrides( + ctx, + total_num_requests=no_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 0 @@ -534,7 +637,6 @@ def test_replicas_delayed_startup(self): running_replicas=None, current_time=None, total_queued_requests=None, - total_running_requests=None, aggregated_metrics=None, raw_metrics=None, last_scale_up_time=None, @@ -547,25 +649,33 @@ def test_replicas_delayed_startup(self): # New target is 100, but no new replicas finished spinning up during this # timestep. - ctx.total_num_requests = 100 - ctx.current_num_replicas = 1 - ctx.target_num_replicas = 100 + ctx = create_context_with_overrides( + ctx, + total_num_requests=100, + current_num_replicas=1, + target_num_replicas=100, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 100 # Two new replicas spun up during this timestep. - ctx.total_num_requests = 123 - ctx.current_num_replicas = 3 - ctx.target_num_replicas = 100 - + ctx = create_context_with_overrides( + ctx, + total_num_requests=123, + current_num_replicas=3, + target_num_replicas=100, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 123 # A lot of queries got drained and a lot of replicas started up, but # new_num_replicas should not decrease, because of the downscale delay. - ctx.total_num_requests = 10 - ctx.current_num_replicas = 4 - ctx.target_num_replicas = 123 + ctx = create_context_with_overrides( + ctx, + total_num_requests=10, + current_num_replicas=4, + target_num_replicas=123, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == 123 @@ -609,7 +719,6 @@ def test_fluctuating_ongoing_requests(self, delay_s): running_replicas=None, current_time=None, total_queued_requests=None, - total_running_requests=None, aggregated_metrics=None, raw_metrics=None, last_scale_up_time=None, @@ -619,18 +728,24 @@ def test_fluctuating_ongoing_requests(self, delay_s): new_num_replicas = None for trial in range(trials): if trial % 2 == 0: - ctx.target_num_replicas = 1 - ctx.total_num_requests = overload_requests - ctx.current_num_replicas = 1 + ctx = create_context_with_overrides( + ctx, + total_num_requests=overload_requests, + current_num_replicas=1, + target_num_replicas=1, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) if delay_s > 0: assert new_num_replicas == 1, trial else: assert new_num_replicas == 2, trial else: - ctx.target_num_replicas = 2 - ctx.total_num_requests = underload_requests - ctx.current_num_replicas = 2 + ctx = create_context_with_overrides( + ctx, + total_num_requests=underload_requests, + current_num_replicas=2, + target_num_replicas=2, + ) new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) if delay_s > 0: assert new_num_replicas == 2, trial @@ -666,7 +781,6 @@ def test_single_replica_receives_all_requests(self, ongoing_requests): running_replicas=None, current_time=None, total_queued_requests=None, - total_running_requests=None, aggregated_metrics=None, raw_metrics=None, last_scale_up_time=None, @@ -676,6 +790,89 @@ def test_single_replica_receives_all_requests(self, ongoing_requests): new_num_replicas, _ = replica_queue_length_autoscaling_policy(ctx=ctx) assert new_num_replicas == ongoing_requests / target_requests + def test_callable_and_direct_values(self): + config = AutoscalingConfig(min_replicas=1, max_replicas=10) + deployment_id = DeploymentID(name="test", app_name="test_app") + replica_id = ReplicaID(unique_id="r1", deployment_id=deployment_id) + + # Test callables with lazy evaluation and caching + call_counts = {"requests": 0, "queued": 0, "agg": 0, "raw": 0} + + ctx = AutoscalingContext( + config=config, + deployment_id=None, + deployment_name="test", + app_name=None, + current_num_replicas=5, + target_num_replicas=5, + running_replicas=[], + total_num_requests=lambda: ( + call_counts.update({"requests": call_counts["requests"] + 1}), + 42.0, + )[1], + total_queued_requests=lambda: ( + call_counts.update({"queued": call_counts["queued"] + 1}), + 10.0, + )[1], + aggregated_metrics=lambda: ( + call_counts.update({"agg": call_counts["agg"] + 1}), + {"m": {replica_id: 5.0}}, + )[1], + raw_metrics=lambda: ( + call_counts.update({"raw": call_counts["raw"] + 1}), + {"m": {replica_id: [TimeStampedValue(1.0, 5.0)]}}, + )[1], + capacity_adjusted_min_replicas=1, + capacity_adjusted_max_replicas=10, + policy_state={}, + last_scale_up_time=None, + last_scale_down_time=None, + current_time=None, + ) + + # Callables not executed until accessed + assert all(c == 0 for c in call_counts.values()) + + # First access executes callables + assert ctx.total_num_requests == 42.0 + assert ctx.total_queued_requests == 10.0 + assert ctx.aggregated_metrics == {"m": {replica_id: 5.0}} + assert ctx.raw_metrics["m"][replica_id][0].value == 5.0 + assert all(c == 1 for c in call_counts.values()) + + # Second access uses cached values + _ = ctx.total_num_requests + _ = ctx.total_queued_requests + _ = ctx.aggregated_metrics + _ = ctx.raw_metrics + assert all(c == 1 for c in call_counts.values()) + + # Test direct values (non-callable) + ctx2 = AutoscalingContext( + config=config, + deployment_id=None, + deployment_name="test", + app_name=None, + current_num_replicas=5, + target_num_replicas=5, + running_replicas=[], + total_num_requests=100.0, + total_queued_requests=20.0, + aggregated_metrics={"m2": {replica_id: 15.0}}, + raw_metrics={"m2": {replica_id: [TimeStampedValue(2.0, 25.0)]}}, + capacity_adjusted_min_replicas=1, + capacity_adjusted_max_replicas=10, + policy_state={}, + last_scale_up_time=None, + last_scale_down_time=None, + current_time=None, + ) + + assert ctx2.total_num_requests == 100.0 + assert ctx2.total_queued_requests == 20.0 + assert ctx2.aggregated_metrics == {"m2": {replica_id: 15.0}} + assert ctx2.raw_metrics["m2"][replica_id][0].value == 25.0 + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__]))