diff --git a/scripts/reinforcement_learning/ray/hyperparameter_tuning/vision_cartpole_cfg.py b/scripts/reinforcement_learning/ray/hyperparameter_tuning/vision_cartpole_cfg.py index 0a6889d075b..b8a9b9cc433 100644 --- a/scripts/reinforcement_learning/ray/hyperparameter_tuning/vision_cartpole_cfg.py +++ b/scripts/reinforcement_learning/ray/hyperparameter_tuning/vision_cartpole_cfg.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: BSD-3-Clause import pathlib import sys +from typing import Any # Allow for import of items from the ray workflow. CUR_DIR = pathlib.Path(__file__).parent @@ -12,6 +13,7 @@ import util import vision_cfg from ray import tune +from ray.tune.stopper import Stopper class CartpoleRGBNoTuneJobCfg(vision_cfg.CameraJobCfg): @@ -47,3 +49,21 @@ def __init__(self, cfg: dict = {}): cfg = util.populate_isaac_ray_cfg_args(cfg) cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"]) super().__init__(cfg) + + +class CartpoleEarlyStopper(Stopper): + def __init__(self): + self._bad_trials = set() + + def __call__(self, trial_id: str, result: dict[str, Any]) -> bool: + iter = result.get("training_iteration", 0) + out_of_bounds = result.get("Episode/Episode_Termination/cart_out_of_bounds") + + # Mark the trial for stopping if conditions are met + if 20 <= iter and out_of_bounds is not None and out_of_bounds > 0.85: + self._bad_trials.add(trial_id) + + return trial_id in self._bad_trials + + def stop_all(self) -> bool: + return False # only stop individual trials diff --git a/scripts/reinforcement_learning/ray/tuner.py b/scripts/reinforcement_learning/ray/tuner.py index cc08fba1bfa..b1180948207 100644 --- a/scripts/reinforcement_learning/ray/tuner.py +++ b/scripts/reinforcement_learning/ray/tuner.py @@ -5,6 +5,7 @@ import argparse import importlib.util import os +import random import subprocess import sys from time import sleep, time @@ -12,8 +13,10 @@ import ray import util from ray import air, tune +from ray.tune import Callback from ray.tune.search.optuna import OptunaSearch from ray.tune.search.repeater import Repeater +from ray.tune.stopper import CombinedStopper """ This script breaks down an aggregate tuning job, as defined by a hyperparameter sweep configuration, @@ -60,7 +63,7 @@ NUM_WORKERS_PER_NODE = 1 # needed for local parallelism PROCESS_RESPONSE_TIMEOUT = 200.0 # seconds to wait before killing the process when it stops responding MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS = 1000 # maximum number of lines to read from the training process logs -MAX_LOG_EXTRACTION_ERRORS = 2 # maximum allowed LogExtractionErrors before we abort the whole training +MAX_LOG_EXTRACTION_ERRORS = 10 # maximum allowed LogExtractionErrors before we abort the whole training class IsaacLabTuneTrainable(tune.Trainable): @@ -203,13 +206,38 @@ def stop_all(self): return False -def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None: +class ProcessCleanupCallback(Callback): + """Callback to clean up processes when trials are stopped.""" + + def on_trial_error(self, iteration, trials, trial, error, **info): + """Called when a trial encounters an error.""" + self._cleanup_trial(trial) + + def on_trial_complete(self, iteration, trials, trial, **info): + """Called when a trial completes.""" + self._cleanup_trial(trial) + + def _cleanup_trial(self, trial): + """Clean up processes for a trial using SIGKILL.""" + try: + subprocess.run(["pkill", "-9", "-f", f"rid {trial.config['runner_args']['-rid']}"], check=False) + sleep(5) + except Exception as e: + print(f"[ERROR]: Failed to cleanup trial {trial.trial_id}: {e}") + + +def invoke_tuning_run( + cfg: dict, + args: argparse.Namespace, + stopper: tune.Stopper | None = None, +) -> None: """Invoke an Isaac-Ray tuning run. Log either to a local directory or to MLFlow. Args: cfg: Configuration dictionary extracted from job setup args: Command-line arguments related to tuning. + stopper: Custom stopper, optional. """ # Allow for early exit os.environ["TUNE_DISABLE_STRICT_METRIC_CHECKING"] = "1" @@ -237,16 +265,23 @@ def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None: ) repeat_search = Repeater(searcher, repeat=args.repeat_run_count) + # Configure the stoppers + stoppers: CombinedStopper = CombinedStopper(*[ + LogExtractionErrorStopper(max_errors=MAX_LOG_EXTRACTION_ERRORS), + *([stopper] if stopper is not None else []), + ]) + if args.run_mode == "local": # Standard config, to file run_config = air.RunConfig( storage_path="/tmp/ray", name=f"IsaacRay-{args.cfg_class}-tune", + callbacks=[ProcessCleanupCallback()], verbose=1, checkpoint_config=air.CheckpointConfig( checkpoint_frequency=0, # Disable periodic checkpointing checkpoint_at_end=False, # Disable final checkpoint ), - stop=LogExtractionErrorStopper(max_errors=MAX_LOG_EXTRACTION_ERRORS), + stop=stoppers, ) elif args.run_mode == "remote": # MLFlow, to MLFlow server @@ -260,13 +295,14 @@ def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None: run_config = ray.train.RunConfig( name="mlflow", storage_path="/tmp/ray", - callbacks=[mlflow_callback], + callbacks=[ProcessCleanupCallback(), mlflow_callback], checkpoint_config=ray.train.CheckpointConfig(checkpoint_frequency=0, checkpoint_at_end=False), - stop=LogExtractionErrorStopper(max_errors=MAX_LOG_EXTRACTION_ERRORS), + stop=stoppers, ) else: raise ValueError("Unrecognized run mode.") - + # RID isn't optimized as it is sampled from, but useful for cleanup later + cfg["runner_args"]["-rid"] = tune.sample_from(lambda _: str(random.randint(int(1e9), int(1e10) - 1))) # Configure the tuning job tuner = tune.Tuner( IsaacLabTuneTrainable, @@ -399,6 +435,12 @@ def __init__(self, cfg: dict): default=MAX_LOG_EXTRACTION_ERRORS, help="Max number number of LogExtractionError failures before we abort the whole tuning run.", ) + parser.add_argument( + "--stopper", + type=str, + default=None, + help="A stop criteria in the cfg_file, must be a tune.Stopper instance.", + ) args = parser.parse_args() PROCESS_RESPONSE_TIMEOUT = args.process_response_timeout @@ -457,7 +499,16 @@ def __init__(self, cfg: dict): print(f"[INFO]: Successfully instantiated class '{class_name}' from {file_path}") cfg = instance.cfg print(f"[INFO]: Grabbed the following hyperparameter sweep config: \n {cfg}") - invoke_tuning_run(cfg, args) + # Load optional stopper config + stopper = None + if args.stopper and hasattr(module, args.stopper): + stopper = getattr(module, args.stopper) + if isinstance(stopper, type) and issubclass(stopper, tune.Stopper): + stopper = stopper() + else: + raise TypeError(f"[ERROR]: Unsupported stop criteria type: {type(stopper)}") + print(f"[INFO]: Loaded custom stop criteria from '{args.stopper}'") + invoke_tuning_run(cfg, args, stopper=stopper) else: raise AttributeError(f"[ERROR]:Class '{class_name}' not found in {file_path}") diff --git a/scripts/reinforcement_learning/ray/util.py b/scripts/reinforcement_learning/ray/util.py index 26a52a90aba..31a5bfff26c 100644 --- a/scripts/reinforcement_learning/ray/util.py +++ b/scripts/reinforcement_learning/ray/util.py @@ -71,7 +71,7 @@ def process_args(args, target_list, is_hydra=False): if not is_hydra: if key.endswith("_singleton"): target_list.append(value) - elif key.startswith("--"): + elif key.startswith("--") or key.startswith("-"): target_list.append(f"{key} {value}") # Space instead of = for runner args else: target_list.append(f"{value}") diff --git a/scripts/reinforcement_learning/rl_games/train.py b/scripts/reinforcement_learning/rl_games/train.py index 634e5975676..d864bc5db11 100644 --- a/scripts/reinforcement_learning/rl_games/train.py +++ b/scripts/reinforcement_learning/rl_games/train.py @@ -42,6 +42,9 @@ help="if toggled, this experiment will be tracked with Weights and Biases", ) parser.add_argument("--export_io_descriptors", action="store_true", default=False, help="Export IO descriptors.") +parser.add_argument( + "--ray-proc-id", "-rid", type=int, default=None, help="Automatically configured by Ray integration, otherwise None." +) # append AppLauncher cli args AppLauncher.add_app_launcher_args(parser) # parse the arguments diff --git a/scripts/reinforcement_learning/rsl_rl/train.py b/scripts/reinforcement_learning/rsl_rl/train.py index 8b66feb28aa..0b739f0e497 100644 --- a/scripts/reinforcement_learning/rsl_rl/train.py +++ b/scripts/reinforcement_learning/rsl_rl/train.py @@ -31,6 +31,9 @@ "--distributed", action="store_true", default=False, help="Run training with multiple GPUs or nodes." ) parser.add_argument("--export_io_descriptors", action="store_true", default=False, help="Export IO descriptors.") +parser.add_argument( + "--ray-proc-id", "-rid", type=int, default=None, help="Automatically configured by Ray integration, otherwise None." +) # append RSL-RL cli arguments cli_args.add_rsl_rl_args(parser) # append AppLauncher cli args diff --git a/scripts/reinforcement_learning/sb3/train.py b/scripts/reinforcement_learning/sb3/train.py index be43b3b8ac8..419fd202766 100644 --- a/scripts/reinforcement_learning/sb3/train.py +++ b/scripts/reinforcement_learning/sb3/train.py @@ -37,6 +37,9 @@ default=False, help="Use a slower SB3 wrapper but keep all the extra training info.", ) +parser.add_argument( + "--ray-proc-id", "-rid", type=int, default=None, help="Automatically configured by Ray integration, otherwise None." +) # append AppLauncher cli args AppLauncher.add_app_launcher_args(parser) # parse the arguments diff --git a/scripts/reinforcement_learning/skrl/train.py b/scripts/reinforcement_learning/skrl/train.py index d73a2a40262..ec0131c261b 100644 --- a/scripts/reinforcement_learning/skrl/train.py +++ b/scripts/reinforcement_learning/skrl/train.py @@ -54,7 +54,9 @@ choices=["AMP", "PPO", "IPPO", "MAPPO"], help="The RL algorithm used for training the skrl agent.", ) - +parser.add_argument( + "--ray-proc-id", "-rid", type=int, default=None, help="Automatically configured by Ray integration, otherwise None." +) # append AppLauncher cli args AppLauncher.add_app_launcher_args(parser) # parse the arguments