Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions lerobot/common/teleoperators/keyboard/teleop_keyboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import os
import sys
import time
from threading import Lock
from queue import Queue

import numpy as np

Expand All @@ -29,7 +29,6 @@

PYNPUT_AVAILABLE = True
try:
# Only import if there's a valid X server or if we're not on a Pi
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
logging.info("No DISPLAY set. Skipping pynput import.")
raise ImportError("pynput blocked intentionally due to no display.")
Expand Down Expand Up @@ -57,15 +56,14 @@ def __init__(self, config: KeyboardTeleopConfig):
self.config = config
self.robot_type = config.type

self.pressed_keys_lock = Lock()
self.pressed_keys = {}
self.event_queue = Queue()
self.current_pressed = {}
self.listener = None
self.is_connected = False
self.logs = {}

@property
def action_feature(self) -> dict:
# TODO(Steven): Set this from config? (when deciding the keys we want to capture)
return {
"dtype": "float32",
"shape": (len(self.arm),),
Expand Down Expand Up @@ -100,17 +98,20 @@ def calibrate(self) -> None:

def on_press(self, key):
if hasattr(key, "char"):
with self.pressed_keys_lock:
self.pressed_keys[key.char] = True
self.event_queue.put((key.char, True))

def on_release(self, key):
if hasattr(key, "char"):
with self.pressed_keys_lock:
self.pressed_keys[key.char] = False
self.event_queue.put((key.char, False))
if key == keyboard.Key.esc:
logging.info("ESC pressed, disconnecting.")
self.disconnect()

def _drain_pressed_keys(self):
while not self.event_queue.empty():
key_char, is_pressed = self.event_queue.get_nowait()
self.current_pressed[key_char] = is_pressed

def get_action(self) -> np.ndarray:
before_read_t = time.perf_counter()

Expand All @@ -119,10 +120,10 @@ def get_action(self) -> np.ndarray:
"KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`."
)

with self.pressed_keys_lock:
pressed_keys_snapshot = self.pressed_keys
self._drain_pressed_keys()

action = {key for key, val in pressed_keys_snapshot.items() if val}
# Generate action based on current key states
action = {key for key, val in self.current_pressed.items() if val}
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t

return np.array(list(action))
Expand Down
Loading