Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions lerobot/common/teleoperators/keyboard/teleop_keyboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import sys
import time
from queue import Queue

import numpy as np

Expand All @@ -28,7 +29,6 @@

PYNPUT_AVAILABLE = True
try:
# Only import if there's a valid X server or if we're not on a Pi
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
logging.info("No DISPLAY set. Skipping pynput import.")
raise ImportError("pynput blocked intentionally due to no display.")
Expand Down Expand Up @@ -56,7 +56,8 @@ def __init__(self, config: KeyboardTeleopConfig):
self.config = config
self.robot_type = config.type

self.pressed_keys = {}
self.event_queue = Queue()
self.current_pressed = {}
self.listener = None
self.is_connected = False
self.logs = {}
Expand Down Expand Up @@ -97,31 +98,45 @@ def calibrate(self) -> None:

def on_press(self, key):
if hasattr(key, "char"):
self.pressed_keys[key.char] = True
self.event_queue.put((key.char, True))

def on_release(self, key):
if hasattr(key, "char"):
self.pressed_keys[key.char] = False
self.event_queue.put((key.char, False))
if key == keyboard.Key.esc:
logging.info("ESC pressed, disconnecting.")
self.disconnect()

def _drain_pressed_keys(self):
while not self.event_queue.empty():
key_char, is_pressed = self.event_queue.get_nowait()
self.current_pressed[key_char] = is_pressed

def get_action(self) -> np.ndarray:
before_read_t = time.perf_counter()
# pressed_keys.items is wrapped in a list to avoid any RuntimeError due to dictionary changing size
# during iteration
action = {key for key, val in list(self.pressed_keys.items()) if val}

if not self.is_connected:
raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`."
)

self._drain_pressed_keys()

# Generate action based on current key states
action = {key for key, val in self.current_pressed.items() if val}
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t

return action
return np.array(list(action))

def send_feedback(self, feedback: np.ndarray) -> None:
pass

def disconnect(self) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting."
"KeyboardTeleop is not connected. You need to run `robot.connect()` before `disconnect()`."
)
self.listener.stop()
if self.listener is not None:
self.listener.stop()

self.is_connected = False
28 changes: 28 additions & 0 deletions lerobot/common/teleoperators/keyboard/teleop_keyboard_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging
import time

from lerobot.common.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig


def main():
logging.info("Configuring Keyboard Teleop")
keyboard_config = KeyboardTeleopConfig()
keyboard = KeyboardTeleop(keyboard_config)

logging.info("Connecting Keyboard Teleop")
keyboard.connect()

logging.info("Starting Keyboard capture")
i = 0
while i < 20:
action = keyboard.get_action()
print("Captured keys: %s", action)
time.sleep(1)
i += 1

keyboard.disconnect()
logging.info("Finished LeKiwiRobot cleanly")


if __name__ == "__main__":
main()
Loading