Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ ignore_errors = false
module = "lerobot.cameras.*"
ignore_errors = false

# [[tool.mypy.overrides]]
# module = "lerobot.motors.*"
# ignore_errors = false
[[tool.mypy.overrides]]
module = "lerobot.motors.*"
ignore_errors = false

# [[tool.mypy.overrides]]
# module = "lerobot.robots.*"
Expand Down
10 changes: 6 additions & 4 deletions src/lerobot/motors/calibration_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def __init__(self, bus: MotorsBus, groups: dict[str, list[str]] | None = None):

self.bus = bus
self.groups = groups if groups is not None else {"all": list(bus.motors)}
self.group_names = list(groups)
self.group_names = list(self.groups)
self.current_group = self.group_names[0]

if not bus.is_connected:
Expand All @@ -230,18 +230,20 @@ def __init__(self, bus: MotorsBus, groups: dict[str, list[str]] | None = None):
self.calibration = bus.read_calibration()
self.res_table = bus.model_resolution_table
self.present_cache = {
m: bus.read("Present_Position", m, normalize=False) for motors in groups.values() for m in motors
m: bus.read("Present_Position", m, normalize=False)
for motors in self.groups.values()
for m in motors
}

pygame.init()
self.font = pygame.font.Font(None, FONT_SIZE)

label_pad = max(self.font.size(m)[0] for ms in groups.values() for m in ms)
label_pad = max(self.font.size(m)[0] for ms in self.groups.values() for m in ms)
self.label_pad = label_pad
width = 40 + label_pad + BAR_LEN + 6 + BTN_W + 10 + SAVE_W + 10
self.controls_bottom = 10 + SAVE_H
self.base_y = self.controls_bottom + TOP_GAP
height = self.base_y + PADDING_Y * len(groups[self.current_group]) + 40
height = self.base_y + PADDING_Y * len(self.groups[self.current_group]) + 40

self.screen = pygame.display.set_mode((width, height))
pygame.display.set_caption("Motors range finder")
Expand Down
42 changes: 38 additions & 4 deletions src/lerobot/motors/damiao/damiao.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ def _handshake(self) -> None:
logger.info("Starting handshake with motors...")

# Drain any pending messages
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")

while self.canbus.recv(timeout=0.01):
pass

Expand Down Expand Up @@ -283,6 +286,10 @@ def _send_simple_command(self, motor: NameOrID, command_byte: int) -> None:
recv_id = self._get_motor_recv_id(motor)
data = [0xFF] * 7 + [command_byte]
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)

if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")

self.canbus.send(msg)
if msg := self._recv_motor_response(expected_recv_id=recv_id):
self._process_response(motor_name, msg)
Expand Down Expand Up @@ -341,6 +348,10 @@ def _refresh_motor(self, motor: NameOrID) -> can.Message | None:
recv_id = self._get_motor_recv_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False, is_fd=self.use_can_fd)

if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")

self.canbus.send(msg)
return self._recv_motor_response(expected_recv_id=recv_id)

Expand All @@ -356,6 +367,10 @@ def _recv_motor_response(
Returns:
CAN message if received, None otherwise
"""

if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")

try:
start_time = time.time()
messages_seen = []
Expand Down Expand Up @@ -394,10 +409,13 @@ def _recv_all_responses(
Returns:
Dictionary mapping recv_id to CAN message
"""
responses = {}
responses: dict[int, can.Message] = {}
expected_set = set(expected_recv_ids)
start_time = time.time()

if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")

try:
while len(responses) < len(expected_recv_ids) and (time.time() - start_time) < timeout:
# 100us poll timeout
Expand Down Expand Up @@ -461,6 +479,9 @@ def _mit_control(
motor_name = self._get_motor_name(motor)
motor_type = self._motor_types[motor_name]

if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")

data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
Expand Down Expand Up @@ -488,6 +509,9 @@ def _mit_control_batch(

recv_id_to_motor: dict[int, str] = {}

if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")

# Step 1: Send all MIT control commands
for motor, (kp, kd, position_degrees, velocity_deg_per_sec, torque) in commands.items():
motor_id = self._get_motor_id(motor)
Expand Down Expand Up @@ -656,6 +680,10 @@ def sync_read_all_states(

def _batch_refresh(self, motors: list[str]) -> None:
"""Internal helper to refresh a list of motors and update cache."""

if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")

# Send refresh commands
for motor in motors:
motor_id = self._get_motor_id(motor)
Expand All @@ -678,10 +706,14 @@ def _batch_refresh(self, motors: list[str]) -> None:
else:
logger.warning(f"Packet drop: {motor} (ID: 0x{recv_id:02X}). Using last known state.")

def sync_write(self, data_name: str, values: Value | dict[str, Value]) -> None:
def sync_write(self, data_name: str, values: dict[str, Value]) -> None:
"""
Write values to multiple motors simultaneously. Positions are always in degrees.
"""

if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")

if data_name in ("Kp", "Kd"):
key = data_name.lower()
for motor, val in values.items():
Expand All @@ -690,6 +722,8 @@ def sync_write(self, data_name: str, values: Value | dict[str, Value]) -> None:
elif data_name == "Goal_Position":
# Step 1: Send all MIT control commands
recv_id_to_motor: dict[int, str] = {}
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
for motor, value_degrees in values.items():
motor_id = self._get_motor_id(motor)
motor_name = self._get_motor_name(motor)
Expand Down Expand Up @@ -732,9 +766,9 @@ def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache

def record_ranges_of_motion(
self,
motors: NameOrID | list[NameOrID] | None = None,
motors: str | list[str] | None = None,
display_values: bool = True,
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
) -> tuple[dict[str, Value], dict[str, Value]]:
"""
Interactively record the min/max values of each motor in degrees.

Expand Down
16 changes: 8 additions & 8 deletions src/lerobot/motors/dynamixel/dynamixel.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ def read_calibration(self) -> dict[str, MotorCalibration]:
for motor, m in self.motors.items():
calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=drive_modes[motor],
homing_offset=offsets[motor],
range_min=mins[motor],
range_max=maxes[motor],
drive_mode=int(drive_modes[motor]),
homing_offset=int(offsets[motor]),
range_min=int(mins[motor]),
range_max=int(maxes[motor]),
)

return calibration
Expand All @@ -198,15 +198,15 @@ def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache
if cache:
self.calibration = calibration_dict

def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry)

def _disable_torque(self, motor: int, model: str, num_retry: int = 0) -> None:
addr, length = get_address(self.model_ctrl_table, model, "Torque_Enable")
self._write(addr, length, motor, TorqueMode.DISABLED.value, num_retry=num_retry)

def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
def enable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry)

Expand Down Expand Up @@ -235,7 +235,7 @@ def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameO
On Dynamixel Motors:
Present_Position = Actual_Position + Homing_Offset
"""
half_turn_homings = {}
half_turn_homings: dict[NameOrID, Value] = {}
for motor, pos in positions.items():
model = self._get_motor_model(motor)
max_res = self.model_resolution_table[model] - 1
Expand All @@ -258,6 +258,6 @@ def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> di
if raise_on_error:
raise ConnectionError(self.packet_handler.getTxRxResult(comm))

return
return None

return {id_: data[0] for id_, data in data_list.items()}
18 changes: 9 additions & 9 deletions src/lerobot/motors/feetech/feetech.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __init__(

self.port_handler = scs.PortHandler(self.port)
# HACK: monkeypatch
self.port_handler.setPacketTimeout = patch_setPacketTimeout.__get__(
self.port_handler.setPacketTimeout = patch_setPacketTimeout.__get__( # type: ignore[method-assign]
self.port_handler, scs.PortHandler
)
self.packet_handler = scs.PacketHandler(protocol_version)
Expand Down Expand Up @@ -262,9 +262,9 @@ def read_calibration(self) -> dict[str, MotorCalibration]:
calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=offsets[motor],
range_min=mins[motor],
range_max=maxes[motor],
homing_offset=int(offsets[motor]),
range_min=int(mins[motor]),
range_max=int(maxes[motor]),
)

return calibration
Expand All @@ -284,15 +284,15 @@ def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameO
On Feetech Motors:
Present_Position = Actual_Position - Homing_Offset
"""
half_turn_homings = {}
half_turn_homings: dict[NameOrID, Value] = {}
for motor, pos in positions.items():
model = self._get_motor_model(motor)
max_res = self.model_resolution_table[model] - 1
half_turn_homings[motor] = pos - int(max_res / 2)

return half_turn_homings

def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry)
self.write("Lock", motor, 0, num_retry=num_retry)
Expand All @@ -303,7 +303,7 @@ def _disable_torque(self, motor: int, model: str, num_retry: int = 0) -> None:
addr, length = get_address(self.model_ctrl_table, model, "Lock")
self._write(addr, length, motor, 0, num_retry=num_retry)

def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
def enable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry)
self.write("Lock", motor, 1, num_retry=num_retry)
Expand Down Expand Up @@ -334,7 +334,7 @@ def _split_into_byte_chunks(self, value: int, length: int) -> list[int]:
def _broadcast_ping(self) -> tuple[dict[int, int], int]:
import scservo_sdk as scs

data_list = {}
data_list: dict[int, int] = {}

status_length = 6

Expand Down Expand Up @@ -414,7 +414,7 @@ def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> di
if not self._is_comm_success(comm):
if raise_on_error:
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
return
return None

ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}
if ids_errors:
Expand Down
Loading
Loading