Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions release/ray_release/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
from ray_release.buildkite.output import buildkite_group, buildkite_open_last
from ray_release.cloud_util import archive_directory
from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.cluster_manager.full import FullClusterManager
from ray_release.cluster_manager.minimal import MinimalClusterManager
from ray_release.command_runner.anyscale_job_runner import AnyscaleJobRunner
from ray_release.command_runner.command_runner import CommandRunner
from ray_release.command_runner.job_runner import JobRunner
from ray_release.config import (
DEFAULT_AUTOSUSPEND_MINS,
DEFAULT_BUILD_TIMEOUT,
Expand All @@ -48,20 +46,17 @@
from ray_release.reporter.reporter import Reporter
from ray_release.result import Result, ResultStatus, handle_exception
from ray_release.signal_handling import (
register_handler,
reset_signal_handling,
setup_signal_handling,
)
from ray_release.template import get_working_dir, load_test_cluster_compute
from ray_release.test import Test

type_str_to_command_runner = {
"job": JobRunner,
"anyscale_job": AnyscaleJobRunner,
}

command_runner_to_cluster_manager = {
JobRunner: FullClusterManager,
AnyscaleJobRunner: MinimalClusterManager,
}
Comment on lines 55 to 61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Now that JobRunner has been removed, these dictionaries only contain a single entry. This adds a layer of indirection that is no longer necessary. Consider removing these dictionaries and directly using AnyscaleJobRunner and MinimalClusterManager in _load_test_configuration. This would make the control flow more straightforward.

For example, you could refactor the logic in _load_test_configuration to something like this:

run_type = test["run"].get("type", DEFAULT_RUN_TYPE)

if run_type == "anyscale_job":
    command_runner_cls = AnyscaleJobRunner
    cluster_manager_cls = MinimalClusterManager
else:
    raise ReleaseTestConfigError(
        f"Unknown command runner type: {run_type}. Must be one of "
        f"['anyscale_job']"
    )


Expand Down Expand Up @@ -238,12 +233,6 @@ def _local_environment_information(
cluster_id: Optional[str],
cluster_env_id: Optional[str],
) -> None:
if isinstance(cluster_manager, FullClusterManager):
if not no_terminate:
register_handler(
lambda sig, frame: cluster_manager.terminate_cluster(wait=True)
)

# Start cluster
if cluster_id:
buildkite_group(":rocket: Using existing cluster")
Expand All @@ -258,10 +247,7 @@ def _local_environment_information(

cluster_manager.build_configs(timeout=build_timeout)

if isinstance(cluster_manager, FullClusterManager):
buildkite_group(":rocket: Starting up cluster")
cluster_manager.start_cluster(timeout=cluster_timeout)
elif isinstance(command_runner, AnyscaleJobRunner):
if isinstance(command_runner, AnyscaleJobRunner):
command_runner.job_manager.cluster_startup_timeout = cluster_timeout
Comment on lines +250 to 251
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since AnyscaleJobRunner is now the only command runner used in this path, the isinstance check is redundant. You can simplify the code by removing the conditional statement.

        command_runner.job_manager.cluster_startup_timeout = cluster_timeout


result.cluster_url = cluster_manager.get_cluster_url()
Expand Down
37 changes: 0 additions & 37 deletions release/ray_release/tests/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
from ray_release.cluster_manager.full import FullClusterManager
from ray_release.command_runner.command_runner import CommandRunner
from ray_release.exception import (
ClusterCreationError,
ClusterNodesWaitTimeout,
ClusterStartupError,
ClusterStartupTimeout,
CommandError,
CommandTimeout,
ExitCode,
Expand Down Expand Up @@ -321,40 +318,6 @@ def testInvalidClusterCompute(self):
self._run(result, True)
self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)

def testStartClusterFails(self):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster lifecycle management is on anyscale now.

result = Result()

self._succeed_until("cluster_env")

# Fails because API response faulty
with self.assertRaises(ClusterCreationError):
self._run(result)
self.assertEqual(result.return_code, ExitCode.CLUSTER_RESOURCE_ERROR.value)

self.cluster_manager_return["cluster_id"] = "valid"

# Fail for random cluster startup reason
self.cluster_manager_return["start_cluster"] = _fail_on_call(
ClusterStartupError
)
with self.assertRaises(ClusterStartupError):
self._run(result)
self.assertEqual(result.return_code, ExitCode.CLUSTER_STARTUP_ERROR.value)

# Ensure cluster was terminated
self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)

# Fail for cluster startup timeout
self.cluster_manager_return["start_cluster"] = _fail_on_call(
ClusterStartupTimeout
)
with self.assertRaises(ClusterStartupTimeout):
self._run(result)
self.assertEqual(result.return_code, ExitCode.CLUSTER_STARTUP_TIMEOUT.value)

# Ensure cluster was terminated
self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)

def testPrepareRemoteEnvFails(self):
result = Result()

Expand Down