diff --git a/docs/source/hilserl.mdx b/docs/source/hilserl.mdx index 7c63c41a83c..c70745d50c3 100644 --- a/docs/source/hilserl.mdx +++ b/docs/source/hilserl.mdx @@ -127,11 +127,11 @@ class RewardClassifierConfig: # Dataset configuration class DatasetConfig: repo_id: str # LeRobot dataset repository ID - dataset_root: str # Local dataset root directory task: str # Task identifier - num_episodes: int # Number of episodes for recording - episode: int # Episode index for replay - push_to_hub: bool # Whether to push datasets to Hub + root: str | None = None # Local dataset root directory + num_episodes_to_record: int = 5 # Number of episodes for recording + replay_episode: int | None = None # Episode index for replay + push_to_hub: bool = False # Whether to push datasets to Hub ``` @@ -351,7 +351,7 @@ Create a configuration file for recording demonstrations (or edit an existing on 1. Set `mode` to `"record"` at the root level 2. Specify a unique `repo_id` for your dataset in the `dataset` section (e.g., "username/task_name") -3. Set `num_episodes` in the `dataset` section to the number of demonstrations you want to collect +3. Set `num_episodes_to_record` in the `dataset` section to the number of demonstrations you want to collect 4. Set `env.processor.image_preprocessing.crop_params_dict` to `{}` initially (we'll determine crops later) 5. Configure `env.robot`, `env.teleop`, and other hardware settings in the `env` section @@ -390,10 +390,10 @@ Example configuration section: }, "dataset": { "repo_id": "username/pick_lift_cube", - "dataset_root": null, + "root": null, "task": "pick_and_lift", - "num_episodes": 15, - "episode": 0, + "num_episodes_to_record": 15, + "replay_episode": 0, "push_to_hub": true }, "mode": "record", @@ -626,7 +626,7 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/r - **mode**: set it to `"record"` to collect a dataset (at root level) - **dataset.repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub -- **dataset.num_episodes**: Number of episodes to record +- **dataset.num_episodes_to_record**: Number of episodes to record - **env.processor.reset.terminate_on_success**: Whether to automatically terminate episodes when success is detected (default: `true`) - **env.fps**: Number of frames per second to record - **dataset.push_to_hub**: Whether to push the dataset to the hub @@ -664,8 +664,8 @@ Example configuration section for data collection: "repo_id": "hf_username/dataset_name", "dataset_root": "data/your_dataset", "task": "reward_classifier_task", - "num_episodes": 20, - "episode": 0, + "num_episodes_to_record": 20, + "replay_episode": null, "push_to_hub": true }, "mode": "record", diff --git a/docs/source/hilserl_sim.mdx b/docs/source/hilserl_sim.mdx index bbb0cc6f93c..656e650a090 100644 --- a/docs/source/hilserl_sim.mdx +++ b/docs/source/hilserl_sim.mdx @@ -107,10 +107,10 @@ To collect a dataset, set the mode to `record` whilst defining the repo_id and n }, "dataset": { "repo_id": "username/sim_dataset", - "dataset_root": null, + "root": null, "task": "pick_cube", - "num_episodes": 10, - "episode": 0, + "num_episodes_to_record": 10, + "replay_episode": null, "push_to_hub": true }, "mode": "record" diff --git a/docs/source/il_sim.mdx b/docs/source/il_sim.mdx index 6047bf88472..70de48f8834 100644 --- a/docs/source/il_sim.mdx +++ b/docs/source/il_sim.mdx @@ -36,10 +36,10 @@ To teleoperate and collect a dataset, we need to modify this config file. Here's }, "dataset": { "repo_id": "your_username/il_gym", - "dataset_root": null, + "root": null, "task": "pick_cube", - "num_episodes": 30, - "episode": 0, + "num_episodes_to_record": 30, + "replay_episode": null, "push_to_hub": true }, "mode": "record", @@ -50,7 +50,7 @@ To teleoperate and collect a dataset, we need to modify this config file. Here's Key configuration points: - Set your `repo_id` in the `dataset` section: `"repo_id": "your_username/il_gym"` -- Set `num_episodes: 30` to collect 30 demonstration episodes +- Set `num_episodes_to_record: 30` to collect 30 demonstration episodes - Ensure `mode` is set to `"record"` - If you don't have an NVIDIA GPU, change `"device": "cuda"` to `"mps"` for macOS or `"cpu"` - To use keyboard instead of gamepad, change `"task"` to `"PandaPickCubeKeyboard-v0"` diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 979f7ebc4e0..813756490f4 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -15,18 +15,17 @@ # limitations under the License. from .batch_processor import ToBatchProcessor -from .delta_action_processor import MapDeltaActionToRobotAction +from .delta_action_processor import MapDeltaActionToRobotAction, MapTensorToDeltaActionDict from .device_processor import DeviceProcessor +from .gym_action_processor import Numpy2TorchActionProcessor, Torch2NumpyActionProcessor from .hil_processor import ( AddTeleopActionAsComplimentaryData, AddTeleopEventsAsInfo, GripperPenaltyProcessor, ImageCropResizeProcessor, InterventionActionProcessor, - Numpy2TorchActionProcessor, RewardClassifierProcessor, TimeLimitProcessor, - Torch2NumpyActionProcessor, ) from .joint_observations_processor import JointVelocityProcessor, MotorCurrentProcessor from .normalize_processor import NormalizerProcessor, UnnormalizerProcessor, hotswap_stats @@ -55,6 +54,7 @@ "DeviceProcessor", "DoneProcessor", "MapDeltaActionToRobotAction", + "MapTensorToDeltaActionDict", "EnvTransition", "GripperPenaltyProcessor", "IdentityProcessor", diff --git a/src/lerobot/processor/delta_action_processor.py b/src/lerobot/processor/delta_action_processor.py index a575bb07ab0..1b644acde71 100644 --- a/src/lerobot/processor/delta_action_processor.py +++ b/src/lerobot/processor/delta_action_processor.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataclasses import dataclass, field +from dataclasses import dataclass from torch import Tensor @@ -22,6 +22,30 @@ from lerobot.processor.pipeline import ActionProcessor, ProcessorStepRegistry +@ProcessorStepRegistry.register("map_tensor_to_delta_action_dict") +@dataclass +class MapTensorToDeltaActionDict(ActionProcessor): + """ + Map a tensor to a delta action dictionary. + """ + + def action(self, action: Tensor) -> dict: + if isinstance(action, dict): + return action + if action.dim() > 1: + action = action.squeeze(0) + + # TODO (maractingi): add rotation + delta_action = { + "action.delta_x": action[0], + "action.delta_y": action[1], + "action.delta_z": action[2], + } + if action.shape[0] > 3: + delta_action["action.gripper"] = action[3] + return delta_action + + @ProcessorStepRegistry.register("map_delta_action_to_robot_action") @dataclass class MapDeltaActionToRobotAction(ActionProcessor): @@ -53,25 +77,17 @@ class MapDeltaActionToRobotAction(ActionProcessor): # Scale factors for delta movements position_scale: float = 1.0 rotation_scale: float = 0.0 # No rotation deltas for gamepad/keyboard - gripper_deadzone: float = 0.1 # Threshold for gripper activation - _prev_enabled: bool = field(default=False, init=False, repr=False) - def action(self, action: dict | Tensor | None) -> dict: + def action(self, action: dict | None) -> dict: if action is None: return {} # NOTE (maractingi): Action can be a dict from the teleop_devices or a tensor from the policy # TODO (maractingi): changing this target_xyz naming convention from the teleop_devices - if isinstance(action, dict): - delta_x = action.pop("action.delta_x", 0.0) - delta_y = action.pop("action.delta_y", 0.0) - delta_z = action.pop("action.delta_z", 0.0) - gripper = action.pop("action.gripper", 1.0) # Default to "stay" (1.0) - else: - delta_x = action[0].item() - delta_y = action[1].item() - delta_z = action[2].item() - gripper = action[3].item() + delta_x = action.pop("action.delta_x", 0.0) + delta_y = action.pop("action.delta_y", 0.0) + delta_z = action.pop("action.delta_z", 0.0) + gripper = action.pop("action.gripper", 1.0) # Default to "stay" (1.0) # Determine if the teleoperator is actively providing input # Consider enabled if any significant movement delta is detected @@ -101,7 +117,6 @@ def action(self, action: dict | Tensor | None) -> dict: "action.gripper": float(gripper), } - self._prev_enabled = enabled return action def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: @@ -120,6 +135,3 @@ def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, Po } ) return features - - def reset(self): - self._prev_enabled = False diff --git a/src/lerobot/processor/gym_action_processor.py b/src/lerobot/processor/gym_action_processor.py new file mode 100644 index 00000000000..7b3816750d4 --- /dev/null +++ b/src/lerobot/processor/gym_action_processor.py @@ -0,0 +1,68 @@ +#! /usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, + +from dataclasses import dataclass + +import numpy as np +import torch + +from lerobot.processor.pipeline import ActionProcessor, ProcessorStepRegistry + + +@ProcessorStepRegistry.register("torch2numpy_action_processor") +@dataclass +class Torch2NumpyActionProcessor(ActionProcessor): + """Convert PyTorch tensor actions to NumPy arrays.""" + + squeeze_batch_dim: bool = True + + def action(self, action: torch.Tensor | None) -> np.ndarray | None: + if action is None: + return None + + if not isinstance(action, torch.Tensor): + raise TypeError( + f"Expected torch.Tensor or None, got {type(action).__name__}. " + "Use appropriate processor for non-tensor actions." + ) + + numpy_action = action.detach().cpu().numpy() + + # Remove batch dimensions but preserve action dimensions + # Only squeeze if there's a batch dimension (first dim == 1) + if ( + self.squeeze_batch_dim + and numpy_action.shape + and len(numpy_action.shape) > 1 + and numpy_action.shape[0] == 1 + ): + numpy_action = numpy_action.squeeze(0) + + return numpy_action + + +@ProcessorStepRegistry.register("numpy2torch_action_processor") +@dataclass +class Numpy2TorchActionProcessor(ActionProcessor): + """Convert NumPy array action to PyTorch tensor.""" + + def action(self, action: np.ndarray | None) -> torch.Tensor | None: + if action is None: + return None + if not isinstance(action, np.ndarray): + raise TypeError( + f"Expected np.ndarray or None, got {type(action).__name__}. " + "Use appropriate processor for non-tensor actions." + ) + torch_action = torch.from_numpy(action) + return torch_action diff --git a/src/lerobot/processor/hil_processor.py b/src/lerobot/processor/hil_processor.py index 9e31548b222..074976e45ba 100644 --- a/src/lerobot/processor/hil_processor.py +++ b/src/lerobot/processor/hil_processor.py @@ -8,7 +8,6 @@ from lerobot.configs.types import PolicyFeature from lerobot.processor.pipeline import ( - ActionProcessor, ComplementaryDataProcessor, EnvTransition, InfoProcessor, @@ -49,55 +48,6 @@ def info(self, info: dict | None) -> dict: return info -@ProcessorStepRegistry.register("torch2numpy_action_processor") -@dataclass -class Torch2NumpyActionProcessor(ActionProcessor): - """Convert PyTorch tensor actions to NumPy arrays.""" - - squeeze_batch_dim: bool = True - - def action(self, action: torch.Tensor | None) -> np.ndarray | None: - if action is None: - return None - - if not isinstance(action, torch.Tensor): - raise TypeError( - f"Expected torch.Tensor or None, got {type(action).__name__}. " - "Use appropriate processor for non-tensor actions." - ) - - numpy_action = action.detach().cpu().numpy() - - # Remove batch dimensions but preserve action dimensions - # Only squeeze if there's a batch dimension (first dim == 1) - if ( - self.squeeze_batch_dim - and numpy_action.shape - and len(numpy_action.shape) > 1 - and numpy_action.shape[0] == 1 - ): - numpy_action = numpy_action.squeeze(0) - - return numpy_action - - -@ProcessorStepRegistry.register("numpy2torch_action_processor") -@dataclass -class Numpy2TorchActionProcessor(ActionProcessor): - """Convert NumPy array action to PyTorch tensor.""" - - def action(self, action: np.ndarray | None) -> torch.Tensor | None: - if action is None: - return None - if not isinstance(action, np.ndarray): - raise TypeError( - f"Expected np.ndarray or None, got {type(action).__name__}. " - "Use appropriate processor for non-tensor actions." - ) - torch_action = torch.from_numpy(action) - return torch_action - - @ProcessorStepRegistry.register("image_crop_resize_processor") @dataclass class ImageCropResizeProcessor(ObservationProcessor): @@ -271,7 +221,8 @@ def __call__(self, transition: EnvTransition) -> EnvTransition: # Get intervention signals from complementary data info = transition.get(TransitionKey.INFO, {}) - teleop_action = info.get("teleop_action", {}) + complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) + teleop_action = complementary_data.get("teleop_action", {}) is_intervention = info.get(TeleopEvents.IS_INTERVENTION, False) terminate_episode = info.get(TeleopEvents.TERMINATE_EPISODE, False) success = info.get(TeleopEvents.SUCCESS, False) diff --git a/src/lerobot/processor/joint_observations_processor.py b/src/lerobot/processor/joint_observations_processor.py index 185db404848..012e6180a51 100644 --- a/src/lerobot/processor/joint_observations_processor.py +++ b/src/lerobot/processor/joint_observations_processor.py @@ -13,7 +13,7 @@ @dataclass @ProcessorStepRegistry.register("joint_velocity_processor") -class JointVelocityProcessor: +class JointVelocityProcessor(ObservationProcessor): """Add joint velocity information to observations.""" joint_velocity_limits: float = 100.0 diff --git a/src/lerobot/replay.py b/src/lerobot/replay.py index a9dceb741d5..dbf908019e0 100644 --- a/src/lerobot/replay.py +++ b/src/lerobot/replay.py @@ -45,9 +45,11 @@ from pathlib import Path from pprint import pformat -import draccus - +from lerobot.configs import parser from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.processor import RobotProcessor +from lerobot.processor.converters import to_output_robot_action, to_transition_teleop_action +from lerobot.processor.pipeline import IdentityProcessor from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, @@ -83,13 +85,25 @@ class ReplayConfig: dataset: DatasetReplayConfig # Use vocal synthesis to read events. play_sounds: bool = True + # Optional processor for actions before sending to robot + robot_action_processor: RobotProcessor | None = None -@draccus.wrap() +@parser.wrap() def replay(cfg: ReplayConfig): init_logging() logging.info(pformat(asdict(cfg))) + # Initialize robot action processor with default if not provided + robot_action_processor = cfg.robot_action_processor or RobotProcessor( + steps=[IdentityProcessor()], + to_transition=lambda tr: tr, + to_output=to_output_robot_action, # type: ignore[arg-type] + ) + + # Reset processor + robot_action_processor.reset() + robot = make_robot_from_config(cfg.robot) dataset = LeRobotDataset(cfg.dataset.repo_id, root=cfg.dataset.root, episodes=[cfg.dataset.episode]) actions = dataset.hf_dataset.select_columns("action") @@ -104,7 +118,12 @@ def replay(cfg: ReplayConfig): for i, name in enumerate(dataset.features["action"]["names"]): action[name] = action_array[i] - robot.send_action(action) + # Process action through robot action processor + # Note: We need to convert the action dict to a transition format first + action_transition = to_transition_teleop_action(action) + processed_action = robot_action_processor(action_transition) + + robot.send_action(processed_action) # type: ignore[arg-type] dt_s = time.perf_counter() - start_episode_t busy_wait(1 / dataset.fps - dt_s) diff --git a/src/lerobot/robots/so100_follower/robot_kinematic_processor.py b/src/lerobot/robots/so100_follower/robot_kinematic_processor.py index 6d85507bbcc..9ab2d763423 100644 --- a/src/lerobot/robots/so100_follower/robot_kinematic_processor.py +++ b/src/lerobot/robots/so100_follower/robot_kinematic_processor.py @@ -140,6 +140,11 @@ def __call__(self, transition: EnvTransition) -> EnvTransition: def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: return features + def reset(self): + self._prev_enabled = False + self.reference_ee_pose = None + self._command_when_disabled = None + @ProcessorStepRegistry.register("ee_bounds_and_safety") @dataclass @@ -162,6 +167,7 @@ class EEBoundsAndSafety(ActionProcessor): max_ee_step_m: float = 0.05 max_ee_twist_step_rad: float = 0.20 _last_pos: np.ndarray | None = field(default=None, init=False, repr=False) + _last_twist: np.ndarray | None = field(default=None, init=False, repr=False) def action(self, act: dict | None) -> dict: x = act.pop("action.ee.x", None) @@ -205,6 +211,7 @@ def action(self, act: dict | None) -> dict: def reset(self): self._last_pos = None + self._last_twist = None def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: # Because this is last step we specify the dataset features of this step that we want to be stored in the dataset diff --git a/src/lerobot/scripts/rl/actor.py b/src/lerobot/scripts/rl/actor.py index 5f44d3c5f77..997bc620b8a 100644 --- a/src/lerobot/scripts/rl/actor.py +++ b/src/lerobot/scripts/rl/actor.py @@ -98,7 +98,6 @@ ACTOR_SHUTDOWN_TIMEOUT = 30 - ################################################# # Main entry point # ################################################# @@ -288,7 +287,9 @@ def act_with_policy( logging.info("[ACTOR] Shutting down act_with_policy") return - observation = transition[TransitionKey.OBSERVATION] + observation = { + k: v for k, v in transition[TransitionKey.OBSERVATION].items() if k in cfg.policy.input_features + } # Time policy inference and check if it meets FPS requirement with policy_timer: @@ -308,8 +309,16 @@ def act_with_policy( ) # Extract values from processed transition - next_observation = new_transition[TransitionKey.OBSERVATION] - executed_action = new_transition[TransitionKey.ACTION] + next_observation = { + k: v + for k, v in new_transition[TransitionKey.OBSERVATION].items() + if k in cfg.policy.input_features + } + + # Teleop action is the action that was executed in the environment + # It is either the action from the teleop device or the action from the policy + executed_action = new_transition[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] + reward = new_transition[TransitionKey.REWARD] done = new_transition.get(TransitionKey.DONE, False) truncated = new_transition.get(TransitionKey.TRUNCATED, False) diff --git a/src/lerobot/scripts/rl/gym_manipulator.py b/src/lerobot/scripts/rl/gym_manipulator.py index 37ff1cc7e09..a5f2c304c10 100644 --- a/src/lerobot/scripts/rl/gym_manipulator.py +++ b/src/lerobot/scripts/rl/gym_manipulator.py @@ -37,6 +37,7 @@ InterventionActionProcessor, JointVelocityProcessor, MapDeltaActionToRobotAction, + MapTensorToDeltaActionDict, MotorCurrentProcessor, Numpy2TorchActionProcessor, RewardClassifierProcessor, @@ -80,11 +81,11 @@ class DatasetConfig: """Configuration for dataset creation and management.""" repo_id: str - dataset_root: str task: str - num_episodes: int - episode: int - push_to_hub: bool + root: str | None = None + num_episodes_to_record: int = 5 + replay_episode: int | None = None + push_to_hub: bool = False @dataclass @@ -401,9 +402,7 @@ def make_processors( joint_names=motor_names, ) - env_pipeline_steps = [ - VanillaObservationProcessor(), - ] + env_pipeline_steps = [VanillaObservationProcessor()] if cfg.processor.observation is not None: if cfg.processor.observation.add_joint_velocity_to_observation: @@ -473,6 +472,7 @@ def make_processors( if cfg.processor.inverse_kinematics is not None and kinematics_solver is not None: # Add EE bounds and safety processor inverse_kinematics_steps = [ + MapTensorToDeltaActionDict(), MapDeltaActionToRobotAction(), EEReferenceAndDelta( kinematics=kinematics_solver, @@ -625,7 +625,7 @@ def control_loop( dataset = LeRobotDataset.create( cfg.dataset.repo_id, cfg.env.fps, - root=cfg.dataset.dataset_root, + root=cfg.dataset.root, use_videos=True, image_writer_threads=4, image_writer_processes=0, @@ -636,7 +636,7 @@ def control_loop( episode_step = 0 episode_start_time = time.perf_counter() - while episode_idx < cfg.dataset.num_episodes: + while episode_idx < cfg.dataset.num_episodes_to_record: step_start_time = time.perf_counter() # Create a neutral action (no movement) @@ -711,10 +711,12 @@ def control_loop( def replay_trajectory(env: gym.Env, action_processor: RobotProcessor, cfg: GymManipulatorConfig) -> None: """Replay recorded trajectory on robot environment.""" + assert cfg.dataset.replay_episode is not None, "Replay episode must be provided for replay" + dataset = LeRobotDataset( cfg.dataset.repo_id, - root=cfg.dataset.dataset_root, - episodes=[cfg.dataset.episode], + root=cfg.dataset.root, + episodes=[cfg.dataset.replay_episode], download_videos=False, ) dataset_actions = dataset.hf_dataset.select_columns(["action"]) diff --git a/src/lerobot/teleoperate.py b/src/lerobot/teleoperate.py index 320140bdbd5..cbaae79a625 100644 --- a/src/lerobot/teleoperate.py +++ b/src/lerobot/teleoperate.py @@ -56,11 +56,18 @@ from dataclasses import asdict, dataclass from pprint import pformat -import draccus import rerun as rr from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401 from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401 +from lerobot.configs import parser +from lerobot.processor import RobotProcessor +from lerobot.processor.converters import ( + to_output_robot_action, + to_transition_robot_observation, + to_transition_teleop_action, +) +from lerobot.processor.pipeline import IdentityProcessor from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, @@ -97,21 +104,67 @@ class TeleoperateConfig: teleop_time_s: float | None = None # Display all cameras on screen display_data: bool = False + # Optional processors for data transformation + teleop_action_processor: RobotProcessor | None = None # runs after teleop + robot_action_processor: RobotProcessor | None = None # runs before robot + robot_observation_processor: RobotProcessor | None = None # runs after robot def teleop_loop( - teleop: Teleoperator, robot: Robot, fps: int, display_data: bool = False, duration: float | None = None + teleop: Teleoperator, + robot: Robot, + fps: int, + display_data: bool = False, + duration: float | None = None, + teleop_action_processor: RobotProcessor | None = None, + robot_action_processor: RobotProcessor | None = None, + robot_observation_processor: RobotProcessor | None = None, ): + # Initialize processors with defaults if not provided + teleop_action_processor = teleop_action_processor or RobotProcessor( + steps=[IdentityProcessor()], to_transition=to_transition_teleop_action, to_output=lambda tr: tr + ) + robot_action_processor = robot_action_processor or RobotProcessor( + steps=[IdentityProcessor()], + to_transition=lambda tr: tr, + to_output=to_output_robot_action, # type: ignore[arg-type] + ) + robot_observation_processor = robot_observation_processor or RobotProcessor( + steps=[IdentityProcessor()], to_transition=to_transition_robot_observation, to_output=lambda tr: tr + ) + + # Reset processors + teleop_action_processor.reset() + robot_action_processor.reset() + robot_observation_processor.reset() + display_len = max(len(key) for key in robot.action_features) start = time.perf_counter() + while True: loop_start = time.perf_counter() - action = teleop.get_action() + + # Get robot observation + obs = robot.get_observation() + + # Process robot observation through pipeline + obs_transition = robot_observation_processor(obs) + + # Get teleop action + raw_action = teleop.get_action() + + # Process teleop action through pipeline + teleop_transition = teleop_action_processor(raw_action) + + # Process action for robot through pipeline + robot_action_to_send = robot_action_processor(teleop_transition) + + # Send processed action to robot (robot_action_processor.to_output should return dict[str, Any]) + robot.send_action(robot_action_to_send) # type: ignore[arg-type] + if display_data: - observation = robot.get_observation() - log_rerun_data(observation=observation, action=action) + log_rerun_data([obs_transition, teleop_transition]) - robot.send_action(action) dt_s = time.perf_counter() - loop_start busy_wait(1 / fps - dt_s) @@ -119,17 +172,18 @@ def teleop_loop( print("\n" + "-" * (display_len + 10)) print(f"{'NAME':<{display_len}} | {'NORM':>7}") - for motor, value in action.items(): + # Display the final robot action that was sent + for motor, value in robot_action_to_send.items(): print(f"{motor:<{display_len}} | {value:>7.2f}") print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") if duration is not None and time.perf_counter() - start >= duration: return - move_cursor_up(len(action) + 5) + move_cursor_up(len(robot_action_to_send) + 5) -@draccus.wrap() +@parser.wrap() def teleoperate(cfg: TeleoperateConfig): init_logging() logging.info(pformat(asdict(cfg))) @@ -143,7 +197,16 @@ def teleoperate(cfg: TeleoperateConfig): robot.connect() try: - teleop_loop(teleop, robot, cfg.fps, display_data=cfg.display_data, duration=cfg.teleop_time_s) + teleop_loop( + teleop=teleop, + robot=robot, + fps=cfg.fps, + display_data=cfg.display_data, + duration=cfg.teleop_time_s, + teleop_action_processor=cfg.teleop_action_processor, + robot_action_processor=cfg.robot_action_processor, + robot_observation_processor=cfg.robot_observation_processor, + ) except KeyboardInterrupt: pass finally: