diff --git a/python/ray/autoscaler/v2/autoscaler.py b/python/ray/autoscaler/v2/autoscaler.py index cdc8620bc2b7..9746ad4aa576 100644 --- a/python/ray/autoscaler/v2/autoscaler.py +++ b/python/ray/autoscaler/v2/autoscaler.py @@ -32,6 +32,9 @@ from ray.autoscaler.v2.instance_manager.subscribers.cloud_instance_updater import ( CloudInstanceUpdater, ) +from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import ( + CloudResourceMonitor, +) from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopper from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import ( ThreadedRayInstaller, @@ -76,6 +79,7 @@ def __init__( self._metrics_reporter = metrics_reporter self._init_cloud_instance_provider(config, config_reader) + self._cloud_resource_monitor = None self._init_instance_manager( session_name=session_name, config=config, @@ -160,6 +164,8 @@ def _init_instance_manager( max_concurrent_installs=config.get_max_num_worker_nodes() or 50, ) ) + self._cloud_resource_monitor = CloudResourceMonitor() + subscribers.append(self._cloud_resource_monitor) self._instance_manager = InstanceManager( instance_storage=instance_storage, @@ -201,6 +207,7 @@ def update_autoscaling_state( instance_manager=self._instance_manager, scheduler=self._scheduler, cloud_provider=self._cloud_instance_provider, + cloud_resource_monitor=self._cloud_resource_monitor, ray_cluster_resource_state=ray_cluster_resource_state, non_terminated_cloud_instances=( self._cloud_instance_provider.get_non_terminated() diff --git a/python/ray/autoscaler/v2/instance_manager/common.py b/python/ray/autoscaler/v2/instance_manager/common.py index fa792f29e820..92b7b3ed1ae2 100644 --- a/python/ray/autoscaler/v2/instance_manager/common.py +++ b/python/ray/autoscaler/v2/instance_manager/common.py @@ -66,6 +66,7 @@ def is_cloud_instance_allocated(instance_status: Instance.InstanceStatus) -> boo Instance.TERMINATING, Instance.RAY_INSTALL_FAILED, Instance.TERMINATION_FAILED, + Instance.ALLOCATION_TIMEOUT, } @staticmethod @@ -225,6 +226,10 @@ def get_valid_transitions() -> Dict[ # Ray is already installed on the provisioned cloud # instance. It could be any valid ray status. Instance.RAY_RUNNING, + # The cloud provider timed out for allocating running cloud instance. + # The CloudResourceMonitor subscriber will lower this node-type's priority + # in feature schedules. + Instance.ALLOCATION_TIMEOUT, Instance.RAY_STOPPING, Instance.RAY_STOPPED, # Instance is requested to be stopped, e.g. instance leaked: no matching @@ -284,6 +289,13 @@ def get_valid_transitions() -> Dict[ # cloud instance somehow failed. Instance.TERMINATED, }, + # An instance has been allocated to a cloud instance, but the cloud + # provider timed out for allocating running cloud instance, e.g. the + # a kubernetes pod remains pending due to insufficient resources. + Instance.ALLOCATION_TIMEOUT: { + # Instance is requested to be stopped + Instance.TERMINATING + }, # When in this status, the ray process is requested to be stopped to the # ray cluster, but not yet present in the dead ray node list reported by # the ray cluster. diff --git a/python/ray/autoscaler/v2/instance_manager/reconciler.py b/python/ray/autoscaler/v2/instance_manager/reconciler.py index b403803e577b..3ec6e9675267 100644 --- a/python/ray/autoscaler/v2/instance_manager/reconciler.py +++ b/python/ray/autoscaler/v2/instance_manager/reconciler.py @@ -21,6 +21,9 @@ LaunchNodeError, TerminateNodeError, ) +from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import ( + CloudResourceMonitor, +) from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopError from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import ( RayInstallError, @@ -62,6 +65,7 @@ def reconcile( instance_manager: InstanceManager, scheduler: IResourceScheduler, cloud_provider: ICloudInstanceProvider, + cloud_resource_monitor: CloudResourceMonitor, ray_cluster_resource_state: ClusterResourceState, non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance], autoscaling_config: AutoscalingConfig, @@ -88,6 +92,8 @@ def reconcile( Args: instance_manager: The instance manager to reconcile. + cloud_resource_monitor: The cloud resource monitor for monitoring resource + availability of all node types. ray_cluster_resource_state: The ray cluster's resource state. non_terminated_cloud_instances: The non-terminated cloud instances from the cloud provider. @@ -122,6 +128,7 @@ def reconcile( instance_manager=instance_manager, scheduler=scheduler, cloud_provider=cloud_provider, + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ray_cluster_resource_state, non_terminated_cloud_instances=non_terminated_cloud_instances, autoscaling_config=autoscaling_config, @@ -161,26 +168,30 @@ def _sync_from( More specifically, we will reconcile status transitions for: 1. QUEUED/REQUESTED -> ALLOCATED: - When a instance with launch request id (indicating a previous launch + When an instance with launch request id (indicating a previous launch request was made) could be assigned to an unassigned cloud instance of the same instance type. 2. REQUESTED -> ALLOCATION_FAILED: When there's an error from the cloud provider for launch failure so that the instance becomes ALLOCATION_FAILED. - 3. * -> RAY_RUNNING: + 3. ALLOCATED -> ALLOCATION_TIMEOUT: + When an instance has been allocated to a cloud instance, but is stuck in + this state. For example, a kubernetes pod remains pending due to + insufficient resources. + 4. * -> RAY_RUNNING: When a ray node on a cloud instance joins the ray cluster, we will transition the instance to RAY_RUNNING. - 4. * -> TERMINATED: + 5. * -> TERMINATED: When the cloud instance is already terminated, we will transition the instance to TERMINATED. - 5. TERMINATING -> TERMINATION_FAILED: + 6. TERMINATING -> TERMINATION_FAILED: When there's an error from the cloud provider for termination failure. - 6. * -> RAY_STOPPED: + 7. * -> RAY_STOPPED: When ray was stopped on the cloud instance, we will transition the instance to RAY_STOPPED. - 7. * -> RAY_INSTALL_FAILED: + 8. * -> RAY_INSTALL_FAILED: When there's an error from RayInstaller. - 8. RAY_STOP_REQUESTED -> RAY_RUNNING: + 9. RAY_STOP_REQUESTED -> RAY_RUNNING: When requested to stop ray, but failed to stop/drain the ray node (e.g. idle termination drain rejected by the node). @@ -227,6 +238,7 @@ def _step_next( instance_manager: InstanceManager, scheduler: IResourceScheduler, cloud_provider: ICloudInstanceProvider, + cloud_resource_monitor: CloudResourceMonitor, ray_cluster_resource_state: ClusterResourceState, non_terminated_cloud_instances: Dict[CloudInstanceId, CloudInstance], autoscaling_config: AutoscalingConfig, @@ -258,6 +270,8 @@ def _step_next( Args: instance_manager: The instance manager to reconcile. scheduler: The resource scheduler to make scaling decisions. + cloud_resource_monitor: The cloud resource monitor for monitoring resource + availability of all node types. ray_cluster_resource_state: The ray cluster's resource state. non_terminated_cloud_instances: The non-terminated cloud instances from the cloud provider. @@ -275,6 +289,7 @@ def _step_next( Reconciler._scale_cluster( autoscaling_state=autoscaling_state, instance_manager=instance_manager, + cloud_resource_monitor=cloud_resource_monitor, ray_state=ray_cluster_resource_state, scheduler=scheduler, autoscaling_config=autoscaling_config, @@ -711,6 +726,7 @@ def _handle_ray_status_transition( f"{NodeStatus.Name(ray_node.status)}" ), ray_node_id=ray_node_id, + instance_type=im_instance.instance_type, ) Reconciler._update_instance_manager(instance_manager, version, updates) @@ -932,8 +948,9 @@ def _handle_stuck_instances( update = Reconciler._handle_stuck_instance( instance, reconcile_config.allocate_status_timeout_s, - new_status=IMInstance.TERMINATING, + new_status=IMInstance.ALLOCATION_TIMEOUT, cloud_instance_id=instance.cloud_instance_id, + instance_type=instance.instance_type, ) if update: im_updates[instance.instance_id] = update @@ -1060,6 +1077,7 @@ def _is_head_node_running(instance_manager: InstanceManager) -> bool: def _scale_cluster( autoscaling_state: AutoscalingState, instance_manager: InstanceManager, + cloud_resource_monitor: CloudResourceMonitor, ray_state: ClusterResourceState, scheduler: IResourceScheduler, autoscaling_config: AutoscalingConfig, @@ -1075,6 +1093,8 @@ def _scale_cluster( Args: autoscaling_state: The autoscaling state to reconcile. instance_manager: The instance manager to reconcile. + cloud_resource_monitor: The cloud resource monitor for monitoring resource + availability of all node types. ray_state: The ray cluster's resource state. scheduler: The resource scheduler to make scaling decisions. autoscaling_config: The autoscaling config. @@ -1119,6 +1139,9 @@ def _scale_cluster( disable_launch_config_check=( autoscaling_config.disable_launch_config_check() ), + cloud_resource_availabilities=( + cloud_resource_monitor.get_resource_availabilities() + ), ) # Ask scheduler for updates to the cluster shape. @@ -1195,6 +1218,7 @@ def _terminate_instances(instance_manager: InstanceManager): """ Terminate instances with the below statuses: - RAY_STOPPED: ray was stopped on the cloud instance. + - ALLOCATION_TIMEOUT: cloud provider timed out to allocate a running cloud instance. - RAY_INSTALL_FAILED: ray installation failed on the cloud instance, we will not retry. - TERMINATION_FAILED: cloud provider failed to terminate the instance @@ -1209,6 +1233,7 @@ def _terminate_instances(instance_manager: InstanceManager): for instance in im_instances: if instance.status not in [ IMInstance.RAY_STOPPED, + IMInstance.ALLOCATION_TIMEOUT, IMInstance.RAY_INSTALL_FAILED, IMInstance.TERMINATION_FAILED, ]: @@ -1391,7 +1416,6 @@ def _handle_stuck_requested_instance( instance, select_instance_status=IMInstance.REQUESTED ) ) - # Fail the allocation if we have tried too many times. if len(all_request_times_ns) > max_num_retry_request_to_allocate: return IMInstanceUpdateEvent( diff --git a/python/ray/autoscaler/v2/instance_manager/subscribers/cloud_resource_monitor.py b/python/ray/autoscaler/v2/instance_manager/subscribers/cloud_resource_monitor.py new file mode 100644 index 000000000000..12ce294ba6ba --- /dev/null +++ b/python/ray/autoscaler/v2/instance_manager/subscribers/cloud_resource_monitor.py @@ -0,0 +1,73 @@ +import logging +import time +from typing import Dict, List + +from ray.autoscaler.v2.instance_manager.instance_manager import ( + InstanceUpdatedSubscriber, +) +from ray.autoscaler.v2.schema import NodeType +from ray.core.generated.instance_manager_pb2 import Instance, InstanceUpdateEvent + +logger = logging.getLogger(__name__) + + +class CloudResourceMonitor(InstanceUpdatedSubscriber): + """CloudResourceMonitor records the availability of all node types. + + In the Spot scenario, the resources in the cluster change dynamically. + When scaling up, it is necessary to know which node types are most + likely to have resources, in order to decide which type of node to request. + + During scaling up, if resource of a node type is requested but fail to + allocate, that type is considered unavailable at that timestamp.This class + records the last timestamp at which a node type is unavailable,allowing the + autoscaler to skip such node types when making future scaling decisions. + """ + + def __init__( + self, + ) -> None: + self._last_unavailable_timestamp: Dict[NodeType, float] = {} + + def allocation_timeout(self, failed_event: InstanceUpdateEvent): + unavailable_timestamp = time.time() + self._last_unavailable_timestamp[ + failed_event.instance_type + ] = unavailable_timestamp + logger.info( + f"Cloud Resource Type {failed_event.instance_type} is " + f"unavailable at timestamp={unavailable_timestamp}. " + f"We will lower its priority in feature schedules." + ) + + def allocation_succeeded(self, succeeded_event: InstanceUpdateEvent): + if succeeded_event.instance_type in self._last_unavailable_timestamp: + self._last_unavailable_timestamp.pop(succeeded_event.instance_type) + logger.info( + f"Cloud Resource Type {succeeded_event.instance_type} is " + f"available at timestamp={time.time()}. We will higher its priority in " + f"feature schedules." + ) + + def notify(self, events: List[InstanceUpdateEvent]) -> None: + for event in events: + if event.new_instance_status == Instance.ALLOCATION_TIMEOUT: + self.allocation_timeout(event) + elif ( + event.new_instance_status == Instance.RAY_RUNNING + and event.instance_type + ): + self.allocation_succeeded(event) + + def get_resource_availabilities(self) -> Dict[NodeType, float]: + """Calculate the availability scores of node types. + Higher values indicate a higher likelihood of resource allocation. + """ + resource_availability_scores: Dict[NodeType, float] = {} + if self._last_unavailable_timestamp: + max_ts = max(self._last_unavailable_timestamp.values()) + for node_type in self._last_unavailable_timestamp: + resource_availability_scores[node_type] = ( + 1 - self._last_unavailable_timestamp[node_type] / max_ts + ) + return resource_availability_scores diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index 924ccb9efa01..17ddbd98c9d2 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -65,6 +65,9 @@ class SchedulingRequest: ) # The current instances. current_instances: List[AutoscalerInstance] = field(default_factory=list) + # The cloud resource availability score. A low score indicates that resource + # allocation for this node type has recently failed. + cloud_resource_availabilities: Dict[NodeType, float] = field(default_factory=dict) @dataclass @@ -725,11 +728,18 @@ class ScheduleContext: # number of workers in the config. This takes into account any pending/running # nodes. _node_type_available: Dict[NodeType, int] = field(default_factory=dict) + # The availability scores of cloud resource. A low score suggests that + # this type of resource has historically experienced allocation failures, + # and the weight of this type should be reduced during scheduling. + _cloud_resource_availabilities: Dict[NodeType, float] = field( + default_factory=dict + ) def __init__( self, nodes: List[SchedulingNode], node_type_configs: Dict[NodeType, NodeTypeConfig], + cloud_resource_availabilities: Dict[NodeType, float], disable_launch_config_check: bool, max_num_nodes: Optional[int] = None, idle_timeout_s: Optional[float] = None, @@ -742,6 +752,7 @@ def __init__( self._max_num_nodes = max_num_nodes self._idle_timeout_s = idle_timeout_s self._disable_launch_config_check = disable_launch_config_check + self._cloud_resource_availabilities = cloud_resource_availabilities @classmethod def from_schedule_request( @@ -771,6 +782,7 @@ def from_schedule_request( return cls( nodes=nodes, node_type_configs=node_type_configs, + cloud_resource_availabilities=req.cloud_resource_availabilities, disable_launch_config_check=req.disable_launch_config_check, max_num_nodes=req.max_num_nodes, idle_timeout_s=req.idle_timeout_s, @@ -853,6 +865,9 @@ def get_cluster_resources(self) -> Dict[str, float]: def get_idle_timeout_s(self) -> Optional[float]: return self._idle_timeout_s + def get_cloud_resource_availabilities(self) -> Dict[NodeType, float]: + return copy.deepcopy(self._cloud_resource_availabilities) + def update(self, new_nodes: List[SchedulingNode]) -> None: """ Update the context with the new nodes. @@ -1468,7 +1483,10 @@ def _sort_resource_request(req: ResourceRequest) -> Tuple: requests_to_sched, existing_nodes, ) = ResourceDemandScheduler._sched_best_node( - requests_to_sched, existing_nodes, resource_request_source + requests_to_sched, + existing_nodes, + resource_request_source, + ctx.get_cloud_resource_availabilities(), ) if best_node is None: # No existing nodes can schedule any more requests. @@ -1504,7 +1522,10 @@ def _sort_resource_request(req: ResourceRequest) -> Tuple: requests_to_sched, node_pools, ) = ResourceDemandScheduler._sched_best_node( - requests_to_sched, node_pools, resource_request_source + requests_to_sched, + node_pools, + resource_request_source, + ctx.get_cloud_resource_availabilities(), ) if best_node is None: break @@ -1529,12 +1550,17 @@ def _sched_best_node( requests: List[ResourceRequest], nodes: List[SchedulingNode], resource_request_source: ResourceRequestSource, + cloud_resource_availabilities: Dict[NodeType, float], ) -> Tuple[SchedulingNode, List[ResourceRequest], List[SchedulingNode]]: """ Schedule the requests on the best node. A simple greedy algorithm is used to schedule the requests: 1. Try to schedule the requests on each node. - 2. Sort the nodes by a score + 2. Sort the nodes by a score. The sorting includes: + 2.1. UtilizationScore: to maximize resource utilization. + 2.2. Cloud resource availabilities: prioritize node types with + the most available cloud resources, in order to minimize allocation + failures. 3. Return the node with the highest score. The highest score node is updated with the scheduled requests, and the node is @@ -1547,6 +1573,8 @@ def _sched_best_node( removed from the list. resource_request_source: The source of the resource request, i.e. pending demands from ray actors/tasks or cluster resource constraints. + cloud_resource_availabilities: The cloud resource availability score. A low + score indicates that allocation for this node type has recently failed. Returns: best_node: The best node to schedule the requests. @@ -1591,9 +1619,16 @@ class ScheduleResult: return None, requests, nodes # Sort the results by score. - results = sorted(results, key=lambda r: r.score, reverse=True) - best_result = results[0] + results = sorted( + results, + key=lambda r: ( + r.score, + cloud_resource_availabilities.get(r.node.node_type, 1), + ), + reverse=True, + ) + best_result = results[0] # Remove the best node from the nodes. nodes.pop(best_result.idx) logger.debug( diff --git a/python/ray/autoscaler/v2/tests/test_instance_util.py b/python/ray/autoscaler/v2/tests/test_instance_util.py index 62e379e40d40..94a72624e833 100644 --- a/python/ray/autoscaler/v2/tests/test_instance_util.py +++ b/python/ray/autoscaler/v2/tests/test_instance_util.py @@ -51,6 +51,7 @@ def test_transition_graph(self): Instance.RAY_STOPPED, Instance.TERMINATING, Instance.TERMINATED, + Instance.ALLOCATION_TIMEOUT, } all_status.remove(Instance.ALLOCATED) @@ -72,6 +73,9 @@ def test_transition_graph(self): } all_status.remove(Instance.RAY_RUNNING) + assert g[Instance.ALLOCATION_TIMEOUT] == {Instance.TERMINATING} + all_status.remove(Instance.ALLOCATION_TIMEOUT) + assert g[Instance.RAY_STOP_REQUESTED] == { Instance.RAY_STOPPING, Instance.RAY_STOPPED, @@ -207,6 +211,7 @@ def test_is_cloud_instance_allocated(self): Instance.RAY_STOPPED, Instance.TERMINATING, Instance.TERMINATION_FAILED, + Instance.ALLOCATION_TIMEOUT, } for s in positive_status: instance.status = s @@ -309,6 +314,7 @@ def add_reachable_from(reachable, src, transitions): add_reachable_from(expected_reachable, Instance.QUEUED, transitions) # Add REQUESTED again since it's also reachable from QUEUED. add_reachable_from(expected_reachable, Instance.REQUESTED, transitions) + add_reachable_from(expected_reachable, Instance.ALLOCATION_TIMEOUT, transitions) for s, expected in expected_reachable.items(): assert InstanceUtil.get_reachable_statuses(s) == expected, ( diff --git a/python/ray/autoscaler/v2/tests/test_reconciler.py b/python/ray/autoscaler/v2/tests/test_reconciler.py index 016963dfbb4b..133ff9e33655 100644 --- a/python/ray/autoscaler/v2/tests/test_reconciler.py +++ b/python/ray/autoscaler/v2/tests/test_reconciler.py @@ -21,6 +21,9 @@ ) from ray.autoscaler.v2.instance_manager.reconciler import Reconciler, logger from ray.autoscaler.v2.instance_manager.storage import InMemoryStorage +from ray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitor import ( + CloudResourceMonitor, +) from ray.autoscaler.v2.instance_manager.subscribers.ray_stopper import RayStopError from ray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installer import ( RayInstallError, @@ -117,7 +120,9 @@ def setup(): instance_status_update_subscribers=[mock_subscriber], ) - yield instance_manager, instance_storage, mock_subscriber + cloud_resource_monitor = CloudResourceMonitor() + + yield instance_manager, instance_storage, mock_subscriber, cloud_resource_monitor class TestReconciler: @@ -129,7 +134,7 @@ def _add_instances(instance_storage, instances): @staticmethod def test_requested_instance_no_op(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup # Request no timeout yet. TestReconciler._add_instances( instance_storage, @@ -148,6 +153,7 @@ def test_requested_instance_no_op(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances={}, cloud_provider_errors=[], @@ -163,7 +169,7 @@ def test_requested_instance_no_op(setup): def test_requested_instance_to_allocated(setup): # When there's a matching cloud instance for the requested instance. # The requested instance should be moved to ALLOCATED. - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup TestReconciler._add_instances( instance_storage, [ @@ -192,6 +198,7 @@ def test_requested_instance_to_allocated(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -212,7 +219,7 @@ def test_requested_instance_to_allocation_failed(setup): Test that the instance should be transitioned to ALLOCATION_FAILED when launch error happens. """ - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup instances = [ # Should succeed with instance matched. @@ -248,6 +255,7 @@ def test_requested_instance_to_allocation_failed(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=launch_errors, @@ -264,7 +272,7 @@ def test_requested_instance_to_allocation_failed(setup): @staticmethod def test_reconcile_terminated_cloud_instances(setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup instances = [ create_instance( @@ -304,6 +312,7 @@ def test_reconcile_terminated_cloud_instances(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=termination_errors, @@ -330,7 +339,7 @@ def test_reconcile_terminated_cloud_instances(setup): @staticmethod def test_ray_reconciler_no_op(setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -352,6 +361,7 @@ def test_ray_reconciler_no_op(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -372,6 +382,7 @@ def test_ray_reconciler_no_op(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -384,7 +395,7 @@ def test_ray_reconciler_no_op(setup): @staticmethod def test_ray_reconciler_new_ray(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup # A newly running ray node with matching cloud instance id node_states = [ @@ -404,6 +415,7 @@ def test_ray_reconciler_new_ray(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=node_states), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -419,7 +431,7 @@ def test_ray_reconciler_new_ray(setup): @staticmethod def test_ray_reconciler_already_ray_running(setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup # A running ray node already reconciled. TestReconciler._add_instances( instance_storage, @@ -448,6 +460,7 @@ def test_ray_reconciler_already_ray_running(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -459,7 +472,7 @@ def test_ray_reconciler_already_ray_running(setup): @staticmethod def test_ray_reconciler_stopping_ray(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup # draining ray nodes im_instances = [ @@ -491,6 +504,7 @@ def test_ray_reconciler_stopping_ray(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -506,7 +520,7 @@ def test_ray_reconciler_stopping_ray(setup): @staticmethod def test_ray_reconciler_stopped_ray(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup # dead ray nodes im_instances = [ @@ -538,6 +552,7 @@ def test_ray_reconciler_stopped_ray(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -553,7 +568,7 @@ def test_ray_reconciler_stopped_ray(setup): @staticmethod def test_reconcile_ray_installer_failures(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup # dead ray nodes im_instances = [ @@ -578,6 +593,7 @@ def test_reconcile_ray_installer_failures(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=[]), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -596,7 +612,7 @@ def test_draining_ray_node_also_terminated(setup): transition the instance to TERMINATED. """ - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -627,6 +643,7 @@ def test_draining_ray_node_also_terminated(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -664,7 +681,7 @@ def test_max_concurrent_launches( upscaling_speed, setup, ): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup next_id = 0 # Add some allocated instances. @@ -724,6 +741,7 @@ def test_max_concurrent_launches( instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -748,7 +766,7 @@ def test_max_concurrent_launches( @staticmethod @mock.patch("time.time_ns") def test_stuck_instances_requested(mock_time_ns, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup cur_time_s = 10 mock_time_ns.return_value = cur_time_s * s_to_ns @@ -787,6 +805,7 @@ def test_stuck_instances_requested(mock_time_ns, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances={}, cloud_provider_errors=[], @@ -807,7 +826,7 @@ def test_stuck_instances_requested(mock_time_ns, setup): @staticmethod @mock.patch("time.time_ns") def test_stuck_instances_ray_stop_requested(mock_time_ns, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup timeout_s = 5 cur_time_s = 20 mock_time_ns.return_value = cur_time_s * s_to_ns @@ -838,6 +857,7 @@ def test_stuck_instances_ray_stop_requested(mock_time_ns, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances={}, cloud_provider_errors=[], @@ -860,7 +880,7 @@ def test_ray_stop_requested_fail(mock_time_ns, setup): # Test that the instance should be transitioned to RAY_RUNNING # when the ray stop request fails. - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup mock_time_ns.return_value = 10 * s_to_ns instances = [ @@ -900,6 +920,7 @@ def test_ray_stop_requested_fail(mock_time_ns, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -924,7 +945,7 @@ def test_ray_stop_requested_fail(mock_time_ns, setup): ], ) def test_stuck_instances(mock_time_ns, cur_status, expect_status, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup timeout_s = 5 cur_time_s = 20 mock_time_ns.return_value = cur_time_s * s_to_ns @@ -961,6 +982,7 @@ def test_stuck_instances(mock_time_ns, cur_status, expect_status, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -987,7 +1009,7 @@ def test_stuck_instances(mock_time_ns, cur_status, expect_status, setup): ], ) def test_warn_stuck_transient_instances(mock_time_ns, status, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup cur_time_s = 10 mock_time_ns.return_value = cur_time_s * s_to_ns timeout_s = 5 @@ -1027,6 +1049,7 @@ def test_warn_stuck_transient_instances(mock_time_ns, status, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances if status != Instance.QUEUED @@ -1047,7 +1070,7 @@ def test_warn_stuck_transient_instances(mock_time_ns, status, setup): @staticmethod @mock.patch("time.time_ns") def test_stuck_instances_no_op(mock_time_ns, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup # Large enough to not trigger any timeouts mock_time_ns.return_value = 999999 * s_to_ns @@ -1068,6 +1091,7 @@ def test_stuck_instances_no_op(mock_time_ns, setup): Instance.RAY_STOPPED, Instance.TERMINATION_FAILED, Instance.QUEUED, + Instance.ALLOCATION_TIMEOUT, } no_op_statuses = all_status - reconciled_stuck_statuses - transient_statuses @@ -1087,6 +1111,7 @@ def test_stuck_instances_no_op(mock_time_ns, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances={}, cloud_provider_errors=[], @@ -1110,7 +1135,7 @@ def test_stuck_instances_no_op(mock_time_ns, setup): ids=["ray_running", "allocated"], ) def test_is_head_node_running(status, expected_running, setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup instances = [ create_instance( @@ -1130,7 +1155,7 @@ def test_scaling_updates(setup): Tests that new instances should be launched due to autoscaling decisions, and existing instances should be terminated if needed. """ - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -1185,6 +1210,7 @@ def test_scaling_updates(setup): instance_manager=instance_manager, scheduler=mock_scheduler, cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1212,7 +1238,7 @@ def test_scaling_updates(setup): @staticmethod def test_terminating_instances(setup): - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup instances = [ create_instance( @@ -1244,6 +1270,7 @@ def test_terminating_instances(setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1266,7 +1293,7 @@ def test_terminating_instances(setup): [True, False], ) def test_ray_install(disable_node_updaters, cloud_instance_running, setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup instances = [ create_instance( @@ -1290,6 +1317,7 @@ def test_ray_install(disable_node_updaters, cloud_instance_running, setup): instance_manager=instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1310,7 +1338,7 @@ def test_ray_install(disable_node_updaters, cloud_instance_running, setup): @staticmethod @mock.patch("time.time_ns") def test_autoscaler_state(mock_time_ns, setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup mock_time_ns.return_value = 5 instances = [ @@ -1403,6 +1431,7 @@ def test_autoscaler_state(mock_time_ns, setup): instance_manager=instance_manager, scheduler=mock_scheduler, cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState( cluster_resource_state_version=1, ), @@ -1436,7 +1465,7 @@ def test_extra_cloud_instances_cloud_provider(setup): """ Test that extra cloud instances should be terminated. """ - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -1467,6 +1496,7 @@ def test_extra_cloud_instances_cloud_provider(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1491,7 +1521,7 @@ def test_cloud_instance_reboot(setup): """ Test that the case of booting up a previous stopped cloud instance. """ - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -1521,6 +1551,7 @@ def test_cloud_instance_reboot(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1542,7 +1573,7 @@ def test_ray_node_restarted_on_the_same_cloud_instance(setup): """ Test that the case of reusing cloud instances. """ - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup im_instances = [ create_instance( @@ -1578,6 +1609,7 @@ def test_ray_node_restarted_on_the_same_cloud_instance(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1610,7 +1642,7 @@ def test_ray_head_restarted_on_the_same_cloud_instance(setup): """ Test that the case of restarting Head node with GCS FT. """ - instance_manager, instance_storage, subscriber = setup + instance_manager, instance_storage, subscriber, cloud_resource_monitor = setup ray_nodes = [ NodeState( @@ -1636,6 +1668,7 @@ def test_ray_head_restarted_on_the_same_cloud_instance(setup): instance_manager, scheduler=MockScheduler(), cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState(node_states=ray_nodes), non_terminated_cloud_instances=cloud_instances, cloud_provider_errors=[], @@ -1672,7 +1705,7 @@ def test_ray_head_restarted_on_the_same_cloud_instance(setup): @staticmethod def test_reconcile_max_worker_nodes_limit_triggers_termination(setup): - instance_manager, instance_storage, _ = setup + instance_manager, instance_storage, _, cloud_resource_monitor = setup instances = [ create_instance( @@ -1734,6 +1767,7 @@ def test_reconcile_max_worker_nodes_limit_triggers_termination(setup): instance_manager=instance_manager, scheduler=mock_scheduler, cloud_provider=MagicMock(), + cloud_resource_monitor=cloud_resource_monitor, ray_cluster_resource_state=ClusterResourceState( node_states=ray_nodes, cluster_resource_state_version=1, diff --git a/python/ray/autoscaler/v2/tests/test_scheduler.py b/python/ray/autoscaler/v2/tests/test_scheduler.py index a1ca0a9f0944..4533c7368b11 100644 --- a/python/ray/autoscaler/v2/tests/test_scheduler.py +++ b/python/ray/autoscaler/v2/tests/test_scheduler.py @@ -52,6 +52,7 @@ def sched_request( instances: Optional[List[AutoscalerInstance]] = None, idle_timeout_s: Optional[float] = None, disable_launch_config_check: Optional[bool] = False, + cloud_resource_availabilities: Optional[Dict[NodeType, float]] = None, ) -> SchedulingRequest: if resource_requests is None: @@ -62,6 +63,8 @@ def sched_request( cluster_resource_constraints = [] if instances is None: instances = [] + if cloud_resource_availabilities is None: + cloud_resource_availabilities = {} return SchedulingRequest( resource_requests=ResourceRequestUtil.group_by_count(resource_requests), @@ -84,6 +87,7 @@ def sched_request( max_num_nodes=max_num_nodes, idle_timeout_s=idle_timeout_s, disable_launch_config_check=disable_launch_config_check, + cloud_resource_availabilities=cloud_resource_availabilities, ) @@ -104,6 +108,7 @@ def schedule( resource_requests: List[Dict], anti_affinity: bool = False, max_nodes: Optional[int] = None, + cloud_resource_availabilities: Optional[Dict[NodeType, float]] = None, ) -> SchedulingReply: ANTI_AFFINITY = ResourceRequestUtil.PlacementConstraintType.ANTI_AFFINITY @@ -142,6 +147,7 @@ def schedule( gang_resource_requests=gang_requests, max_num_nodes=max_nodes, instances=instances, + cloud_resource_availabilities=cloud_resource_availabilities, ) else: request = sched_request( @@ -149,6 +155,7 @@ def schedule( resource_requests=[ResourceRequestUtil.make(r) for r in resource_requests], instances=instances, max_num_nodes=max_nodes, + cloud_resource_availabilities=cloud_resource_availabilities, ) return ResourceDemandScheduler(event_logger).schedule(request) @@ -177,6 +184,7 @@ def test_is_schedulable(): Instance.ALLOCATION_FAILED, Instance.RAY_INSTALL_FAILED, Instance.TERMINATION_FAILED, + Instance.ALLOCATION_TIMEOUT, } for status in all_im_status: instance = make_autoscaler_instance( @@ -2638,6 +2646,98 @@ def test_pg_with_bundle_infeasible_label_selectors(): assert len(reply.infeasible_gang_resource_requests) == 1 +def test_get_nodes_with_resource_availabilities(): + node_type_configs = { + "type_gpu1": NodeTypeConfig( + name="type_gpu1", + resources={"CPU": 8, "GPU": 1, "gpu1": 1}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + "type_gpu2": NodeTypeConfig( + name="type_gpu2", + resources={"CPU": 8, "GPU": 1, "gpu2": 1}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + "type_gpu3": NodeTypeConfig( + name="type_gpu3", + resources={"CPU": 8, "GPU": 1, "gpu3": 1}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + "type_gpu4": NodeTypeConfig( + name="type_gpu4", + resources={"CPU": 1, "GPU": 1, "gpu4": 1}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + } + + def get_nodes_for( + resource_requests, + anti_affinity=False, + max_nodes: Optional[int] = None, + current_nodes: Optional[Dict] = None, + cloud_resource_availabilities=None, + ): + reply = schedule( + node_type_configs, + current_nodes or {}, + resource_requests, + anti_affinity=anti_affinity, + max_nodes=max_nodes, + cloud_resource_availabilities=cloud_resource_availabilities, + ) + to_launch, _ = _launch_and_terminate(reply) + infeasible = ResourceRequestUtil.to_resource_maps( + reply.infeasible_resource_requests + ) + return to_launch, infeasible + + # Pick the node type with the highest availability score when utilization scores are equal. + assert get_nodes_for( + [{"CPU": 8, "GPU": 1}], + cloud_resource_availabilities={ + "type_gpu1": 0.1, + "type_gpu2": 1, + "type_gpu3": 0.2, + }, + ) == ({"type_gpu2": 1}, []) + + # The availability score is set to 1 by default. + assert get_nodes_for( + [{"CPU": 8, "GPU": 1}], + cloud_resource_availabilities={"type_gpu2": 0.1, "type_gpu3": 0.2}, + ) == ({"type_gpu1": 1}, []) + + assert get_nodes_for( + [{"CPU": 8, "GPU": 1}] * 2, + cloud_resource_availabilities={ + "type_gpu1": 0.1, + "type_gpu2": 0.1, + "type_gpu3": 1, + }, + ) == ({"type_gpu3": 2}, []) + + # The utilization score is the first factor to be considered. + assert get_nodes_for([{"CPU": 1, "GPU": 1}], cloud_resource_availabilities={}) == ( + {"type_gpu4": 1}, + [], + ) + + # The utilization score is the first factor to be considered. + assert get_nodes_for( + [{"CPU": 1, "GPU": 1}], + cloud_resource_availabilities={ + "type_gpu1": 0.1, + "type_gpu2": 0.1, + "type_gpu3": 1, + "type_gpu4": 0.1, + }, + ) == ({"type_gpu4": 1}, []) + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/src/ray/protobuf/instance_manager.proto b/src/ray/protobuf/instance_manager.proto index 14dbf09f11ec..aa2c4882c943 100644 --- a/src/ray/protobuf/instance_manager.proto +++ b/src/ray/protobuf/instance_manager.proto @@ -84,6 +84,8 @@ message Instance { RAY_INSTALL_FAILED = 12; // The instance termination failed - follows from the TERMINATING state. TERMINATION_FAILED = 13; + // The instance is timeout for allocating - follows from the ALLOCATED state. + ALLOCATION_TIMEOUT = 14; } // an unique id for the instance that's generated by the // instance manager. This may be optional if @@ -145,7 +147,7 @@ message InstanceUpdateEvent { /// Additional metadata for the update event. - // The instance type for QUEUED/REQUESTED/ALLOCATED event. + // The instance type. optional string instance_type = 5; // Cloud instance id for ALLOCATED event. optional string cloud_instance_id = 6;