Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions python/ray/autoscaler/v2/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
from ray.autoscaler.v2.instance_manager.subscribers.cloud_instance_updater import (
CloudInstanceUpdater,
)
from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import (
CloudResourceMonitor,
)
from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopper
from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import (
ThreadedRayInstaller,
Expand Down Expand Up @@ -76,6 +79,7 @@ def __init__(
self._metrics_reporter = metrics_reporter

self._init_cloud_instance_provider(config, config_reader)
self._cloud_resource_monitor = None
self._init_instance_manager(
session_name=session_name,
config=config,
Expand Down Expand Up @@ -160,6 +164,8 @@ def _init_instance_manager(
max_concurrent_installs=config.get_max_num_worker_nodes() or 50,
)
)
self._cloud_resource_monitor = CloudResourceMonitor()
subscribers.append(self._cloud_resource_monitor)

self._instance_manager = InstanceManager(
instance_storage=instance_storage,
Expand Down Expand Up @@ -201,6 +207,7 @@ def update_autoscaling_state(
instance_manager=self._instance_manager,
scheduler=self._scheduler,
cloud_provider=self._cloud_instance_provider,
cloud_resource_monitor=self._cloud_resource_monitor,
ray_cluster_resource_state=ray_cluster_resource_state,
non_terminated_cloud_instances=(
self._cloud_instance_provider.get_non_terminated()
Expand Down
12 changes: 12 additions & 0 deletions python/ray/autoscaler/v2/instance_manager/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def is_cloud_instance_allocated(instance_status: Instance.InstanceStatus) -> boo
Instance.TERMINATING,
Instance.RAY_INSTALL_FAILED,
Instance.TERMINATION_FAILED,
Instance.ALLOCATION_TIMEOUT,
}

@staticmethod
Expand Down Expand Up @@ -225,6 +226,10 @@ def get_valid_transitions() -> Dict[
# Ray is already installed on the provisioned cloud
# instance. It could be any valid ray status.
Instance.RAY_RUNNING,
# The cloud provider timed out for allocating running cloud instance.
# The CloudResourceMonitor subscriber will lower this node-type's priority
# in feature schedules.
Instance.ALLOCATION_TIMEOUT,
Instance.RAY_STOPPING,
Instance.RAY_STOPPED,
# Instance is requested to be stopped, e.g. instance leaked: no matching
Expand Down Expand Up @@ -284,6 +289,13 @@ def get_valid_transitions() -> Dict[
# cloud instance somehow failed.
Instance.TERMINATED,
},
# An instance has been allocated to a cloud instance, but the cloud
# provider timed out for allocating running cloud instance, e.g. the
# a kubernetes pod remains pending due to insufficient resources.
Instance.ALLOCATION_TIMEOUT: {
# Instance is requested to be stopped
Instance.TERMINATING
},
# When in this status, the ray process is requested to be stopped to the
# ray cluster, but not yet present in the dead ray node list reported by
# the ray cluster.
Expand Down
42 changes: 33 additions & 9 deletions python/ray/autoscaler/v2/instance_manager/reconciler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
LaunchNodeError,
TerminateNodeError,
)
from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import (
CloudResourceMonitor,
)
from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopError
from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import (
RayInstallError,
Expand Down Expand Up @@ -62,6 +65,7 @@ def reconcile(
instance_manager: InstanceManager,
scheduler: IResourceScheduler,
cloud_provider: ICloudInstanceProvider,
cloud_resource_monitor: CloudResourceMonitor,
ray_cluster_resource_state: ClusterResourceState,
non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
autoscaling_config: AutoscalingConfig,
Expand All @@ -88,6 +92,8 @@ def reconcile(

Args:
instance_manager: The instance manager to reconcile.
cloud_resource_monitor: The cloud resource monitor for monitoring resource
availability of all node types.
ray_cluster_resource_state: The ray cluster's resource state.
non_terminated_cloud_instances: The non-terminated cloud instances from
the cloud provider.
Expand Down Expand Up @@ -122,6 +128,7 @@ def reconcile(
instance_manager=instance_manager,
scheduler=scheduler,
cloud_provider=cloud_provider,
cloud_resource_monitor=cloud_resource_monitor,
ray_cluster_resource_state=ray_cluster_resource_state,
non_terminated_cloud_instances=non_terminated_cloud_instances,
autoscaling_config=autoscaling_config,
Expand Down Expand Up @@ -161,26 +168,30 @@ def _sync_from(

More specifically, we will reconcile status transitions for:
1. QUEUED/REQUESTED -> ALLOCATED:
When a instance with launch request id (indicating a previous launch
When an instance with launch request id (indicating a previous launch
request was made) could be assigned to an unassigned cloud instance
of the same instance type.
2. REQUESTED -> ALLOCATION_FAILED:
When there's an error from the cloud provider for launch failure so
that the instance becomes ALLOCATION_FAILED.
3. * -> RAY_RUNNING:
3. ALLOCATED -> ALLOCATION_TIMEOUT:
When an instance has been allocated to a cloud instance, but is stuck in
this state. For example, a kubernetes pod remains pending due to
insufficient resources.
4. * -> RAY_RUNNING:
When a ray node on a cloud instance joins the ray cluster, we will
transition the instance to RAY_RUNNING.
4. * -> TERMINATED:
5. * -> TERMINATED:
When the cloud instance is already terminated, we will transition the
instance to TERMINATED.
5. TERMINATING -> TERMINATION_FAILED:
6. TERMINATING -> TERMINATION_FAILED:
When there's an error from the cloud provider for termination failure.
6. * -> RAY_STOPPED:
7. * -> RAY_STOPPED:
When ray was stopped on the cloud instance, we will transition the
instance to RAY_STOPPED.
7. * -> RAY_INSTALL_FAILED:
8. * -> RAY_INSTALL_FAILED:
When there's an error from RayInstaller.
8. RAY_STOP_REQUESTED -> RAY_RUNNING:
9. RAY_STOP_REQUESTED -> RAY_RUNNING:
When requested to stop ray, but failed to stop/drain the ray node
(e.g. idle termination drain rejected by the node).

Expand Down Expand Up @@ -227,6 +238,7 @@ def _step_next(
instance_manager: InstanceManager,
scheduler: IResourceScheduler,
cloud_provider: ICloudInstanceProvider,
cloud_resource_monitor: CloudResourceMonitor,
ray_cluster_resource_state: ClusterResourceState,
non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
autoscaling_config: AutoscalingConfig,
Expand Down Expand Up @@ -258,6 +270,8 @@ def _step_next(
Args:
instance_manager: The instance manager to reconcile.
scheduler: The resource scheduler to make scaling decisions.
cloud_resource_monitor: The cloud resource monitor for monitoring resource
availability of all node types.
ray_cluster_resource_state: The ray cluster's resource state.
non_terminated_cloud_instances: The non-terminated cloud instances from
the cloud provider.
Expand All @@ -275,6 +289,7 @@ def _step_next(
Reconciler._scale_cluster(
autoscaling_state=autoscaling_state,
instance_manager=instance_manager,
cloud_resource_monitor=cloud_resource_monitor,
ray_state=ray_cluster_resource_state,
scheduler=scheduler,
autoscaling_config=autoscaling_config,
Expand Down Expand Up @@ -711,6 +726,7 @@ def _handle_ray_status_transition(
f"{NodeStatus.Name(ray_node.status)}"
),
ray_node_id=ray_node_id,
instance_type=im_instance.instance_type,
)

Reconciler._update_instance_manager(instance_manager, version, updates)
Expand Down Expand Up @@ -932,8 +948,9 @@ def _handle_stuck_instances(
update = Reconciler._handle_stuck_instance(
instance,
reconcile_config.allocate_status_timeout_s,
new_status=IMInstance.TERMINATING,
new_status=IMInstance.ALLOCATION_TIMEOUT,
cloud_instance_id=instance.cloud_instance_id,
instance_type=instance.instance_type,
)
if update:
im_updates[instance.instance_id] = update
Expand Down Expand Up @@ -1060,6 +1077,7 @@ def _is_head_node_running(instance_manager: InstanceManager) -> bool:
def _scale_cluster(
autoscaling_state: AutoscalingState,
instance_manager: InstanceManager,
cloud_resource_monitor: CloudResourceMonitor,
ray_state: ClusterResourceState,
scheduler: IResourceScheduler,
autoscaling_config: AutoscalingConfig,
Expand All @@ -1075,6 +1093,8 @@ def _scale_cluster(
Args:
autoscaling_state: The autoscaling state to reconcile.
instance_manager: The instance manager to reconcile.
cloud_resource_monitor: The cloud resource monitor for monitoring resource
availability of all node types.
ray_state: The ray cluster's resource state.
scheduler: The resource scheduler to make scaling decisions.
autoscaling_config: The autoscaling config.
Expand Down Expand Up @@ -1119,6 +1139,9 @@ def _scale_cluster(
disable_launch_config_check=(
autoscaling_config.disable_launch_config_check()
),
cloud_resource_availabilities=(
cloud_resource_monitor.get_resource_availabilities()
),
)

# Ask scheduler for updates to the cluster shape.
Expand Down Expand Up @@ -1195,6 +1218,7 @@ def _terminate_instances(instance_manager: InstanceManager):
"""
Terminate instances with the below statuses:
- RAY_STOPPED: ray was stopped on the cloud instance.
- ALLOCATION_TIMEOUT: cloud provider timed out to allocate a running cloud instance.
- RAY_INSTALL_FAILED: ray installation failed on the cloud instance,
we will not retry.
- TERMINATION_FAILED: cloud provider failed to terminate the instance
Expand All @@ -1209,6 +1233,7 @@ def _terminate_instances(instance_manager: InstanceManager):
for instance in im_instances:
if instance.status not in [
IMInstance.RAY_STOPPED,
IMInstance.ALLOCATION_TIMEOUT,
IMInstance.RAY_INSTALL_FAILED,
IMInstance.TERMINATION_FAILED,
]:
Expand Down Expand Up @@ -1391,7 +1416,6 @@ def _handle_stuck_requested_instance(
instance, select_instance_status=IMInstance.REQUESTED
)
)

# Fail the allocation if we have tried too many times.
if len(all_request_times_ns) > max_num_retry_request_to_allocate:
return IMInstanceUpdateEvent(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import logging
import time
from typing import Dict, List

from ray.autoscaler.v2.instance_manager.instance_manager import (
InstanceUpdatedSubscriber,
)
from ray.autoscaler.v2.schema import NodeType
from ray.core.generated.instance_manager_pb2 import Instance, InstanceUpdateEvent

logger = logging.getLogger(__name__)


class CloudResourceMonitor(InstanceUpdatedSubscriber):
"""CloudResourceMonitor records the availability of all node types.

In the Spot scenario, the resources in the cluster change dynamically.
When scaling up, it is necessary to know which node types are most
likely to have resources, in order to decide which type of node to request.

During scaling up, if resource of a node type is requested but fail to
allocate, that type is considered unavailable at that timestamp.This class
records the last timestamp at which a node type is unavailable,allowing the
autoscaler to skip such node types when making future scaling decisions.
"""

def __init__(
self,
) -> None:
self._last_unavailable_timestamp: Dict[NodeType, float] = {}

def allocation_timeout(self, failed_event: InstanceUpdateEvent):
unavailable_timestamp = time.time()
self._last_unavailable_timestamp[
failed_event.instance_type
] = unavailable_timestamp
logger.info(
f"Cloud Resource Type {failed_event.instance_type} is "
f"unavailable at timestamp={unavailable_timestamp}. "
f"We will lower its priority in feature schedules."
)

def allocation_succeeded(self, succeeded_event: InstanceUpdateEvent):
if succeeded_event.instance_type in self._last_unavailable_timestamp:
self._last_unavailable_timestamp.pop(succeeded_event.instance_type)
logger.info(
f"Cloud Resource Type {succeeded_event.instance_type} is "
f"available at timestamp={time.time()}. We will higher its priority in "
f"feature schedules."
)

def notify(self, events: List[InstanceUpdateEvent]) -> None:
for event in events:
if event.new_instance_status == Instance.ALLOCATION_TIMEOUT:
self.allocation_timeout(event)
elif (
event.new_instance_status == Instance.RAY_RUNNING
and event.instance_type
):
self.allocation_succeeded(event)

def get_resource_availabilities(self) -> Dict[NodeType, float]:
"""Calculate the availability scores of node types.
Higher values indicate a higher likelihood of resource allocation.
"""
resource_availability_scores: Dict[NodeType, float] = {}
if self._last_unavailable_timestamp:
max_ts = max(self._last_unavailable_timestamp.values())
for node_type in self._last_unavailable_timestamp:
resource_availability_scores[node_type] = (
1 - self._last_unavailable_timestamp[node_type] / max_ts
)
return resource_availability_scores
Loading