Skip to content

Conversation

@wxwmd
Copy link
Contributor

@wxwmd wxwmd commented Nov 14, 2025

Description

When the Autoscaler receives a resource request and decides which type of node to scale up,, only the UtilizationScore is considered (that is, Ray tries to avoid launching a large node for a small resource request, which would lead to resource waste). If multiple node types in the cluster have the same UtilizationScore, Ray always request for the same node type.

In Spot scenarios, cloud resources are dynamically changing. Therefore, we want the Autoscaler to be aware of cloud resource availability — if a certain node type becomes unavailable, the Autoscaler should be able to automatically switch to requesting other node types.

In this PR, I added the CloudResourceMonitor class, which records node types that have failed resource allocation, and in future scaling events, reduces the weight of these node types.

Related issues

Related to #49983
Fixes #53636 #39788 #39789

implementation details

  1. CloudResourceMonitor
    This is a subscriber of Instances. When a Instance get status of ALLOCATION_FAILED, CloudResourceMonitor record the node_type and set a lower its availability score.
  2. ResourceDemandScheduler
    This class determines how to select the best node_type to handle resource request. I modify the part of selecting the best node type:
# Sort the results by score.
results = sorted(
    results,
    key=lambda r: (
        r.score,
        cloud_resource_availabilities.get(r.node.node_type, 1),
        random.random()
    ),
    reverse=True
)

The sorting includes:
2.1. UtilizationScore: to maximize resource utilization.
2.2. Cloud resource availabilities: prioritize node types with the most available cloud resources, in order to minimize allocation failures.
2.3. Random number: to diversify resource requests rather than always requesting for the same type.

@wxwmd wxwmd requested a review from a team as a code owner November 14, 2025 07:19
cloud_resource_availabilities=cloud_resource_availabilities,
)

assert get_nodes_for([{"GPU": 1}], cloud_resource_availabilities={
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The Case of the Missing Return

The get_nodes_for helper function calls schedule() but doesn't return the result, causing it to return None. The test assertions then compare None with expected dictionaries, which will always fail. The function needs to return _launch_and_terminate(reply)[0] to extract and return the launch dictionary.

Fix in Cursor Fix in Web

if last_status:
last_unavailability_timestamp=(last_status.timestamp_ns) / 1000
else:
last_unavailability_timestamp = time.time()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Timestamp Unit Mismatch Causes Availability Errors

Unit mismatch in timestamp handling. Line 67 converts timestamp_ns (nanoseconds) to microseconds by dividing by 1000, but line 69 uses time.time() which returns seconds. This creates inconsistent units for last_unavailability_timestamp, causing incorrect availability score calculations in get_resource_availabilities(). Both should use the same unit, likely microseconds by changing line 69 to time.time() * 1e6.

Fix in Cursor Fix in Web

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a CloudResourceMonitor to make the autoscaler aware of cloud resource availability, which is a great improvement for environments with dynamic resource allocation like spot instances. The implementation correctly integrates this monitor with the instance manager and scheduler.

I've found a critical issue in the CloudResourceMonitor related to inconsistent time units, which would lead to incorrect availability scoring. I've also identified some issues in the new test case for the scheduler, where the test helper function is broken and some assertions are incorrect.

Once these issues are addressed, this PR should be in good shape.

for instance in instances.values():
last_status = InstanceUtil.get_last_status_transition(instance)
if last_status:
last_unavailability_timestamp=(last_status.timestamp_ns) / 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There's an inconsistency in the time units used for last_unavailability_timestamp. Here, last_status.timestamp_ns (in nanoseconds) is divided by 1000, resulting in microseconds. However, in the else block on line 69, time.time() is used, which returns seconds.

This will lead to incorrect availability scores because the timestamps are not comparable. All timestamps should be in the same unit. I suggest using seconds consistently.

Suggested change
last_unavailability_timestamp=(last_status.timestamp_ns) / 1000
last_unavailability_timestamp = last_status.timestamp_ns / 1e9

Comment on lines +2647 to +2705
def test_get_nodes_with_resource_availabilities():
node_type_configs = {
"type_gpu1": NodeTypeConfig(
name="type_gpu1",
resources={"CPU": 8, "GPU": 1, "gpu1": 1},
min_worker_nodes=0,
max_worker_nodes=10,
),
"type_gpu2": NodeTypeConfig(
name="type_gpu2",
resources={"CPU": 8, "GPU": 1, "gpu2": 1},
min_worker_nodes=0,
max_worker_nodes=10,
),
"type_gpu3": NodeTypeConfig(
name="type_gpu3",
resources={"CPU": 8, "GPU": 1, "gpu3": 1},
min_worker_nodes=0,
max_worker_nodes=10,
),
"type_cpu": NodeTypeConfig(
name="type_cpu",
resources={"CPU": 8},
min_worker_nodes=0,
max_worker_nodes=10,
),
}

def get_nodes_for(
resource_requests,
anti_affinity=False,
max_nodes: Optional[int] = None,
current_nodes: Optional[Dict] = None,
cloud_resource_availabilities=None,
):
reply = schedule(
node_type_configs,
current_nodes or {},
resource_requests,
anti_affinity=anti_affinity,
max_nodes=max_nodes,
cloud_resource_availabilities=cloud_resource_availabilities,
)

assert get_nodes_for([{"GPU": 1}], cloud_resource_availabilities={
"type_gpu1": 0.1, "type_gpu2": 1, "type_gpu3": 0.2
}) == {"type_gpu2": 1}

assert get_nodes_for([{"GPU": 1}], cloud_resource_availabilities={
"type_gpu2": 0.1, "type_gpu3": 0.2
}) == {"type_gpu1": 1}

assert get_nodes_for([{"GPU": 2}], cloud_resource_availabilities={
"type_gpu1": 0.1, "type_gpu2": 0.1, "type_gpu3": 1
}) == {"type_gpu2": 2}

assert (get_nodes_for([{"CPU": 4}], cloud_resource_availabilities={})
== {"type_cpu": 1})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This new test function test_get_nodes_with_resource_availabilities has a few issues:

  1. The helper function get_nodes_for does not have a return statement, so it returns None. The assertions on its return value will fail. It should probably return the nodes to be launched, like return _launch_and_terminate(reply)[0].

  2. The assertion on line 2699, assert get_nodes_for([{"GPU": 2}], ...) == {"type_gpu2": 2}, seems incorrect. The request is for a single bundle with 2 GPUs ([{"GPU": 2}]). None of the defined type_gpu* node types can satisfy this, as they each have only 1 GPU. This request should be infeasible, and no nodes should be launched. To request two nodes with one GPU each, the request should be [{"GPU": 1}, {"GPU": 1}].

  3. Even if the request was [{"GPU": 1}, {"GPU": 1}], the expected result {"type_gpu2": 2} seems wrong. Given the availability scores {"type_gpu1": 0.1, "type_gpu2": 0.1, "type_gpu3": 1}, the scheduler should prioritize type_gpu3 because it has the highest availability score. Therefore, the expected result should be {"type_gpu3": 2}.

Please review and correct the test logic. A corrected version of the get_nodes_for helper and the problematic assertion might look like this:

    def get_nodes_for(
        resource_requests,
        anti_affinity=False,
        max_nodes: Optional[int] = None,
        current_nodes: Optional[Dict] = None,
        cloud_resource_availabilities=None,
    ):
        reply = schedule(
            node_type_configs,
            current_nodes or {},
            resource_requests,
            anti_affinity=anti_affinity,
            max_nodes=max_nodes,
            cloud_resource_availabilities=cloud_resource_availabilities,
        )
        to_launch, _ = _launch_and_terminate(reply)
        return to_launch

    assert get_nodes_for([{"GPU": 1}], cloud_resource_availabilities={
        "type_gpu1": 0.1, "type_gpu2": 1, "type_gpu3": 0.2
    }) == {"type_gpu2": 1}

    assert get_nodes_for([{"GPU": 1}], cloud_resource_availabilities={
        "type_gpu2": 0.1, "type_gpu3": 0.2
    }) == {"type_gpu1": 1}

    # Corrected assertion
    assert get_nodes_for([{"GPU": 1}, {"GPU": 1}], cloud_resource_availabilities={
        "type_gpu1": 0.1, "type_gpu2": 0.1, "type_gpu3": 1
    }) == {"type_gpu3": 2}

    assert (get_nodes_for([{"CPU": 4}], cloud_resource_availabilities={})
            == {"type_cpu": 1})

@wxwmd wxwmd marked this pull request as draft November 14, 2025 07:26
@wxwmd wxwmd closed this Nov 14, 2025
@wxwmd wxwmd deleted the feature/autoscale_with_resource_availability branch November 14, 2025 07:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[core][autoscaler] Select different node types when a node type is unavailable

1 participant