diff --git a/examples/parallel_environment.py b/examples/parallel_environment.py index 30bd87ddc6..e81f2166ff 100644 --- a/examples/parallel_environment.py +++ b/examples/parallel_environment.py @@ -225,6 +225,7 @@ def parallel_env_sync( str( pathlib.Path(__file__).absolute().parents[1] / "scenarios" + / "sumo" / "figure_eight" ) ] diff --git a/examples/traffic_histories_to_observations.py b/examples/traffic_histories_to_observations.py index ed643c07b3..a7259422a0 100644 --- a/examples/traffic_histories_to_observations.py +++ b/examples/traffic_histories_to_observations.py @@ -1,11 +1,13 @@ import argparse import logging +import math import os import pickle from dataclasses import replace from typing import Optional, Sequence -from PIL import Image, ImageDraw +import numpy as np +from PIL import Image from envision.client import Client as Envision from smarts import sstudio @@ -19,6 +21,7 @@ RoadWaypoints, Waypoints, ) +from smarts.core.colors import Colors from smarts.core.controllers import ActionSpaceType, ControllerOutOfLaneException from smarts.core.local_traffic_provider import LocalTrafficProvider from smarts.core.plan import PositionalGoal @@ -259,7 +262,7 @@ def collect( for car, data in collected_data.items(): outfile = os.path.join( self._output_dir, - f"{self._scenario.name}_{self._scenario.traffic_history.name}_{car}.pkl", + f"{car}.pkl", ) with open(outfile, "wb") as of: pickle.dump(data, of) @@ -274,11 +277,13 @@ def _record_data( selected_vehicles, max_sim_time, ): + # Record only within specified time window. + t = self._smarts.elapsed_sim_time end_time = self._end_time if self._end_time is not None else max_sim_time - if not (self._start_time <= self._smarts.elapsed_sim_time <= end_time): + if not (self._start_time <= t <= end_time): return - # Attach sensors to each vehicle + # Attach sensors to each vehicle. valid_vehicles = (current_vehicles - off_road_vehicles) & selected_vehicles for veh_id in valid_vehicles: try: @@ -287,53 +292,49 @@ def _record_data( self._logger.warning(f"{veh_id} out of lane, skipped attaching sensors") off_road_vehicles.add(veh_id) - # Get observations from each vehicle and record them + # Get observations from each vehicle and record them. obs = dict() obs, _, _, _ = self._smarts.observe_from(list(valid_vehicles)) - resolutions = {} - self._logger.info( - f"t={self._smarts.elapsed_sim_time}, active_vehicles={len(valid_vehicles)}" - ) + self._logger.info(f"t={t}, active_vehicles={len(valid_vehicles)}") for id_ in list(obs): - if obs[id_].top_down_rgb: - resolutions[id_] = obs[id_].top_down_rgb.metadata.resolution ego_state = obs[id_].ego_vehicle_state if ego_state.lane_index is None: del obs[id_] - else: - mission = self._missions[ego_state.id] - if mission: - # doh! ego_state is immutable! - new_ego_state = ego_state._replace(mission=mission) - obs[id_] = replace(obs[id_], ego_vehicle_state=new_ego_state) - # TODO: handle case where neighboring vehicle has lane_index of None too - t = self._smarts.elapsed_sim_time - for car, car_obs in obs.items(): - collected_data.setdefault(car, {}).setdefault(t, {}) - collected_data[car][t] = car_obs + continue - if not self._output_dir: - return + mission = self._missions[ego_state.id] + if mission: + new_ego_state = ego_state._replace(mission=mission) + obs[id_] = replace(obs[id_], ego_vehicle_state=new_ego_state) - # Write top-down RGB image to a file for each vehicle if we have one - for agent_id, agent_obs in obs.items(): - if agent_obs.top_down_rgb is not None: - rgb_data = agent_obs.top_down_rgb.data - h, w, _ = rgb_data.shape + top_down_rgb = obs[id_].top_down_rgb + if top_down_rgb: + res = top_down_rgb.metadata.resolution + rgb = top_down_rgb.data.copy() + h, w, _ = rgb.shape shape = ( ( - h / 2 - 1.47 / 2 / resolutions[agent_id], - w / 2 - 3.68 / 2 / resolutions[agent_id], + math.floor(w / 2 - 3.68 / 2 / res), + math.ceil(w / 2 + 3.68 / 2 / res), ), ( - h / 2 + 1.47 / 2 / resolutions[agent_id], - w / 2 + 3.68 / 2 / resolutions[agent_id], + math.floor(h / 2 - 1.47 / 2 / res), + math.ceil(h / 2 + 1.47 / 2 / res), ), ) - img = Image.fromarray(rgb_data, "RGB") - rect_image = ImageDraw.Draw(img) - rect_image.rectangle(shape, fill="red") - img.save(os.path.join(self._output_dir, f"{t}_{agent_id}.png")) + color = np.array(Colors.Red.value[0:3], ndmin=3) * 255 + rgb[shape[0][0] : shape[0][1], shape[1][0] : shape[1][1], :] = color + top_down_rgb_edited = top_down_rgb._replace(data=rgb) + obs[id_] = replace(obs[id_], top_down_rgb=top_down_rgb_edited) + + if self._output_dir: + img = Image.fromarray(rgb, "RGB") + img.save(os.path.join(self._output_dir, f"{t}_{id_}.png")) + + # TODO: handle case where neighboring vehicle has lane_index of None too + for car, car_obs in obs.items(): + collected_data.setdefault(car, {}).setdefault(t, {}) + collected_data[car][t] = car_obs if __name__ == "__main__": diff --git a/examples/traffic_histories_vehicle_replacement.py b/examples/traffic_histories_vehicle_replacement.py index aaa79e14da..b97d8ba932 100644 --- a/examples/traffic_histories_vehicle_replacement.py +++ b/examples/traffic_histories_vehicle_replacement.py @@ -40,7 +40,7 @@ def load_data_for_vehicle( self._vehicle_id = vehicle_id # for debugging self._time_offset = time_offset - datafile = f"collected_observations/{scenario.name}_{scenario.traffic_history.name}_Agent-history-vehicle-{vehicle_id}.pkl" + datafile = f"collected_observations/Agent-history-vehicle-{vehicle_id}.pkl" # We read actions from a datafile previously-generated by the # traffic_histories_to_observations.py script. # This allows us to test the action space to ensure that it diff --git a/smarts/env/multi_scenario_env.py b/smarts/env/multi_scenario_env.py index 66dfc7a526..09bbe61360 100644 --- a/smarts/env/multi_scenario_env.py +++ b/smarts/env/multi_scenario_env.py @@ -18,11 +18,15 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. +import copy +import logging +import math import os import pathlib from typing import Any, Dict, Optional, Tuple import gym +import numpy as np from smarts import sstudio from smarts.core.agent_interface import ( @@ -38,6 +42,9 @@ from smarts.env.hiway_env import HiWayEnv from smarts.zoo.agent_spec import AgentSpec +logger = logging.getLogger(__file__) +logger.setLevel(logging.WARNING) + def multi_scenario_v0_env( scenario: str, @@ -145,6 +152,7 @@ def multi_scenario_v0_env( sumo_headless=sumo_headless, envision_record_data_replay_path=envision_record_data_replay_path, ) + env = _LimitTargetPose(env=env) env = _InfoScore(env=env) return env @@ -248,7 +256,7 @@ def _get_env_specs(scenario: str): matches_agent = regexp_agent.search(scenario) if not matches_agent: raise Exception( - f"Scenario path should match regexp of 'agent_\d+', but got {scenario}" + f"Scenario path should match regexp of 'agents_\d+', but got {scenario}" ) num_agent = regexp_num.search(matches_agent.group(0)) @@ -304,6 +312,117 @@ def resolve_agent_interface( ) +class _LimitTargetPose(gym.Wrapper): + """Uses previous observation to limit the next TargetPose action range.""" + + def __init__(self, env: gym.Env): + """ + Args: + env (gym.Env): Environment to be wrapped. + """ + super().__init__(env) + self._prev_obs: Dict[str, Dict[str, Any]] + + def step( + self, action: Dict[str, np.ndarray] + ) -> Tuple[ + Dict[str, Any], + Dict[str, float], + Dict[str, bool], + Dict[str, Dict[str, Any]], + ]: + """Steps the environment. + + Args: + action (Dict[str, Any]): Action for each agent. + + Returns: + Tuple[ Dict[str, Any], Dict[str, float], Dict[str, bool], Dict[str, Dict[str, Any]] ]: + Observation, reward, done, and info, for each agent is returned. + """ + + limited_actions: Dict[str, np.ndarray] = {} + for agent_name, agent_action in action.items(): + limited_actions[agent_name] = self._limit( + name=agent_name, + action=agent_action, + prev_coord=self._prev_obs[agent_name]["pos"], + ) + + out = self.env.step(limited_actions) + self._prev_obs = self._store(obs=out[0]) + return out + + def reset(self, **kwargs) -> Dict[str, Any]: + """Resets the environment. + + Returns: + Dict[str, Any]: A dictionary of observation for each agent. + """ + obs = self.env.reset(**kwargs) + self._prev_obs = self._store(obs=obs) + return obs + + def _store(self, obs: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + filtered_obs: Dict[str, Dict[str, Any]] = {} + for agent_name, agent_obs in obs.items(): + filtered_obs[agent_name] = { + "pos": copy.deepcopy(agent_obs.ego_vehicle_state.position[:2]) + } + return filtered_obs + + def _limit( + self, name: str, action: np.ndarray, prev_coord: np.ndarray + ) -> np.ndarray: + """Set time delta and limit Euclidean distance travelled in TargetPose action space. + + Args: + name (str): Agent's name. + action (np.ndarray): Agent's action. + prev_coord (np.ndarray): Agent's previous xy coordinate on the map. + + Returns: + np.ndarray: Agent's TargetPose action which has fixed time-delta and constrained next xy coordinate. + """ + + time_delta = 0.1 + limited_action = np.array( + [action[0], action[1], action[2], time_delta], dtype=np.float32 + ) + speed_max = 28 # 28m/s = 100.8 km/h. Maximum speed should be >0. + dist_max = speed_max * time_delta + + # Set time-delta + if not math.isclose(action[3], time_delta, abs_tol=1e-3): + logger.warning( + "%s: Expected time-delta=%s, but got time-delta=%s. " + "Action time-delta automatically changed to %s.", + name, + time_delta, + action[3], + time_delta, + ) + + # Limit Euclidean distance travelled + next_coord = action[:2] + vector = next_coord - prev_coord + dist = np.linalg.norm(vector) + if dist > dist_max: + unit_vector = vector / dist + limited_action[0], limited_action[1] = prev_coord + dist_max * unit_vector + logger.warning( + "%s: Allowed max speed=%s, but got speed=%s. Next x-coordinate " + "and y-coordinate automatically changed from %s to %s.", + name, + speed_max, + dist / time_delta, + next_coord, + limited_action[:2], + ) + + return limited_action + + class _InfoScore(gym.Wrapper): def __init__(self, env: gym.Env): super(_InfoScore, self).__init__(env)