diff --git a/CHANGELOG.md b/CHANGELOG.md index 6932826806..faf4360cee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,12 @@ Copy and pasting the git commit messages is __NOT__ enough. - Modified naming of benchmark used in NeurIPS 2022 from driving-smarts-competition-env to driving-smarts-v2022. - Sstudio generated scenario vehicle traffic ids are now shortened. - ChaseViaPoints zoo agent uses unconstrained path change command, instead of being constrained to [-1, 0, +1] path change commands used previously. +- Made the metrics module configurable by supplying parameters through a `Params` class. +- Neighborhood vehicles which should be excluded from the `dist_to_obstacles` cost function can be specified through `Params`. This would be useful in certain tasks, like the vehicle-following task where the distance to the lead vehicle should not be included in the computation of the `dist_to_obstacles` cost function. +- Unified the computation of `dist_to_destination` (previously known as `completion`) and `steps` (i.e., time taken) as functions inside the cost functions module, instead of computing them separately in a different module. +- In the metrics module, the records which is the raw metrics data and the scoring which is the formula to compute the final results are now separated to provided greater flexibility for applying metrics to different environments. +- Benchmark listing may specify specialised metric formula for each benchmark. +- Changed `benchmark_runner_v0.py` to only average records across scenarios that share the same environment. Records are not averaged across different environments, because the scoring formula may differ in different environments. ### Deprecated ### Fixed - Fixed issues related to waypoints in junctions on Argoverse maps. Waypoints will now be generated for all paths leading through the lane(s) the vehicle is on. @@ -49,6 +55,7 @@ Copy and pasting the git commit messages is __NOT__ enough. - Fixed an issue where building sumo scenarios would sometimes stall. - `VehicleIndex` no longer segfaults when attempting to `repr()` it. - Fixed issues related to waypoints in SUMO maps. Waypoints in junctions should now return all possible paths through the junction. +- Fixed CI tests for metrics. ### Removed - Removed the deprecated `waymo_browser` utility. - Removed camera observation `created_at` attribute from metadata to make observation completely reproducible. diff --git a/smarts/benchmark/driving_smarts/v2023/config_3.yaml b/smarts/benchmark/driving_smarts/v2023/config_3.yaml index 067564dc6b..fd11954a73 100644 --- a/smarts/benchmark/driving_smarts/v2023/config_3.yaml +++ b/smarts/benchmark/driving_smarts/v2023/config_3.yaml @@ -20,4 +20,4 @@ benchmark: - scenarios/sumo/platoon/merge_exit_sumo_t_agents_1 kwargs: seed: 42 - # metric_formula: smarts/benchmark/driving_smarts/v2023/metric_formula_platoon.py + metric_formula: smarts/benchmark/driving_smarts/v2023/metric_formula_platoon.py diff --git a/smarts/benchmark/driving_smarts/v2023/metric_formula_platoon.py b/smarts/benchmark/driving_smarts/v2023/metric_formula_platoon.py new file mode 100644 index 0000000000..8582c2da1d --- /dev/null +++ b/smarts/benchmark/driving_smarts/v2023/metric_formula_platoon.py @@ -0,0 +1,156 @@ +# MIT License + +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import functools +from typing import Dict + +import numpy as np + +from smarts.env.gymnasium.wrappers.metric.costs import Costs +from smarts.env.gymnasium.wrappers.metric.formula import FormulaBase, Score +from smarts.env.gymnasium.wrappers.metric.params import ( + Comfort, + DistToObstacles, + GapBetweenVehicles, + Params, + Steps, +) +from smarts.env.gymnasium.wrappers.metric.types import Record +from smarts.env.gymnasium.wrappers.metric.utils import ( + add_dataclass, + divide, + op_dataclass, +) + + +class Formula(FormulaBase): + """Sets the (i) cost function parameters, and (ii) score computation formula, + for an environment. + """ + + def __init__(self): + pass + + def params(self) -> Params: + """Return parameters to configure and initialize cost functions. + + Returns: + Params: Cost function parameters. + """ + params = Params( + comfort=Comfort( + active=False, + ), # TODO: Activate after implementing comfort cost function. + dist_to_obstacles=DistToObstacles( + active=False, + ), + gap_between_vehicles=GapBetweenVehicles( + active=False, + interest="Leader-007", + ), # TODO: Activate after implmenting gap_between_vehicles cost function. + steps=Steps( + active=False, + ), + ) + return params + + def score(self, records_sum: Dict[str, Dict[str, Record]]) -> Score: + """ + Computes several sub-component scores and one total combined score named + "Overall" on the wrapped environment. + + +-------------------+--------+-----------------------------------------------------------+ + | | Range | Remarks | + +===================+========+===========================================================+ + | Overall | [0, 1] | Total score. The higher, the better. | + +-------------------+--------+-----------------------------------------------------------+ + | DistToDestination | [0, 1] | Remaining distance to destination. The lower, the better. | + +-------------------+--------+-----------------------------------------------------------+ + | GapBetweenVehicles| [0, 1] | Gap between vehicles in a platoon. The higher, the better.| + +-------------------+--------+-----------------------------------------------------------+ + | Humanness | [0, 1] | Humanness indicator. The higher, the better. | + +-------------------+--------+-----------------------------------------------------------+ + | Rules | [0, 1] | Traffic rules compliance. The higher, the better. | + +-------------------+--------+-----------------------------------------------------------+ + + Returns: + Score: Contains "Overall", "DistToDestination", "GapBetweenVehicles", + "Humanness", and "Rules" scores. + """ + + costs_total = Costs() + episodes = 0 + for scen, val in records_sum.items(): + # Number of agents in scenario. + agents_in_scenario = len(val.keys()) + costs_list, counts_list = zip( + *[(record.costs, record.counts) for agent, record in val.items()] + ) + # Sum costs over all agents in scenario. + costs_sum_agent: Costs = functools.reduce( + lambda a, b: add_dataclass(a, b), costs_list + ) + # Average costs over number of agents in scenario. + costs_mean_agent = op_dataclass(costs_sum_agent, agents_in_scenario, divide) + # Sum costs over all scenarios. + costs_total = add_dataclass(costs_total, costs_mean_agent) + # Increment total number of episodes. + episodes += counts_list[0].episodes + + # Average costs over total number of episodes. + costs_final = op_dataclass(costs_total, episodes, divide) + + # Compute sub-components of score. + dist_to_destination = costs_final.dist_to_destination + humanness = _humanness(costs=costs_final) + rules = _rules(costs=costs_final) + gap_between_vehicles = costs_final.gap_between_vehicles + overall = ( + 0.50 * (1 - dist_to_destination) + + 0.25 * gap_between_vehicles + + 0.20 * humanness + + 0.05 * rules + ) + + return Score( + { + "overall": overall, + "dist_to_destination": dist_to_destination, + "gap_between_vehicles": gap_between_vehicles, + "humanness": humanness, + "rules": rules, + } + ) + + +def _humanness(costs: Costs) -> float: + humanness = np.array( + [costs.jerk_linear, costs.lane_center_offset] + ) + humanness = np.mean(humanness, dtype=float) + return 1 - humanness + + +def _rules(costs: Costs) -> float: + rules = np.array([costs.speed_limit, costs.wrong_way]) + rules = np.mean(rules, dtype=float) + return 1 - rules diff --git a/smarts/benchmark/entrypoints/benchmark_runner_v0.py b/smarts/benchmark/entrypoints/benchmark_runner_v0.py index 9938cf511b..0916f5f8d4 100644 --- a/smarts/benchmark/entrypoints/benchmark_runner_v0.py +++ b/smarts/benchmark/entrypoints/benchmark_runner_v0.py @@ -21,16 +21,21 @@ # THE SOFTWARE. import logging import os +import pprint from pathlib import Path -from typing import List, Tuple +from typing import Dict import gymnasium as gym import psutil import ray from smarts.benchmark.driving_smarts import load_config +from smarts.core.utils.import_utils import import_module_from_file from smarts.core.utils.logging import suppress_output -from smarts.env.gymnasium.wrappers.metrics import Metrics, Score +from smarts.env.gymnasium.wrappers.metric.formula import Score +from smarts.env.gymnasium.wrappers.metric.metrics import Metrics +from smarts.env.gymnasium.wrappers.metric.types import Record +from smarts.env.gymnasium.wrappers.metric.utils import multiply, op_dataclass from smarts.zoo import registry as agent_registry LOG_WORKERS = False @@ -52,20 +57,20 @@ def _eval_worker_local(name, env_config, episodes, agent_locator, error_tolerant agent_interface=agent_registry.make(locator=agent_locator).interface, **env_config["kwargs"], ) - env = Metrics(env) + env = Metrics(env, formula_path=env_config["metric_formula"]) agents = { agent_id: agent_registry.make_agent(locator=agent_locator) for agent_id in env.agent_ids } - observation, info = env.reset() + obs, info = env.reset() current_resets = 0 try: while current_resets < episodes: try: action = { - agent_id: agents[agent_id].act(obs) - for agent_id, obs in observation.items() + agent_id: agents[agent_id].act(agent_obs) + for agent_id, agent_obs in obs.items() } # assert env.action_space.contains(action) except Exception: @@ -76,14 +81,14 @@ def _eval_worker_local(name, env_config, episodes, agent_locator, error_tolerant raise terminated, truncated = False, True else: - observation, reward, terminated, truncated, info = env.step(action) + obs, reward, terminated, truncated, info = env.step(action) if terminated["__all__"] or truncated["__all__"]: current_resets += 1 - observation, info = env.reset() + obs, info = env.reset() finally: - score = env.score() + records = env.records() env.close() - return name, score + return name, records def _parallel_task_iterator(env_args, benchmark_args, agent_locator, log_workers): @@ -97,9 +102,9 @@ def _parallel_task_iterator(env_args, benchmark_args, agent_locator, log_workers for name, env_config in env_args.items(): if len(unfinished_refs) >= max_queued_tasks: ready_refs, unfinished_refs = ray.wait(unfinished_refs, num_returns=1) - for name, score in ray.get(ready_refs): - yield name, score - print(f"Evaluating {name}...") + for name, records in ray.get(ready_refs): + yield name, records + print(f"\nEvaluating {name}...") unfinished_refs.append( _eval_worker.remote( name=name, @@ -109,23 +114,23 @@ def _parallel_task_iterator(env_args, benchmark_args, agent_locator, log_workers error_tolerant=ERROR_TOLERANT, ) ) - for name, score in ray.get(unfinished_refs): - yield name, score + for name, records in ray.get(unfinished_refs): + yield name, records finally: ray.shutdown() def _serial_task_iterator(env_args, benchmark_args, agent_locator, *args, **_): for name, env_config in env_args.items(): - print(f"Evaluating {name}...") - name, score = _eval_worker_local( + print(f"\nEvaluating {name}...") + name, records = _eval_worker_local( name=name, env_config=env_config, episodes=benchmark_args["eval_episodes"], agent_locator=agent_locator, error_tolerant=ERROR_TOLERANT, ) - yield name, score + yield name, records def benchmark(benchmark_args, agent_locator, log_workers=False): @@ -135,59 +140,69 @@ def benchmark(benchmark_args, agent_locator, log_workers=False): agent_loctor(str): Locator string for the registered agent. debug_log(bool): Whether the benchmark should log to stdout. """ - print(f"Starting `{benchmark_args['name']}` benchmark.") - debug = benchmark_args.get("debug", {}) + print(f"\n\n<-- Starting `{benchmark_args['name']}` benchmark -->\n") message = benchmark_args.get("message") if message is not None: print(message) - env_args = {} + + debug = benchmark_args.get("debug", {}) + iterator = _serial_task_iterator if debug.get("serial") else _parallel_task_iterator + + root_dir = Path(__file__).resolve().parents[3] for env_name, env_config in benchmark_args["envs"].items(): + metric_formula = ( + root_dir / x + if (x := env_config.get("metric_formula", None)) != None + else None + ) + + env_args = {} for scenario in env_config["scenarios"]: - scenario_path = str(Path(__file__).resolve().parents[3] / scenario) kwargs = dict(benchmark_args.get("shared_env_kwargs", {})) kwargs.update(env_config.get("kwargs", {})) env_args[f"{env_name}-{scenario}"] = dict( env=env_config["loc"], - scenario=scenario_path, + scenario=str(root_dir / scenario), kwargs=kwargs, + metric_formula=metric_formula, ) - named_scores = [] - iterator = _serial_task_iterator if debug.get("serial") else _parallel_task_iterator - - for name, score in iterator( - env_args=env_args, - benchmark_args=benchmark_args, - agent_locator=agent_locator, - log_workers=log_workers, - ): - named_scores.append((name, score)) - print(f"Scoring {name}...") - - def format_one_line_scores(named_scores: List[Tuple[str, Score]]): - name_just = 30 - headers = "SCENARIO".ljust(name_just) + "SCORE" - return ( - headers - + "\n" - + "\n".join( - f"- {name}:".ljust(name_just) + f"{score}" - for name, score in named_scores + records_cumulative: Dict[str, Dict[str, Record]] = {} + for name, records in iterator( + env_args=env_args, + benchmark_args=benchmark_args, + agent_locator=agent_locator, + log_workers=log_workers, + ): + records_cumulative.update(records) + print(f"\nScoring {name} ...") + + score = _get_score(records=records_cumulative, metric_formula=metric_formula) + print("\nSCORE") + pprint.pprint(score) + + print("\n<-- Evaluation complete -->\n") + + +def _get_score(records: Dict[str, Dict[str, Record]], metric_formula: Path) -> Score: + # Convert averaged records into sum of records. + records_sum = {} + for scen, agents in records.items(): + records_sum[scen] = {} + for agent, data in agents.items(): + records_sum[scen][agent] = Record( + costs=op_dataclass(data.costs, data.counts.episodes, multiply), + counts=data.counts, ) - ) - def format_scores_total(named_scores: List[Tuple[str, Score]], scenario_count): - score_sum = Score(*[sum(f) for f in zip(*[score for _, score in named_scores])]) - return "\n".join( - f"- {k}: {v/scenario_count}" for k, v in score_sum._asdict().items() - ) + # Import scoring formula + import_module_from_file("custom_formula", metric_formula) + from custom_formula import Formula + + formula = Formula() - print("Evaluation complete...") - print() - print(format_one_line_scores(named_scores)) - print() - print("`Driving SMARTS` averaged result:") - print(format_scores_total(named_scores, len(env_args))) + score = formula.score(records_sum=records_sum) + return score def benchmark_from_configs(benchmark_config, agent_locator, debug_log=False): diff --git a/smarts/env/gymnasium/hiway_env_v1.py b/smarts/env/gymnasium/hiway_env_v1.py index 26c35f1eef..4c92649adf 100644 --- a/smarts/env/gymnasium/hiway_env_v1.py +++ b/smarts/env/gymnasium/hiway_env_v1.py @@ -489,15 +489,6 @@ def scenario_log(self) -> Dict[str, Union[float, str]]: "mission_hash": str(hash(frozenset(scenario.missions.items()))), } - @property - def scenario(self) -> Scenario: - """Returns underlying scenario. - - Returns: - scenario.Scenario: Current simulated scenario. - """ - return self._smarts.scenario - @property def smarts(self): """Gives access to the underlying simulator. Use this carefully. diff --git a/smarts/env/gymnasium/platoon_env.py b/smarts/env/gymnasium/platoon_env.py index 30a96c6138..8fa70623f2 100644 --- a/smarts/env/gymnasium/platoon_env.py +++ b/smarts/env/gymnasium/platoon_env.py @@ -163,7 +163,7 @@ def resolve_agent_interface(agent_interface: AgentInterface): wrong_way=False, not_moving=False, actors_alive=ActorsAliveDoneCriteria( - actors_of_interest=(".*Leader-007",), + actors_of_interest=("Leader-007",), strict=True, ), ) diff --git a/smarts/env/gymnasium/wrappers/metric/completion.py b/smarts/env/gymnasium/wrappers/metric/completion.py deleted file mode 100644 index 2ef1b98855..0000000000 --- a/smarts/env/gymnasium/wrappers/metric/completion.py +++ /dev/null @@ -1,168 +0,0 @@ -# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -import logging -import warnings -from dataclasses import dataclass -from typing import Any, Callable, Dict - -from smarts.core.coordinates import Heading, Point -from smarts.core.observations import Observation -from smarts.core.plan import Mission, Plan, PlanningError, PositionalGoal, Start -from smarts.core.road_map import RoadMap -from smarts.core.utils.math import running_mean - -logger = logging.getLogger(__file__) - -MAXIMUM_OFFROAD_DIST = 300 - - -@dataclass(frozen=True) -class Completion: - """Parameters to compute the percentage of scenario tasks completed.""" - - dist_remainder: float = 0 - """Shortest road distance between current position and goal position. - """ - dist_tot: float = 0 - """Shortest route distance between start position and goal position. - """ - - -class CompletionError(Exception): - """Raised when computation of `Completion` metric fails.""" - - pass - - -def get_dist(road_map: RoadMap, point_a: Point, point_b: Point) -> float: - """ - Computes the shortest route distance from point_a to point_b in the road - map. If no routes are available from point_a to point_b, then distance of - the shortest road length from point_b to point_a is instead computed. Both - points should lie on a road in the road map. - - Args: - road_map: Scenario road map. - point_a: A point, in world-map coordinates, which lies on a road. - point_b: A point, in world-map coordinates, which lies on a road. - - Returns: - float: Shortest road distance between two points in the road map. - """ - - def _get_dist(start: Point, end: Point) -> float: - mission = Mission( - start=Start( - position=start.as_np_array, - heading=Heading(0), - from_front_bumper=False, - ), - goal=PositionalGoal( - position=end, - radius=3, - ), - ) - plan = Plan(road_map=road_map, mission=mission, find_route=False) - plan.create_route(mission=mission, radius=MAXIMUM_OFFROAD_DIST) - from_route_point = RoadMap.Route.RoutePoint(pt=start) - to_route_point = RoadMap.Route.RoutePoint(pt=end) - - dist_tot = plan.route.distance_between( - start=from_route_point, end=to_route_point - ) - if dist_tot == None: - raise CompletionError("Unable to find road on route near given points.") - elif dist_tot < 0: - raise CompletionError( - "Path from start point to end point flows in " - "the opposite direction of the generated route." - ) - - return dist_tot - - try: - dist_tot = _get_dist(point_a, point_b) - except PlanningError as err: - if err.args[0].startswith("Unable to find a route"): - # Vehicle might end (i) in a one-way road, or (ii) in a road without - # u-turn, causing the route planner to fail. When there is no legal - # route, the walkable road distance in the reverse direction is - # returned as the distance between point_a and point_b. - dist_tot = _get_dist(point_b, point_a) - logger.info( - "completion.get dist(): Did not find a route from " - "%s to %s, instead found a reversed route from %s to %s.", - point_a, - point_b, - point_b, - point_a, - ) - else: - raise - except CompletionError: - dist_tot = 1e10 - warnings.warn( - "completion.get dist(): Did not find a route from " - f"{point_a} to {point_b}, because too far off road. " - f"Agent vehicle is more than {MAXIMUM_OFFROAD_DIST}m off road." - "This will cause a large penalty in completion score.", - ) - return dist_tot - - -def _dist_remainder(): - mean: float = 0 - step: int = 0 - - def func( - road_map: RoadMap, obs: Observation, initial_compl: Completion - ) -> Completion: - nonlocal mean, step - - if obs.events.reached_goal: - dist = 0 - else: - cur_pos = Point(*obs.ego_vehicle_state.position) - goal_position = getattr(obs.ego_vehicle_state.mission.goal, "position") - assert ( - goal_position is not None - ), f"Ego `{obs.ego_vehicle_state.id}` cannot use cost func without a positional goal." - goal_point = Point(*goal_position) - dist = get_dist(road_map=road_map, point_a=cur_pos, point_b=goal_point) - - # Cap remainder distance - c_dist = min(dist, initial_compl.dist_tot) - - mean, step = running_mean(prev_mean=mean, prev_step=step, new_val=c_dist) - return Completion(dist_remainder=mean) - - return func - - -@dataclass(frozen=True) -class CompletionFuncs: - """Functions to compute scenario completion metrics. Each function computes - the running mean completion value over number of episodes, for a given - scenario.""" - - # fmt: off - dist_remainder: Callable[[RoadMap, Observation, Completion], Completion] = _dist_remainder() - # fmt: on diff --git a/smarts/env/gymnasium/wrappers/metric/costs.py b/smarts/env/gymnasium/wrappers/metric/costs.py index bfc9ab5992..76d2991867 100644 --- a/smarts/env/gymnasium/wrappers/metric/costs.py +++ b/smarts/env/gymnasium/wrappers/metric/costs.py @@ -19,14 +19,17 @@ # THE SOFTWARE. from dataclasses import dataclass -from typing import Any, Callable, Dict +from typing import Callable, Dict, List, NewType import numpy as np from smarts.core.coordinates import Heading, Point from smarts.core.observations import Observation +from smarts.core.plan import Mission, Plan, PositionalGoal, Start from smarts.core.road_map import RoadMap from smarts.core.utils.math import running_mean +from smarts.core.vehicle_index import VehicleIndex +from smarts.env.gymnasium.wrappers.metric.params import Params @dataclass(frozen=True) @@ -34,28 +37,91 @@ class Costs: """Performance cost values.""" collisions: int = 0 + comfort: float = 0 + dist_to_destination: float = 0 dist_to_obstacles: float = 0 - jerk_angular: float = 0 + gap_between_vehicles: float = 0 jerk_linear: float = 0 lane_center_offset: float = 0 off_road: int = 0 speed_limit: float = 0 + steps: float = 0 wrong_way: float = 0 -def _collisions(road_map: RoadMap, obs: Observation) -> Costs: - return Costs(collisions=len(obs.events.collisions)) +Done = NewType("Done", bool) -def _dist_to_obstacles() -> Callable[[RoadMap, Observation], Costs]: +def _collisions() -> Callable[[RoadMap, Done, Observation], Costs]: + sum = 0 + + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: + nonlocal sum + + sum = sum + len(obs.events.collisions) + j_coll = sum + return Costs(collisions=j_coll) + + return func + + +def _comfort() -> Callable[[RoadMap, Done, Observation], Costs]: + mean = 0 + step = 0 + + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: + nonlocal mean, step + + # TODO: Cost function is to be designed. + + j_comfort = 0 + mean, step = running_mean(prev_mean=mean, prev_step=step, new_val=j_comfort) + return Costs(comfort=0) + + return func + + +def _dist_to_destination( + end_pos: Point = Point(0, 0, 0), dist_tot: float = 0 +) -> Callable[[RoadMap, Done, Observation], Costs]: + mean = 0 + step = 0 + end_pos = end_pos + dist_tot = dist_tot + + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: + nonlocal mean, step, end_pos, dist_tot + + if not done: + return Costs(dist_to_destination=-1) + elif obs.events.reached_goal: + return Costs(dist_to_destination=0) + else: + cur_pos = Point(*obs.ego_vehicle_state.position) + dist_remainder = get_dist( + road_map=road_map, point_a=cur_pos, point_b=end_pos + ) + dist_remainder_capped = min( + dist_remainder, dist_tot + ) # Cap remainder distance + return Costs(dist_to_destination=dist_remainder_capped / dist_tot) + + return func + + +def _dist_to_obstacles( + ignore: List[str], +) -> Callable[[RoadMap, Done, Observation], Costs]: mean = 0 step = 0 rel_angle_th = np.pi * 40 / 180 rel_heading_th = np.pi * 179 / 180 w_dist = 0.05 + safe_time = 3 # Safe driving distance expressed in time. Units:seconds. + ignore = ignore - def func(road_map: RoadMap, obs: Observation) -> Costs: - nonlocal mean, step, rel_angle_th, rel_heading_th, w_dist + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: + nonlocal mean, step, rel_angle_th, rel_heading_th, w_dist, safe_time, ignore # Ego's position and heading with respect to the map's coordinate system. # Note: All angles returned by smarts is with respect to the map's coordinate system. @@ -65,7 +131,7 @@ def func(road_map: RoadMap, obs: Observation) -> Costs: ego_pos = ego.position # Set obstacle distance threshold using 3-second rule - obstacle_dist_th = ego.speed * 3 + obstacle_dist_th = ego.speed * safe_time if obstacle_dist_th == 0: return Costs(dist_to_obstacles=0) @@ -84,6 +150,13 @@ def func(road_map: RoadMap, obs: Observation) -> Costs: if len(nghbs_state) == 0: return Costs(dist_to_obstacles=0) + # Filter neighbors to be ignored. + nghbs_state = [ + nghb_state for nghb_state in nghbs_state if nghb_state[0].id not in ignore + ] + if len(nghbs_state) == 0: + return Costs(dist_to_obstacles=0) + # Filter neighbors within ego's visual field. obstacles = [] for nghb_state in nghbs_state: @@ -111,32 +184,38 @@ def func(road_map: RoadMap, obs: Observation) -> Costs: if len(nghbs_state) == 0: return Costs(dist_to_obstacles=0) - # J_D : Distance to obstacles cost + # j_dist_to_obstacles : Distance to obstacles cost di = np.array([nghb_state[1] for nghb_state in nghbs_state]) - j_d = np.amax(np.exp(-w_dist * di)) + j_dist_to_obstacles = np.amax(np.exp(-w_dist * di)) - mean, step = running_mean(prev_mean=mean, prev_step=step, new_val=j_d) + mean, step = running_mean( + prev_mean=mean, prev_step=step, new_val=j_dist_to_obstacles + ) return Costs(dist_to_obstacles=mean) return func -def _jerk_angular() -> Callable[[RoadMap, Observation], Costs]: +def _gap_between_vehicles( + interest: str, +) -> Callable[[RoadMap, Done, Observation], Costs]: mean = 0 step = 0 + interest = interest - # TODO: The output of this cost function should be normalised and bounded to [0,1]. - def func(road_map: RoadMap, obs: Observation) -> Costs: - nonlocal mean, step + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: + nonlocal mean, step, interest + + # TODO: Cost function is to be designed. - j_a = np.linalg.norm(obs.ego_vehicle_state.angular_jerk) - mean, step = running_mean(prev_mean=mean, prev_step=step, new_val=j_a) - return Costs(jerk_angular=mean) + j_gap = 0 + mean, step = running_mean(prev_mean=mean, prev_step=step, new_val=j_gap) + return Costs(gap_between_vehicles=0) return func -def _jerk_linear() -> Callable[[RoadMap, Observation], Costs]: +def _jerk_linear() -> Callable[[RoadMap, Done, Observation], Costs]: mean = 0 step = 0 jerk_linear_max = np.linalg.norm(np.array([0.9, 0.9, 0])) # Units: m/s^3 @@ -149,7 +228,7 @@ def _jerk_linear() -> Callable[[RoadMap, Observation], Costs]: Neural Information Processing Systems, NeurIPS 2019, Vancouver, Canada. """ - def func(road_map: RoadMap, obs: Observation) -> Costs: + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: nonlocal mean, step, jerk_linear_max jerk_linear = np.linalg.norm(obs.ego_vehicle_state.linear_jerk) @@ -160,17 +239,17 @@ def func(road_map: RoadMap, obs: Observation) -> Costs: return func -def _lane_center_offset() -> Callable[[RoadMap, Observation], Costs]: +def _lane_center_offset() -> Callable[[RoadMap, Done, Observation], Costs]: mean = 0 step = 0 - def func(road_map: RoadMap, obs: Observation) -> Costs: + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: nonlocal mean, step if obs.events.off_road: # When vehicle is off road, the lane_center_offset cost - # (i.e., j_lc) is set as zero. - j_lc = 0 + # (i.e., j_lco) is set as zero. + j_lco = 0 else: # Vehicle's offset along the lane ego_pos = obs.ego_vehicle_state.position @@ -185,30 +264,39 @@ def func(road_map: RoadMap, obs: Observation) -> Costs: # reflinepoint.t = signed distance from lane center norm_dist_from_center = reflinepoint.t / lane_hwidth - # J_LC : Lane center offset - j_lc = norm_dist_from_center**2 + # j_lco : Lane center offset + j_lco = norm_dist_from_center**2 - mean, step = running_mean(prev_mean=mean, prev_step=step, new_val=j_lc) + mean, step = running_mean(prev_mean=mean, prev_step=step, new_val=j_lco) return Costs(lane_center_offset=mean) return func -def _off_road(road_map: RoadMap, obs: Observation) -> Costs: - return Costs(off_road=obs.events.off_road) +def _off_road() -> Callable[[RoadMap, Done, Observation], Costs]: + sum = 0 + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: + nonlocal sum -def _speed_limit() -> Callable[[RoadMap, Observation], Costs]: + sum = sum + obs.events.off_road + j_off_road = sum + return Costs(off_road=j_off_road) + + return func + + +def _speed_limit() -> Callable[[RoadMap, Done, Observation], Costs]: mean = 0 step = 0 - def func(road_map: RoadMap, obs: Observation) -> Costs: + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: nonlocal mean, step if obs.events.off_road: # When vehicle is off road, the speed_limit cost (i.e., j_v) is # set as zero. - j_v = 0 + j_speed_limit = 0 else: # Nearest lane's speed limit. ego_speed = obs.ego_vehicle_state.speed @@ -222,19 +310,49 @@ def func(road_map: RoadMap, obs: Observation) -> Costs: # Excess speed beyond speed limit. overspeed = ego_speed - speed_limit if ego_speed > speed_limit else 0 overspeed_norm = min(overspeed / (0.5 * speed_limit), 1) - j_v = overspeed_norm**2 + j_speed_limit = overspeed_norm**2 - mean, step = running_mean(prev_mean=mean, prev_step=step, new_val=j_v) + mean, step = running_mean(prev_mean=mean, prev_step=step, new_val=j_speed_limit) return Costs(speed_limit=mean) return func -def _wrong_way() -> Callable[[RoadMap, Observation], Costs]: +def _steps(max_episode_steps: int) -> Callable[[RoadMap, Done, Observation], Costs]: + step = 0 + max_episode_steps = max_episode_steps + + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: + nonlocal step, max_episode_steps + + step = step + 1 + + if not done: + return Costs(steps=-1) + + if obs.events.reached_goal or obs.events.actors_alive_done: + return Costs(steps=step / max_episode_steps) + elif ( + len(obs.events.collisions) > 0 + or obs.events.off_road + or obs.events.reached_max_episode_steps + ): + return Costs(steps=1) + else: + raise CostError( + "Expected reached_goal, collisions, off_road, " + "max_episode_steps, or actors_alive_done, to be true " + f"on agent done, but got events: {obs.events}." + ) + + return func + + +def _wrong_way() -> Callable[[RoadMap, Done, Observation], Costs]: mean = 0 step = 0 - def func(road_map: RoadMap, obs: Observation) -> Costs: + def func(road_map: RoadMap, done: Done, obs: Observation) -> Costs: nonlocal mean, step j_wrong_way = 0 if obs.events.wrong_way: @@ -246,16 +364,104 @@ def func(road_map: RoadMap, obs: Observation) -> Costs: return func -@dataclass(frozen=True) -class CostFuncs: +@dataclass +class CostFuncsBase: """Functions to compute performance costs. Each cost function computes the - running mean cost over number of time steps, for a given scenario.""" - - collisions: Callable[[RoadMap, Observation], Costs] = _collisions - dist_to_obstacles: Callable[[RoadMap, Observation], Costs] = _dist_to_obstacles() - # jerk_angular: Callable[[RoadMap, Observation], Costs] = _jerk_angular() # Currently not used. - jerk_linear: Callable[[RoadMap, Observation], Costs] = _jerk_linear() - lane_center_offset: Callable[[RoadMap, Observation], Costs] = _lane_center_offset() - off_road: Callable[[RoadMap, Observation], Costs] = _off_road - speed_limit: Callable[[RoadMap, Observation], Costs] = _speed_limit() - wrong_way: Callable[[RoadMap, Observation], Costs] = _wrong_way() + running cost over time steps, for a given scenario.""" + + # fmt: off + collisions: Callable[[], Callable[[RoadMap, Done, Observation], Costs]] = _collisions + comfort: Callable[[], Callable[[RoadMap, Done, Observation], Costs]] = _comfort + dist_to_destination: Callable[[Point,float], Callable[[RoadMap, Done, Observation], Costs]] = _dist_to_destination + dist_to_obstacles: Callable[[List[str]], Callable[[RoadMap, Done, Observation], Costs]] = _dist_to_obstacles + gap_between_vehicles: Callable[[str], Callable[[RoadMap, Done, Observation], Costs]] = _gap_between_vehicles + jerk_linear: Callable[[], Callable[[RoadMap, Done, Observation], Costs]] = _jerk_linear + lane_center_offset: Callable[[], Callable[[RoadMap, Done, Observation], Costs]] = _lane_center_offset + off_road: Callable[[], Callable[[RoadMap, Done, Observation], Costs]] = _off_road + speed_limit: Callable[[], Callable[[RoadMap, Done, Observation], Costs]] = _speed_limit + steps: Callable[[int], Callable[[RoadMap, Done, Observation], Costs]] = _steps + wrong_way: Callable[[], Callable[[RoadMap, Done, Observation], Costs]] = _wrong_way + # fmt: on + + +CostFuncs = NewType( + "CostFuncs", Dict[str, Callable[[RoadMap, Done, Observation], Costs]] +) + + +def make_cost_funcs(params: Params, **kwargs) -> CostFuncs: + """ + Returns a dictionary of active cost functions to be computed as specified + by the corresponding `active` field in `params`. Cost functions are + initialized using `kwargs`, if any are provided. + + Args: + params (Params): Parameters to configure individual cost functions. + kwargs (Dict[str, Dict[str,Any]]): If any, used to initialize + the appropriate cost functions. + + Returns: + CostFuncs: Dictionary of active cost functions to be computed. + """ + cost_funcs = CostFuncs({}) + for field in CostFuncsBase.__dataclass_fields__: + if getattr(params, field).active: + func = getattr(CostFuncsBase, field) + args = kwargs.get(field, {}) + cost_funcs[field] = func(**args) + + return cost_funcs + + +class CostError(Exception): + """Raised when computation of cost functions fail.""" + + pass + + +def get_dist(road_map: RoadMap, point_a: Point, point_b: Point) -> float: + """ + Computes the shortest route distance from point_a to point_b in the road + map. Both points should lie on a road in the road map. Key assumption about + the road map: Any two given points on the road map have valid routes in + both directions. + + Args: + road_map: Scenario road map. + point_a: A point, in world-map coordinates, which lies on a road. + point_b: A point, in world-map coordinates, which lies on a road. + + Returns: + float: Shortest road distance between two points in the road map. + """ + + def _get_dist(start: Point, end: Point) -> float: + mission = Mission( + start=Start( + position=start.as_np_array, + heading=Heading(0), + from_front_bumper=False, + ), + goal=PositionalGoal( + position=end, + radius=3, + ), + ) + plan = Plan(road_map=road_map, mission=mission, find_route=False) + plan.create_route(mission=mission, radius=20) + from_route_point = RoadMap.Route.RoutePoint(pt=start) + to_route_point = RoadMap.Route.RoutePoint(pt=end) + + dist_tot = plan.route.distance_between( + start=from_route_point, end=to_route_point + ) + if dist_tot == None: + raise CostError("Unable to find road on route near given points.") + elif dist_tot < 0: + raise CostError( + "Path from start point to end point flows in " + "the opposite direction of the generated route." + ) + return dist_tot + + return _get_dist(point_a, point_b) diff --git a/smarts/env/gymnasium/wrappers/metric/counts.py b/smarts/env/gymnasium/wrappers/metric/counts.py index 49a40712ce..ad2ca452df 100644 --- a/smarts/env/gymnasium/wrappers/metric/counts.py +++ b/smarts/env/gymnasium/wrappers/metric/counts.py @@ -34,11 +34,3 @@ class Counts: steps: int = 0 """ Sum of steps taken over all episodes. """ - steps_adjusted: int = 0 - """ Sum of steps taken over all episodes. The number of steps, in an episode - where the agent did not achieve the goal, is replaced with the agent's - max_episode_steps value. - """ - max_steps: int = 0 - """ Sum of maximum number of steps over all episodes. - """ diff --git a/smarts/env/gymnasium/wrappers/metric/formula.py b/smarts/env/gymnasium/wrappers/metric/formula.py new file mode 100644 index 0000000000..048592c363 --- /dev/null +++ b/smarts/env/gymnasium/wrappers/metric/formula.py @@ -0,0 +1,163 @@ +# MIT License + +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import functools +from typing import Dict, NewType + +import numpy as np + +from smarts.env.gymnasium.wrappers.metric.costs import Costs +from smarts.env.gymnasium.wrappers.metric.params import Params +from smarts.env.gymnasium.wrappers.metric.types import Record +from smarts.env.gymnasium.wrappers.metric.utils import ( + add_dataclass, + divide, + op_dataclass, +) + +Score = NewType("Score", Dict[str, float]) + + +class FormulaBase: + """Interface, for cost function parameters and score computation formula, + to be implemented by other formula classes. + """ + + def __init__(self): + pass + + def params(self) -> Params: + """Return parameters to configure and initialize cost functions. + + Returns: + Params: Cost function parameters. + """ + raise NotImplementedError + + def score(self, records_sum: Dict[str, Dict[str, Record]]) -> Score: + """Computes sub-component scores and one total combined score named + "Overall" on the wrapped environment. + + Returns: + Score: Contains "Overall" score and other sub-component scores. + """ + raise NotImplementedError + + +class Formula(FormulaBase): + """Sets the (i) cost function parameters, and (ii) score computation formula, + for an environment. + """ + + def __init__(self): + pass + + def params(self) -> Params: + """Return parameters to configure and initialize cost functions. + + Returns: + Params: Cost function parameters. + """ + return Params() + + def score(self, records_sum: Dict[str, Dict[str, Record]]) -> Score: + """ + Computes four sub-component scores, namely, "Distance to Destination", + "Time", "Humanness", "Rules", and one total combined score named + "Overall" on the wrapped environment. + + +-------------------+--------+-----------------------------------------------------------+ + | | Range | Remarks | + +===================+========+===========================================================+ + | Overall | [0, 1] | Total score. The higher, the better. | + +-------------------+--------+-----------------------------------------------------------+ + | DistToDestination | [0, 1] | Remaining distance to destination. The lower, the better. | + +-------------------+--------+-----------------------------------------------------------+ + | Time | [0, 1] | Time taken to complete scenario. The lower, the better. | + +-------------------+--------+-----------------------------------------------------------+ + | Humanness | [0, 1] | Humanness indicator. The higher, the better. | + +-------------------+--------+-----------------------------------------------------------+ + | Rules | [0, 1] | Traffic rules compliance. The higher, the better. | + +-------------------+--------+-----------------------------------------------------------+ + + Returns: + Score: Contains "Overall", "DistToDestination", "Time", + "Humanness", and "Rules" scores. + """ + + costs_total = Costs() + episodes = 0 + for scen, val in records_sum.items(): + # Number of agents in scenario. + agents_in_scenario = len(val.keys()) + costs_list, counts_list = zip( + *[(record.costs, record.counts) for agent, record in val.items()] + ) + # Sum costs over all agents in scenario. + costs_sum_agent: Costs = functools.reduce( + lambda a, b: add_dataclass(a, b), costs_list + ) + # Average costs over number of agents in scenario. + costs_mean_agent = op_dataclass(costs_sum_agent, agents_in_scenario, divide) + # Sum costs over all scenarios. + costs_total = add_dataclass(costs_total, costs_mean_agent) + # Increment total number of episodes. + episodes += counts_list[0].episodes + + # Average costs over total number of episodes. + costs_final = op_dataclass(costs_total, episodes, divide) + + # Compute sub-components of score. + dist_to_destination = costs_final.dist_to_destination + humanness = _humanness(costs=costs_final) + rules = _rules(costs=costs_final) + time = costs_final.steps + overall = ( + 0.50 * (1 - dist_to_destination) + + 0.25 * (1 - time) + + 0.20 * humanness + + 0.05 * rules + ) + + return Score( + { + "overall": overall, + "dist_to_destination": dist_to_destination, + "time": time, + "humanness": humanness, + "rules": rules, + } + ) + + +def _humanness(costs: Costs) -> float: + humanness = np.array( + [costs.dist_to_obstacles, costs.jerk_linear, costs.lane_center_offset] + ) + humanness = np.mean(humanness, dtype=float) + return 1 - humanness + + +def _rules(costs: Costs) -> float: + rules = np.array([costs.speed_limit, costs.wrong_way]) + rules = np.mean(rules, dtype=float) + return 1 - rules diff --git a/smarts/env/gymnasium/wrappers/metric/metrics.py b/smarts/env/gymnasium/wrappers/metric/metrics.py new file mode 100644 index 0000000000..82f04b28cb --- /dev/null +++ b/smarts/env/gymnasium/wrappers/metric/metrics.py @@ -0,0 +1,435 @@ +# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import copy +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Tuple + +import gymnasium as gym + +from smarts.core.agent_interface import ActorsAliveDoneCriteria, AgentInterface +from smarts.core.coordinates import Point, RefLinePoint +from smarts.core.observations import Observation +from smarts.core.plan import EndlessGoal, PositionalGoal +from smarts.core.road_map import RoadMap +from smarts.core.scenario import Scenario +from smarts.core.traffic_provider import TrafficProvider +from smarts.core.utils.import_utils import import_module_from_file +from smarts.core.vehicle_index import VehicleIndex +from smarts.env.gymnasium.wrappers.metric.costs import ( + CostFuncs, + Costs, + Done, + get_dist, + make_cost_funcs, +) +from smarts.env.gymnasium.wrappers.metric.counts import Counts +from smarts.env.gymnasium.wrappers.metric.formula import Score +from smarts.env.gymnasium.wrappers.metric.params import Params +from smarts.env.gymnasium.wrappers.metric.types import Record +from smarts.env.gymnasium.wrappers.metric.utils import ( + add_dataclass, + divide, + op_dataclass, +) + + +class MetricsError(Exception): + """Raised when Metrics env wrapper fails.""" + + pass + + +class MetricsBase(gym.Wrapper): + """Computes agents' performance metrics in a SMARTS environment.""" + + def __init__(self, env: gym.Env, formula_path: Optional[Path] = None): + super().__init__(env) + + # Import scoring formula. + if formula_path: + import_module_from_file("custom_formula", formula_path) + from custom_formula import Formula + else: + from smarts.env.gymnasium.wrappers.metric.formula import Formula + + self._formula = Formula() + self._params = self._formula.params() + + _check_env(agent_interfaces=self.env.agent_interfaces, params=self._params) + + self._scen: Scenario + self._scen_name: str + self._road_map: RoadMap + self._cur_agents: Set[str] + self._steps: Dict[str, int] + self._done_agents: Set[str] + self._vehicle_index: VehicleIndex + self._cost_funcs: Dict[str, CostFuncs] + self._records_sum: Dict[str, Dict[str, Record]] = {} + + def step(self, action: Dict[str, Any]): + """Steps the environment by one step.""" + result = super().step(action) + + obs, _, terminated, truncated, info = result + + # Only count steps in which an ego agent is present. + if len(obs) == 0: + return result + + dones = {} + if isinstance(terminated, dict): + # Caters to environments which use (i) ObservationOptions.multi_agent, + # (ii) ObservationOptions.unformated, and (iii) ObservationOptions.default . + dones = {k: v or truncated[k] for k, v in terminated.items()} + elif isinstance(terminated, bool): + # Caters to environments which use (i) ObservationOptions.full . + if terminated or truncated: + dones["__all__"] = True + else: + dones["__all__"] = False + dones.update({a: d["done"] for a, d in info.items()}) + + if isinstance(next(iter(obs.values())), dict): + # Caters to environments which use (i) ObservationOptions.multi_agent, + # (ii) ObservationOptions.full, and (iii) ObservationOptions.default . + active_agents = [ + agent_id for agent_id, agent_obs in obs.items() if agent_obs["active"] + ] + else: + # Caters to environments which uses (i) ObservationOptions.unformated . + active_agents = list(obs.keys()) + + for agent_name in active_agents: + base_obs: Observation = info[agent_name]["env_obs"] + self._steps[agent_name] += 1 + + # Compute all cost functions. + costs = Costs() + for _, cost_func in self._cost_funcs[agent_name].items(): + new_costs = cost_func(self._road_map, Done(dones[agent_name]), base_obs) + if dones[agent_name]: + costs = add_dataclass(new_costs, costs) + + if dones[agent_name] == False: + # Skip the rest, if agent is not done yet. + continue + + self._done_agents.add(agent_name) + # Only these termination reasons are considered by the current metrics. + if not ( + base_obs.events.reached_goal + or len(base_obs.events.collisions) + or base_obs.events.off_road + or base_obs.events.reached_max_episode_steps + or base_obs.events.actors_alive_done + ): + raise MetricsError( + "Expected reached_goal, collisions, off_road, " + "max_episode_steps, or actors_alive_done, to be true " + f"on agent done, but got events: {base_obs.events}." + ) + + # Update stored counts and costs. + counts = Counts( + episodes=1, + steps=self._steps[agent_name], + goals=base_obs.events.reached_goal, + ) + self._records_sum[self._scen_name][agent_name].counts = add_dataclass( + counts, self._records_sum[self._scen_name][agent_name].counts + ) + self._records_sum[self._scen_name][agent_name].costs = add_dataclass( + costs, self._records_sum[self._scen_name][agent_name].costs + ) + + if dones["__all__"] is True: + assert ( + self._done_agents == self._cur_agents + ), f'done["__all__"]==True but not all agents are done. Current agents = {self._cur_agents}. Agents done = {self._done_agents}.' + + return result + + def reset(self, **kwargs): + """Resets the environment.""" + result = super().reset(**kwargs) + self._cur_agents = set(self.env.agent_interfaces.keys()) + self._steps = dict.fromkeys(self._cur_agents, 0) + self._done_agents = set() + self._scen = self.env.smarts.scenario + self._scen_name = self.env.smarts.scenario.name + self._road_map = self.env.smarts.scenario.road_map + self._vehicle_index = self.env.smarts.vehicle_index + self._cost_funcs = {} + + _check_scen(scenario=self._scen, agent_interfaces=self.env.agent_interfaces) + + # Refresh the cost functions for every episode. + for agent_name in self._cur_agents: + end_pos = Point(0, 0, 0) + dist_tot = 0 + if self._params.dist_to_destination.active: + actors_alive = self.env.agent_interfaces[ + agent_name + ].done_criteria.actors_alive + if isinstance(actors_alive, ActorsAliveDoneCriteria): + end_pos, dist_tot = _get_sumo_smarts_dist( + vehicle_name=actors_alive.actors_of_interest[0], + traffic_sims=self.env.smarts.traffic_sims, + road_map=self._road_map, + ) + elif actors_alive == None: + end_pos = self._scen.missions[agent_name].goal.position + dist_tot = get_dist( + road_map=self._road_map, + point_a=Point(*self._scen.missions[agent_name].start.position), + point_b=end_pos, + ) + + self._cost_funcs[agent_name] = make_cost_funcs( + params=self._params, + dist_to_destination={ + "end_pos": end_pos, + "dist_tot": dist_tot, + }, + dist_to_obstacles={ + "ignore": self._params.dist_to_obstacles.ignore, + }, + gap_between_vehicles={ + "interest": self._params.gap_between_vehicles.interest, + }, + steps={ + "max_episode_steps": self.env.agent_interfaces[ + agent_name + ].max_episode_steps, + }, + ) + + # Create new entry in records_sum for new scenarios. + if self._scen_name not in self._records_sum.keys(): + self._records_sum[self._scen_name] = { + agent_name: Record( + costs=Costs(), + counts=Counts(), + ) + for agent_name in self._cur_agents + } + + return result + + def records(self) -> Dict[str, Dict[str, Record]]: + """ + Fine grained performance metric for each agent in each scenario. + + .. code-block:: bash + + $ env.records() + $ { + scen1: { + agent1: Record(costs, counts), + agent2: Record(costs, counts), + }, + scen2: { + agent1: Record(costs, counts), + }, + } + + Returns: + Dict[str, Dict[str, Record]]: Performance record in a nested + dictionary for each agent in each scenario. + """ + + records = {} + for scen, agents in self._records_sum.items(): + records[scen] = {} + for agent, data in agents.items(): + data_copy = copy.deepcopy(data) + records[scen][agent] = Record( + costs=op_dataclass( + data_copy.costs, data_copy.counts.episodes, divide + ), + counts=data_copy.counts, + ) + + return records + + def score(self) -> Score: + """ + Computes score according to environment specific formula from the + Formula class. + + Returns: + Dict[str, float]: Contains key-value pairs denoting score + components. + """ + records_sum_copy = copy.deepcopy(self._records_sum) + return self._formula.score(records_sum=records_sum_copy) + + +def _get_sumo_smarts_dist( + vehicle_name: str, traffic_sims: List[TrafficProvider], road_map: RoadMap +) -> Tuple[Point, float]: + """Computes the end point and route distance of a SUMO or a SMARTS vehicle + specified by `vehicle_name`. + + Args: + vehicle_name (str): Name of vehicle. + traffic_sims (List[TrafficProvider]): Traffic providers. + road_map (RoadMap): Underlying road map. + + Returns: + Tuple[Point, float]: End point and route distance. + """ + traffic_sim = [ + traffic_sim + for traffic_sim in traffic_sims + if traffic_sim.manages_actor(vehicle_name) + ] + assert ( + len(traffic_sim) == 1 + ), "None or multiple, traffic sims contain the vehicle of interest." + traffic_sim = traffic_sim[0] + dest_road = traffic_sim.vehicle_dest_road(vehicle_name) + end_pos = ( + road_map.road_by_id(dest_road) + .lane_at_index(0) + .from_lane_coord(RefLinePoint(s=1e10)) + ) + route = traffic_sim.route_for_vehicle(vehicle_name) + dist_tot = route.road_length + return end_pos, dist_tot + + +class Metrics(gym.Wrapper): + """Metrics class wraps an underlying MetricsBase class. The underlying + MetricsBase class computes agents' performance metrics in a SMARTS + environment. Whereas, this Metrics class is a basic gym.Wrapper class + which prevents external users from accessing or modifying (i) protected + attributes or (ii) attributes beginning with an underscore, to ensure + security of the metrics computed. + + Args: + env (gym.Env): A gym.Env to be wrapped. + + Raises: + AttributeError: Upon accessing (i) a protected attribute or (ii) an + attribute beginning with an underscore. + + Returns: + gym.Env: A wrapped gym.Env which computes agents' performance metrics. + """ + + def __init__(self, env: gym.Env, formula_path: Optional[Path] = None): + env = MetricsBase(env, formula_path) + super().__init__(env) + + def __getattr__(self, name: str): + """Returns an attribute with ``name``, unless ``name`` is a restricted + attribute or starts with an underscore.""" + if name == "_np_random": + raise AttributeError( + "Can't access `_np_random` of a wrapper, use `self.unwrapped._np_random` or `self.np_random`." + ) + elif name.startswith("_") or name in [ + "smarts", + ]: + raise AttributeError(f"accessing private attribute '{name}' is prohibited") + + return getattr(self.env, name) + + +def _check_env(agent_interfaces: Dict[str, AgentInterface], params: Params): + """Checks environment suitability to compute performance metrics. + Args: + agent_interfaces (Dict[str,AgentInterface]): Agent interfaces. + params (Params): Metric parameters. + + Raises: + AttributeError: If any required agent interface is disabled or + is ill defined. + """ + + def check_intrfc(agent_intrfc: AgentInterface): + intrfc = { + "accelerometer": bool(agent_intrfc.accelerometer), + "max_episode_steps": bool(agent_intrfc.max_episode_steps), + "neighborhood_vehicle_states": bool(agent_intrfc.neighborhood_vehicles), + "waypoint_paths": bool(agent_intrfc.waypoints), + "done_criteria.collision": agent_intrfc.done_criteria.collision, + "done_criteria.off_road": agent_intrfc.done_criteria.off_road, + } + return intrfc + + for agent_name, agent_interface in agent_interfaces.items(): + intrfc = check_intrfc(agent_interface) + if not all(intrfc.values()): + raise AttributeError( + ( + "Enable {0}'s disabled interface to " + "compute its metrics. Current interface is " + "{1}." + ).format(agent_name, intrfc) + ) + + actors_alive = agent_interface.done_criteria.actors_alive + if ( + params.dist_to_destination.active + and isinstance(actors_alive, ActorsAliveDoneCriteria) + and len(actors_alive.actors_of_interest) != 1 + ): + raise AttributeError( + ( + "ActorsAliveDoneCriteria with none or multiple actors of " + "interest is currently not supported when " + "dist_to_destination cost function is enabled. Current " + "interface is {0}:{1}." + ).format(agent_name, actors_alive) + ) + + +def _check_scen(scenario: Scenario, agent_interfaces: Dict[str, AgentInterface]): + """Checks scenario suitability to compute performance metrics. + + Args: + scen (Scenario): A ``smarts.core.scenario.Scenario`` class. + agent_interfaces (Dict[str,AgentInterface]): Agent interfaces. + + Raises: + AttributeError: If any agent's mission is not of type PositionGoal. + """ + goal_types = { + agent_name: type(agent_mission.goal) + for agent_name, agent_mission in scenario.missions.items() + } + + for agent_name, agent_interface in agent_interfaces.items(): + actors_alive = agent_interface.done_criteria.actors_alive + if not ( + (goal_types[agent_name] == PositionalGoal and actors_alive == None) + or ( + goal_types[agent_name] == EndlessGoal + and isinstance(actors_alive, ActorsAliveDoneCriteria) + ) + ): + raise AttributeError( + "{0} has an unsupported goal type {1} and actors alive done criteria {2} " + "combination.".format(agent_name, goal_types[agent_name], actors_alive) + ) diff --git a/smarts/env/gymnasium/wrappers/metric/params.py b/smarts/env/gymnasium/wrappers/metric/params.py new file mode 100644 index 0000000000..b61874b65a --- /dev/null +++ b/smarts/env/gymnasium/wrappers/metric/params.py @@ -0,0 +1,156 @@ +# MIT License + +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from dataclasses import dataclass, field +from typing import List + + +@dataclass(frozen=True) +class Collisions: + """Parameters for collisions cost function.""" + + active: bool = False + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + + +@dataclass(frozen=True) +class Comfort: + """Parameters for comfort cost function.""" + + active: bool = False + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + + +@dataclass(frozen=True) +class DistToDestination: + """Parameters for distance to destination cost function.""" + + active: bool = True + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + + +@dataclass(frozen=True) +class DistToObstacles: + """Parameters for distance to obstacles cost function.""" + + active: bool = True + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + ignore: List[str] = field(default_factory=list) + """Vehicle id of neighbours to be excluded from being considered as an + obstacle. + """ + + +@dataclass(frozen=True) +class GapBetweenVehicles: + """Parameters for gap between vehicles cost function.""" + + active: bool = False + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + interest: str = "Leader-007" + + +@dataclass(frozen=True) +class JerkLinear: + """Parameters for jerk linear cost function.""" + + active: bool = True + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + + +@dataclass(frozen=True) +class LaneCenterOffset: + """Parameters for lane center offset cost function.""" + + active: bool = True + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + + +@dataclass(frozen=True) +class OffRoad: + """Parameters for off road cost function.""" + + active: bool = False + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + + +@dataclass(frozen=True) +class SpeedLimit: + """Parameters for speed limit cost function.""" + + active: bool = True + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + + +@dataclass(frozen=True) +class Steps: + """Parameters for steps cost function.""" + + active: bool = True + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + + +@dataclass(frozen=True) +class WrongWay: + """Parameters for wrong way cost function.""" + + active: bool = True + """If True, enables computation of coresponding cost function. Else, + disabled. + """ + + +@dataclass(frozen=True) +class Params: + """Parameters for cost functions.""" + + collisions: Collisions = Collisions() + comfort: Comfort = Comfort() + dist_to_destination: DistToDestination = DistToDestination() + dist_to_obstacles: DistToObstacles = DistToObstacles() + gap_between_vehicles: GapBetweenVehicles = GapBetweenVehicles() + jerk_linear: JerkLinear = JerkLinear() + lane_center_offset: LaneCenterOffset = LaneCenterOffset() + off_road: OffRoad = OffRoad() + speed_limit: SpeedLimit = SpeedLimit() + steps: Steps = Steps() + wrong_way: WrongWay = WrongWay() diff --git a/smarts/env/gymnasium/wrappers/metric/types.py b/smarts/env/gymnasium/wrappers/metric/types.py new file mode 100644 index 0000000000..23cd417669 --- /dev/null +++ b/smarts/env/gymnasium/wrappers/metric/types.py @@ -0,0 +1,35 @@ +# MIT License + +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from dataclasses import dataclass + +from smarts.env.gymnasium.wrappers.metric.costs import Costs +from smarts.env.gymnasium.wrappers.metric.counts import Counts + + +@dataclass +class Record: + """Stores an agent's scenario-completion, performance-count, and + performance-cost values.""" + + costs: Costs + counts: Counts diff --git a/smarts/env/gymnasium/wrappers/metric/utils.py b/smarts/env/gymnasium/wrappers/metric/utils.py new file mode 100644 index 0000000000..473e61a977 --- /dev/null +++ b/smarts/env/gymnasium/wrappers/metric/utils.py @@ -0,0 +1,97 @@ +# MIT License + +# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from dataclasses import fields +from typing import Callable, TypeVar, Union + +from smarts.env.gymnasium.wrappers.metric.costs import Costs +from smarts.env.gymnasium.wrappers.metric.counts import Counts + +T = TypeVar("T", Costs, Counts) + + +def add_dataclass(first: T, second: T) -> T: + """Sums the fields of two dataclass objects. + + Args: + first (T): First dataclass object. + second (T): Second dataclass object. + + Returns: + T: New summed dataclass object. + """ + assert type(first) is type(second) + new = {} + for field in fields(first): + new[field.name] = getattr(first, field.name) + getattr(second, field.name) + output = first.__class__(**new) + + return output + + +def op_dataclass( + first: T, + second: Union[int, float], + op: Callable[[Union[int, float], Union[int, float]], float], +) -> T: + """Performs operation `op` on the fields of a dataclass object. + + Args: + first (T): Dataclass object. + second (Union[int, float]): Value input for the operator. + op (Callable[[Union[int, float], Union[int, float]], float]): Operation to be performed. + + Returns: + T: New dataclass object with operation performed on all of its fields. + """ + new = {} + for field in fields(first): + new[field.name] = op(getattr(first, field.name), second) + output = first.__class__(**new) + + return output + + +def divide(value: Union[int, float], divider: Union[int, float]) -> float: + """Division operation. + + Args: + value (Union[int, float]): Numerator + divider (Union[int, float]): Denominator + + Returns: + float: Numerator / Denominator + """ + return float(value / divider) + + +def multiply(value: Union[int, float], multiplier: Union[int, float]) -> float: + """Multiplication operation. + + Args: + value (Union[int, float]): Value + multiplier (Union[int, float]): Multiplier + + Returns: + float: Value x Multiplier + """ + return float(value * multiplier) diff --git a/smarts/env/gymnasium/wrappers/metrics.py b/smarts/env/gymnasium/wrappers/metrics.py deleted file mode 100644 index 4333b9e179..0000000000 --- a/smarts/env/gymnasium/wrappers/metrics.py +++ /dev/null @@ -1,454 +0,0 @@ -# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -import copy -import functools -from dataclasses import dataclass, fields -from typing import Any, Dict, NamedTuple, Set, TypeVar - -import gymnasium as gym -import numpy as np - -from smarts.core.agent_interface import AgentInterface -from smarts.core.coordinates import Point -from smarts.core.observations import Observation -from smarts.core.plan import PositionalGoal -from smarts.core.road_map import RoadMap -from smarts.core.scenario import Scenario -from smarts.env.gymnasium.wrappers.metric.completion import ( - Completion, - CompletionFuncs, - get_dist, -) -from smarts.env.gymnasium.wrappers.metric.costs import CostFuncs, Costs -from smarts.env.gymnasium.wrappers.metric.counts import Counts - - -@dataclass -class Record: - """Stores an agent's scenario-completion, performance-count, and - performance-cost values.""" - - completion: Completion - costs: Costs - counts: Counts - - -@dataclass -class Data: - """Stores an agent's performance-record, completion-functions, and - cost-functions.""" - - record: Record - completion_funcs: CompletionFuncs - cost_funcs: CostFuncs - - -class Score(NamedTuple): - """This describes the final score given by processing observations through the metrics.""" - - completion: float - humanness: float - rules: float - time: float - overall: float - - -class MetricsError(Exception): - """Raised when Metrics env wrapper fails.""" - - pass - - -class MetricsBase(gym.Wrapper): - """Computes agents' performance metrics in a SMARTS environment.""" - - def __init__(self, env: gym.Env): - super().__init__(env) - _check_env(env) - self._scen: Scenario - self._scen_name: str - self._road_map: RoadMap - self._cur_agents: Set[str] - self._steps: Dict[str, int] - self._done_agents: Set[str] - self._records = {} - - def step(self, action: Dict[str, Any]): - """Steps the environment by one step.""" - result = super().step(action) - - obs, _, terminated, truncated, info = result - - # Only count steps in which an ego agent is present. - if len(obs) == 0: - return result - - dones = {"__all__": False} - if isinstance(terminated, dict): - dones = {k: v or truncated[k] for k, v in terminated.items()} - elif isinstance(terminated, bool): - if terminated or truncated: - dones["__all__"] = True - dones.update({a: d["done"] for a, d in info.items()}) - - if isinstance(next(iter(obs.values())), dict): - obs = {agent_id: o for agent_id, o in obs.items() if o["active"]} - # fmt: off - for agent_name in obs: - base_obs: Observation = info[agent_name]["env_obs"] - self._steps[agent_name] += 1 - - # Compute all cost functions. - costs = Costs() - for field in fields(self._records[self._scen_name][agent_name].cost_funcs): - cost_func = getattr(self._records[self._scen_name][agent_name].cost_funcs, field.name) - new_costs = cost_func(road_map=self._road_map, obs=base_obs) - costs = _add_dataclass(new_costs, costs) - - # Update stored costs. - self._records[self._scen_name][agent_name].record.costs = costs - - if dones[agent_name]: - self._done_agents.add(agent_name) - if not ( - base_obs.events.reached_goal - or len(base_obs.events.collisions) - or base_obs.events.off_road - or base_obs.events.reached_max_episode_steps - ): - raise MetricsError( - "Expected reached_goal, collisions, off_road, or " - "max_episode_steps to be true on agent done, but got " - f"events: {base_obs.events}." - ) - - # Update stored counts. - counts = Counts( - episodes=1, - steps=self._steps[agent_name], - steps_adjusted=min( - self._steps[agent_name], - self.env.agent_interfaces[agent_name].max_episode_steps - ), - goals=base_obs.events.reached_goal, - max_steps=self.env.agent_interfaces[agent_name].max_episode_steps - ) - self._records[self._scen_name][agent_name].record.counts = _add_dataclass( - counts, - self._records[self._scen_name][agent_name].record.counts - ) - - # Update percentage of scenario tasks completed. - completion = Completion(dist_tot=self._records[self._scen_name][agent_name].record.completion.dist_tot) - for field in fields(self._records[self._scen_name][agent_name].completion_funcs): - completion_func = getattr( - self._records[self._scen_name][agent_name].completion_funcs, - field.name, - ) - new_completion = completion_func( - road_map=self._road_map, - obs=base_obs, - initial_compl=completion, - ) - completion = _add_dataclass(new_completion, completion) - self._records[self._scen_name][agent_name].record.completion = completion - - # fmt: on - if dones["__all__"] is True: - assert ( - self._done_agents == self._cur_agents - ), f'done["__all__"]==True but not all agents are done. Current agents = {self._cur_agents}. Agents done = {self._done_agents}.' - - return result - - def reset(self, **kwargs): - """Resets the environment.""" - result = super().reset(**kwargs) - self._cur_agents = set(self.env.agent_interfaces.keys()) - self._steps = dict.fromkeys(self._cur_agents, 0) - self._done_agents = set() - self._scen = self.env.scenario - self._scen_name = self.env.scenario.name - self._road_map = self.env.scenario.road_map - - # fmt: off - if self._scen_name not in self._records: - _check_scen(self._scen) - self._records[self._scen_name] = { - agent_name: Data( - record=Record( - completion=Completion( - dist_tot=get_dist( - road_map=self._road_map, - point_a=Point(*self._scen.missions[agent_name].start.position), - point_b=self._scen.missions[agent_name].goal.position, - ) - ), - costs=Costs(), - counts=Counts(), - ), - cost_funcs=CostFuncs(), - completion_funcs=CompletionFuncs(), - ) - for agent_name in self._cur_agents - } - # fmt: on - - return result - - def records(self) -> Dict[str, Dict[str, Record]]: - """ - Fine grained performance metric for each agent in each scenario. - - .. code-block:: bash - - $ env.records() - $ { - scen1: { - agent1: Record(completion, costs, counts), - agent2: Record(completion, costs, counts), - }, - scen2: { - agent1: Record(completion, costs, counts), - }, - } - - Returns: - Dict[str, Dict[str, Record]]: Performance record in a nested - dictionary for each agent in each scenario. - """ - - records = {} - for scen, agents in self._records.items(): - records[scen] = {} - for agent, data in agents.items(): - records[scen][agent] = copy.deepcopy(data.record) - - return records - - def score(self) -> Score: - """ - Computes four sub-component scores, namely, "Completion", "Time", - "Humanness", "Rules", and one total combined score named "Overall" - on the wrapped environment. - - +-------------+--------+-----------------------------------------------------------------------------------------------------+ - | | Range | Remarks | - +=============+========+=====================================================================================================+ - | Overall | [0, 1] | Total score which combines "Completion", "Time", "Humanness", and "Rules". The higher, the better. | - +-------------+--------+-----------------------------------------------------------------------------------------------------+ - | Completion | [0, 1] | Proportion of scenarios tasks completed. The higher, the better. | - +-------------+--------+-----------------------------------------------------------------------------------------------------+ - | Time | [0, 1] | Time taken to complete scenario. The lower, the better. | - +-------------+--------+-----------------------------------------------------------------------------------------------------+ - | Humanness | [0, 1] | Humanness indicator. The higher, the better. | - +-------------+--------+-----------------------------------------------------------------------------------------------------+ - | Rules | [0, 1] | Traffic rules compliance. The higher, the better. | - +-------------+--------+-----------------------------------------------------------------------------------------------------+ - - Returns: - Dict[str, float]: Contains "Overall", "Completion", "Time", - "Humanness", and "Rules" scores. - """ - - counts_list, costs_list, completion_list = zip( - *[ - (data.record.counts, data.record.costs, data.record.completion) - for agents in self._records.values() - for data in agents.values() - ] - ) - agents_tot: int = len(counts_list) # Total number of agents over all scenarios - counts_tot: Counts = functools.reduce( - lambda a, b: _add_dataclass(a, b), counts_list - ) - costs_tot: Costs = functools.reduce( - lambda a, b: _add_dataclass(a, b), costs_list - ) - completion_tot: Completion = functools.reduce( - lambda a, b: _add_dataclass(a, b), completion_list - ) - - completion = _completion(completion=completion_tot) - humanness = _humanness(costs=costs_tot, agents_tot=agents_tot) - rules = _rules(costs=costs_tot, agents_tot=agents_tot) - time = _time(counts=counts_tot) - overall = completion * (1 - time) * humanness * rules - - return Score( - completion=completion, - humanness=humanness, - rules=rules, - time=time, - overall=overall, - ) - - -class Metrics(gym.Wrapper): - """Metrics class wraps an underlying _Metrics class. The underlying - _Metrics class computes agents' performance metrics in a SMARTS - environment. Whereas, this Metrics class is a basic gym.Wrapper class - which prevents external users from accessing or modifying attributes - beginning with an underscore, to ensure security of the metrics computed. - - Args: - env (gym.Env): A gym.Env to be wrapped. - - Raises: - AttributeError: Upon accessing an attribute beginning with an underscore. - - Returns: - gym.Env: A wrapped gym.Env which computes agents' performance metrics. - """ - - def __init__(self, env: gym.Env): - env = MetricsBase(env) - super().__init__(env) - - -def _check_env(env: gym.Env): - """Checks environment suitability to compute performance metrics. - Args: - env (gym.Env): A gym environment - Raises: - AttributeError: If any required agent interface is disabled. - """ - - def check_intrfc(agent_intrfc: AgentInterface): - intrfc = { - "accelerometer": bool(agent_intrfc.accelerometer), - "max_episode_steps": bool(agent_intrfc.max_episode_steps), - "neighborhood_vehicle_states": bool(agent_intrfc.neighborhood_vehicles), - "road_waypoints": bool(agent_intrfc.road_waypoints), - "waypoint_paths": bool(agent_intrfc.waypoints), - "done_criteria.collision": agent_intrfc.done_criteria.collision, - "done_criteria.off_road": agent_intrfc.done_criteria.off_road, - } - return intrfc - - for agent_name, agent_interface in env.agent_interfaces.items(): - intrfc = check_intrfc(agent_interface) - if not all(intrfc.values()): - raise AttributeError( - ( - "Enable {0}'s disabled interface to " - "compute its metrics. Current interface is " - "{1}." - ).format(agent_name, intrfc) - ) - - -def _check_scen(scen: Scenario): - """Checks scenario suitability to compute performance metrics. - - Args: - scen (Scenario): A ``smarts.core.scenario.Scenario`` class. - - Raises: - AttributeError: If any agent's mission is not of type PositionGoal. - """ - goal_types = { - agent_name: type(agent_mission.goal) - for agent_name, agent_mission in scen.missions.items() - } - if not all([goal_type == PositionalGoal for goal_type in goal_types.values()]): - raise AttributeError( - "Expected all agents to have PositionalGoal, but agents have goal type " - "{0}".format(goal_types) - ) - - -T = TypeVar("T", Completion, Costs, Counts) - - -def _add_dataclass(first: T, second: T) -> T: - assert type(first) is type(second) - new = {} - for field in fields(first): - sum = getattr(first, field.name) + getattr(second, field.name) - new[field.name] = sum - output = first.__class__(**new) - - return output - - -def _completion(completion: Completion) -> float: - """ - Proportion of scenarios tasks completed. - - Args: - completion (Completion): Scenario tasks completed. - - Returns: - float: Normalized completion value = [0, 1]. Completion value should be - maximized. The higher the value, the better it is. - """ - return (completion.dist_tot - completion.dist_remainder) / completion.dist_tot - - -def _humanness(costs: Costs, agents_tot: int) -> float: - """ - Humanness indicator. - - Args: - costs (Costs): Performance cost values. - agents_tot (int): Number of agents simulated. - - Returns: - float: Normalized humanness value = [0, 1]. Humanness value should be - maximized. The higher the value, the better it is. - """ - humanness_to_minimize = np.array( - [costs.dist_to_obstacles, costs.jerk_linear, costs.lane_center_offset] - ) - humanness_to_minimize = np.mean(humanness_to_minimize, dtype=float) / agents_tot - return 1 - humanness_to_minimize - - -def _rules(costs: Costs, agents_tot: int) -> float: - """ - Traffic rules compliance. - - Args: - costs (Costs): Performance cost values. - agents_tot (int): Number of agents simulated. - - Returns: - float: Normalized rules value = [0, 1]. Rules value should be maximized. - The higher the value, the better it is. - """ - rules_to_minimize = np.array([costs.speed_limit, costs.wrong_way]) - rules_to_minimize = np.mean(rules_to_minimize, dtype=float) / agents_tot - return 1 - rules_to_minimize - - -def _time(counts: Counts) -> float: - """ - Time taken to complete scenario. - - Args: - counts (Counts): Performance count values. - - Returns: - float: Normalized time value = (0, 1]. Time value should be minimized. - The lower the value, the better it is. - """ - return counts.steps_adjusted / counts.max_steps diff --git a/smarts/env/tests/test_metrics.py b/smarts/env/tests/test_metrics.py index a0fc37b839..7893573d3f 100644 --- a/smarts/env/tests/test_metrics.py +++ b/smarts/env/tests/test_metrics.py @@ -31,7 +31,7 @@ from smarts.core.controllers import ActionSpaceType from smarts.core.coordinates import Heading, Point from smarts.core.plan import EndlessGoal, Goal, Mission, PositionalGoal, Start -from smarts.env.gymnasium.wrappers.metrics import Metrics +from smarts.env.gymnasium.wrappers.metric.metrics import Metrics from smarts.zoo.agent_spec import AgentSpec @@ -40,7 +40,6 @@ def _intrfc_improper(): {"accelerometer": False}, {"max_episode_steps": None}, {"neighborhood_vehicle_states": False}, - {"road_waypoints": False}, {"waypoint_paths": False}, { "done_criteria": DoneCriteria( @@ -74,7 +73,6 @@ def get_agent_spec(request): max_episode_steps=5, neighborhood_vehicle_states=True, waypoint_paths=True, - road_waypoints=True, ) return AgentSpec(interface=dataclasses.replace(base_intrfc, **request.param)) @@ -141,7 +139,7 @@ def test_init(make_env): env = Metrics(env=make_env) # Verify blocked access to underlying private variables. - for elem in ["_scen", "_road_map", "_records"]: + for elem in ["_scen", "_road_map", "_records", "smarts"]: with pytest.raises(AttributeError): getattr(env, elem) @@ -202,7 +200,6 @@ def test_end_in_off_road(make_env): assert counts.goals == 0 assert counts.episodes == 1 assert counts.steps == 3 - assert counts.max_steps == env.agent_interfaces[agent_name].max_episode_steps @pytest.mark.parametrize( @@ -297,14 +294,14 @@ def test_records_and_scores(make_env): # env.score() is only callable after >=1 episode. Hence step through 1 episode. env = Metrics(env=make_env) obs, _ = env.reset() - agent_name = next(iter(obs.keys())) - for _ in range(env.agent_interfaces[agent_name].max_episode_steps): + terminated = {"__all__": False} + while not terminated["__all__"]: actions = { agent_name: np.append( agent_obs["ego_vehicle_state"]["position"][:2], [0, 0.1] ) for agent_name, agent_obs in obs.items() } - obs, _, _, _, _ = env.step(actions) + obs, _, terminated, _, _ = env.step(actions) env.records() env.score() diff --git a/zoo/policies/__init__.py b/zoo/policies/__init__.py index 22dc625241..055014c14c 100644 --- a/zoo/policies/__init__.py +++ b/zoo/policies/__init__.py @@ -2,7 +2,6 @@ from pathlib import Path from smarts.core.agent_interface import ( - RGB, AgentInterface, AgentType, DoneCriteria,