diff --git a/python/ray/autoscaler/v2/autoscaler.py b/python/ray/autoscaler/v2/autoscaler.py index cdc8620bc2b7..bbf7b1144ff0 100644 --- a/python/ray/autoscaler/v2/autoscaler.py +++ b/python/ray/autoscaler/v2/autoscaler.py @@ -32,6 +32,9 @@ from ray.autoscaler.v2.instance_manager.subscribers.cloud_instance_updater import ( CloudInstanceUpdater, ) +from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import ( + CloudResourceMonitor, +) from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopper from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import ( ThreadedRayInstaller, @@ -76,6 +79,7 @@ def __init__( self._metrics_reporter = metrics_reporter self._init_cloud_instance_provider(config, config_reader) + self.cloud_resource_monitor = None self._init_instance_manager( session_name=session_name, config=config, @@ -160,6 +164,10 @@ def _init_instance_manager( max_concurrent_installs=config.get_max_num_worker_nodes() or 50, ) ) + self.cloud_resource_monitor = CloudResourceMonitor( + instance_storage=instance_storage + ) + subscribers.append(self.cloud_resource_monitor) self._instance_manager = InstanceManager( instance_storage=instance_storage, @@ -201,6 +209,7 @@ def update_autoscaling_state( instance_manager=self._instance_manager, scheduler=self._scheduler, cloud_provider=self._cloud_instance_provider, + cloud_resource_monitor=self.cloud_resource_monitor, ray_cluster_resource_state=ray_cluster_resource_state, non_terminated_cloud_instances=( self._cloud_instance_provider.get_non_terminated() diff --git a/python/ray/autoscaler/v2/instance_manager/reconciler.py b/python/ray/autoscaler/v2/instance_manager/reconciler.py index b403803e577b..ca6539be96e0 100644 --- a/python/ray/autoscaler/v2/instance_manager/reconciler.py +++ b/python/ray/autoscaler/v2/instance_manager/reconciler.py @@ -21,6 +21,9 @@ LaunchNodeError, TerminateNodeError, ) +from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import ( + CloudResourceMonitor +) from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopError from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import ( RayInstallError, @@ -62,6 +65,7 @@ def reconcile( instance_manager: InstanceManager, scheduler: IResourceScheduler, cloud_provider: ICloudInstanceProvider, + cloud_resource_monitor: CloudResourceMonitor, ray_cluster_resource_state: ClusterResourceState, non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance], autoscaling_config: AutoscalingConfig, @@ -88,6 +92,8 @@ def reconcile( Args: instance_manager: The instance manager to reconcile. + cloud_resource_monitor: The cloud resource monitor for monitoring resource + availability of all node types. ray_cluster_resource_state: The ray cluster's resource state. non_terminated_cloud_instances: The non-terminated cloud instances from the cloud provider. @@ -122,6 +128,7 @@ def reconcile( instance_manager=instance_manager, scheduler=scheduler, cloud_provider=cloud_provider, + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ray_cluster_resource_state, non_terminated_cloud_instances=non_terminated_cloud_instances, autoscaling_config=autoscaling_config, @@ -162,8 +169,8 @@ def _sync_from( More specifically, we will reconcile status transitions for: 1. QUEUED/REQUESTED -> ALLOCATED: When a instance with launch request id (indicating a previous launch - request was made) could be assigned to an unassigned cloud instance - of the same instance type. + request was made) could be assigned to an unassigned running cloud + instance of the same instance type. 2. REQUESTED -> ALLOCATION_FAILED: When there's an error from the cloud provider for launch failure so that the instance becomes ALLOCATION_FAILED. @@ -227,6 +234,7 @@ def _step_next( instance_manager: InstanceManager, scheduler: IResourceScheduler, cloud_provider: ICloudInstanceProvider, + cloud_resource_monitor: CloudResourceMonitor, ray_cluster_resource_state: ClusterResourceState, non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance], autoscaling_config: AutoscalingConfig, @@ -258,6 +266,8 @@ def _step_next( Args: instance_manager: The instance manager to reconcile. scheduler: The resource scheduler to make scaling decisions. + cloud_resource_monitor: The cloud resource monitor for monitoring resource + availability of all node types. ray_cluster_resource_state: The ray cluster's resource state. non_terminated_cloud_instances: The non-terminated cloud instances from the cloud provider. @@ -275,6 +285,7 @@ def _step_next( Reconciler._scale_cluster( autoscaling_state=autoscaling_state, instance_manager=instance_manager, + cloud_resource_monitor=cloud_resource_monitor, ray_state=ray_cluster_resource_state, scheduler=scheduler, autoscaling_config=autoscaling_config, @@ -337,7 +348,10 @@ def _handle_cloud_instance_allocation( ] = defaultdict(list) for cloud_instance_id, cloud_instance in non_terminated_cloud_instances.items(): - if cloud_instance_id not in assigned_cloud_instance_ids: + if ( + cloud_instance_id not in assigned_cloud_instance_ids + and cloud_instance.is_running + ): unassigned_cloud_instances_by_type[cloud_instance.node_type].append( cloud_instance ) @@ -374,7 +388,8 @@ def _try_resolve_pending_allocation( Args: im_instance: The instance to allocate or fail. - unassigned_cloud_instances_by_type: The unassigned cloud instances by type. + unassigned_cloud_instances_by_type: The unassigned running cloud + instances by type. launch_errors: The launch errors from the cloud provider. Returns: @@ -415,6 +430,7 @@ def _try_resolve_pending_allocation( return IMInstanceUpdateEvent( instance_id=im_instance.instance_id, new_instance_status=IMInstance.ALLOCATION_FAILED, + instance_type=im_instance.instance_type, details=f"launch failed with {str(launch_error)}", ) # No update. @@ -1060,6 +1076,7 @@ def _is_head_node_running(instance_manager: InstanceManager) -> bool: def _scale_cluster( autoscaling_state: AutoscalingState, instance_manager: InstanceManager, + cloud_resource_monitor: CloudResourceMonitor, ray_state: ClusterResourceState, scheduler: IResourceScheduler, autoscaling_config: AutoscalingConfig, @@ -1075,6 +1092,8 @@ def _scale_cluster( Args: autoscaling_state: The autoscaling state to reconcile. instance_manager: The instance manager to reconcile. + cloud_resource_monitor: The cloud resource monitor for monitoring resource + availability of all node types. ray_state: The ray cluster's resource state. scheduler: The resource scheduler to make scaling decisions. autoscaling_config: The autoscaling config. @@ -1119,6 +1138,9 @@ def _scale_cluster( disable_launch_config_check=( autoscaling_config.disable_launch_config_check() ), + cloud_resource_availabilities=( + cloud_resource_monitor.get_resource_availabilities() + ) ) # Ask scheduler for updates to the cluster shape. @@ -1195,6 +1217,8 @@ def _terminate_instances(instance_manager: InstanceManager): """ Terminate instances with the below statuses: - RAY_STOPPED: ray was stopped on the cloud instance. + - ALLOCATION_FAILED: ray request for resource but the cloud provider + fail to allocate. - RAY_INSTALL_FAILED: ray installation failed on the cloud instance, we will not retry. - TERMINATION_FAILED: cloud provider failed to terminate the instance @@ -1209,6 +1233,7 @@ def _terminate_instances(instance_manager: InstanceManager): for instance in im_instances: if instance.status not in [ IMInstance.RAY_STOPPED, + IMInstance.ALLOCATION_FAILED, IMInstance.RAY_INSTALL_FAILED, IMInstance.TERMINATION_FAILED, ]: @@ -1391,12 +1416,12 @@ def _handle_stuck_requested_instance( instance, select_instance_status=IMInstance.REQUESTED ) ) - # Fail the allocation if we have tried too many times. if len(all_request_times_ns) > max_num_retry_request_to_allocate: return IMInstanceUpdateEvent( instance_id=instance.instance_id, new_instance_status=IMInstance.ALLOCATION_FAILED, + instance_type=instance.instance_type, details=( "failed to allocate cloud instance after " f"{len(all_request_times_ns)} attempts > " @@ -1478,7 +1503,7 @@ def _handle_extra_cloud_instances( ray_nodes: The ray cluster's states of ray nodes. """ Reconciler._handle_extra_cloud_instances_from_ray_nodes( - instance_manager, ray_nodes + instance_manager, ray_nodes, non_terminated_cloud_instances ) Reconciler._handle_extra_cloud_instances_from_cloud_provider( instance_manager, non_terminated_cloud_instances @@ -1514,6 +1539,8 @@ def _handle_extra_cloud_instances_from_cloud_provider( for cloud_instance_id, cloud_instance in non_terminated_cloud_instances.items(): if cloud_instance_id in cloud_instance_ids_managed_by_im: continue + if not cloud_instance.is_running: + continue updates[cloud_instance_id] = IMInstanceUpdateEvent( instance_id=InstanceUtil.random_instance_id(), # Assign a new id. cloud_instance_id=cloud_instance_id, @@ -1531,7 +1558,9 @@ def _handle_extra_cloud_instances_from_cloud_provider( @staticmethod def _handle_extra_cloud_instances_from_ray_nodes( - instance_manager: InstanceManager, ray_nodes: List[NodeState] + instance_manager: InstanceManager, + ray_nodes: List[NodeState], + non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance], ): """ For extra cloud instances reported by Ray but not managed by the instance @@ -1540,6 +1569,8 @@ def _handle_extra_cloud_instances_from_ray_nodes( Args: instance_manager: The instance manager to reconcile. ray_nodes: The ray cluster's states of ray nodes. + non_terminated_cloud_instances: The non-terminated cloud instances from + the cloud provider. """ updates = {} @@ -1568,20 +1599,23 @@ def _handle_extra_cloud_instances_from_ray_nodes( if cloud_instance_id in cloud_instance_ids_managed_by_im: continue - is_head = is_head_node(ray_node) - updates[ray_node_id] = IMInstanceUpdateEvent( - instance_id=InstanceUtil.random_instance_id(), # Assign a new id. - cloud_instance_id=cloud_instance_id, - new_instance_status=IMInstance.ALLOCATED, - node_kind=NodeKind.HEAD if is_head else NodeKind.WORKER, - ray_node_id=ray_node_id, - instance_type=ray_node.ray_node_type_name, - details=( - "allocated unmanaged worker cloud instance from ray node: " - f"{ray_node_id}" - ), - upsert=True, - ) + if cloud_instance_id in non_terminated_cloud_instances: + cloud_instance = non_terminated_cloud_instances[cloud_instance_id] + if cloud_instance.is_running: + is_head = is_head_node(ray_node) + updates[ray_node_id] = IMInstanceUpdateEvent( + instance_id=InstanceUtil.random_instance_id(), # Assign a new id. + cloud_instance_id=cloud_instance_id, + new_instance_status=IMInstance.ALLOCATED, + node_kind=NodeKind.HEAD if is_head else NodeKind.WORKER, + ray_node_id=ray_node_id, + instance_type=ray_node.ray_node_type_name, + details=( + "allocated unmanaged worker cloud instance from ray node: " + f"{ray_node_id}" + ), + upsert=True, + ) Reconciler._update_instance_manager(instance_manager, version, updates) diff --git a/python/ray/autoscaler/v2/instance_manager/subscribers/cloud_resource_monitor.py b/python/ray/autoscaler/v2/instance_manager/subscribers/cloud_resource_monitor.py new file mode 100644 index 000000000000..02b85caeab7f --- /dev/null +++ b/python/ray/autoscaler/v2/instance_manager/subscribers/cloud_resource_monitor.py @@ -0,0 +1,116 @@ +import logging +import time +from typing import List, Dict + +from ray.autoscaler.v2.instance_manager.common import InstanceUtil +from ray.autoscaler.v2.instance_manager.instance_manager import ( + InstanceUpdatedSubscriber +) +from ray.autoscaler.v2.instance_manager.instance_storage import InstanceStorage +from ray.core.generated.instance_manager_pb2 import Instance, InstanceUpdateEvent +from ray.autoscaler.v2.schema import NodeType + +logger = logging.getLogger(__name__) + +class CloudResourceAvailability: + """CloudResourceAvailability indicates the availability of a type of + cloud resource. + + During scaling up, if resource of a node type is requested but fail to + allocate, that type is considered unavailable at that timestamp. + This class records the last timestamp at which a node type is unavailable, + allowing the autoscaler to skip such node types when making future scaling + decisions. + """ + + # The node type of cloud resource. + _node_type: NodeType + # Timestamp of the last failed resource allocation. + _last_unavailable_timestamp: int + + def __init__( + self, + node_type: NodeType, + last_unavailability_timestamp: int + ): + self._node_type = node_type + self._last_unavailable_timestamp = last_unavailability_timestamp + + def get_last_unavailability_timestamp(self) -> int: + return self._last_unavailable_timestamp + + def set_last_unavailability_timestamp(self, timestamp: int): + self._last_unavailable_timestamp = timestamp + + +class CloudResourceMonitor(InstanceUpdatedSubscriber): + """CloudResourceMonitor records the availability of all node types. + + In the Spot scenario, the resources in the cluster change dynamically. + When scaling up, it is necessary to know which node types are most + likely to have resources, in order to decide which type of node to request. + """ + def __init__( + self, + instance_storage: InstanceStorage, + ) -> None: + self._instance_storage = instance_storage + self._resource_availabilities: Dict[NodeType, CloudResourceAvailability] = {} + + def allocation_failed(self, failed_event: InstanceUpdateEvent): + instances, _ = self._instance_storage.get_instances( + instance_ids=[failed_event.instance_id] + ) + for instance in instances.values(): + last_status = InstanceUtil.get_last_status_transition(instance) + if last_status: + last_unavailability_timestamp=(last_status.timestamp_ns) / 1000 + else: + last_unavailability_timestamp = time.time() + self._resource_availabilities[instance.instance_type] = ( + CloudResourceAvailability( + node_type=instance.instance_type, + last_unavailability_timestamp=last_unavailability_timestamp + ) + ) + logger.debug(f"Cloud Resource Type {instance.instance_type} is " + f"unavailable at timestamp={last_unavailability_timestamp}. " + f"We will lower its priority in feature schedules." + ) + + def allocation_succeeded(self, succeeded_event: InstanceUpdateEvent): + if succeeded_event.instance_type in self._resource_availabilities: + self._resource_availabilities[ + succeeded_event.instance_type + ].set_last_unavailability_timestamp(0) + logger.debug(f"Cloud Resource Type {succeeded_event.instance_type} is " + f"available. We will prioritize scheduling this type " + f"in feature schedules." + ) + + def notify(self, events: List[InstanceUpdateEvent]) -> None: + for event in events: + if event.new_instance_status == Instance.ALLOCATION_FAILED: + self.allocation_failed(event) + elif event.new_instance_status == Instance.ALLOCATED: + self.allocation_succeeded(event) + + def get_resource_availabilities(self) -> Dict[NodeType, float]: + """Calculate the availability scores of node types. + Higher values indicate a higher likelihood of resource allocation. + """ + resource_availability_scores: Dict[NodeType, float] = {} + if self._resource_availabilities: + max_ts = max( + [ + r.get_last_unavailability_timestamp() + for r in self._resource_availabilities.values() + ] + ) + for node_type in self._resource_availabilities: + resource_availability_scores[node_type] = ( + 1 - + self._resource_availabilities[node_type].get_last_unavailability_timestamp() + / max_ts + ) if max_ts > 0 else 1 + return resource_availability_scores \ No newline at end of file diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index 924ccb9efa01..1fb51d261407 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -1,5 +1,6 @@ import copy import logging +import random import time import uuid from abc import ABC, abstractmethod @@ -65,6 +66,9 @@ class SchedulingRequest: ) # The current instances. current_instances: List[AutoscalerInstance] = field(default_factory=list) + # The cloud resource availability score. A low score indicates that resource + # allocation for this node type has recently failed. + cloud_resource_availabilities: Dict[NodeType, float] = field(default_factory=dict) @dataclass @@ -725,11 +729,16 @@ class ScheduleContext: # number of workers in the config. This takes into account any pending/running # nodes. _node_type_available: Dict[NodeType, int] = field(default_factory=dict) + # The availability scores of cloud resource. A low score suggests that + # this type of resource has historically experienced allocation failures, + # and the weight of this type should be reduced during scheduling. + _cloud_resource_availabilities: Dict[NodeType, float] = field(default_factory=dict) def __init__( self, nodes: List[SchedulingNode], node_type_configs: Dict[NodeType, NodeTypeConfig], + cloud_resource_availabilities: Dict[NodeType, float], disable_launch_config_check: bool, max_num_nodes: Optional[int] = None, idle_timeout_s: Optional[float] = None, @@ -742,6 +751,7 @@ def __init__( self._max_num_nodes = max_num_nodes self._idle_timeout_s = idle_timeout_s self._disable_launch_config_check = disable_launch_config_check + self._cloud_resource_availabilities = cloud_resource_availabilities @classmethod def from_schedule_request( @@ -771,6 +781,7 @@ def from_schedule_request( return cls( nodes=nodes, node_type_configs=node_type_configs, + cloud_resource_availabilities=req.cloud_resource_availabilities, disable_launch_config_check=req.disable_launch_config_check, max_num_nodes=req.max_num_nodes, idle_timeout_s=req.idle_timeout_s, @@ -853,6 +864,9 @@ def get_cluster_resources(self) -> Dict[str, float]: def get_idle_timeout_s(self) -> Optional[float]: return self._idle_timeout_s + def get_cloud_resource_availabilities(self) -> Dict[NodeType, float]: + return copy.deepcopy(self._cloud_resource_availabilities) + def update(self, new_nodes: List[SchedulingNode]) -> None: """ Update the context with the new nodes. @@ -1468,7 +1482,10 @@ def _sort_resource_request(req: ResourceRequest) -> Tuple: requests_to_sched, existing_nodes, ) = ResourceDemandScheduler._sched_best_node( - requests_to_sched, existing_nodes, resource_request_source + requests_to_sched, + existing_nodes, + resource_request_source, + ctx.get_cloud_resource_availabilities() ) if best_node is None: # No existing nodes can schedule any more requests. @@ -1504,7 +1521,10 @@ def _sort_resource_request(req: ResourceRequest) -> Tuple: requests_to_sched, node_pools, ) = ResourceDemandScheduler._sched_best_node( - requests_to_sched, node_pools, resource_request_source + requests_to_sched, + node_pools, + resource_request_source, + ctx.get_cloud_resource_availabilities() ) if best_node is None: break @@ -1529,12 +1549,19 @@ def _sched_best_node( requests: List[ResourceRequest], nodes: List[SchedulingNode], resource_request_source: ResourceRequestSource, + cloud_resource_availabilities: Dict[NodeType, float], ) -> Tuple[SchedulingNode, List[ResourceRequest], List[SchedulingNode]]: """ Schedule the requests on the best node. A simple greedy algorithm is used to schedule the requests: 1. Try to schedule the requests on each node. - 2. Sort the nodes by a score + 2. Sort the nodes by a score. The sorting includes: + 2.1. UtilizationScore: to maximize resource utilization. + 2.2. Cloud resource availabilities: prioritize node types with + the most available cloud resources, in order to minimize allocation + failures. + 2.3. Random number: to diversify resource requests rather than + always requesting for the same type. 3. Return the node with the highest score. The highest score node is updated with the scheduled requests, and the node is @@ -1547,6 +1574,8 @@ def _sched_best_node( removed from the list. resource_request_source: The source of the resource request, i.e. pending demands from ray actors/tasks or cluster resource constraints. + cloud_resource_availabilities: The cloud resource availability score. A low + score indicates that allocation for this node type has recently failed. Returns: best_node: The best node to schedule the requests. @@ -1591,9 +1620,17 @@ class ScheduleResult: return None, requests, nodes # Sort the results by score. - results = sorted(results, key=lambda r: r.score, reverse=True) - best_result = results[0] + results = sorted( + results, + key=lambda r: ( + r.score, + cloud_resource_availabilities.get(r.node.node_type, 1), + random.random() + ), + reverse=True + ) + best_result = results[0] # Remove the best node from the nodes. nodes.pop(best_result.idx) logger.debug( diff --git a/python/ray/autoscaler/v2/tests/test_reconciler.py b/python/ray/autoscaler/v2/tests/test_reconciler.py index 016963dfbb4b..49342b812f09 100644 --- a/python/ray/autoscaler/v2/tests/test_reconciler.py +++ b/python/ray/autoscaler/v2/tests/test_reconciler.py @@ -21,6 +21,9 @@ ) from ray.autoscaler.v2.instance_manager.reconciler import Reconciler, logger from ray.autoscaler.v2.instance_manager.storage import InMemoryStorage +from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import ( + CloudResourceMonitor +) from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopError from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import ( RayInstallError, @@ -116,8 +119,12 @@ def setup(): instance_storage=instance_storage, instance_status_update_subscribers=[mock_subscriber], ) + + cloud_resource_monitor = CloudResourceMonitor( + instance_storage + ) - yield instance_manager, instance_storage, mock_subscriber + yield instance_manager, instance_storage, mock_subscriber, cloud_resource_monitor class TestReconciler: @@ -129,7 +136,7 @@ def _add_instances(instance_storage, instances): @staticmethod def test_requested_instance_no_op(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup # Request no timeout yet. TestReconciler._add_instances( instance_storage, @@ -148,6 +155,7 @@ def test_requested_instance_no_op(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances={}, cloud_provider_errors=[], @@ -163,7 +171,7 @@ def test_requested_instance_no_op(setup): def test_requested_instance_to_allocated(setup): # When there's a matching cloud instance for the requested instance. # The requested instance should be moved to ALLOCATED. - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup TestReconciler._add_instances( instance_storage, [ @@ -192,6 +200,7 @@ def test_requested_instance_to_allocated(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -212,7 +221,7 @@ def test_requested_instance_to_allocation_failed(setup): Test that the instance should be transitioned to ALLOCATION_FAILED when launch error happens. """ - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup instances = [ # Should succeed with instance matched. @@ -248,6 +257,7 @@ def test_requested_instance_to_allocation_failed(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=launch_errors, @@ -264,7 +274,7 @@ def test_requested_instance_to_allocation_failed(setup): @staticmethod def test_reconcile_terminated_cloud_instances(setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup instances = [ create_instance( @@ -304,6 +314,7 @@ def test_reconcile_terminated_cloud_instances(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=termination_errors, @@ -330,7 +341,7 @@ def test_reconcile_terminated_cloud_instances(setup): @staticmethod def test_ray_reconciler_no_op(setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -352,6 +363,7 @@ def test_ray_reconciler_no_op(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -372,6 +384,7 @@ def test_ray_reconciler_no_op(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -384,7 +397,7 @@ def test_ray_reconciler_no_op(setup): @staticmethod def test_ray_reconciler_new_ray(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup # A newly running ray node with matching cloud instance id node_states = [ @@ -404,6 +417,7 @@ def test_ray_reconciler_new_ray(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=node_states), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -419,7 +433,7 @@ def test_ray_reconciler_new_ray(setup): @staticmethod def test_ray_reconciler_already_ray_running(setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup # A running ray node already reconciled. TestReconciler._add_instances( instance_storage, @@ -448,6 +462,7 @@ def test_ray_reconciler_already_ray_running(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -459,7 +474,7 @@ def test_ray_reconciler_already_ray_running(setup): @staticmethod def test_ray_reconciler_stopping_ray(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup # draining ray nodes im_instances = [ @@ -491,6 +506,7 @@ def test_ray_reconciler_stopping_ray(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -506,7 +522,7 @@ def test_ray_reconciler_stopping_ray(setup): @staticmethod def test_ray_reconciler_stopped_ray(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup # dead ray nodes im_instances = [ @@ -538,6 +554,7 @@ def test_ray_reconciler_stopped_ray(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -553,7 +570,7 @@ def test_ray_reconciler_stopped_ray(setup): @staticmethod def test_reconcile_ray_installer_failures(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup # dead ray nodes im_instances = [ @@ -578,6 +595,7 @@ def test_reconcile_ray_installer_failures(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=[]), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -596,7 +614,7 @@ def test_draining_ray_node_also_terminated(setup): transition the instance to TERMINATED. """ - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -627,6 +645,7 @@ def test_draining_ray_node_also_terminated(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -664,7 +683,7 @@ def test_max_concurrent_launches( upscaling_speed, setup, ): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup next_id = 0 # Add some allocated instances. @@ -724,6 +743,7 @@ def test_max_concurrent_launches( instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -748,7 +768,7 @@ def test_max_concurrent_launches( @staticmethod @mock.patch("time.time_ns") def test_stuck_instances_requested(mock_time_ns, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup cur_time_s = 10 mock_time_ns.return_value = cur_time_s * s_to_ns @@ -787,6 +807,7 @@ def test_stuck_instances_requested(mock_time_ns, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances={}, cloud_provider_errors=[], @@ -807,7 +828,7 @@ def test_stuck_instances_requested(mock_time_ns, setup): @staticmethod @mock.patch("time.time_ns") def test_stuck_instances_ray_stop_requested(mock_time_ns, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup timeout_s = 5 cur_time_s = 20 mock_time_ns.return_value = cur_time_s * s_to_ns @@ -838,6 +859,7 @@ def test_stuck_instances_ray_stop_requested(mock_time_ns, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances={}, cloud_provider_errors=[], @@ -860,7 +882,7 @@ def test_ray_stop_requested_fail(mock_time_ns, setup): # Test that the instance should be transitioned to RAY_RUNNING # when the ray stop request fails. - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup mock_time_ns.return_value = 10 * s_to_ns instances = [ @@ -900,6 +922,7 @@ def test_ray_stop_requested_fail(mock_time_ns, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -924,7 +947,7 @@ def test_ray_stop_requested_fail(mock_time_ns, setup): ], ) def test_stuck_instances(mock_time_ns, cur_status, expect_status, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup timeout_s = 5 cur_time_s = 20 mock_time_ns.return_value = cur_time_s * s_to_ns @@ -961,6 +984,7 @@ def test_stuck_instances(mock_time_ns, cur_status, expect_status, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -987,7 +1011,7 @@ def test_stuck_instances(mock_time_ns, cur_status, expect_status, setup): ], ) def test_warn_stuck_transient_instances(mock_time_ns, status, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup cur_time_s = 10 mock_time_ns.return_value = cur_time_s * s_to_ns timeout_s = 5 @@ -1027,6 +1051,7 @@ def test_warn_stuck_transient_instances(mock_time_ns, status, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances if status != Instance.QUEUED @@ -1047,7 +1072,7 @@ def test_warn_stuck_transient_instances(mock_time_ns, status, setup): @staticmethod @mock.patch("time.time_ns") def test_stuck_instances_no_op(mock_time_ns, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup # Large enough to not trigger any timeouts mock_time_ns.return_value = 999999 * s_to_ns @@ -1087,6 +1112,7 @@ def test_stuck_instances_no_op(mock_time_ns, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances={}, cloud_provider_errors=[], @@ -1110,7 +1136,7 @@ def test_stuck_instances_no_op(mock_time_ns, setup): ids=["ray_running", "allocated"], ) def test_is_head_node_running(status, expected_running, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup instances = [ create_instance( @@ -1130,7 +1156,7 @@ def test_scaling_updates(setup): Tests that new instances should be launched due to autoscaling decisions, and existing instances should be terminated if needed. """ - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -1185,6 +1211,7 @@ def test_scaling_updates(setup): instance_manager=instance_manager, scheduler=mock_scheduler, cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1212,7 +1239,7 @@ def test_scaling_updates(setup): @staticmethod def test_terminating_instances(setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup instances = [ create_instance( @@ -1244,6 +1271,7 @@ def test_terminating_instances(setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1266,7 +1294,7 @@ def test_terminating_instances(setup): [True, False], ) def test_ray_install(disable_node_updaters, cloud_instance_running, setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup instances = [ create_instance( @@ -1290,6 +1318,7 @@ def test_ray_install(disable_node_updaters, cloud_instance_running, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1310,7 +1339,7 @@ def test_ray_install(disable_node_updaters, cloud_instance_running, setup): @staticmethod @mock.patch("time.time_ns") def test_autoscaler_state(mock_time_ns, setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup mock_time_ns.return_value = 5 instances = [ @@ -1403,6 +1432,7 @@ def test_autoscaler_state(mock_time_ns, setup): instance_manager=instance_manager, scheduler=mock_scheduler, cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState( cluster_resource_state_version=1, ), @@ -1436,7 +1466,7 @@ def test_extra_cloud_instances_cloud_provider(setup): """ Test that extra cloud instances should be terminated. """ - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -1467,6 +1497,7 @@ def test_extra_cloud_instances_cloud_provider(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1491,7 +1522,7 @@ def test_cloud_instance_reboot(setup): """ Test that the case of booting up a previous stopped cloud instance. """ - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -1521,6 +1552,7 @@ def test_cloud_instance_reboot(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1542,7 +1574,7 @@ def test_ray_node_restarted_on_the_same_cloud_instance(setup): """ Test that the case of reusing cloud instances. """ - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -1578,6 +1610,7 @@ def test_ray_node_restarted_on_the_same_cloud_instance(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1610,7 +1643,7 @@ def test_ray_head_restarted_on_the_same_cloud_instance(setup): """ Test that the case of restarting Head node with GCS FT. """ - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup ray_nodes = [ NodeState( @@ -1636,6 +1669,7 @@ def test_ray_head_restarted_on_the_same_cloud_instance(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1672,7 +1706,7 @@ def test_ray_head_restarted_on_the_same_cloud_instance(setup): @staticmethod def test_reconcile_max_worker_nodes_limit_triggers_termination(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup instances = [ create_instance( @@ -1734,6 +1768,7 @@ def test_reconcile_max_worker_nodes_limit_triggers_termination(setup): instance_manager=instance_manager, scheduler=mock_scheduler, cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState( node_states=ray_nodes, cluster_resource_state_version=1, diff --git a/python/ray/autoscaler/v2/tests/test_scheduler.py b/python/ray/autoscaler/v2/tests/test_scheduler.py index a1ca0a9f0944..5c1dc293b89b 100644 --- a/python/ray/autoscaler/v2/tests/test_scheduler.py +++ b/python/ray/autoscaler/v2/tests/test_scheduler.py @@ -52,6 +52,7 @@ def sched_request( instances: Optional[List[AutoscalerInstance]] = None, idle_timeout_s: Optional[float] = None, disable_launch_config_check: Optional[bool] = False, + cloud_resource_availabilities: Optional[Dict[NodeType, float]] = None, ) -> SchedulingRequest: if resource_requests is None: @@ -62,6 +63,8 @@ def sched_request( cluster_resource_constraints = [] if instances is None: instances = [] + if cloud_resource_availabilities is None: + cloud_resource_availabilities = {} return SchedulingRequest( resource_requests=ResourceRequestUtil.group_by_count(resource_requests), @@ -84,6 +87,7 @@ def sched_request( max_num_nodes=max_num_nodes, idle_timeout_s=idle_timeout_s, disable_launch_config_check=disable_launch_config_check, + cloud_resource_availabilities=cloud_resource_availabilities, ) @@ -104,6 +108,7 @@ def schedule( resource_requests: List[Dict], anti_affinity: bool = False, max_nodes: Optional[int] = None, + cloud_resource_availabilities: Optional[Dict[NodeType, float]] = None, ) -> SchedulingReply: ANTI_AFFINITY = ResourceRequestUtil.PlacementConstraintType.ANTI_AFFINITY @@ -142,6 +147,7 @@ def schedule( gang_resource_requests=gang_requests, max_num_nodes=max_nodes, instances=instances, + cloud_resource_availabilities=cloud_resource_availabilities, ) else: request = sched_request( @@ -149,6 +155,7 @@ def schedule( resource_requests=[ResourceRequestUtil.make(r) for r in resource_requests], instances=instances, max_num_nodes=max_nodes, + cloud_resource_availabilities=cloud_resource_availabilities, ) return ResourceDemandScheduler(event_logger).schedule(request) @@ -2637,6 +2644,66 @@ def test_pg_with_bundle_infeasible_label_selectors(): assert to_launch == {} assert len(reply.infeasible_gang_resource_requests) == 1 +def test_get_nodes_with_resource_availabilities(): + node_type_configs = { + "type_gpu1": NodeTypeConfig( + name="type_gpu1", + resources={"CPU": 8, "GPU": 1, "gpu1": 1}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + "type_gpu2": NodeTypeConfig( + name="type_gpu2", + resources={"CPU": 8, "GPU": 1, "gpu2": 1}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + "type_gpu3": NodeTypeConfig( + name="type_gpu3", + resources={"CPU": 8, "GPU": 1, "gpu3": 1}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + "type_cpu": NodeTypeConfig( + name="type_cpu", + resources={"CPU": 8}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + } + + def get_nodes_for( + resource_requests, + anti_affinity=False, + max_nodes: Optional[int] = None, + current_nodes: Optional[Dict] = None, + cloud_resource_availabilities=None, + ): + reply = schedule( + node_type_configs, + current_nodes or {}, + resource_requests, + anti_affinity=anti_affinity, + max_nodes=max_nodes, + cloud_resource_availabilities=cloud_resource_availabilities, + ) + + assert get_nodes_for([{"GPU": 1}], cloud_resource_availabilities={ + "type_gpu1": 0.1, "type_gpu2": 1, "type_gpu3": 0.2 + }) == {"type_gpu2": 1} + + assert get_nodes_for([{"GPU": 1}], cloud_resource_availabilities={ + "type_gpu2": 0.1, "type_gpu3": 0.2 + }) == {"type_gpu1": 1} + + assert get_nodes_for([{"GPU": 2}], cloud_resource_availabilities={ + "type_gpu1": 0.1, "type_gpu2": 0.1, "type_gpu3": 1 + }) == {"type_gpu2": 2} + + assert (get_nodes_for([{"CPU": 4}], cloud_resource_availabilities={}) + == {"type_cpu": 1}) + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): diff --git a/src/ray/protobuf/instance_manager.proto b/src/ray/protobuf/instance_manager.proto index 14dbf09f11ec..cee8a6407562 100644 --- a/src/ray/protobuf/instance_manager.proto +++ b/src/ray/protobuf/instance_manager.proto @@ -145,7 +145,7 @@ message InstanceUpdateEvent { /// Additional metadata for the update event. - // The instance type for QUEUED/REQUESTED/ALLOCATED event. + // The instance type for QUEUED/REQUESTED/ALLOCATED/ALLOCATION_FAILED event. optional string instance_type = 5; // Cloud instance id for ALLOCATED event. optional string cloud_instance_id = 6;