Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions release/ray_release/command_runner/anyscale_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import re
import shlex
import shutil
import tempfile
from typing import TYPE_CHECKING, Any, Dict, List, Optional

Expand All @@ -11,14 +12,15 @@
upload_working_dir_to_azure,
)
from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.command_runner.job_runner import JobRunner
from ray_release.command_runner.command_runner import CommandRunner
from ray_release.exception import (
FetchResultError,
JobBrokenError,
JobNoLogsError,
JobOutOfRetriesError,
JobTerminatedBeforeStartError,
JobTerminatedError,
LogsError,
PrepareCommandError,
PrepareCommandTimeout,
TestCommandError,
Expand Down Expand Up @@ -60,7 +62,7 @@ def _get_env_str(env: Dict[str, str]) -> str:
return env_str


class AnyscaleJobRunner(JobRunner):
class AnyscaleJobRunner(CommandRunner):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Changing the base class from JobRunner to CommandRunner introduces a critical issue. The wait_for_nodes method in this class calls super().wait_for_nodes(). Previously, this resolved to JobRunner.wait_for_nodes(). Now, it will resolve to CommandRunner.wait_for_nodes(), which raises a NotImplementedError.

This will break tests that require waiting for nodes. The implementation of wait_for_nodes from JobRunner should be merged into this class. Specifically, the super() call in AnyscaleJobRunner.wait_for_nodes should be replaced with the logic to schedule the wait_cluster.py script, like this:

def wait_for_nodes(self, num_nodes: int, timeout: float = 900):
    self._wait_for_nodes_timeout = timeout
    self.job_manager.cluster_startup_timeout += timeout
    self.run_prepare_command(
        f"python wait_cluster.py {num_nodes} {timeout}", timeout=timeout + 30
    )

def __init__(
self,
cluster_manager: ClusterManager,
Expand Down Expand Up @@ -114,9 +116,14 @@ def __init__(
self._artifact_path = artifact_path
self._artifact_uploaded = artifact_path is not None

def _copy_script_to_working_dir(self, script_name):
script = os.path.join(os.path.dirname(__file__), f"_{script_name}")
shutil.copy(script, script_name)

def prepare_remote_env(self):
self._copy_script_to_working_dir("anyscale_job_wrapper.py")
super().prepare_remote_env()
self._copy_script_to_working_dir("wait_cluster.py")
self._copy_script_to_working_dir("prometheus_metrics.py")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Cluster Node Waiting Logic Missing

The wait_for_nodes method calls super().wait_for_nodes() which raises NotImplementedError in the base CommandRunner class. The old JobRunner implementation added a prepare command to run wait_cluster.py, but this logic was lost during the merge. The method should call self.run_prepare_command(f"python wait_cluster.py {num_nodes} {timeout}", timeout=timeout + 30) instead of calling the parent method.

Fix in Cursor Fix in Web


def run_prepare_command(
self, command: str, env: Optional[Dict] = None, timeout: float = 3600.0
Expand Down Expand Up @@ -337,6 +344,12 @@ def run_command(

return time_taken

def get_last_logs_ex(self) -> Optional[str]:
try:
return self.job_manager.get_last_logs()
except Exception as e:
raise LogsError(f"Could not get last logs: {e}") from e

def _fetch_json(self, path: str) -> Dict[str, Any]:
try:
tmpfile = tempfile.mkstemp(suffix=".json")[1]
Expand Down
133 changes: 0 additions & 133 deletions release/ray_release/command_runner/job_runner.py

This file was deleted.

4 changes: 1 addition & 3 deletions release/ray_release/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from ray_release.cluster_manager.minimal import MinimalClusterManager
from ray_release.command_runner.anyscale_job_runner import AnyscaleJobRunner
from ray_release.command_runner.command_runner import CommandRunner
from ray_release.command_runner.job_runner import JobRunner
from ray_release.config import (
DEFAULT_AUTOSUSPEND_MINS,
DEFAULT_BUILD_TIMEOUT,
Expand Down Expand Up @@ -56,12 +55,11 @@
from ray_release.test import Test

type_str_to_command_runner = {
"job": JobRunner,
"job": AnyscaleJobRunner,
"anyscale_job": AnyscaleJobRunner,
Comment on lines +58 to 59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This change makes run_type="job" an alias for run_type="anyscale_job". This is a significant behavioral change, as "job" previously used JobRunner with FullClusterManager (which manages the cluster lifecycle), and will now use AnyscaleJobRunner with MinimalClusterManager (where the cluster is managed by the Anyscale Job service). If this is intended, consider removing the "job" run type to avoid confusion, as it is now redundant.

}

command_runner_to_cluster_manager = {
JobRunner: FullClusterManager,
AnyscaleJobRunner: MinimalClusterManager,
}

Expand Down