Skip to content

Commit 3d4ce06

Browse files
abrarsheikhSheldonTsen
authored andcommitted
[Serve] lazily evaluate autoscaling context (ray-project#58963)
autoscaling context need expensive function evaluation, not all autoscaling policies need the data. Lazily evaluate them to save controller CPU --------- Signed-off-by: abrar <[email protected]>
1 parent cec4dd7 commit 3d4ce06

File tree

3 files changed

+386
-128
lines changed

3 files changed

+386
-128
lines changed

python/ray/serve/_private/autoscaling_state.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -234,38 +234,29 @@ def get_decision_num_replicas(
234234

235235
return self.apply_bounds(decision_num_replicas)
236236

237-
def get_autoscaling_context(self, curr_target_num_replicas):
238-
total_num_requests = self.get_total_num_requests()
239-
total_queued_requests = self._get_queued_requests()
240-
# NOTE: for non additive aggregation functions, total_running_requests is not
241-
# accurate, consider this is a approximation.
242-
total_running_requests = total_num_requests - total_queued_requests
243-
244-
autoscaling_context: AutoscalingContext = AutoscalingContext(
237+
def get_autoscaling_context(self, curr_target_num_replicas) -> AutoscalingContext:
238+
return AutoscalingContext(
245239
deployment_id=self._deployment_id,
246240
deployment_name=self._deployment_id.name,
247241
app_name=self._deployment_id.app_name,
248242
current_num_replicas=len(self._running_replicas),
249243
target_num_replicas=curr_target_num_replicas,
250244
running_replicas=self._running_replicas,
251-
total_num_requests=total_num_requests,
245+
total_num_requests=self.get_total_num_requests,
252246
capacity_adjusted_min_replicas=self.get_num_replicas_lower_bound(),
253247
capacity_adjusted_max_replicas=self.get_num_replicas_upper_bound(),
254248
policy_state=(
255249
self._policy_state.copy() if self._policy_state is not None else {}
256250
),
257251
current_time=time.time(),
258252
config=self._config,
259-
total_queued_requests=total_queued_requests,
260-
total_running_requests=total_running_requests,
261-
aggregated_metrics=self._get_aggregated_custom_metrics(),
262-
raw_metrics=self._get_raw_custom_metrics(),
253+
total_queued_requests=self._get_queued_requests,
254+
aggregated_metrics=self._get_aggregated_custom_metrics,
255+
raw_metrics=self._get_raw_custom_metrics,
263256
last_scale_up_time=None,
264257
last_scale_down_time=None,
265258
)
266259

267-
return autoscaling_context
268-
269260
def _collect_replica_running_requests(self) -> List[TimeSeries]:
270261
"""Collect running requests timeseries from replicas for aggregation.
271262

python/ray/serve/config.py

Lines changed: 113 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import json
22
import logging
33
import warnings
4-
from dataclasses import dataclass
54
from enum import Enum
5+
from functools import cached_property
66
from typing import Any, Callable, Dict, List, Optional, Union
77

88
from ray import cloudpickle
@@ -39,7 +39,6 @@
3939

4040

4141
@PublicAPI(stability="alpha")
42-
@dataclass
4342
class AutoscalingContext:
4443
"""Rich context provided to custom autoscaling policies.
4544
@@ -49,49 +48,120 @@ class AutoscalingContext:
4948
5049
The context includes deployment metadata, current replica state, built-in and
5150
custom metrics, capacity bounds, policy state, and timing information.
51+
52+
Note: The aggregated_metrics and raw_metrics fields support lazy evaluation.
53+
You can pass callables that will be evaluated only when accessed, with results
54+
cached for subsequent accesses.
5255
"""
5356

54-
# Deployment information
55-
deployment_id: DeploymentID #: Unique identifier for the deployment.
56-
deployment_name: str #: Name of the deployment.
57-
app_name: Optional[str] #: Name of the application containing this deployment.
58-
59-
# Current state
60-
current_num_replicas: int #: Current number of running replicas.
61-
target_num_replicas: int #: Target number of replicas set by the autoscaler.
62-
running_replicas: List[ReplicaID] #: List of currently running replica IDs.
63-
64-
# Built-in metrics
65-
total_num_requests: float #: Total number of requests across all replicas.
66-
total_queued_requests: Optional[float] #: Number of requests currently queued.
67-
total_running_requests: Optional[
68-
float
69-
] #: Total number of requests currently running.
70-
71-
# Custom metrics
72-
aggregated_metrics: Dict[
73-
str, Dict[ReplicaID, float]
74-
] #: Time-weighted averages of custom metrics per replica.
75-
raw_metrics: Dict[
76-
str, Dict[ReplicaID, TimeSeries]
77-
] #: Raw custom metric timeseries per replica.
78-
79-
# Capacity and bounds
80-
capacity_adjusted_min_replicas: int #: Minimum replicas adjusted for cluster capacity.
81-
capacity_adjusted_max_replicas: int #: Maximum replicas adjusted for cluster capacity.
82-
83-
# Policy state
84-
policy_state: Dict[
85-
str, Any
86-
] #: Persistent state dictionary for the autoscaling policy.
87-
88-
# Timing
89-
last_scale_up_time: Optional[float] #: Timestamp of last scale-up action.
90-
last_scale_down_time: Optional[float] #: Timestamp of last scale-down action.
91-
current_time: Optional[float] #: Current timestamp.
92-
93-
# Config
94-
config: Optional[Any] #: Autoscaling configuration for this deployment.
57+
def __init__(
58+
self,
59+
deployment_id: DeploymentID,
60+
deployment_name: str,
61+
app_name: Optional[str],
62+
current_num_replicas: int,
63+
target_num_replicas: int,
64+
running_replicas: List[ReplicaID],
65+
total_num_requests: Union[float, Callable[[], float]],
66+
total_queued_requests: Optional[Union[float, Callable[[], float]]],
67+
aggregated_metrics: Optional[
68+
Union[
69+
Dict[str, Dict[ReplicaID, float]],
70+
Callable[[], Dict[str, Dict[ReplicaID, float]]],
71+
]
72+
],
73+
raw_metrics: Optional[
74+
Union[
75+
Dict[str, Dict[ReplicaID, TimeSeries]],
76+
Callable[[], Dict[str, Dict[ReplicaID, TimeSeries]]],
77+
]
78+
],
79+
capacity_adjusted_min_replicas: int,
80+
capacity_adjusted_max_replicas: int,
81+
policy_state: Dict[str, Any],
82+
last_scale_up_time: Optional[float],
83+
last_scale_down_time: Optional[float],
84+
current_time: Optional[float],
85+
config: Optional[Any],
86+
):
87+
# Deployment information
88+
self.deployment_id = deployment_id #: Unique identifier for the deployment.
89+
self.deployment_name = deployment_name #: Name of the deployment.
90+
self.app_name = app_name #: Name of the application containing this deployment.
91+
92+
# Current state
93+
self.current_num_replicas = (
94+
current_num_replicas #: Current number of running replicas.
95+
)
96+
self.target_num_replicas = (
97+
target_num_replicas #: Target number of replicas set by the autoscaler.
98+
)
99+
self.running_replicas = (
100+
running_replicas #: List of currently running replica IDs.
101+
)
102+
103+
# Built-in metrics
104+
self._total_num_requests_value = (
105+
total_num_requests #: Total number of requests across all replicas.
106+
)
107+
self._total_queued_requests_value = (
108+
total_queued_requests #: Number of requests currently queued.
109+
)
110+
111+
# Custom metrics - store potentially lazy callables privately
112+
self._aggregated_metrics_value = aggregated_metrics
113+
self._raw_metrics_value = raw_metrics
114+
115+
# Capacity and bounds
116+
self.capacity_adjusted_min_replicas = capacity_adjusted_min_replicas #: Minimum replicas adjusted for cluster capacity.
117+
self.capacity_adjusted_max_replicas = capacity_adjusted_max_replicas #: Maximum replicas adjusted for cluster capacity.
118+
119+
# Policy state
120+
self.policy_state = (
121+
policy_state #: Persistent state dictionary for the autoscaling policy.
122+
)
123+
124+
# Timing
125+
self.last_scale_up_time = (
126+
last_scale_up_time #: Timestamp of last scale-up action.
127+
)
128+
self.last_scale_down_time = (
129+
last_scale_down_time #: Timestamp of last scale-down action.
130+
)
131+
self.current_time = current_time #: Current timestamp.
132+
133+
# Config
134+
self.config = config #: Autoscaling configuration for this deployment.
135+
136+
@cached_property
137+
def aggregated_metrics(self) -> Optional[Dict[str, Dict[ReplicaID, float]]]:
138+
if callable(self._aggregated_metrics_value):
139+
return self._aggregated_metrics_value()
140+
return self._aggregated_metrics_value
141+
142+
@cached_property
143+
def raw_metrics(self) -> Optional[Dict[str, Dict[ReplicaID, TimeSeries]]]:
144+
if callable(self._raw_metrics_value):
145+
return self._raw_metrics_value()
146+
return self._raw_metrics_value
147+
148+
@cached_property
149+
def total_num_requests(self) -> float:
150+
if callable(self._total_num_requests_value):
151+
return self._total_num_requests_value()
152+
return self._total_num_requests_value
153+
154+
@cached_property
155+
def total_queued_requests(self) -> float:
156+
if callable(self._total_queued_requests_value):
157+
return self._total_queued_requests_value()
158+
return self._total_queued_requests_value
159+
160+
@property
161+
def total_running_requests(self) -> float:
162+
# NOTE: for non-additive aggregation functions, total_running_requests is not
163+
# accurate, consider this is an approximation.
164+
return self.total_num_requests - self.total_queued_requests
95165

96166

97167
@PublicAPI(stability="alpha")

0 commit comments

Comments
 (0)