diff --git a/ros2/orchestrator/orchestrator/orchestrator_lib/action.py b/ros2/orchestrator/orchestrator/orchestrator_lib/action.py index a4a4063..dbab73f 100644 --- a/ros2/orchestrator/orchestrator/orchestrator_lib/action.py +++ b/ros2/orchestrator/orchestrator/orchestrator_lib/action.py @@ -4,7 +4,7 @@ from enum import Enum from typing import Any, Optional, Union from typing_extensions import TypeAlias -from orchestrator.orchestrator_lib.node_model import Cause, TopicInput, TimerInput +from orchestrator.orchestrator_lib.node_model import TopicInput, TimerInput from rclpy.time import Time @@ -30,7 +30,6 @@ class _BaseAction: state: ActionState node: str timestamp: Time - cause: Cause @dataclass diff --git a/ros2/orchestrator/orchestrator/orchestrator_lib/orchestrator.py b/ros2/orchestrator/orchestrator/orchestrator_lib/orchestrator.py index 10181f6..3fd8054 100644 --- a/ros2/orchestrator/orchestrator/orchestrator_lib/orchestrator.py +++ b/ros2/orchestrator/orchestrator/orchestrator_lib/orchestrator.py @@ -63,6 +63,7 @@ class FutureInput: class FutureTimestep: time: Time future: Future + allow_jump: bool def _verify_node_models(node_models): @@ -420,7 +421,7 @@ def add_status(parent: GraphNodeId): self.l.info(" Created all callback actions, proceeding as usual.") - def wait_until_time_publish_allowed(self, t: Time) -> Future: + def wait_until_time_publish_allowed(self, t: Time, allow_jump: bool = False) -> Future: """ Get a future that will be complete once the specified timestamp can be published. The caller should spin the executor of the RosNode while waiting, otherwise the future @@ -443,7 +444,7 @@ def wait_until_time_publish_allowed(self, t: Time) -> Future: return f f = Future() - self.next_input = FutureTimestep(t, f) + self.next_input = FutureTimestep(t, f, allow_jump) if self.timing_analysis: self.next_input_timestamp = datetime.datetime.now() @@ -516,7 +517,7 @@ def reconfigure(self, new_node_config: List[NodeModel]) -> None: self.initialize_ros_communication() self.__process() - def __add_pending_timers_until(self, t: Time): + def __add_pending_timers_until(self, t: Time, allow_jump: bool = False): """ Add all remaining timer events. @@ -551,6 +552,7 @@ class Timer: expected_timer_actions: List[TimerCallbackAction] = [] + reinit_timers = False for timer in timers: self.l.debug(f" Considering timer {timer}") nr_fires = last_time // timer.cause.period @@ -560,13 +562,19 @@ class Timer: f" Timer has fired {nr_fires} times, the last invocation was at {last_fire}, next will be at {next_fire}") dt: int = t.nanoseconds - last_time if dt > timer.cause.period: - raise RuntimeError(f"Requested timestep too large! Stepping time from {last_time} to {t} ({dt}) " - "would require firing the timer multiple times within the same timestep. " - "This is probably unintended!") + if not allow_jump: + raise RuntimeError(f"Requested timestep too large! Stepping time from {last_time} to {t} ({dt}) " + "would require firing the timer multiple times within the same timestep. " + "This is probably unintended!") + reinit_timers = True + continue if next_fire <= t: self.l.info(f" Timer of node \"{timer.node}\" with period {timer.cause.period} will fire!") action = TimerCallbackAction(ActionState.WAITING, timer.node, t, timer.cause) expected_timer_actions.append(action) + if reinit_timers: + self.__initialize_sim_time(t) + self.simulator_time = t self.l.info( f" Adding actions for {len(expected_timer_actions)} timer callbacks.") @@ -639,7 +647,7 @@ def __request_next_input(self): if self.next_input_timestamp is not None: delta = datetime.datetime.now() - self.next_input_timestamp self.l.info(f"Clock input was delayed by {delta}") - self.__add_pending_timers_until(next.time) + self.__add_pending_timers_until(next.time, next.allow_jump) self.simulator_time = next.time next.future.set_result(None) elif self.next_input is None: