diff --git a/lerobot/common/teleoperators/keyboard/teleop_keyboard.py b/lerobot/common/teleoperators/keyboard/teleop_keyboard.py index df5a1a925d8..65f1dff71d4 100644 --- a/lerobot/common/teleoperators/keyboard/teleop_keyboard.py +++ b/lerobot/common/teleoperators/keyboard/teleop_keyboard.py @@ -18,7 +18,7 @@ import os import sys import time -from threading import Lock +from queue import Queue import numpy as np @@ -29,7 +29,6 @@ PYNPUT_AVAILABLE = True try: - # Only import if there's a valid X server or if we're not on a Pi if ("DISPLAY" not in os.environ) and ("linux" in sys.platform): logging.info("No DISPLAY set. Skipping pynput import.") raise ImportError("pynput blocked intentionally due to no display.") @@ -57,15 +56,14 @@ def __init__(self, config: KeyboardTeleopConfig): self.config = config self.robot_type = config.type - self.pressed_keys_lock = Lock() - self.pressed_keys = {} + self.event_queue = Queue() + self.current_pressed = {} self.listener = None self.is_connected = False self.logs = {} @property def action_feature(self) -> dict: - # TODO(Steven): Set this from config? (when deciding the keys we want to capture) return { "dtype": "float32", "shape": (len(self.arm),), @@ -100,17 +98,20 @@ def calibrate(self) -> None: def on_press(self, key): if hasattr(key, "char"): - with self.pressed_keys_lock: - self.pressed_keys[key.char] = True + self.event_queue.put((key.char, True)) def on_release(self, key): if hasattr(key, "char"): - with self.pressed_keys_lock: - self.pressed_keys[key.char] = False + self.event_queue.put((key.char, False)) if key == keyboard.Key.esc: logging.info("ESC pressed, disconnecting.") self.disconnect() + def _drain_pressed_keys(self): + while not self.event_queue.empty(): + key_char, is_pressed = self.event_queue.get_nowait() + self.current_pressed[key_char] = is_pressed + def get_action(self) -> np.ndarray: before_read_t = time.perf_counter() @@ -119,10 +120,10 @@ def get_action(self) -> np.ndarray: "KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`." ) - with self.pressed_keys_lock: - pressed_keys_snapshot = self.pressed_keys + self._drain_pressed_keys() - action = {key for key, val in pressed_keys_snapshot.items() if val} + # Generate action based on current key states + action = {key for key, val in self.current_pressed.items() if val} self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t return np.array(list(action))