diff --git a/rllib/env/env_runner.py b/rllib/env/env_runner.py index 6da4fcaf68e0..04191cf28900 100644 --- a/rllib/env/env_runner.py +++ b/rllib/env/env_runner.py @@ -228,7 +228,7 @@ def _try_env_reset( raise e def _try_env_step(self, actions): - """Tries stepping the env and - if an error orrurs - handles it gracefully.""" + """Tries stepping the env and - if an error occurs - handles it gracefully.""" try: with self.metrics.log_time(ENV_STEP_TIMER): results = self.env.step(actions) @@ -236,9 +236,13 @@ def _try_env_step(self, actions): except Exception as e: self.metrics.log_value(NUM_ENV_STEP_FAILURES_LIFETIME, 1, reduce="sum") + # @OldAPIStack (config.restart_failed_sub_environments) if self.config.restart_failed_sub_environments: if not isinstance(e, StepFailedRecreateEnvError): - logger.exception("Stepping the env resulted in an error!") + logger.exception( + "Stepping the env resulted in an error! The original error " + f"is: {e}" + ) # Recreate the env. self.make_env() # And return that the stepping failed. The caller will then handle diff --git a/rllib/env/multi_agent_env.py b/rllib/env/multi_agent_env.py index a09f2cd93f97..327abe50779e 100644 --- a/rllib/env/multi_agent_env.py +++ b/rllib/env/multi_agent_env.py @@ -444,7 +444,7 @@ def step(self, action_dict): # an additional episode_done bool that covers cases where all agents are # either terminated or truncated, but not all are truncated and not all are # terminated. We can then get rid of the aweful `__all__` special keys! - terminated["__all__"] = len(self.terminateds) + len(self.truncateds) == len( + terminated["__all__"] = len(self.terminateds | self.truncateds) == len( self.envs ) truncated["__all__"] = len(self.truncateds) == len(self.envs) diff --git a/rllib/env/multi_agent_env_runner.py b/rllib/env/multi_agent_env_runner.py index 48d7e1a6a9af..f1c5922eab8c 100644 --- a/rllib/env/multi_agent_env_runner.py +++ b/rllib/env/multi_agent_env_runner.py @@ -158,9 +158,9 @@ def sample( Args: num_timesteps: The number of timesteps to sample during this call. - Note that only one of `num_timetseps` or `num_episodes` may be provided. + Note that only one of `num_timesteps` or `num_episodes` may be provided. num_episodes: The number of episodes to sample during this call. - Note that only one of `num_timetseps` or `num_episodes` may be provided. + Note that only one of `num_timesteps` or `num_episodes` may be provided. explore: If True, will use the RLModule's `forward_exploration()` method to compute actions. If False, will use the RLModule's `forward_inference()` method. If None (default), will use the `explore` @@ -183,8 +183,11 @@ def sample( f"{self} doesn't have an env! Can't call `sample()` on it." ) - assert not (num_timesteps is not None and num_episodes is not None) - + assert not (num_timesteps is not None and num_episodes is not None), ( + "Provide " + "either `num_timesteps` or `num_episodes`. Both provided here:" + f"{num_timesteps=}, {num_episodes=}" + ) # Log time between `sample()` requests. if self._time_after_sampling is not None: self.metrics.log_value( @@ -214,7 +217,7 @@ def sample( * self.num_envs ) - # Sample n timesteps. + # Sample "num_timesteps" timesteps. if num_timesteps is not None: samples = self._sample( num_timesteps=num_timesteps, @@ -222,15 +225,14 @@ def sample( random_actions=random_actions, force_reset=force_reset, ) - # Sample m episodes. + # Sample "num_episodes" episodes. elif num_episodes is not None: samples = self._sample( num_episodes=num_episodes, explore=explore, random_actions=random_actions, ) - # For complete episodes mode, sample as long as the number of timesteps - # done is smaller than the `train_batch_size`. + # For batch_mode="complete_episodes" (env_runners configuration), continue sampling as long as the number of timesteps done is smaller than the `train_batch_size`. else: samples = self._sample( num_episodes=self.num_envs, @@ -346,14 +348,14 @@ def _sample( metrics_prefix_key=(MODULE_TO_ENV_CONNECTOR,), ) # In case all environments had been terminated `to_module` will be - # empty and no actions are needed b/c we reset all environemnts. + # empty and no actions are needed b/c we reset all environments. else: to_env = {} shared_data["vector_env_episodes_map"] = {} # Extract the (vectorized) actions (to be sent to the env) from the # module/connector output. Note that these actions are fully ready (e.g. - # already unsquashed/clipped) to be sent to the environment) and might not + # already unsquashed/clipped) to be sent to the environment and might not # be identical to the actions produced by the RLModule/distribution, which # are the ones stored permanently in the episode objects. actions = to_env.pop(Columns.ACTIONS, [{} for _ in episodes]) @@ -361,6 +363,9 @@ def _sample( # Try stepping the environment. results = self._try_env_step(actions_for_env) if results == ENV_STEP_FAILURE: + logging.warning( + f"RLlib {self.__class__.__name__}: Environment step failed. Will force reset env(s) in this EnvRunner." + ) return self._sample( num_timesteps=num_timesteps, num_episodes=num_episodes, @@ -372,7 +377,7 @@ def _sample( call_on_episode_start = set() # Store the data from the last environment step into the - # episodes for all sub-envrironments. + # episodes for all sub-environments. for env_index in range(self.num_envs): extra_model_outputs = defaultdict(dict) # `to_env` returns a dictionary with column keys and @@ -710,7 +715,7 @@ def set_state(self, state: StateDict) -> None: # update. weights_seq_no = state.get(WEIGHTS_SEQ_NO, 0) - # Only update the weigths, if this is the first synchronization or + # Only update the weights, if this is the first synchronization or # if the weights of this `EnvRunner` lacks behind the actual ones. if weights_seq_no == 0 or self._weights_seq_no < weights_seq_no: rl_module_state = state[COMPONENT_RL_MODULE] diff --git a/rllib/env/single_agent_env_runner.py b/rllib/env/single_agent_env_runner.py index d032f3a8d245..ec30195bec44 100644 --- a/rllib/env/single_agent_env_runner.py +++ b/rllib/env/single_agent_env_runner.py @@ -157,9 +157,9 @@ def sample( Args: num_timesteps: The number of timesteps to sample during this call. - Note that only one of `num_timetseps` or `num_episodes` may be provided. + Note that only one of `num_timesteps` or `num_episodes` may be provided. num_episodes: The number of episodes to sample during this call. - Note that only one of `num_timetseps` or `num_episodes` may be provided. + Note that only one of `num_timesteps` or `num_episodes` may be provided. explore: If True, will use the RLModule's `forward_exploration()` method to compute actions. If False, will use the RLModule's `forward_inference()` method. If None (default), will use the `explore` @@ -328,7 +328,7 @@ def _sample( # Extract the (vectorized) actions (to be sent to the env) from the # module/connector output. Note that these actions are fully ready (e.g. - # already unsquashed/clipped) to be sent to the environment) and might not + # already unsquashed/clipped) to be sent to the environment and might not # be identical to the actions produced by the RLModule/distribution, which # are the ones stored permanently in the episode objects. actions = to_env.pop(Columns.ACTIONS) @@ -362,7 +362,7 @@ def _sample( # Call `add_env_step()` method on episode. else: - # Only increase ts when we actually stepped (not reset'd as a reset + # Only increase ts when we actually stepped (not reset as a reset # does not count as a timestep). ts += 1 episodes[env_index].add_env_step( @@ -375,7 +375,7 @@ def _sample( extra_model_outputs=extra_model_output, ) - # Env-to-module connector pass (cache results as we will do the RLModule + # Env-to-module connector pass cache results as we will do the RLModule # forward pass only in the next `while`-iteration. if self.module is not None: self._cached_to_module = self._env_to_module( @@ -442,7 +442,7 @@ def _sample( ] for eps in self._episodes: - # Just started Episodes do not have to be returned. There is no data + # Just started episodes do not have to be returned. There is no data # in them anyway. if eps.t == 0: continue @@ -554,8 +554,8 @@ def set_state(self, state: StateDict) -> None: # update. weights_seq_no = state.get(WEIGHTS_SEQ_NO, 0) - # Only update the weigths, if this is the first synchronization or - # if the weights of this `EnvRunner` lacks behind the actual ones. + # Only update the weights, if this is the first synchronization or + # if the weights of this `EnvRunner` lag behind the actual ones. if weights_seq_no == 0 or self._weights_seq_no < weights_seq_no: rl_module_state = state[COMPONENT_RL_MODULE] if isinstance(rl_module_state, ray.ObjectRef): @@ -609,13 +609,13 @@ def get_checkpointable_components(self): def assert_healthy(self): """Checks that self.__init__() has been completed properly. - Ensures that the instances has a `MultiRLModule` and an + Ensures that the instance has a `MultiRLModule` and an environment defined. Raises: AssertionError: If the EnvRunner Actor has NOT been properly initialized. """ - # Make sure, we have built our gym.vector.Env and RLModule properly. + # Make sure we have built our gym.vector.Env and RLModule properly. assert self.env and hasattr(self, "module") @override(EnvRunner) @@ -626,8 +626,8 @@ def make_env(self) -> None: `self.config.env_config`) and then call this method to create new environments with the updated configuration. """ - # If an env already exists, try closing it first (to allow it to properly - # cleanup). + # If an env already exists, try closing it first + # to allow it to properly clean up. if self.env is not None: try: self.env.close() @@ -854,7 +854,7 @@ def _log_episode_metrics(self, length, ret, sec): # Log general episode metrics. # Use the configured window, but factor in the parallelism of the EnvRunners. # As a result, we only log the last `window / num_env_runners` steps here, - # b/c everything gets parallel-merged in the Algorithm process. + # because everything gets parallel-merged in the Algorithm process. win = max( 1, int( diff --git a/rllib/env/tests/test_single_agent_env_runner.py b/rllib/env/tests/test_single_agent_env_runner.py index 0aac37bb3f83..2f3df1864bf4 100644 --- a/rllib/env/tests/test_single_agent_env_runner.py +++ b/rllib/env/tests/test_single_agent_env_runner.py @@ -1,14 +1,14 @@ +import unittest from functools import partial from unittest.mock import patch -import unittest import gymnasium as gym import ray from ray import tune from ray.rllib.algorithms.algorithm_config import AlgorithmConfig -from ray.rllib.env.single_agent_env_runner import SingleAgentEnvRunner from ray.rllib.env.env_runner import StepFailedRecreateEnvError +from ray.rllib.env.single_agent_env_runner import SingleAgentEnvRunner from ray.rllib.env.utils import _gym_env_creator from ray.rllib.examples.envs.classes.simple_corridor import SimpleCorridor from ray.rllib.utils.test_utils import check @@ -37,71 +37,6 @@ def setUpClass(cls) -> None: def tearDownClass(cls) -> None: ray.shutdown() - def test_sample(self): - config = ( - AlgorithmConfig().environment("CartPole-v1") - # Vectorize x2 and by default, rollout 64 timesteps per individual env. - .env_runners(num_envs_per_env_runner=2, rollout_fragment_length=64) - ) - env_runner = SingleAgentEnvRunner(config=config) - - # Expect error if both num_timesteps and num_episodes given. - self.assertRaises( - AssertionError, - lambda: env_runner.sample( - num_timesteps=10, num_episodes=10, random_actions=True - ), - ) - - # Sample 10 episodes (5 per env) 100 times. - for _ in range(100): - episodes = env_runner.sample(num_episodes=10, random_actions=True) - check(len(episodes), 10) - # Since we sampled complete episodes, there should be no ongoing episodes - # being returned. - self.assertTrue(all(e.is_done for e in episodes)) - - # Sample 10 timesteps (5 per env) 100 times. - for _ in range(100): - episodes = env_runner.sample(num_timesteps=10, random_actions=True) - # Check the sum of lengths of all episodes returned. - sum_ = sum(map(len, episodes)) - self.assertTrue(sum_ in [10, 11]) - - # Sample (by default setting: rollout_fragment_length=64) 10 times. - for _ in range(100): - episodes = env_runner.sample(random_actions=True) - # Check, whether the sum of lengths of all episodes returned is 128 - # 2 (num_env_per_worker) * 64 (rollout_fragment_length). - sum_ = sum(map(len, episodes)) - self.assertTrue(sum_ in [128, 129]) - - def test_async_vector_env(self): - """Tests, whether SingleAgentEnvRunner can run with vector envs.""" - - for env in ["CartPole-v1", SimpleCorridor, "tune-registered"]: - config = ( - AlgorithmConfig().environment(env) - # Vectorize x5 and by default, rollout 64 timesteps per individual env. - .env_runners( - num_env_runners=0, - num_envs_per_env_runner=5, - rollout_fragment_length=10, - remote_worker_envs=True, - ) - ) - - env_runner = SingleAgentEnvRunner(config=config) - - # Sample with the async-vectorized env. - episodes = env_runner.sample(random_actions=True) - # Assert length of all fragments is `rollout_fragment_length`. - self.assertEqual( - sum(len(e) for e in episodes), - config.num_envs_per_env_runner * config.rollout_fragment_length, - ) - env_runner.stop() - def test_distributed_env_runner(self): """Tests, whether SingleAgentEnvRunner can be distributed.""" @@ -144,11 +79,56 @@ def test_distributed_env_runner(self): ], ) - @patch("ray.rllib.env.env_runner.logger") + def test_sample(self): + config = ( + AlgorithmConfig() + .environment("CartPole-v1") + .env_runners( + num_envs_per_env_runner=2, + rollout_fragment_length=64, + ) + ) + env_runner = SingleAgentEnvRunner(config=config) + + # Expect error if both num_timesteps and num_episodes given. + self.assertRaises( + AssertionError, + lambda: env_runner.sample( + num_timesteps=10, num_episodes=10, random_actions=True + ), + ) + + # Sample 10 episodes (5 per env, because num_envs_per_env_runner=2) + # Repeat 100 times + for _ in range(100): + episodes = env_runner.sample(num_episodes=10, random_actions=True) + check(len(episodes), 10) + # Since we sampled complete episodes, there should be no ongoing episodes + # being returned. + self.assertTrue(all(e.is_done for e in episodes)) + + # Sample 10 timesteps (5 per env) + # Repeat 100 times + for _ in range(100): + episodes = env_runner.sample(num_timesteps=10, random_actions=True) + # Check the sum of lengths of all episodes returned. + sum_ = sum(map(len, episodes)) + self.assertTrue(sum_ in [10, 11]) + + # Sample rollout_fragment_length=64, 100 times + # Repeat 100 times + for _ in range(100): + episodes = env_runner.sample(random_actions=True) + # Check, whether the sum of lengths of all episodes returned is 128 + # 2 (num_env_per_worker) * 64 (rollout_fragment_length). + sum_ = sum(map(len, episodes)) + self.assertTrue(sum_ in [128, 129]) + + @patch(target="ray.rllib.env.env_runner.logger") def test_step_failed_reset_required(self, mock_logger): """Tests, whether SingleAgentEnvRunner can handle StepFailedResetRequired.""" - # Define an env that raises StepFailedResetRequired + # Define an env that raises StepFailedResetRequired class ErrorRaisingEnv(gym.Env): def __init__(self, config=None): # As per gymnasium standard, provide observation and action spaces in your @@ -192,6 +172,29 @@ def step(self, action): assert mock_logger.exception.call_count == 1 + def test_vector_env(self): + """Tests, whether SingleAgentEnvRunner can run various vectorized envs.""" + + for env in ["CartPole-v1", SimpleCorridor, "tune-registered"]: + config = ( + AlgorithmConfig() + .environment(env) + .env_runners( + num_envs_per_env_runner=5, + rollout_fragment_length=10, + ) + ) + + env_runner = SingleAgentEnvRunner(config=config) + + # Sample with the async-vectorized env. + episodes = env_runner.sample(random_actions=True) + self.assertEqual( + sum(len(e) for e in episodes), + config.num_envs_per_env_runner * config.rollout_fragment_length, + ) + env_runner.stop() + if __name__ == "__main__": import pytest diff --git a/rllib/examples/envs/classes/simple_corridor.py b/rllib/examples/envs/classes/simple_corridor.py index 9088f73dbd37..5ab5b976bc5f 100644 --- a/rllib/examples/envs/classes/simple_corridor.py +++ b/rllib/examples/envs/classes/simple_corridor.py @@ -1,6 +1,10 @@ +import logging + import gymnasium as gym -from gymnasium.spaces import Box, Discrete import numpy as np +from gymnasium.spaces import Box, Discrete + +logger = logging.getLogger("ray.rllib") class SimpleCorridor(gym.Env): @@ -20,7 +24,7 @@ def __init__(self, config=None): def set_corridor_length(self, length): self.end_pos = length - print(f"Set corridor length to {self.end_pos}") + logger.info(f"Set corridor length to {self.end_pos}") assert self.end_pos <= 999, "The maximum `corridor_length` allowed is 999!" def reset(self, *, seed=None, options=None):