diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f5073e7d0..49ec54ff1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,9 +10,17 @@ Copy and pasting the git commit messages is __NOT__ enough. ## [Unreleased] ### Added +- Added `rllib/pg_example.py` to demonstrate a simple integration with `RLlib` and `tensorflow` for policy training. +- Added `rllib/pg_pbt_example.py` to demonstrate integration with `ray.RLlib`, `tensorflow`, and `ray.tune` for scheduled policy training. ### Changed +- Updated `smarts[ray]` (`ray==2.2`) and `smarts[rllib]` (`ray[rllib]==1.4`) to use `ray~=2.5`. +- Introduced `tensorflow-probability` to `smarts[rllib]`. +- Updated `RLlibHiWayEnv` to use the `gymnasium` interface. +- Renamed `rllib/rllib.py` to `rllib/pg_pbt_example.py`. +- Loosened constraint of `gymnasium` from `==0.27.0` to `>=0.26.3`. ### Deprecated ### Fixed +- Missing neighborhood vehicle state `'lane_id'` is now added to the `hiway-v1` formatted observations. - Fixed a regression where `pybullet` build time messages returned. ### Removed ### Security diff --git a/README.md b/README.md index 3857ca5090..6067a0eabb 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,8 @@ Several agent control policies and agent [action types](smarts/core/controllers/ ### RL Model 1. [Drive](examples/rl/drive). See [Driving SMARTS 2023.1 & 2023.2](https://smarts.readthedocs.io/en/latest/benchmarks/driving_smarts_2023_1.html) for more info. 1. [VehicleFollowing](examples/rl/platoon). See [Driving SMARTS 2023.3](https://smarts.readthedocs.io/en/latest/benchmarks/driving_smarts_2023_3.html) for more info. +1. [PG](examples/rl/rllib/pg_example.py). See [RLlib](https://smarts.readthedocs.io/en/latest/docs/ecosystem/rllib.html) for more info. +1. [PG Population Based Training](examples/rl/rllib/pg_pbt_example.py). See [RLlib](https://smarts.readthedocs.io/en/latest/docs/ecosystem/rllib.html) for more info. ### RL Environment 1. [ULTRA](https://github.com/smarts-project/smarts-project.rl/blob/master/ultra) provides a gym-based environment built upon SMARTS to tackle intersection navigation, specifically the unprotected left turn. diff --git a/docs/ecosystem/rllib.rst b/docs/ecosystem/rllib.rst index 2a49341ae8..0b09585d4a 100644 --- a/docs/ecosystem/rllib.rst +++ b/docs/ecosystem/rllib.rst @@ -8,6 +8,13 @@ RLlib of applications. ``RLlib`` natively supports ``TensorFlow``, ``TensorFlow Eager``, and ``PyTorch``. Most of its internals are agnostic to such deep learning frameworks. +SMARTS contains two examples using `Policy Gradients (PG) `_. + +1. ``rllib/pg_example.py`` +This example shows the basics of using RLlib with SMARTS through :class:`~smarts.env.rllib_hiway_env.RLlibHiWayEnv`. +1. ``rllib/pg_pbt_example.py`` +This example combines Policy Gradients with `Population Based Training (PBT) `_ scheduling. + Recommended reads ----------------- @@ -28,7 +35,7 @@ many docs about ``Ray`` and ``RLlib``. We recommend to read the following pages Resume training --------------- -With respect to ``SMARTS/examples/rl/rllib`` example, if you want to continue an aborted experiment, you can set ``resume=True`` in ``tune.run``. But note that ``resume=True`` will continue to use the same configuration as was set in the original experiment. +With respect to ``SMARTS/examples/rl/rllib`` examples, if you want to continue an aborted experiment, you can set ``resume_training=True``. But note that ``resume_training=True`` will continue to use the same configuration as was set in the original experiment. To make changes to a started experiment, you can edit the latest experiment file in ``./results``. -Or if you want to start a new experiment but train from an existing checkpoint, you can set ``restore=checkpoint_path`` in ``tune.run``. +Or if you want to start a new experiment but train from an existing checkpoint, you will need to look into `How to Save and Load Trial Checkpoints `_. diff --git a/docs/sim/env.rst b/docs/sim/env.rst index 5d6571830a..a3e4ac9cd1 100644 --- a/docs/sim/env.rst +++ b/docs/sim/env.rst @@ -9,7 +9,7 @@ Base environments SMARTS environment module is defined in :mod:`~smarts.env` package. Currently SMARTS provides two kinds of training environments, namely: -+ ``HiWayEnv`` utilizing ``gym.env`` style interface ++ ``HiWayEnv`` utilizing a ``gymnasium.Env`` interface + ``RLlibHiwayEnv`` customized for `RLlib `_ training .. image:: ../_static/env.png @@ -17,11 +17,12 @@ environments, namely: HiWayEnv ^^^^^^^^ -``HiWayEnv`` inherits class ``gym.Env`` and supports gym APIs like ``reset``, ``step``, ``close``. An usage example is shown below. +``HiWayEnv`` inherits class ``gymnasium.Env`` and supports gym APIs like ``reset``, ``step``, ``close``. An usage example is shown below. Refer to :class:`~smarts.env.hiway_env.HiWayEnv` for more details. .. code-block:: python + import gymnasium as gym # Make env env = gym.make( "smarts.env:hiway-v0", # Env entry name. @@ -53,6 +54,7 @@ exactly matches the `env.observation_space`, and `ObservationOptions.multi_agent .. code-block:: python + import gymnasium as gym # Make env env = gym.make( "smarts.env:hiway-v1", # Env entry name. @@ -81,6 +83,7 @@ This can be done with :class:`~smarts.env.gymnasium.wrappers.api_reversion.Api02 .. code-block:: python + import gymnasium as gym # Make env env = gym.make( "smarts.env:hiway-v1", # Env entry name. @@ -91,7 +94,7 @@ This can be done with :class:`~smarts.env.gymnasium.wrappers.api_reversion.Api02 RLlibHiwayEnv ^^^^^^^^^^^^^ -``RLlibHiwayEnv`` inherits class ``MultiAgentEnv``, which is defined in `RLlib `_. It also supports common env APIs like ``reset``, +``RLlibHiwayEnv`` inherits class ``MultiAgentEnv``, which is defined in `RLlib `_. It also supports common environment APIs like ``reset``, ``step``, ``close``. An usage example is shown below. Refer to :class:`~smarts.env.rllib_hiway_env.RLlibHiWayEnv` for more details. .. code-block:: python diff --git a/examples/rl/rllib/configs.py b/examples/rl/rllib/configs.py new file mode 100644 index 0000000000..fb7f1076a3 --- /dev/null +++ b/examples/rl/rllib/configs.py @@ -0,0 +1,69 @@ +import argparse +import multiprocessing +from pathlib import Path + + +def gen_parser(prog: str, default_result_dir: str) -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog) + parser.add_argument( + "scenarios", + help="A list of scenarios. Each element can be either the scenario to" + "run or a directory of scenarios to sample from. See `scenarios/`" + "folder for some samples you can use.", + type=str, + nargs="*", + ) + parser.add_argument( + "--envision", + action="store_true", + help="Run simulation with Envision display.", + ) + parser.add_argument( + "--train_batch_size", + type=int, + default=2000, + help="The training batch size. This value must be > 0.", + ) + parser.add_argument( + "--time_total_s", + type=int, + default=1 * 60 * 60, # 1 hour + help="Total time in seconds to run the simulation for. This is a rough end time as it will be checked per training batch.", + ) + parser.add_argument( + "--seed", + type=int, + default=42, + help="The base random seed to use, intended to be mixed with --num_samples", + ) + parser.add_argument( + "--num_agents", type=int, default=2, help="Number of agents (one per policy)" + ) + parser.add_argument( + "--num_workers", + type=int, + default=(multiprocessing.cpu_count() // 2 + 1), + help="Number of workers (defaults to use all system cores)", + ) + parser.add_argument( + "--resume_training", + default=False, + action="store_true", + help="Resume an errored or 'ctrl+c' cancelled training. This does not extend a fully run original experiment.", + ) + parser.add_argument( + "--result_dir", + type=str, + default=default_result_dir, + help="Directory containing results", + ) + parser.add_argument( + "--log_level", + type=str, + default="ERROR", + help="Log level (DEBUG|INFO|WARN|ERROR)", + ) + parser.add_argument( + "--checkpoint_freq", type=int, default=3, help="Checkpoint frequency" + ) + return parser diff --git a/examples/rl/rllib/model/README.md b/examples/rl/rllib/model/README.md index 646c9015c8..9afe8800ce 100644 --- a/examples/rl/rllib/model/README.md +++ b/examples/rl/rllib/model/README.md @@ -1,3 +1,3 @@ ## Model Binaries -The binaries located in this directory are the components of a trained rllib model. These are related to the `examples/rl/rllib/rllib.py` example script. Results from `examples/rl/rllib/rllib.py` are loaded and written to this directory. \ No newline at end of file +The binaries located in this directory are the components of a trained rllib model. These are related to the `examples/rl/rllib/pg_pbt_example.py` example script. Results from `examples/rl/rllib/pg_pbt_example.py` are loaded and written to this directory. \ No newline at end of file diff --git a/examples/rl/rllib/model/saved_model.pb b/examples/rl/rllib/model/saved_model.pb deleted file mode 100644 index 70bd2f2238..0000000000 Binary files a/examples/rl/rllib/model/saved_model.pb and /dev/null differ diff --git a/examples/rl/rllib/model/variables/variables.data-00000-of-00001 b/examples/rl/rllib/model/variables/variables.data-00000-of-00001 deleted file mode 100644 index 3c6df02c29..0000000000 Binary files a/examples/rl/rllib/model/variables/variables.data-00000-of-00001 and /dev/null differ diff --git a/examples/rl/rllib/model/variables/variables.index b/examples/rl/rllib/model/variables/variables.index deleted file mode 100644 index f46fdc3b9f..0000000000 Binary files a/examples/rl/rllib/model/variables/variables.index and /dev/null differ diff --git a/examples/rl/rllib/pg_example.py b/examples/rl/rllib/pg_example.py new file mode 100644 index 0000000000..81265321ff --- /dev/null +++ b/examples/rl/rllib/pg_example.py @@ -0,0 +1,218 @@ +from pathlib import Path +from pprint import pprint as print +from typing import Dict, Literal, Optional, Union + +import numpy as np + +try: + from ray.rllib.algorithms.algorithm import Algorithm, AlgorithmConfig + from ray.rllib.algorithms.callbacks import DefaultCallbacks + from ray.rllib.algorithms.pg import PGConfig + from ray.rllib.env.base_env import BaseEnv + from ray.rllib.evaluation.episode import Episode + from ray.rllib.evaluation.episode_v2 import EpisodeV2 + from ray.rllib.evaluation.rollout_worker import RolloutWorker + from ray.rllib.policy.policy import Policy + from ray.rllib.utils.typing import PolicyID +except Exception as e: + from smarts.core.utils.custom_exceptions import RayException + + raise RayException.required_to("rllib.py") + +import smarts +from smarts.env.rllib_hiway_env import RLlibHiWayEnv +from smarts.sstudio.scenario_construction import build_scenarios + +if __name__ == "__main__": + from configs import gen_parser + from rllib_agent import TrainingModel, rllib_agent +else: + from .configs import gen_parser + from .rllib_agent import TrainingModel, rllib_agent + +# Add custom metrics to your tensorboard using these callbacks +# See: https://ray.readthedocs.io/en/latest/rllib-training.html#callbacks-and-custom-metrics +class Callbacks(DefaultCallbacks): + @staticmethod + def on_episode_start( + worker: RolloutWorker, + base_env: BaseEnv, + policies: Dict[PolicyID, Policy], + episode: Union[Episode, EpisodeV2], + env_index: int, + **kwargs, + ): + + episode.user_data["ego_reward"] = [] + + @staticmethod + def on_episode_step( + worker: RolloutWorker, + base_env: BaseEnv, + episode: Union[Episode, EpisodeV2], + env_index: int, + **kwargs, + ): + single_agent_id = list(episode.get_agents())[0] + infos = episode._last_infos.get(single_agent_id) + if infos is not None: + episode.user_data["ego_reward"].append(infos["reward"]) + + @staticmethod + def on_episode_end( + worker: RolloutWorker, + base_env: BaseEnv, + policies: Dict[PolicyID, Policy], + episode: Union[Episode, EpisodeV2], + env_index: int, + **kwargs, + ): + + mean_ego_speed = np.mean(episode.user_data["ego_reward"]) + print( + f"ep. {episode.episode_id:<12} ended;" + f" length={episode.length:<6}" + f" mean_ego_reward={mean_ego_speed:.2f}" + ) + episode.custom_metrics["mean_ego_reward"] = mean_ego_speed + + +def main( + scenarios, + envision, + time_total_s, + rollout_fragment_length, + train_batch_size, + seed, + num_agents, + num_workers, + resume_training, + result_dir, + checkpoint_freq: int, + checkpoint_num: Optional[int], + log_level: Literal["DEBUG", "INFO", "WARN", "ERROR"], +): + rllib_policies = { + f"AGENT-{i}": ( + None, + rllib_agent["observation_space"], + rllib_agent["action_space"], + {"model": {"custom_model": TrainingModel.NAME}}, + ) + for i in range(num_agents) + } + agent_specs = {f"AGENT-{i}": rllib_agent["agent_spec"] for i in range(num_agents)} + + smarts.core.seed(seed) + assert len(set(rllib_policies.keys()).difference(agent_specs)) == 0 + algo_config: AlgorithmConfig = ( + PGConfig() + .environment( + env=RLlibHiWayEnv, + env_config={ + "seed": seed, + "scenarios": [ + str(Path(scenario).expanduser().resolve().absolute()) + for scenario in scenarios + ], + "headless": not envision, + "agent_specs": agent_specs, + "observation_options": "multi_agent", + }, + disable_env_checking=True, + ) + .framework(framework="tf2", eager_tracing=True) + .rollouts( + rollout_fragment_length=rollout_fragment_length, + num_rollout_workers=num_workers, + num_envs_per_worker=1, + enable_tf1_exec_eagerly=True, + ) + .training( + lr_schedule=[(0, 1e-3), (1e3, 5e-4), (1e5, 1e-4), (1e7, 5e-5), (1e8, 1e-5)], + train_batch_size=train_batch_size, + ) + .multi_agent( + policies=rllib_policies, + policy_mapping_fn=lambda agent_id, episode, worker, **kwargs: f"{agent_id}", + ) + .callbacks(callbacks_class=Callbacks) + .debugging(log_level=log_level) + ) + + def get_checkpoint_dir(num): + checkpoint_dir = Path(result_dir) / f"checkpoint_{num}" / f"checkpoint-{num}" + checkpoint_dir.mkdir(parents=True, exist_ok=True) + return checkpoint_dir + + if resume_training: + checkpoint = str(get_checkpoint_dir("latest")) + if checkpoint_num: + checkpoint = str(get_checkpoint_dir(checkpoint_num)) + else: + checkpoint = None + + print(f"======= Checkpointing at {str(result_dir)} =======") + + algo = algo_config.build() + if checkpoint is not None: + algo.load_checkpoint(checkpoint=checkpoint) + result = {} + current_iteration = 0 + checkpoint_iteration = checkpoint_num or 0 + + try: + while result.get("time_total_s", 0) < time_total_s: + result = algo.train() + print(f"======== Iteration {result['training_iteration']} ========") + print(result, depth=1) + + if current_iteration % checkpoint_freq == 0: + checkpoint_dir = get_checkpoint_dir(checkpoint_iteration) + print(f"======= Saving checkpoint {checkpoint_iteration} =======") + algo.save_checkpoint(checkpoint_dir) + checkpoint_iteration += 1 + current_iteration += 1 + algo.save_checkpoint(get_checkpoint_dir(checkpoint_iteration)) + finally: + algo.save_checkpoint(get_checkpoint_dir("latest")) + algo.stop() + + +if __name__ == "__main__": + default_result_dir = str(Path(__file__).resolve().parent / "results" / "pg_results") + parser = gen_parser("rllib-example", default_result_dir) + parser.add_argument( + "--checkpoint_num", + type=int, + default=None, + help="The checkpoint number to restart from.", + ) + parser.add_argument( + "--rollout_fragment_length", + type=str, + default="auto", + help="Episodes are divided into fragments of this many steps for each rollout. In this example this will be ensured to be `1= 0, f"{train_batch_size.__name__} cannot be less than 1." + if isinstance(rollout_fragment_length, str) and rollout_fragment_length != "auto": + rollout_fragment_length = int(rollout_fragment_length) + if ( + isinstance(rollout_fragment_length, int) + and rollout_fragment_length > train_batch_size + ): + rollout_fragment_length = train_batch_size + + rllib_policies = { + f"AGENT-{i}": ( + None, + rllib_agent["observation_space"], + rllib_agent["action_space"], + {"model": {"custom_model": TrainingModel.NAME}}, + ) + for i in range(num_agents) + } + agent_specs = {f"AGENT-{i}": rllib_agent["agent_spec"] for i in range(num_agents)} + + smarts.core.seed(seed) + assert len(set(rllib_policies.keys()).difference(agent_specs)) == 0 + algo_config: AlgorithmConfig = ( + PGConfig() + .environment( + env=RLlibHiWayEnv, + env_config={ + "seed": seed, + "scenarios": [ + str(Path(scenario).expanduser().resolve().absolute()) + for scenario in scenarios + ], + "headless": not envision, + "agent_specs": agent_specs, + "observation_options": "multi_agent", + }, + disable_env_checking=True, + ) + .framework(framework="tf2", eager_tracing=True) + .rollouts( + rollout_fragment_length=rollout_fragment_length, + num_rollout_workers=num_workers, + num_envs_per_worker=1, + enable_tf1_exec_eagerly=True, + ) + .training( + lr_schedule=[(0, 1e-3), (1e3, 5e-4), (1e5, 1e-4), (1e7, 5e-5), (1e8, 1e-5)], + train_batch_size=train_batch_size, + ) + .multi_agent( + policies=rllib_policies, + policy_mapping_fn=lambda agent_id, episode, worker, **kwargs: f"{agent_id}", + ) + .callbacks(callbacks_class=AlgorithmCallbacks) + .debugging(log_level=log_level) + ) + + experiment_name = "rllib_example_multi" + result_dir = Path(result_dir).expanduser().resolve().absolute() + experiment_dir = result_dir / experiment_name + + print(f"======= Checkpointing at {str(result_dir)} =======") + # Note that PBT modifies the hyperparameters during the run. This perturbation can be applied + # to nearly anything passed to `Tuner(param_space=)`. + pbt = PopulationBasedTraining( + time_attr="time_total_s", + metric="episode_reward_mean", + mode="max", + perturbation_interval=20, + resample_probability=0.25, + # Specifies the mutations of these hyperparams + hyperparam_mutations={ + "lr": [1e-3, 5e-4, 1e-4, 5e-5, 1e-5], + "train_batch_size": lambda: train_batch_size, + }, + # Specifies additional mutations after hyperparam_mutations is applied + custom_explore_fn=explore, + ) + + from ray import air + + run_config = air.RunConfig( + name=experiment_name, + stop={"time_total_s": time_total_s}, + callbacks=[ExperimentCallback()], + storage_path=str(result_dir), + checkpoint_config=air.CheckpointConfig( + num_to_keep=3, + checkpoint_frequency=checkpoint_freq, + checkpoint_at_end=True, + ), + failure_config=air.FailureConfig( + max_failures=3, + fail_fast=False, + ), + ) + tune_config = tune.TuneConfig( + num_samples=num_samples, + scheduler=pbt, + max_concurrent_trials=4, + ) + trainable = "PG" + if resume_training: + tuner = tune.Tuner.restore( + str(experiment_dir), + trainable=trainable, + param_space=algo_config, + ) + else: + tuner = tune.Tuner( + trainable=trainable, + param_space=algo_config, + tune_config=tune_config, + run_config=run_config, + ) + + results = tuner.fit() + # Get the best result based on a particular metric. + best_result = results.get_best_result(metric="episode_reward_mean", mode="max") + + # Get the best checkpoint corresponding to the best result. + best_checkpoint = best_result.checkpoint + + best_logdir = Path(best_result.log_dir) + model_path = best_logdir + + copy_tree(str(model_path), save_model_path, overwrite=True) + print(f"Wrote model to: {save_model_path}") + + +if __name__ == "__main__": + default_result_dir = str( + Path(__file__).resolve().parent / "results" / "tune_pg_results" + ) + parser = gen_parser("rllib-example", default_result_dir) + parser.add_argument( + "--num_samples", + type=int, + default=2, + help="Number of times to sample from hyperparameter space.", + ) + parser.add_argument( + "--save_model_path", + type=str, + default=str(Path(__file__).expanduser().resolve().parent / "model"), + help="Destination path of where to copy the model when training is over", + ) + parser.add_argument( + "--rollout_fragment_length", + type=str, + default="auto", + help="Episodes are divided into fragments of this many steps for each rollout. In this example this will be ensured to be `1= 0, f"{train_batch_size.__name__} cannot be less than 1." - if rollout_fragment_length > train_batch_size: - rollout_fragment_length = train_batch_size - - pbt = PopulationBasedTraining( - time_attr="time_total_s", - metric="episode_reward_mean", - mode="max", - perturbation_interval=300, - resample_probability=0.25, - # Specifies the mutations of these hyperparams - # See: `ray.rllib.agents.trainer.COMMON_CONFIG` for common hyperparams - hyperparam_mutations={ - "lr": [1e-3, 5e-4, 1e-4, 5e-5, 1e-5], - "rollout_fragment_length": lambda: rollout_fragment_length, - "train_batch_size": lambda: train_batch_size, - }, - # Specifies additional mutations after hyperparam_mutations is applied - custom_explore_fn=explore, - ) - - # XXX: There is a bug in Ray where we can only export a trained model if - # the policy it's attached to is named 'default_policy'. - # See: https://github.com/ray-project/ray/issues/5339 - rllib_policies = { - "default_policy": ( - None, - rllib_agent["observation_space"], - rllib_agent["action_space"], - {"model": {"custom_model": TrainingModel.NAME}}, - ) - } - - smarts.core.seed(seed) - tune_config = { - "env": RLlibHiWayEnv, - "log_level": "WARN", - "num_workers": num_workers, - "env_config": { - "seed": tune.sample_from(lambda spec: random.randint(0, 300)), - "scenarios": [str(Path(scenario).expanduser().resolve().absolute())], - "headless": not envision, - "agent_specs": { - f"AGENT-{i}": rllib_agent["agent_spec"] for i in range(num_agents) - }, - }, - "multiagent": {"policies": rllib_policies}, - "callbacks": Callbacks, - } - - experiment_name = "rllib_example_multi" - result_dir = Path(result_dir).expanduser().resolve().absolute() - if checkpoint_num: - checkpoint = str( - result_dir / f"checkpoint_{checkpoint_num}" / f"checkpoint-{checkpoint_num}" - ) - else: - checkpoint = None - - print(f"Checkpointing at {str(result_dir)}") - analysis = tune.run( - "PG", - name=experiment_name, - stop={"time_total_s": time_total_s}, - checkpoint_freq=1, - checkpoint_at_end=True, - local_dir=str(result_dir), - resume=resume_training, - restore=checkpoint, - max_failures=3, - num_samples=num_samples, - export_formats=["model", "checkpoint"], - config=tune_config, - scheduler=pbt, - ) - - print(analysis.dataframe().head()) - - best_logdir = Path(analysis.get_best_logdir("episode_reward_max", mode="max")) - model_path = best_logdir / "model" - - copy_tree(str(model_path), save_model_path, overwrite=True) - print(f"Wrote model to: {save_model_path}") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser("rllib-example") - parser.add_argument( - "--scenario", - type=str, - default=str(Path(__file__).resolve().parents[3] / "scenarios/sumo/loop"), - help="Scenario to run (see scenarios/ for some samples you can use)", - ) - parser.add_argument( - "--envision", - action="store_true", - help="Run simulation with Envision display.", - ) - parser.add_argument( - "--num_samples", - type=int, - default=1, - help="Number of times to sample from hyperparameter space", - ) - parser.add_argument( - "--rollout_fragment_length", - type=int, - default=200, - help="Episodes are divided into fragments of this many steps for each rollout. In this example this will be ensured to be `1==21.7.0 # for scenario requirements.txt files # The following are planned to be made optional - gymnasium==0.27.0 + gymnasium>=0.26.3 pybullet>=3,<4.0 # The following are planned for removal @@ -95,9 +95,10 @@ remote_agent = rllib = opencv-python==4.1.2.30 opencv-python-headless==4.1.2.30 - ray[rllib]==1.4.0 + ray[rllib]~=2.5.0 + tensorflow-probability ray = - ray~=2.2.0 + ray~=2.5.0 ros = catkin_pkg rospkg diff --git a/smarts/env/custom_observations.py b/smarts/env/custom_observations.py index 484997003a..f22b95fe15 100644 --- a/smarts/env/custom_observations.py +++ b/smarts/env/custom_observations.py @@ -18,13 +18,14 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from dataclasses import dataclass -from typing import Callable, Dict +from typing import Callable, Dict, List, Union import gymnasium as gym import numpy as np from smarts.core.coordinates import Heading -from smarts.core.observations import Observation +from smarts.core.observations import EgoVehicleObservation, Observation +from smarts.core.road_map import Waypoint from smarts.core.utils.math import squared_dist, vec_2d, vec_to_radians @@ -143,7 +144,17 @@ def scan_for_vehicles( ) -def lane_ttc(obs: Observation) -> Dict[str, np.ndarray]: +@dataclass +class _VehicleState: + speed: np.ndarray + position: np.ndarray + lane_index: int + heading: float + lane_id: str + steering: float + + +def lane_ttc(obs: Union[Observation, Dict]) -> Dict[str, np.ndarray]: """Computes time-to-collision (TTC) and distance-to-collision (DTC) using the given agent's observation. TTC and DTC are numpy arrays of shape (3,) with values for the right lane (at index [0]), current lane (at index [1]), @@ -162,18 +173,77 @@ def lane_ttc(obs: Observation) -> Dict[str, np.ndarray]: + :spelling:ignore:`ego_ttc`: Time to collision in each lane. Shape=(3,). + ego_lane_dist: Closest cars’ distance to ego in each lane. Shape=(3,). """ - ego = obs.ego_vehicle_state - waypoint_paths = obs.waypoint_paths - wps = [path[0] for path in waypoint_paths] + + if isinstance(obs, Observation): + ego = obs.ego_vehicle_state + waypoint_paths = obs.waypoint_paths + wps = [path[0] for path in waypoint_paths] + neighborhood_vehicle_states = obs.neighborhood_vehicle_states + + elif isinstance(obs, dict): + waypoint_paths = obs["waypoint_paths"] + waypoint_paths = [ + [ + Waypoint( + position[:2], + Heading(heading), + lane_id, + lane_width, + speed_limit, + lane_index, + np.nan, + ) + for position, heading, lane_id, lane_width, speed_limit, lane_index in zip( + waypoint_paths["position"][i], + waypoint_paths["heading"][i], + waypoint_paths["lane_id"][i], + waypoint_paths["lane_width"][i], + waypoint_paths["speed_limit"][i], + waypoint_paths["lane_index"][i], + ) + ] + for i in range(4) + ] + wps = [path[0] for path in waypoint_paths] + + evs = obs["ego_vehicle_state"] + ego = _VehicleState( + speed=evs["speed"], + position=evs["position"], + lane_index=evs["lane_index"], + heading=Heading(evs["heading"]), + lane_id=evs["lane_id"], + steering=evs["steering"], + ) + neighborhood_vehicle_states_dict = obs["neighborhood_vehicle_states"] + neighborhood_vehicle_states = [ + _VehicleState( + position=position, + heading=heading, + speed=speed, + lane_id=lane_id, + lane_index=lane_index, + steering=np.nan, + ) + for position, heading, speed, lane_id, lane_index in zip( + neighborhood_vehicle_states_dict["position"], + neighborhood_vehicle_states_dict["heading"], + neighborhood_vehicle_states_dict["speed"], + neighborhood_vehicle_states_dict["lane_id"], + neighborhood_vehicle_states_dict["lane_index"], + ) + ] + else: + raise NotImplementedError("Cannot generate using given observations") # distance of vehicle from center of lane closest_wp = min(wps, key=lambda wp: wp.dist_to(ego.position)) signed_dist_from_center = closest_wp.signed_lateral_error(ego.position) - lane_hwidth = closest_wp.lane_width * 0.5 - norm_dist_from_center = signed_dist_from_center / lane_hwidth - - ego_ttc, ego_lane_dist = _ego_ttc_lane_dist(obs, closest_wp.lane_index) - + lane_half_width = closest_wp.lane_width * 0.5 + norm_dist_from_center = signed_dist_from_center / lane_half_width + ego_ttc, ego_lane_dist = _ego_ttc_lane_dist( + ego, neighborhood_vehicle_states, waypoint_paths, closest_wp.lane_index + ) return { "distance_from_center": np.array([norm_dist_from_center]), "angle_error": np.array([closest_wp.relative_heading(ego.heading)]), @@ -189,16 +259,25 @@ def lane_ttc(obs: Observation) -> Dict[str, np.ndarray]: ) -def _ego_ttc_lane_dist(obs: Observation, ego_lane_index: int): - ttc_by_p, lane_dist_by_p = _ttc_by_path(obs) +def _ego_ttc_lane_dist( + ego_state: _VehicleState, + neighborhood_vehicle_states, + waypoint_paths, + ego_lane_index: int, +): + ttc_by_p, lane_dist_by_p = _ttc_by_path( + ego_state, neighborhood_vehicle_states, waypoint_paths + ) return _ego_ttc_calc(ego_lane_index, ttc_by_p, lane_dist_by_p) -def _ttc_by_path(obs: Observation): - ego = obs.ego_vehicle_state - waypoint_paths = obs.waypoint_paths - neighborhood_vehicle_states = obs.neighborhood_vehicle_states +def _ttc_by_path( + ego: _VehicleState, + neighborhood_vehicle_states, + waypoint_paths: List[List[Waypoint]], +): + neighborhood_vehicle_states = neighborhood_vehicle_states # first sum up the distance between waypoints along a path # ie. [(wp1, path1, 0), diff --git a/smarts/env/rllib_hiway_env.py b/smarts/env/rllib_hiway_env.py index 08795ee422..20f886a7be 100644 --- a/smarts/env/rllib_hiway_env.py +++ b/smarts/env/rllib_hiway_env.py @@ -20,6 +20,7 @@ import logging import warnings from pathlib import Path +from typing import Dict from ray.rllib.env.multi_agent_env import MultiAgentEnv @@ -29,6 +30,12 @@ from smarts.core.local_traffic_provider import LocalTrafficProvider from smarts.core.scenario import Scenario from smarts.core.smarts import SMARTS +from smarts.env.utils.action_conversion import ActionOptions, ActionSpacesFormatter +from smarts.env.utils.observation_conversion import ( + ObservationOptions, + ObservationSpacesFormatter, +) +from smarts.zoo.agent_spec import AgentSpec class RLlibHiWayEnv(MultiAgentEnv): @@ -64,7 +71,41 @@ class RLlibHiWayEnv(MultiAgentEnv): """ def __init__(self, config): - self._log = logging.getLogger(self.__class__.__name__) + super().__init__() + + self._agent_specs: Dict[str, AgentSpec] = config["agent_specs"] + agent_interfaces = { + a_id: spec.interface for a_id, spec in self._agent_specs.items() + } + + ## ---- Required environment attributes ---- + ## See ray/rllib/env/multi_agent_env.py + self._agent_ids.update(id_ for id_ in self._agent_specs) + + action_options = ActionOptions.multi_agent + self._action_formatter = ActionSpacesFormatter( + agent_interfaces=agent_interfaces, action_options=action_options + ) + self.action_space = self._action_formatter.space + assert self.action_space is not None + + observation_options = ObservationOptions.multi_agent + self._observations_formatter = ObservationSpacesFormatter( + agent_interfaces=agent_interfaces, observation_options=observation_options + ) + self.observation_space = self._observations_formatter.space + assert self.observation_space is not None + + self._action_space_in_preferred_format = ( + self._check_if_action_space_maps_agent_id_to_sub_space() + ) + self._obs_space_in_preferred_format = ( + self._check_if_obs_space_maps_agent_id_to_sub_space() + ) + assert self._action_space_in_preferred_format is True + ## ---- /Required environment attributes ---- + + self._log = logging.getLogger(name=self.__class__.__name__) seed = int(config.get("seed", 42)) # See https://docs.ray.io/en/latest/rllib-env.html#configuring-environments @@ -75,9 +116,9 @@ def __init__(self, config): a = config.worker_index b = config.vector_index c = (a + b) * (a + b + 1) // 2 + b - smarts.core.seed(seed + c) + self._seed = seed + c + smarts.core.seed(self._seed + c) - self._agent_specs = config["agent_specs"] self._scenarios = [ str(Path(scenario).resolve()) for scenario in config["scenarios"] ] @@ -124,7 +165,9 @@ def step(self, agent_actions): isinstance(key, str) for key in agent_actions.keys() ), "Expected Dict[str, any]" - observations, rewards, dones, extras = self._smarts.step(agent_actions) + formatted_actions = self._action_formatter.format(agent_actions) + env_observations, rewards, dones, extras = self._smarts.step(formatted_actions) + env_observations = self._observations_formatter.format(env_observations) # Agent termination: RLlib expects that we return a "last observation" # on the step that an agent transitions to "done". All subsequent calls @@ -135,21 +178,28 @@ def step(self, agent_actions): # to only agents who are actively sending in actions. observations = { agent_id: obs - for agent_id, obs in observations.items() - if agent_id in agent_actions + for agent_id, obs in env_observations.items() + if agent_id in formatted_actions } rewards = { agent_id: reward for agent_id, reward in rewards.items() - if agent_id in agent_actions + if agent_id in formatted_actions } scores = { agent_id: score for agent_id, score in extras["scores"].items() - if agent_id in agent_actions + if agent_id in formatted_actions } - infos = {key: {"score": value} for key, value in scores.items()} + infos = { + agent_id: { + "score": value, + "reward": rewards[agent_id], + "speed": observations[agent_id]["ego_vehicle_state"]["speed"], + } + for agent_id, value in scores.items() + } # Ensure all contain the same agent_ids as keys assert ( @@ -160,7 +210,7 @@ def step(self, agent_actions): ) for agent_id in agent_actions: agent_spec = self._agent_specs[agent_id] - observation = observations[agent_id] + observation = env_observations[agent_id] reward = rewards[agent_id] info = infos[agent_id] @@ -172,25 +222,47 @@ def step(self, agent_actions): self._dones_registered += 1 if done else 0 dones["__all__"] = self._dones_registered >= len(self._agent_specs) - return observations, rewards, dones, infos + return ( + observations, + rewards, + dones, + dones, + infos, + ) - def reset(self): + def reset(self, *, seed=None, options=None): """Environment reset.""" + if seed not in (None, 0): + smarts.core.seed(self._seed + (seed or 0)) + scenario = next(self._scenarios_iterator) self._dones_registered = 0 if self._smarts is None: self._smarts = self._build_smarts() - self._smarts.setup(scenario) + self._smarts.setup(scenario=scenario) - env_observations = self._smarts.reset(scenario) + env_observations = self._smarts.reset(scenario=scenario) + env_observations = self._observations_formatter.format( + observations=env_observations + ) observations = { agent_id: self._agent_specs[agent_id].observation_adapter(obs) for agent_id, obs in env_observations.items() } + info = { + agent_id: { + "score": 0, + "reward": 0, + "env_obs": agent_obs, + "done": False, + "map_source": self._smarts.scenario.road_map.source, + } + for agent_id, agent_obs in observations.items() + } - return observations + return observations, info def close(self): """Environment close.""" diff --git a/smarts/env/tests/test_rllib_hiway_env.py b/smarts/env/tests/test_rllib_hiway_env.py index 12373c6761..13496ca3f4 100644 --- a/smarts/env/tests/test_rllib_hiway_env.py +++ b/smarts/env/tests/test_rllib_hiway_env.py @@ -67,12 +67,15 @@ class atdict(dict): __delattr__ = dict.__delitem__ env = RLlibHiWayEnv(config=atdict(**env_config, worker_index=0, vector_index=1)) - agent_ids = list(env_config["agent_specs"].keys()) + agent_ids = env.get_agent_ids() + assert isinstance(agent_ids, set) + assert AGENT_ID in agent_ids + print(env.observation_space) - dones = {"__all__": False} + term = {"__all__": False} env.reset() - while not dones["__all__"]: - _, _, dones, _ = env.step( + while not term["__all__"]: + _, _, term, _, _ = env.step( {aid: rllib_agent["action_space"].sample() for aid in agent_ids} ) env.close() diff --git a/smarts/env/utils/action_conversion.py b/smarts/env/utils/action_conversion.py index 0cbafe6fcd..7e286f3a03 100644 --- a/smarts/env/utils/action_conversion.py +++ b/smarts/env/utils/action_conversion.py @@ -246,9 +246,10 @@ def format(self, actions: Dict[str, Any]): format_ = formatting_groups[agent_interface.action] space: gym.Space = self.space[agent_id] assert space is format_.space + dtype = action.dtype if isinstance(action, np.ndarray) else None assert space.contains( action - ), f"Action {action} does not match space {space}!" + ), f"Action {action} of type `{type(action)}` & {dtype} does not match space {space}!" formatted_action = format_.formatting_func(action) out_actions[agent_id] = formatted_action diff --git a/smarts/env/utils/observation_conversion.py b/smarts/env/utils/observation_conversion.py index af7ee7bb6a..22301a1f4c 100644 --- a/smarts/env/utils/observation_conversion.py +++ b/smarts/env/utils/observation_conversion.py @@ -74,7 +74,8 @@ def _format_id(lane_id: str, max_length, type_): warnings.warn( f"`{type_}` named `{lane_name}` is more than " f"`{max_length}` characters long. It will be truncated " - "and may cause unintended issues with navigation and lane identification." + "and may cause unintended issues with navigation and lane identification.", + category=UserWarning, ) return lane_name[:max_length] @@ -193,10 +194,11 @@ def _format_neighborhood_vehicle_states( "box": np.zeros((des_shp, 3), dtype=np.float32), "heading": np.zeros((des_shp,), dtype=np.float32), "id": [""] * des_shp, + "interest": np.zeros((des_shp,), dtype=np.int8), + "lane_id": [""] * des_shp, "lane_index": np.zeros((des_shp,), dtype=np.int8), "position": np.zeros((des_shp, 3), dtype=np.float64), "speed": np.zeros((des_shp,), dtype=np.float32), - "interest": np.zeros((des_shp,), dtype=np.int8), } neighborhood_vehicle_states = [ @@ -207,11 +209,12 @@ def _format_neighborhood_vehicle_states( nghb.lane_index, nghb.position, nghb.speed, + _format_id(nghb.lane_id, _WAYPOINT_NAME_LIMIT, "lane id"), nghb.interest, ) for nghb in neighborhood_vehicle_states[:des_shp] ] - box, heading, vehicle_id, lane_index, pos, speed, interest = zip( + box, heading, vehicle_id, lane_index, pos, speed, lane_id, interest = zip( *neighborhood_vehicle_states ) @@ -229,6 +232,7 @@ def _format_neighborhood_vehicle_states( lane_index = np.pad(lane_index, ((0,pad_shp)), mode='constant', constant_values=0) pos = np.pad(pos, ((0,pad_shp),(0,0)), mode='constant', constant_values=0) speed = np.pad(speed, ((0,pad_shp)), mode='constant', constant_values=0) + lane_id = tuple(lane_id + ("",) * pad_shp) interest = np.pad(interest, ((0,pad_shp)), mode="constant", constant_values=False) # fmt: on @@ -236,10 +240,11 @@ def _format_neighborhood_vehicle_states( "box": box, "heading": heading, "id": vehicle_id, + "interest": interest, + "lane_id": lane_id, "lane_index": lane_index, "position": pos, "speed": speed, - "interest": interest, } @@ -1022,6 +1027,12 @@ class ObservationSpacesFormatter: "id": The vehicle ids of neighbor vehicles. Defaults to str("") per vehicle. shape=(10,Text(50)) + "interest": + If the vehicles are of interest. Defaults to np.array([False]) per vehicle. + shape=(10,). dtype=np.int8 + "lane_id": + The ID of the lane that the vehicle is on. Defaults to str("") per vehicle. + shape=(10,Text(50)) "lane_index": Lane number of neighbor vehicles. Defaults to np.array([0]) per vehicle. shape=(10,). dtype=np.int8. diff --git a/zoo/policies/cross-rl-agent/cross_rl_agent/train/run_test.py b/zoo/policies/cross-rl-agent/cross_rl_agent/train/run_test.py index 1b20ad607d..d194912ef2 100644 --- a/zoo/policies/cross-rl-agent/cross_rl_agent/train/run_test.py +++ b/zoo/policies/cross-rl-agent/cross_rl_agent/train/run_test.py @@ -217,7 +217,7 @@ def default_argument_parser(program: str): nargs="+", ) parser.add_argument( - "--sim-name", + "--sim_name", help="a string that gives this simulation a name.", type=str, default=None, @@ -227,7 +227,7 @@ def default_argument_parser(program: str): ) parser.add_argument("--seed", type=int, default=42) parser.add_argument( - "--sumo-port", help="Run SUMO with a specified port.", type=int, default=None + "--sumo_port", help="Run SUMO with a specified port.", type=int, default=None ) parser.add_argument( "--episodes", diff --git a/zoo/policies/cross-rl-agent/cross_rl_agent/train/run_train.py b/zoo/policies/cross-rl-agent/cross_rl_agent/train/run_train.py index 92f7d57c80..1bd85a8f3c 100644 --- a/zoo/policies/cross-rl-agent/cross_rl_agent/train/run_train.py +++ b/zoo/policies/cross-rl-agent/cross_rl_agent/train/run_train.py @@ -335,7 +335,7 @@ def default_argument_parser(program: str): nargs="+", ) parser.add_argument( - "--sim-name", + "--sim_name", help="a string that gives this simulation a name.", type=str, default=None, @@ -345,7 +345,7 @@ def default_argument_parser(program: str): ) parser.add_argument("--seed", type=int, default=42) parser.add_argument( - "--sumo-port", help="Run SUMO with a specified port.", type=int, default=None + "--sumo_port", help="Run SUMO with a specified port.", type=int, default=None ) parser.add_argument( "--episodes",