Skip to content

Commit 2de1c49

Browse files
wxwmd行筠
andauthored
[core] Autoscaler with resource availability (#58623)
## Description When the Autoscaler receives a resource request and decides which type of node to scale up,, only the `UtilizationScore` is considered (that is, Ray tries to avoid launching a large node for a small resource request, which would lead to resource waste). If multiple node types in the cluster have the same `UtilizationScore`, Ray always request for the same node type. In Spot scenarios, cloud resources are dynamically changing. Therefore, we want the Autoscaler to be aware of cloud resource availability — if a certain node type becomes unavailable, the Autoscaler should be able to automatically switch to requesting other node types. In this PR, I added the `CloudResourceMonitor` class, which records node types that have failed resource allocation, and in future scaling events, reduces the weight of these node types. ## Related issues Related to #49983 Fixes #53636 #39788 #39789 ## implementation details 1. `CloudResourceMonitor` This is a subscriber of Instances. When a Instance get status of `ALLOCATION_FAILED`, `CloudResourceMonitor` record the node_type and set a lower its availability score. 2. `ResourceDemandScheduler` This class determines how to select the best node_type to handle resource request. I modify the part of selecting the best node type: ```python # Sort the results by score. results = sorted( results, key=lambda r: ( r.score, cloud_resource_availabilities.get(r.node.node_type, 1), ), reverse=True ) ``` The sorting includes: 2.1. UtilizationScore: to maximize resource utilization. 2.2. Cloud resource availabilities: prioritize node types with the most available cloud resources, in order to minimize allocation failures. --------- Signed-off-by: xiaowen.wxw <[email protected]> Co-authored-by: 行筠 <[email protected]>
1 parent 5458c75 commit 2de1c49

File tree

9 files changed

+337
-44
lines changed

9 files changed

+337
-44
lines changed

python/ray/autoscaler/v2/autoscaler.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
from ray.autoscaler.v2.instance_manager.subscribers.cloud_instance_updater import (
3333
CloudInstanceUpdater,
3434
)
35+
from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import (
36+
CloudResourceMonitor,
37+
)
3538
from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopper
3639
from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import (
3740
ThreadedRayInstaller,
@@ -76,6 +79,7 @@ def __init__(
7679
self._metrics_reporter = metrics_reporter
7780

7881
self._init_cloud_instance_provider(config, config_reader)
82+
self._cloud_resource_monitor = None
7983
self._init_instance_manager(
8084
session_name=session_name,
8185
config=config,
@@ -160,6 +164,8 @@ def _init_instance_manager(
160164
max_concurrent_installs=config.get_max_num_worker_nodes() or 50,
161165
)
162166
)
167+
self._cloud_resource_monitor = CloudResourceMonitor()
168+
subscribers.append(self._cloud_resource_monitor)
163169

164170
self._instance_manager = InstanceManager(
165171
instance_storage=instance_storage,
@@ -201,6 +207,7 @@ def update_autoscaling_state(
201207
instance_manager=self._instance_manager,
202208
scheduler=self._scheduler,
203209
cloud_provider=self._cloud_instance_provider,
210+
cloud_resource_monitor=self._cloud_resource_monitor,
204211
ray_cluster_resource_state=ray_cluster_resource_state,
205212
non_terminated_cloud_instances=(
206213
self._cloud_instance_provider.get_non_terminated()

python/ray/autoscaler/v2/instance_manager/common.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def is_cloud_instance_allocated(instance_status: Instance.InstanceStatus) -> boo
6666
Instance.TERMINATING,
6767
Instance.RAY_INSTALL_FAILED,
6868
Instance.TERMINATION_FAILED,
69+
Instance.ALLOCATION_TIMEOUT,
6970
}
7071

7172
@staticmethod
@@ -225,6 +226,10 @@ def get_valid_transitions() -> Dict[
225226
# Ray is already installed on the provisioned cloud
226227
# instance. It could be any valid ray status.
227228
Instance.RAY_RUNNING,
229+
# The cloud provider timed out for allocating running cloud instance.
230+
# The CloudResourceMonitor subscriber will lower this node-type's priority
231+
# in feature schedules.
232+
Instance.ALLOCATION_TIMEOUT,
228233
Instance.RAY_STOPPING,
229234
Instance.RAY_STOPPED,
230235
# Instance is requested to be stopped, e.g. instance leaked: no matching
@@ -284,6 +289,13 @@ def get_valid_transitions() -> Dict[
284289
# cloud instance somehow failed.
285290
Instance.TERMINATED,
286291
},
292+
# An instance has been allocated to a cloud instance, but the cloud
293+
# provider timed out for allocating running cloud instance, e.g. the
294+
# a kubernetes pod remains pending due to insufficient resources.
295+
Instance.ALLOCATION_TIMEOUT: {
296+
# Instance is requested to be stopped
297+
Instance.TERMINATING
298+
},
287299
# When in this status, the ray process is requested to be stopped to the
288300
# ray cluster, but not yet present in the dead ray node list reported by
289301
# the ray cluster.

python/ray/autoscaler/v2/instance_manager/reconciler.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
LaunchNodeError,
2222
TerminateNodeError,
2323
)
24+
from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import (
25+
CloudResourceMonitor,
26+
)
2427
from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopError
2528
from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import (
2629
RayInstallError,
@@ -62,6 +65,7 @@ def reconcile(
6265
instance_manager: InstanceManager,
6366
scheduler: IResourceScheduler,
6467
cloud_provider: ICloudInstanceProvider,
68+
cloud_resource_monitor: CloudResourceMonitor,
6569
ray_cluster_resource_state: ClusterResourceState,
6670
non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
6771
autoscaling_config: AutoscalingConfig,
@@ -88,6 +92,8 @@ def reconcile(
8892
8993
Args:
9094
instance_manager: The instance manager to reconcile.
95+
cloud_resource_monitor: The cloud resource monitor for monitoring resource
96+
availability of all node types.
9197
ray_cluster_resource_state: The ray cluster's resource state.
9298
non_terminated_cloud_instances: The non-terminated cloud instances from
9399
the cloud provider.
@@ -122,6 +128,7 @@ def reconcile(
122128
instance_manager=instance_manager,
123129
scheduler=scheduler,
124130
cloud_provider=cloud_provider,
131+
cloud_resource_monitor=cloud_resource_monitor,
125132
ray_cluster_resource_state=ray_cluster_resource_state,
126133
non_terminated_cloud_instances=non_terminated_cloud_instances,
127134
autoscaling_config=autoscaling_config,
@@ -161,26 +168,30 @@ def _sync_from(
161168
162169
More specifically, we will reconcile status transitions for:
163170
1. QUEUED/REQUESTED -> ALLOCATED:
164-
When a instance with launch request id (indicating a previous launch
171+
When an instance with launch request id (indicating a previous launch
165172
request was made) could be assigned to an unassigned cloud instance
166173
of the same instance type.
167174
2. REQUESTED -> ALLOCATION_FAILED:
168175
When there's an error from the cloud provider for launch failure so
169176
that the instance becomes ALLOCATION_FAILED.
170-
3. * -> RAY_RUNNING:
177+
3. ALLOCATED -> ALLOCATION_TIMEOUT:
178+
When an instance has been allocated to a cloud instance, but is stuck in
179+
this state. For example, a kubernetes pod remains pending due to
180+
insufficient resources.
181+
4. * -> RAY_RUNNING:
171182
When a ray node on a cloud instance joins the ray cluster, we will
172183
transition the instance to RAY_RUNNING.
173-
4. * -> TERMINATED:
184+
5. * -> TERMINATED:
174185
When the cloud instance is already terminated, we will transition the
175186
instance to TERMINATED.
176-
5. TERMINATING -> TERMINATION_FAILED:
187+
6. TERMINATING -> TERMINATION_FAILED:
177188
When there's an error from the cloud provider for termination failure.
178-
6. * -> RAY_STOPPED:
189+
7. * -> RAY_STOPPED:
179190
When ray was stopped on the cloud instance, we will transition the
180191
instance to RAY_STOPPED.
181-
7. * -> RAY_INSTALL_FAILED:
192+
8. * -> RAY_INSTALL_FAILED:
182193
When there's an error from RayInstaller.
183-
8. RAY_STOP_REQUESTED -> RAY_RUNNING:
194+
9. RAY_STOP_REQUESTED -> RAY_RUNNING:
184195
When requested to stop ray, but failed to stop/drain the ray node
185196
(e.g. idle termination drain rejected by the node).
186197
@@ -227,6 +238,7 @@ def _step_next(
227238
instance_manager: InstanceManager,
228239
scheduler: IResourceScheduler,
229240
cloud_provider: ICloudInstanceProvider,
241+
cloud_resource_monitor: CloudResourceMonitor,
230242
ray_cluster_resource_state: ClusterResourceState,
231243
non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
232244
autoscaling_config: AutoscalingConfig,
@@ -258,6 +270,8 @@ def _step_next(
258270
Args:
259271
instance_manager: The instance manager to reconcile.
260272
scheduler: The resource scheduler to make scaling decisions.
273+
cloud_resource_monitor: The cloud resource monitor for monitoring resource
274+
availability of all node types.
261275
ray_cluster_resource_state: The ray cluster's resource state.
262276
non_terminated_cloud_instances: The non-terminated cloud instances from
263277
the cloud provider.
@@ -275,6 +289,7 @@ def _step_next(
275289
Reconciler._scale_cluster(
276290
autoscaling_state=autoscaling_state,
277291
instance_manager=instance_manager,
292+
cloud_resource_monitor=cloud_resource_monitor,
278293
ray_state=ray_cluster_resource_state,
279294
scheduler=scheduler,
280295
autoscaling_config=autoscaling_config,
@@ -711,6 +726,7 @@ def _handle_ray_status_transition(
711726
f"{NodeStatus.Name(ray_node.status)}"
712727
),
713728
ray_node_id=ray_node_id,
729+
instance_type=im_instance.instance_type,
714730
)
715731

716732
Reconciler._update_instance_manager(instance_manager, version, updates)
@@ -932,8 +948,9 @@ def _handle_stuck_instances(
932948
update = Reconciler._handle_stuck_instance(
933949
instance,
934950
reconcile_config.allocate_status_timeout_s,
935-
new_status=IMInstance.TERMINATING,
951+
new_status=IMInstance.ALLOCATION_TIMEOUT,
936952
cloud_instance_id=instance.cloud_instance_id,
953+
instance_type=instance.instance_type,
937954
)
938955
if update:
939956
im_updates[instance.instance_id] = update
@@ -1060,6 +1077,7 @@ def _is_head_node_running(instance_manager: InstanceManager) -> bool:
10601077
def _scale_cluster(
10611078
autoscaling_state: AutoscalingState,
10621079
instance_manager: InstanceManager,
1080+
cloud_resource_monitor: CloudResourceMonitor,
10631081
ray_state: ClusterResourceState,
10641082
scheduler: IResourceScheduler,
10651083
autoscaling_config: AutoscalingConfig,
@@ -1075,6 +1093,8 @@ def _scale_cluster(
10751093
Args:
10761094
autoscaling_state: The autoscaling state to reconcile.
10771095
instance_manager: The instance manager to reconcile.
1096+
cloud_resource_monitor: The cloud resource monitor for monitoring resource
1097+
availability of all node types.
10781098
ray_state: The ray cluster's resource state.
10791099
scheduler: The resource scheduler to make scaling decisions.
10801100
autoscaling_config: The autoscaling config.
@@ -1119,6 +1139,9 @@ def _scale_cluster(
11191139
disable_launch_config_check=(
11201140
autoscaling_config.disable_launch_config_check()
11211141
),
1142+
cloud_resource_availabilities=(
1143+
cloud_resource_monitor.get_resource_availabilities()
1144+
),
11221145
)
11231146

11241147
# Ask scheduler for updates to the cluster shape.
@@ -1195,6 +1218,7 @@ def _terminate_instances(instance_manager: InstanceManager):
11951218
"""
11961219
Terminate instances with the below statuses:
11971220
- RAY_STOPPED: ray was stopped on the cloud instance.
1221+
- ALLOCATION_TIMEOUT: cloud provider timed out to allocate a running cloud instance.
11981222
- RAY_INSTALL_FAILED: ray installation failed on the cloud instance,
11991223
we will not retry.
12001224
- TERMINATION_FAILED: cloud provider failed to terminate the instance
@@ -1209,6 +1233,7 @@ def _terminate_instances(instance_manager: InstanceManager):
12091233
for instance in im_instances:
12101234
if instance.status not in [
12111235
IMInstance.RAY_STOPPED,
1236+
IMInstance.ALLOCATION_TIMEOUT,
12121237
IMInstance.RAY_INSTALL_FAILED,
12131238
IMInstance.TERMINATION_FAILED,
12141239
]:
@@ -1391,7 +1416,6 @@ def _handle_stuck_requested_instance(
13911416
instance, select_instance_status=IMInstance.REQUESTED
13921417
)
13931418
)
1394-
13951419
# Fail the allocation if we have tried too many times.
13961420
if len(all_request_times_ns) > max_num_retry_request_to_allocate:
13971421
return IMInstanceUpdateEvent(
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import logging
2+
import time
3+
from typing import Dict, List
4+
5+
from ray.autoscaler.v2.instance_manager.instance_manager import (
6+
InstanceUpdatedSubscriber,
7+
)
8+
from ray.autoscaler.v2.schema import NodeType
9+
from ray.core.generated.instance_manager_pb2 import Instance, InstanceUpdateEvent
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class CloudResourceMonitor(InstanceUpdatedSubscriber):
15+
"""CloudResourceMonitor records the availability of all node types.
16+
17+
In the Spot scenario, the resources in the cluster change dynamically.
18+
When scaling up, it is necessary to know which node types are most
19+
likely to have resources, in order to decide which type of node to request.
20+
21+
During scaling up, if resource of a node type is requested but fail to
22+
allocate, that type is considered unavailable at that timestamp.This class
23+
records the last timestamp at which a node type is unavailable,allowing the
24+
autoscaler to skip such node types when making future scaling decisions.
25+
"""
26+
27+
def __init__(
28+
self,
29+
) -> None:
30+
self._last_unavailable_timestamp: Dict[NodeType, float] = {}
31+
32+
def allocation_timeout(self, failed_event: InstanceUpdateEvent):
33+
unavailable_timestamp = time.time()
34+
self._last_unavailable_timestamp[
35+
failed_event.instance_type
36+
] = unavailable_timestamp
37+
logger.info(
38+
f"Cloud Resource Type {failed_event.instance_type} is "
39+
f"unavailable at timestamp={unavailable_timestamp}. "
40+
f"We will lower its priority in feature schedules."
41+
)
42+
43+
def allocation_succeeded(self, succeeded_event: InstanceUpdateEvent):
44+
if succeeded_event.instance_type in self._last_unavailable_timestamp:
45+
self._last_unavailable_timestamp.pop(succeeded_event.instance_type)
46+
logger.info(
47+
f"Cloud Resource Type {succeeded_event.instance_type} is "
48+
f"available at timestamp={time.time()}. We will higher its priority in "
49+
f"feature schedules."
50+
)
51+
52+
def notify(self, events: List[InstanceUpdateEvent]) -> None:
53+
for event in events:
54+
if event.new_instance_status == Instance.ALLOCATION_TIMEOUT:
55+
self.allocation_timeout(event)
56+
elif (
57+
event.new_instance_status == Instance.RAY_RUNNING
58+
and event.instance_type
59+
):
60+
self.allocation_succeeded(event)
61+
62+
def get_resource_availabilities(self) -> Dict[NodeType, float]:
63+
"""Calculate the availability scores of node types.
64+
Higher values indicate a higher likelihood of resource allocation.
65+
"""
66+
resource_availability_scores: Dict[NodeType, float] = {}
67+
if self._last_unavailable_timestamp:
68+
max_ts = max(self._last_unavailable_timestamp.values())
69+
for node_type in self._last_unavailable_timestamp:
70+
resource_availability_scores[node_type] = (
71+
1 - self._last_unavailable_timestamp[node_type] / max_ts
72+
)
73+
return resource_availability_scores

0 commit comments

Comments
 (0)