Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/lerobot/scripts/lerobot_dataset_viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def visualize_dataset(
ws_port: int = 9087,
save: bool = False,
output_dir: Path | None = None,
display_compressed_images: bool = False,
) -> Path | None:
if save:
assert output_dir is not None, (
Expand Down Expand Up @@ -137,8 +138,9 @@ def visualize_dataset(

# display each camera image
for key in dataset.meta.camera_keys:
# TODO(rcadene): add `.compress()`? is it lossless?
rr.log(key, rr.Image(to_hwc_uint8_numpy(batch[key][i])))
img = to_hwc_uint8_numpy(batch[key][i])
img_entity = rr.Image(img).compress() if display_compressed_images else rr.Image(img)
rr.log(key, entity=img_entity)

# display each dimension of action space (e.g. actuators command)
if ACTION in batch:
Expand Down Expand Up @@ -261,6 +263,14 @@ def main():
),
)

parser.add_argument(
"--display-compressed-images",
type=bool,
required=True,
default=False,
help="If set, display compressed images in Rerun instead of uncompressed ones.",
)

args = parser.parse_args()
kwargs = vars(args)
repo_id = kwargs.pop("repo_id")
Expand Down
19 changes: 17 additions & 2 deletions src/lerobot/scripts/lerobot_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ class RecordConfig:
policy: PreTrainedConfig | None = None
# Display all cameras on screen
display_data: bool = False
# Display data on a remote Rerun server
display_ip: str | None = None
# Port of the remote Rerun server
display_port: int | None = None
# Whether to display compressed images in Rerun
display_compressed_images: bool = False
# Use vocal synthesis to read events.
play_sounds: bool = True
# Resume recording on an existing dataset.
Expand Down Expand Up @@ -261,6 +267,7 @@ def record_loop(
control_time_s: int | None = None,
single_task: str | None = None,
display_data: bool = False,
display_compressed_images: bool = False,
):
if dataset is not None and dataset.fps != fps:
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
Expand Down Expand Up @@ -371,7 +378,9 @@ def record_loop(
dataset.add_frame(frame)

if display_data:
log_rerun_data(observation=obs_processed, action=action_values)
log_rerun_data(
observation=obs_processed, action=action_values, compress_images=display_compressed_images
)

dt_s = time.perf_counter() - start_loop_t
precise_sleep(1 / fps - dt_s)
Expand All @@ -384,7 +393,12 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
init_logging()
logging.info(pformat(asdict(cfg)))
if cfg.display_data:
init_rerun(session_name="recording")
init_rerun(session_name="recording", ip=cfg.display_ip, port=cfg.display_port)
display_compressed_images = (
True
if (cfg.display_data and cfg.display_ip is not None and cfg.display_port is not None)
else cfg.display_compressed_images
)

robot = make_robot_from_config(cfg.robot)
teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None
Expand Down Expand Up @@ -478,6 +492,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
control_time_s=cfg.dataset.episode_time_s,
single_task=cfg.dataset.single_task,
display_data=cfg.display_data,
display_compressed_images=display_compressed_images,
)

# Execute a few seconds without recording to give time to manually reset the environment
Expand Down
17 changes: 16 additions & 1 deletion src/lerobot/scripts/lerobot_teleoperate.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ class TeleoperateConfig:
teleop_time_s: float | None = None
# Display all cameras on screen
display_data: bool = False
# Display data on a remote Rerun server
display_ip: str | None = None
# Port of the remote Rerun server
display_port: int | None = None
# Whether to display compressed images in Rerun
display_compressed_images: bool = False


def teleop_loop(
Expand All @@ -119,6 +125,7 @@ def teleop_loop(
robot_observation_processor: RobotProcessorPipeline[RobotObservation, RobotObservation],
display_data: bool = False,
duration: float | None = None,
display_compressed_images: bool = False,
):
"""
This function continuously reads actions from a teleoperation device, processes them through optional
Expand All @@ -130,6 +137,7 @@ def teleop_loop(
robot: The robot instance being controlled.
fps: The target frequency for the control loop in frames per second.
display_data: If True, fetches robot observations and displays them in the console and Rerun.
display_compressed_images: If True, compresses images before sending them to Rerun for display.
duration: The maximum duration of the teleoperation loop in seconds. If None, the loop runs indefinitely.
teleop_action_processor: An optional pipeline to process raw actions from the teleoperator.
robot_action_processor: An optional pipeline to process actions before they are sent to the robot.
Expand Down Expand Up @@ -167,6 +175,7 @@ def teleop_loop(
log_rerun_data(
observation=obs_transition,
action=teleop_action,
compress_images=display_compressed_images,
)

print("\n" + "-" * (display_len + 10))
Expand All @@ -191,7 +200,12 @@ def teleoperate(cfg: TeleoperateConfig):
init_logging()
logging.info(pformat(asdict(cfg)))
if cfg.display_data:
init_rerun(session_name="teleoperation")
init_rerun(session_name="teleoperation", ip=cfg.display_ip, port=cfg.display_port)
display_compressed_images = (
True
if (cfg.display_data and cfg.display_ip is not None and cfg.display_port is not None)
else cfg.display_compressed_images
)

teleop = make_teleoperator_from_config(cfg.teleop)
robot = make_robot_from_config(cfg.robot)
Expand All @@ -210,6 +224,7 @@ def teleoperate(cfg: TeleoperateConfig):
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
display_compressed_images=display_compressed_images,
)
except KeyboardInterrupt:
pass
Expand Down
25 changes: 20 additions & 5 deletions src/lerobot/utils/visualization_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,25 @@
from .constants import OBS_PREFIX, OBS_STR


def init_rerun(session_name: str = "lerobot_control_loop") -> None:
"""Initializes the Rerun SDK for visualizing the control loop."""
def init_rerun(
session_name: str = "lerobot_control_loop", ip: str | None = None, port: int | None = None
) -> None:
"""
Initializes the Rerun SDK for visualizing the control loop.

Args:
session_name: Name of the Rerun session.
ip: Optional IP for connecting to a Rerun server.
port: Optional port for connecting to a Rerun server.
"""
batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")
os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size
rr.init(session_name)
memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")
rr.spawn(memory_limit=memory_limit)
if ip and port:
rr.connect_grpc(url=f"rerun+http://{ip}:{port}/proxy")
else:
rr.spawn(memory_limit=memory_limit)


def _is_scalar(x):
Expand All @@ -40,6 +52,7 @@ def _is_scalar(x):
def log_rerun_data(
observation: dict[str, Any] | None = None,
action: dict[str, Any] | None = None,
compress_images: bool = False,
) -> None:
"""
Logs observation and action data to Rerun for real-time visualization.
Expand All @@ -48,7 +61,7 @@ def log_rerun_data(
to the Rerun viewer. It handles different data types appropriately:
- Scalars values (floats, ints) are logged as `rr.Scalars`.
- 3D NumPy arrays that resemble images (e.g., with 1, 3, or 4 channels first) are transposed
from CHW to HWC format and logged as `rr.Image`.
from CHW to HWC format, (optionally) compressed to JPEG and logged as `rr.Image` or `rr.EncodedImage`.
- 1D NumPy arrays are logged as a series of individual scalars, with each element indexed.
- Other multi-dimensional arrays are flattened and logged as individual scalars.

Expand All @@ -57,6 +70,7 @@ def log_rerun_data(
Args:
observation: An optional dictionary containing observation data to log.
action: An optional dictionary containing action data to log.
compress_images: Whether to compress images before logging to save bandwidth & memory in exchange for cpu and quality.
"""
if observation:
for k, v in observation.items():
Expand All @@ -75,7 +89,8 @@ def log_rerun_data(
for i, vi in enumerate(arr):
rr.log(f"{key}_{i}", rr.Scalars(float(vi)))
else:
rr.log(key, rr.Image(arr), static=True)
img_entity = rr.Image(arr).compress() if compress_images else rr.Image(arr)
rr.log(key, entity=img_entity, static=True)

if action:
for k, v in action.items():
Expand Down
5 changes: 4 additions & 1 deletion tests/utils/test_visualization_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ class DummyImage:
def __init__(self, arr):
self.arr = arr

def dummy_log(key, obj, **kwargs):
def dummy_log(key, obj=None, **kwargs):
# Accept either positional `obj` or keyword `entity` and record remaining kwargs.
if obj is None and "entity" in kwargs:
obj = kwargs.pop("entity")
calls.append((key, obj, kwargs))

dummy_rr = SimpleNamespace(
Expand Down