Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions python/ray/autoscaler/v2/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
from ray.autoscaler.v2.instance_manager.subscribers.cloud_instance_updater import (
CloudInstanceUpdater,
)
from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import (
CloudResourceMonitor,
)
from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopper
from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import (
ThreadedRayInstaller,
Expand Down Expand Up @@ -76,6 +79,7 @@ def __init__(
self._metrics_reporter = metrics_reporter

self._init_cloud_instance_provider(config, config_reader)
self.cloud_resource_monitor = None
self._init_instance_manager(
session_name=session_name,
config=config,
Expand Down Expand Up @@ -160,6 +164,10 @@ def _init_instance_manager(
max_concurrent_installs=config.get_max_num_worker_nodes() or 50,
)
)
self.cloud_resource_monitor = CloudResourceMonitor(
instance_storage=instance_storage
)
subscribers.append(self.cloud_resource_monitor)

self._instance_manager = InstanceManager(
instance_storage=instance_storage,
Expand Down Expand Up @@ -201,6 +209,7 @@ def update_autoscaling_state(
instance_manager=self._instance_manager,
scheduler=self._scheduler,
cloud_provider=self._cloud_instance_provider,
cloud_resource_monitor=self.cloud_resource_monitor,
ray_cluster_resource_state=ray_cluster_resource_state,
non_terminated_cloud_instances=(
self._cloud_instance_provider.get_non_terminated()
Expand Down
76 changes: 55 additions & 21 deletions python/ray/autoscaler/v2/instance_manager/reconciler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
LaunchNodeError,
TerminateNodeError,
)
from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import (
CloudResourceMonitor
)
from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopError
from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import (
RayInstallError,
Expand Down Expand Up @@ -62,6 +65,7 @@ def reconcile(
instance_manager: InstanceManager,
scheduler: IResourceScheduler,
cloud_provider: ICloudInstanceProvider,
cloud_resource_monitor: CloudResourceMonitor,
ray_cluster_resource_state: ClusterResourceState,
non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
autoscaling_config: AutoscalingConfig,
Expand All @@ -88,6 +92,8 @@ def reconcile(

Args:
instance_manager: The instance manager to reconcile.
cloud_resource_monitor: The cloud resource monitor for monitoring resource
availability of all node types.
ray_cluster_resource_state: The ray cluster's resource state.
non_terminated_cloud_instances: The non-terminated cloud instances from
the cloud provider.
Expand Down Expand Up @@ -122,6 +128,7 @@ def reconcile(
instance_manager=instance_manager,
scheduler=scheduler,
cloud_provider=cloud_provider,
cloud_resource_monitor=cloud_resource_monitor,
ray_cluster_resource_state=ray_cluster_resource_state,
non_terminated_cloud_instances=non_terminated_cloud_instances,
autoscaling_config=autoscaling_config,
Expand Down Expand Up @@ -162,8 +169,8 @@ def _sync_from(
More specifically, we will reconcile status transitions for:
1. QUEUED/REQUESTED -> ALLOCATED:
When a instance with launch request id (indicating a previous launch
request was made) could be assigned to an unassigned cloud instance
of the same instance type.
request was made) could be assigned to an unassigned running cloud
instance of the same instance type.
2. REQUESTED -> ALLOCATION_FAILED:
When there's an error from the cloud provider for launch failure so
that the instance becomes ALLOCATION_FAILED.
Expand Down Expand Up @@ -227,6 +234,7 @@ def _step_next(
instance_manager: InstanceManager,
scheduler: IResourceScheduler,
cloud_provider: ICloudInstanceProvider,
cloud_resource_monitor: CloudResourceMonitor,
ray_cluster_resource_state: ClusterResourceState,
non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
autoscaling_config: AutoscalingConfig,
Expand Down Expand Up @@ -258,6 +266,8 @@ def _step_next(
Args:
instance_manager: The instance manager to reconcile.
scheduler: The resource scheduler to make scaling decisions.
cloud_resource_monitor: The cloud resource monitor for monitoring resource
availability of all node types.
ray_cluster_resource_state: The ray cluster's resource state.
non_terminated_cloud_instances: The non-terminated cloud instances from
the cloud provider.
Expand All @@ -275,6 +285,7 @@ def _step_next(
Reconciler._scale_cluster(
autoscaling_state=autoscaling_state,
instance_manager=instance_manager,
cloud_resource_monitor=cloud_resource_monitor,
ray_state=ray_cluster_resource_state,
scheduler=scheduler,
autoscaling_config=autoscaling_config,
Expand Down Expand Up @@ -337,7 +348,10 @@ def _handle_cloud_instance_allocation(
] = defaultdict(list)

for cloud_instance_id, cloud_instance in non_terminated_cloud_instances.items():
if cloud_instance_id not in assigned_cloud_instance_ids:
if (
cloud_instance_id not in assigned_cloud_instance_ids
and cloud_instance.is_running
):
unassigned_cloud_instances_by_type[cloud_instance.node_type].append(
cloud_instance
)
Expand Down Expand Up @@ -374,7 +388,8 @@ def _try_resolve_pending_allocation(

Args:
im_instance: The instance to allocate or fail.
unassigned_cloud_instances_by_type: The unassigned cloud instances by type.
unassigned_cloud_instances_by_type: The unassigned running cloud
instances by type.
launch_errors: The launch errors from the cloud provider.

Returns:
Expand Down Expand Up @@ -415,6 +430,7 @@ def _try_resolve_pending_allocation(
return IMInstanceUpdateEvent(
instance_id=im_instance.instance_id,
new_instance_status=IMInstance.ALLOCATION_FAILED,
instance_type=im_instance.instance_type,
details=f"launch failed with {str(launch_error)}",
)
# No update.
Expand Down Expand Up @@ -1060,6 +1076,7 @@ def _is_head_node_running(instance_manager: InstanceManager) -> bool:
def _scale_cluster(
autoscaling_state: AutoscalingState,
instance_manager: InstanceManager,
cloud_resource_monitor: CloudResourceMonitor,
ray_state: ClusterResourceState,
scheduler: IResourceScheduler,
autoscaling_config: AutoscalingConfig,
Expand All @@ -1075,6 +1092,8 @@ def _scale_cluster(
Args:
autoscaling_state: The autoscaling state to reconcile.
instance_manager: The instance manager to reconcile.
cloud_resource_monitor: The cloud resource monitor for monitoring resource
availability of all node types.
ray_state: The ray cluster's resource state.
scheduler: The resource scheduler to make scaling decisions.
autoscaling_config: The autoscaling config.
Expand Down Expand Up @@ -1119,6 +1138,9 @@ def _scale_cluster(
disable_launch_config_check=(
autoscaling_config.disable_launch_config_check()
),
cloud_resource_availabilities=(
cloud_resource_monitor.get_resource_availabilities()
)
)

# Ask scheduler for updates to the cluster shape.
Expand Down Expand Up @@ -1195,6 +1217,8 @@ def _terminate_instances(instance_manager: InstanceManager):
"""
Terminate instances with the below statuses:
- RAY_STOPPED: ray was stopped on the cloud instance.
- ALLOCATION_FAILED: ray request for resource but the cloud provider
fail to allocate.
- RAY_INSTALL_FAILED: ray installation failed on the cloud instance,
we will not retry.
- TERMINATION_FAILED: cloud provider failed to terminate the instance
Expand All @@ -1209,6 +1233,7 @@ def _terminate_instances(instance_manager: InstanceManager):
for instance in im_instances:
if instance.status not in [
IMInstance.RAY_STOPPED,
IMInstance.ALLOCATION_FAILED,
IMInstance.RAY_INSTALL_FAILED,
IMInstance.TERMINATION_FAILED,
]:
Expand Down Expand Up @@ -1391,12 +1416,12 @@ def _handle_stuck_requested_instance(
instance, select_instance_status=IMInstance.REQUESTED
)
)

# Fail the allocation if we have tried too many times.
if len(all_request_times_ns) > max_num_retry_request_to_allocate:
return IMInstanceUpdateEvent(
instance_id=instance.instance_id,
new_instance_status=IMInstance.ALLOCATION_FAILED,
instance_type=instance.instance_type,
details=(
"failed to allocate cloud instance after "
f"{len(all_request_times_ns)} attempts > "
Expand Down Expand Up @@ -1478,7 +1503,7 @@ def _handle_extra_cloud_instances(
ray_nodes: The ray cluster's states of ray nodes.
"""
Reconciler._handle_extra_cloud_instances_from_ray_nodes(
instance_manager, ray_nodes
instance_manager, ray_nodes, non_terminated_cloud_instances
)
Reconciler._handle_extra_cloud_instances_from_cloud_provider(
instance_manager, non_terminated_cloud_instances
Expand Down Expand Up @@ -1514,6 +1539,8 @@ def _handle_extra_cloud_instances_from_cloud_provider(
for cloud_instance_id, cloud_instance in non_terminated_cloud_instances.items():
if cloud_instance_id in cloud_instance_ids_managed_by_im:
continue
if not cloud_instance.is_running:
continue
updates[cloud_instance_id] = IMInstanceUpdateEvent(
instance_id=InstanceUtil.random_instance_id(), # Assign a new id.
cloud_instance_id=cloud_instance_id,
Expand All @@ -1531,7 +1558,9 @@ def _handle_extra_cloud_instances_from_cloud_provider(

@staticmethod
def _handle_extra_cloud_instances_from_ray_nodes(
instance_manager: InstanceManager, ray_nodes: List[NodeState]
instance_manager: InstanceManager,
ray_nodes: List[NodeState],
non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance],
):
"""
For extra cloud instances reported by Ray but not managed by the instance
Expand All @@ -1540,6 +1569,8 @@ def _handle_extra_cloud_instances_from_ray_nodes(
Args:
instance_manager: The instance manager to reconcile.
ray_nodes: The ray cluster's states of ray nodes.
non_terminated_cloud_instances: The non-terminated cloud instances from
the cloud provider.
"""
updates = {}

Expand Down Expand Up @@ -1568,20 +1599,23 @@ def _handle_extra_cloud_instances_from_ray_nodes(
if cloud_instance_id in cloud_instance_ids_managed_by_im:
continue

is_head = is_head_node(ray_node)
updates[ray_node_id] = IMInstanceUpdateEvent(
instance_id=InstanceUtil.random_instance_id(), # Assign a new id.
cloud_instance_id=cloud_instance_id,
new_instance_status=IMInstance.ALLOCATED,
node_kind=NodeKind.HEAD if is_head else NodeKind.WORKER,
ray_node_id=ray_node_id,
instance_type=ray_node.ray_node_type_name,
details=(
"allocated unmanaged worker cloud instance from ray node: "
f"{ray_node_id}"
),
upsert=True,
)
if cloud_instance_id in non_terminated_cloud_instances:
cloud_instance = non_terminated_cloud_instances[cloud_instance_id]
if cloud_instance.is_running:
is_head = is_head_node(ray_node)
updates[ray_node_id] = IMInstanceUpdateEvent(
instance_id=InstanceUtil.random_instance_id(), # Assign a new id.
cloud_instance_id=cloud_instance_id,
new_instance_status=IMInstance.ALLOCATED,
node_kind=NodeKind.HEAD if is_head else NodeKind.WORKER,
ray_node_id=ray_node_id,
instance_type=ray_node.ray_node_type_name,
details=(
"allocated unmanaged worker cloud instance from ray node: "
f"{ray_node_id}"
),
upsert=True,
)

Reconciler._update_instance_manager(instance_manager, version, updates)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import logging
import time
from typing import List, Dict

from ray.autoscaler.v2.instance_manager.common import InstanceUtil
from ray.autoscaler.v2.instance_manager.instance_manager import (
InstanceUpdatedSubscriber
)
from ray.autoscaler.v2.instance_manager.instance_storage import InstanceStorage
from ray.core.generated.instance_manager_pb2 import Instance, InstanceUpdateEvent
from ray.autoscaler.v2.schema import NodeType

logger = logging.getLogger(__name__)

class CloudResourceAvailability:
"""CloudResourceAvailability indicates the availability of a type of
cloud resource.

During scaling up, if resource of a node type is requested but fail to
allocate, that type is considered unavailable at that timestamp.
This class records the last timestamp at which a node type is unavailable,
allowing the autoscaler to skip such node types when making future scaling
decisions.
"""

# The node type of cloud resource.
_node_type: NodeType
# Timestamp of the last failed resource allocation.
_last_unavailable_timestamp: int

def __init__(
self,
node_type: NodeType,
last_unavailability_timestamp: int
):
self._node_type = node_type
self._last_unavailable_timestamp = last_unavailability_timestamp

def get_last_unavailability_timestamp(self) -> int:
return self._last_unavailable_timestamp

def set_last_unavailability_timestamp(self, timestamp: int):
self._last_unavailable_timestamp = timestamp


class CloudResourceMonitor(InstanceUpdatedSubscriber):
"""CloudResourceMonitor records the availability of all node types.

In the Spot scenario, the resources in the cluster change dynamically.
When scaling up, it is necessary to know which node types are most
likely to have resources, in order to decide which type of node to request.
"""
def __init__(
self,
instance_storage: InstanceStorage,
) -> None:
self._instance_storage = instance_storage
self._resource_availabilities: Dict[NodeType, CloudResourceAvailability] = {}

def allocation_failed(self, failed_event: InstanceUpdateEvent):
instances, _ = self._instance_storage.get_instances(
instance_ids=[failed_event.instance_id]
)
for instance in instances.values():
last_status = InstanceUtil.get_last_status_transition(instance)
if last_status:
last_unavailability_timestamp=(last_status.timestamp_ns) / 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There's an inconsistency in the time units used for last_unavailability_timestamp. Here, last_status.timestamp_ns (in nanoseconds) is divided by 1000, resulting in microseconds. However, in the else block on line 69, time.time() is used, which returns seconds.

This will lead to incorrect availability scores because the timestamps are not comparable. All timestamps should be in the same unit. I suggest using seconds consistently.

Suggested change
last_unavailability_timestamp=(last_status.timestamp_ns) / 1000
last_unavailability_timestamp = last_status.timestamp_ns / 1e9

else:
last_unavailability_timestamp = time.time()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Timestamp Unit Mismatch Causes Availability Errors

Unit mismatch in timestamp handling. Line 67 converts timestamp_ns (nanoseconds) to microseconds by dividing by 1000, but line 69 uses time.time() which returns seconds. This creates inconsistent units for last_unavailability_timestamp, causing incorrect availability score calculations in get_resource_availabilities(). Both should use the same unit, likely microseconds by changing line 69 to time.time() * 1e6.

Fix in Cursor Fix in Web

self._resource_availabilities[instance.instance_type] = (
CloudResourceAvailability(
node_type=instance.instance_type,
last_unavailability_timestamp=last_unavailability_timestamp
)
)
logger.debug(f"Cloud Resource Type {instance.instance_type} is "
f"unavailable at timestamp={last_unavailability_timestamp}. "
f"We will lower its priority in feature schedules."
)

def allocation_succeeded(self, succeeded_event: InstanceUpdateEvent):
if succeeded_event.instance_type in self._resource_availabilities:
self._resource_availabilities[
succeeded_event.instance_type
].set_last_unavailability_timestamp(0)
logger.debug(f"Cloud Resource Type {succeeded_event.instance_type} is "
f"available. We will prioritize scheduling this type "
f"in feature schedules."
)

def notify(self, events: List[InstanceUpdateEvent]) -> None:
for event in events:
if event.new_instance_status == Instance.ALLOCATION_FAILED:
self.allocation_failed(event)
elif event.new_instance_status == Instance.ALLOCATED:
self.allocation_succeeded(event)

def get_resource_availabilities(self) -> Dict[NodeType, float]:
"""Calculate the availability scores of node types.
Higher values indicate a higher likelihood of resource allocation.
"""
resource_availability_scores: Dict[NodeType, float] = {}
if self._resource_availabilities:
max_ts = max(
[
r.get_last_unavailability_timestamp()
for r in self._resource_availabilities.values()
]
)
for node_type in self._resource_availabilities:
resource_availability_scores[node_type] = (
1 -
self._resource_availabilities[node_type].get_last_unavailability_timestamp()
/ max_ts
) if max_ts > 0 else 1
return resource_availability_scores
Loading