-
Notifications
You must be signed in to change notification settings - Fork 7k
[Serve] lazily evaluate autoscaling context #58963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,6 @@ | ||
| import json | ||
| import logging | ||
| import warnings | ||
| from dataclasses import dataclass | ||
| from enum import Enum | ||
| from typing import Any, Callable, Dict, List, Optional, Union | ||
|
|
||
|
|
@@ -39,7 +38,6 @@ | |
|
|
||
|
|
||
| @PublicAPI(stability="alpha") | ||
| @dataclass | ||
| class AutoscalingContext: | ||
| """Rich context provided to custom autoscaling policies. | ||
|
|
||
|
|
@@ -49,49 +47,165 @@ class AutoscalingContext: | |
|
|
||
| The context includes deployment metadata, current replica state, built-in and | ||
| custom metrics, capacity bounds, policy state, and timing information. | ||
|
|
||
| Note: The aggregated_metrics and raw_metrics fields support lazy evaluation. | ||
| You can pass callables that will be evaluated only when accessed, with results | ||
| cached for subsequent accesses. | ||
| """ | ||
|
|
||
| # Deployment information | ||
| deployment_id: DeploymentID #: Unique identifier for the deployment. | ||
| deployment_name: str #: Name of the deployment. | ||
| app_name: Optional[str] #: Name of the application containing this deployment. | ||
|
|
||
| # Current state | ||
| current_num_replicas: int #: Current number of running replicas. | ||
| target_num_replicas: int #: Target number of replicas set by the autoscaler. | ||
| running_replicas: List[ReplicaID] #: List of currently running replica IDs. | ||
|
|
||
| # Built-in metrics | ||
| total_num_requests: float #: Total number of requests across all replicas. | ||
| total_queued_requests: Optional[float] #: Number of requests currently queued. | ||
| total_running_requests: Optional[ | ||
| float | ||
| ] #: Total number of requests currently running. | ||
|
|
||
| # Custom metrics | ||
| aggregated_metrics: Dict[ | ||
| str, Dict[ReplicaID, float] | ||
| ] #: Time-weighted averages of custom metrics per replica. | ||
| raw_metrics: Dict[ | ||
| str, Dict[ReplicaID, TimeSeries] | ||
| ] #: Raw custom metric timeseries per replica. | ||
|
|
||
| # Capacity and bounds | ||
| capacity_adjusted_min_replicas: int #: Minimum replicas adjusted for cluster capacity. | ||
| capacity_adjusted_max_replicas: int #: Maximum replicas adjusted for cluster capacity. | ||
|
|
||
| # Policy state | ||
| policy_state: Dict[ | ||
| str, Any | ||
| ] #: Persistent state dictionary for the autoscaling policy. | ||
|
|
||
| # Timing | ||
| last_scale_up_time: Optional[float] #: Timestamp of last scale-up action. | ||
| last_scale_down_time: Optional[float] #: Timestamp of last scale-down action. | ||
| current_time: Optional[float] #: Current timestamp. | ||
|
|
||
| # Config | ||
| config: Optional[Any] #: Autoscaling configuration for this deployment. | ||
| def __init__( | ||
| self, | ||
| deployment_id: DeploymentID, | ||
| deployment_name: str, | ||
| app_name: Optional[str], | ||
| current_num_replicas: int, | ||
| target_num_replicas: int, | ||
| running_replicas: List[ReplicaID], | ||
| total_num_requests: Union[float, Callable[[], float]], | ||
| total_queued_requests: Optional[Union[float, Callable[[], float]]], | ||
| aggregated_metrics: Optional[ | ||
| Union[ | ||
| Dict[str, Dict[ReplicaID, float]], | ||
| Callable[[], Dict[str, Dict[ReplicaID, float]]], | ||
| ] | ||
| ], | ||
| raw_metrics: Optional[ | ||
| Union[ | ||
| Dict[str, Dict[ReplicaID, TimeSeries]], | ||
| Callable[[], Dict[str, Dict[ReplicaID, TimeSeries]]], | ||
| ] | ||
| ], | ||
| capacity_adjusted_min_replicas: int, | ||
| capacity_adjusted_max_replicas: int, | ||
| policy_state: Dict[str, Any], | ||
| last_scale_up_time: Optional[float], | ||
| last_scale_down_time: Optional[float], | ||
| current_time: Optional[float], | ||
| config: Optional[Any], | ||
| ): | ||
| # Deployment information | ||
| self.deployment_id = deployment_id #: Unique identifier for the deployment. | ||
| self.deployment_name = deployment_name #: Name of the deployment. | ||
| self.app_name = app_name #: Name of the application containing this deployment. | ||
|
|
||
| # Current state | ||
| self.current_num_replicas = ( | ||
| current_num_replicas #: Current number of running replicas. | ||
| ) | ||
| self.target_num_replicas = ( | ||
| target_num_replicas #: Target number of replicas set by the autoscaler. | ||
| ) | ||
| self.running_replicas = ( | ||
| running_replicas #: List of currently running replica IDs. | ||
| ) | ||
|
|
||
| # Built-in metrics | ||
| self._total_num_requests_value = ( | ||
| total_num_requests #: Total number of requests across all replicas. | ||
| ) | ||
| self._total_queued_requests_value = ( | ||
| total_queued_requests #: Number of requests currently queued. | ||
| ) | ||
| self._total_num_requests_cached = None | ||
| self._total_queued_requests_cached = None | ||
| self._total_num_requests_evaluated = False | ||
| self._total_queued_requests_evaluated = False | ||
|
|
||
| # Custom metrics - store potentially lazy callables privately | ||
| self._aggregated_metrics_value = aggregated_metrics | ||
| self._aggregated_metrics_cached = None | ||
| self._aggregated_metrics_evaluated = False | ||
|
|
||
| self._raw_metrics_value = raw_metrics | ||
| self._raw_metrics_cached = None | ||
| self._raw_metrics_evaluated = False | ||
|
|
||
| # Capacity and bounds | ||
| self.capacity_adjusted_min_replicas = capacity_adjusted_min_replicas #: Minimum replicas adjusted for cluster capacity. | ||
| self.capacity_adjusted_max_replicas = capacity_adjusted_max_replicas #: Maximum replicas adjusted for cluster capacity. | ||
|
|
||
| # Policy state | ||
| self.policy_state = ( | ||
| policy_state #: Persistent state dictionary for the autoscaling policy. | ||
| ) | ||
|
|
||
| # Timing | ||
| self.last_scale_up_time = ( | ||
| last_scale_up_time #: Timestamp of last scale-up action. | ||
| ) | ||
| self.last_scale_down_time = ( | ||
| last_scale_down_time #: Timestamp of last scale-down action. | ||
| ) | ||
| self.current_time = current_time #: Current timestamp. | ||
|
|
||
| # Config | ||
| self.config = config #: Autoscaling configuration for this deployment. | ||
|
|
||
| @property | ||
| def aggregated_metrics(self) -> Optional[Dict[str, Dict[ReplicaID, float]]]: | ||
| if callable(self._aggregated_metrics_value): | ||
| if not self._aggregated_metrics_evaluated: | ||
| self._aggregated_metrics_cached = self._aggregated_metrics_value() | ||
| self._aggregated_metrics_evaluated = True | ||
| return self._aggregated_metrics_cached | ||
| return self._aggregated_metrics_value | ||
|
|
||
| @aggregated_metrics.setter | ||
| def aggregated_metrics(self, value: Optional[Dict[str, Dict[ReplicaID, float]]]): | ||
abrarsheikh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self._aggregated_metrics_value = value | ||
| self._aggregated_metrics_evaluated = False | ||
| self._aggregated_metrics_cached = None | ||
|
|
||
| @property | ||
| def raw_metrics(self) -> Optional[Dict[str, Dict[ReplicaID, TimeSeries]]]: | ||
| if callable(self._raw_metrics_value): | ||
| if not self._raw_metrics_evaluated: | ||
| self._raw_metrics_cached = self._raw_metrics_value() | ||
| self._raw_metrics_evaluated = True | ||
| return self._raw_metrics_cached | ||
| return self._raw_metrics_value | ||
|
|
||
| @raw_metrics.setter | ||
| def raw_metrics(self, value: Optional[Dict[str, Dict[ReplicaID, TimeSeries]]]): | ||
| self._raw_metrics_value = value | ||
| self._raw_metrics_evaluated = False | ||
| self._raw_metrics_cached = None | ||
|
|
||
| @property | ||
| def total_num_requests(self) -> float: | ||
| if callable(self._total_num_requests_value): | ||
| if not self._total_num_requests_evaluated: | ||
| self._total_num_requests_cached = self._total_num_requests_value() | ||
| self._total_num_requests_evaluated = True | ||
| return self._total_num_requests_cached | ||
| return self._total_num_requests_value | ||
|
|
||
| @total_num_requests.setter | ||
| def total_num_requests(self, value: float): | ||
| self._total_num_requests_value = value | ||
| self._total_num_requests_evaluated = False | ||
| self._total_num_requests_cached = None | ||
|
|
||
| @property | ||
| def total_queued_requests(self) -> float: | ||
| if callable(self._total_queued_requests_value): | ||
| if not self._total_queued_requests_evaluated: | ||
| self._total_queued_requests_cached = self._total_queued_requests_value() | ||
| self._total_queued_requests_evaluated = True | ||
| return self._total_queued_requests_cached | ||
| return self._total_queued_requests_value | ||
|
|
||
| @total_queued_requests.setter | ||
| def total_queued_requests(self, value: float): | ||
| self._total_queued_requests_value = value | ||
| self._total_queued_requests_evaluated = False | ||
| self._total_queued_requests_cached = None | ||
|
|
||
| @property | ||
| def total_running_requests(self) -> float: | ||
| # NOTE: for non additive aggregation functions, total_running_requests is not | ||
| # accurate, consider this is a approximation. | ||
abrarsheikh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return self.total_num_requests - self.total_queued_requests | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Null handling missing in total_running_requests propertyThe |
||
|
|
||
|
|
||
| @PublicAPI(stability="alpha") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.