From d54b3eb2c473f2c22e06b0b208e5e12835be1419 Mon Sep 17 00:00:00 2001 From: Eugene Mironov Date: Fri, 11 Jul 2025 13:02:35 +0700 Subject: [PATCH 1/5] Minor fixes for Async Inference --- src/lerobot/scripts/rl/actor.py | 31 ++-------------- src/lerobot/scripts/rl/learner.py | 5 +-- src/lerobot/scripts/rl/learner_service.py | 1 - src/lerobot/scripts/server/policy_server.py | 3 +- src/lerobot/scripts/server/robot_client.py | 3 +- src/lerobot/transport/utils.py | 39 +++++++++++++++++++++ 6 files changed, 47 insertions(+), 35 deletions(-) diff --git a/src/lerobot/scripts/rl/actor.py b/src/lerobot/scripts/rl/actor.py index cd5e286c0c..1c8f9286bf 100644 --- a/src/lerobot/scripts/rl/actor.py +++ b/src/lerobot/scripts/rl/actor.py @@ -63,12 +63,12 @@ from lerobot.policies.factory import make_policy from lerobot.policies.sac.modeling_sac import SACPolicy from lerobot.robots import so100_follower # noqa: F401 -from lerobot.scripts.rl import learner_service from lerobot.scripts.rl.gym_manipulator import make_robot_env from lerobot.teleoperators import gamepad, so101_leader # noqa: F401 from lerobot.transport import services_pb2, services_pb2_grpc from lerobot.transport.utils import ( bytes_to_state_dict, + grpc_channel_options, python_object_to_bytes, receive_bytes_in_chunks, send_bytes_in_chunks, @@ -399,8 +399,6 @@ def learner_service_client( host: str = "127.0.0.1", port: int = 50051, ) -> tuple[services_pb2_grpc.LearnerServiceStub, grpc.Channel]: - import json - """ Returns a client for the learner service. @@ -408,34 +406,9 @@ def learner_service_client( So we need to create only one client and reuse it. """ - service_config = { - "methodConfig": [ - { - "name": [{}], # Applies to ALL methods in ALL services - "retryPolicy": { - "maxAttempts": 5, # Max retries (total attempts = 5) - "initialBackoff": "0.1s", # First retry after 0.1s - "maxBackoff": "2s", # Max wait time between retries - "backoffMultiplier": 2, # Exponential backoff factor - "retryableStatusCodes": [ - "UNAVAILABLE", - "DEADLINE_EXCEEDED", - ], # Retries on network failures - }, - } - ] - } - - service_config_json = json.dumps(service_config) - channel = grpc.insecure_channel( f"{host}:{port}", - options=[ - ("grpc.max_receive_message_length", learner_service.MAX_MESSAGE_SIZE), - ("grpc.max_send_message_length", learner_service.MAX_MESSAGE_SIZE), - ("grpc.enable_retries", 1), - ("grpc.service_config", service_config_json), - ], + grpc_channel_options(), ) stub = services_pb2_grpc.LearnerServiceStub(channel) logging.info("[ACTOR] Learner service client created") diff --git a/src/lerobot/scripts/rl/learner.py b/src/lerobot/scripts/rl/learner.py index edd2363b12..cb88895cfa 100644 --- a/src/lerobot/scripts/rl/learner.py +++ b/src/lerobot/scripts/rl/learner.py @@ -77,6 +77,7 @@ from lerobot.teleoperators import gamepad, so101_leader # noqa: F401 from lerobot.transport import services_pb2_grpc from lerobot.transport.utils import ( + MAX_MESSAGE_SIZE, bytes_to_python_object, bytes_to_transitions, state_to_bytes, @@ -658,8 +659,8 @@ def start_learner( server = grpc.server( ThreadPoolExecutor(max_workers=learner_service.MAX_WORKERS), options=[ - ("grpc.max_receive_message_length", learner_service.MAX_MESSAGE_SIZE), - ("grpc.max_send_message_length", learner_service.MAX_MESSAGE_SIZE), + ("grpc.max_receive_message_length", MAX_MESSAGE_SIZE), + ("grpc.max_send_message_length", MAX_MESSAGE_SIZE), ], ) diff --git a/src/lerobot/scripts/rl/learner_service.py b/src/lerobot/scripts/rl/learner_service.py index 198e52945b..b07c296e6e 100644 --- a/src/lerobot/scripts/rl/learner_service.py +++ b/src/lerobot/scripts/rl/learner_service.py @@ -23,7 +23,6 @@ from lerobot.transport.utils import receive_bytes_in_chunks, send_bytes_in_chunks from lerobot.utils.queue import get_last_item_from_queue -MAX_MESSAGE_SIZE = 4 * 1024 * 1024 # 4 MB MAX_WORKERS = 3 # Stream parameters, send transitions and interactions SHUTDOWN_TIMEOUT = 10 diff --git a/src/lerobot/scripts/server/policy_server.py b/src/lerobot/scripts/server/policy_server.py index 669ccc58ee..79d2be8c00 100644 --- a/src/lerobot/scripts/server/policy_server.py +++ b/src/lerobot/scripts/server/policy_server.py @@ -143,8 +143,7 @@ def SendPolicyInstructions(self, request, context): # noqa: N802 policy_class = get_policy_class(self.policy_type) start = time.perf_counter() - self.policy = policy_class.from_pretrained(policy_specs.pretrained_name_or_path) - self.policy.to(self.device) + self.policy = policy_class.from_pretrained(policy_specs.pretrained_name_or_path, device=self.device) end = time.perf_counter() self.logger.info(f"Time taken to put policy on {self.device}: {end - start:.4f} seconds") diff --git a/src/lerobot/scripts/server/robot_client.py b/src/lerobot/scripts/server/robot_client.py index a6d7b72427..f32366efe4 100644 --- a/src/lerobot/scripts/server/robot_client.py +++ b/src/lerobot/scripts/server/robot_client.py @@ -76,6 +76,7 @@ async_inference_pb2, # type: ignore async_inference_pb2_grpc, # type: ignore ) +from lerobot.transport.utils import grpc_channel_options class RobotClient: @@ -113,7 +114,7 @@ def __init__(self, config: RobotClientConfig): config.actions_per_chunk, config.policy_device, ) - self.channel = grpc.insecure_channel(self.server_address) + self.channel = grpc.insecure_channel(self.server_address, grpc_channel_options()) self.stub = async_inference_pb2_grpc.AsyncInferenceStub(self.channel) self.logger.info(f"Initializing client to connect to server at {self.server_address}") diff --git a/src/lerobot/transport/utils.py b/src/lerobot/transport/utils.py index 1c66832624..ac61a5217b 100644 --- a/src/lerobot/transport/utils.py +++ b/src/lerobot/transport/utils.py @@ -16,6 +16,7 @@ # limitations under the License. import io +import json import logging import pickle # nosec B403: Safe usage for internal serialization only from multiprocessing import Event, Queue @@ -27,6 +28,7 @@ from lerobot.utils.transition import Transition CHUNK_SIZE = 2 * 1024 * 1024 # 2 MB +MAX_MESSAGE_SIZE = 4 * 1024 * 1024 # 4 MB def bytes_buffer_size(buffer: io.BytesIO) -> int: @@ -139,3 +141,40 @@ def transitions_to_bytes(transitions: list[Transition]) -> bytes: buffer = io.BytesIO() torch.save(transitions, buffer) return buffer.getvalue() + + +def grpc_channel_options( + max_receive_message_length: int = MAX_MESSAGE_SIZE, + max_send_message_length: int = MAX_MESSAGE_SIZE, + enable_retries: bool = True, + service_config: str | None = None, +): + if service_config is None: + service_config = { + "methodConfig": [ + { + "name": [{}], # Applies to ALL methods in ALL services + "retryPolicy": { + "maxAttempts": 5, # Max retries (total attempts = 5) + "initialBackoff": "0.1s", # First retry after 0.1s + "maxBackoff": "2s", # Max wait time between retries + "backoffMultiplier": 2, # Exponential backoff factor + "retryableStatusCodes": [ + "UNAVAILABLE", + "DEADLINE_EXCEEDED", + ], # Retries on network failures + }, + } + ] + } + + service_config_json = json.dumps(service_config) + + retries_option = 1 if enable_retries else 0 + + return [ + ("grpc.max_receive_message_length", max_receive_message_length), + ("grpc.max_send_message_length", max_send_message_length), + ("grpc.enable_retries", retries_option), + ("grpc.service_config", service_config_json), + ] From 703d49901b39d0a70dd57e00874dedc7a88231e3 Mon Sep 17 00:00:00 2001 From: Eugene Mironov Date: Fri, 11 Jul 2025 16:05:55 +0700 Subject: [PATCH 2/5] Fixup --- src/lerobot/utils/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lerobot/utils/utils.py b/src/lerobot/utils/utils.py index 2e94a9c93e..7a9717dcec 100644 --- a/src/lerobot/utils/utils.py +++ b/src/lerobot/utils/utils.py @@ -48,7 +48,7 @@ def auto_select_torch_device() -> torch.device: logging.info("Cuda backend detected, using cuda.") return torch.device("cuda") elif torch.backends.mps.is_available(): - logging.info("Metal backend detected, using cuda.") + logging.info("Metal backend detected, using mps.") return torch.device("mps") else: logging.warning("No accelerated backend detected. Using default cpu, this will be slow.") From 9ced4aacf43ef8f5d23bcb0844591fe4f2a7f2ab Mon Sep 17 00:00:00 2001 From: Eugene Mironov Date: Fri, 11 Jul 2025 16:28:26 +0700 Subject: [PATCH 3/5] Fixup --- src/lerobot/scripts/server/policy_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lerobot/scripts/server/policy_server.py b/src/lerobot/scripts/server/policy_server.py index 79d2be8c00..669ccc58ee 100644 --- a/src/lerobot/scripts/server/policy_server.py +++ b/src/lerobot/scripts/server/policy_server.py @@ -143,7 +143,8 @@ def SendPolicyInstructions(self, request, context): # noqa: N802 policy_class = get_policy_class(self.policy_type) start = time.perf_counter() - self.policy = policy_class.from_pretrained(policy_specs.pretrained_name_or_path, device=self.device) + self.policy = policy_class.from_pretrained(policy_specs.pretrained_name_or_path) + self.policy.to(self.device) end = time.perf_counter() self.logger.info(f"Time taken to put policy on {self.device}: {end - start:.4f} seconds") From 9eb1329e40db7a43ecf0c84cadd1d946f8bf6953 Mon Sep 17 00:00:00 2001 From: Eugene Mironov Date: Wed, 16 Jul 2025 02:53:19 +0700 Subject: [PATCH 4/5] Parametarize initial_backoff --- src/lerobot/scripts/server/robot_client.py | 6 ++-- src/lerobot/transport/utils.py | 40 ++++++++++++---------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/lerobot/scripts/server/robot_client.py b/src/lerobot/scripts/server/robot_client.py index f32366efe4..5d80782f9e 100644 --- a/src/lerobot/scripts/server/robot_client.py +++ b/src/lerobot/scripts/server/robot_client.py @@ -92,7 +92,7 @@ def __init__(self, config: RobotClientConfig): # Store configuration self.config = config self.robot = make_robot_from_config(config.robot) - self.robot.connect() + # self.robot.connect() lerobot_features = map_robot_keys_to_lerobot_features(self.robot) @@ -114,7 +114,9 @@ def __init__(self, config: RobotClientConfig): config.actions_per_chunk, config.policy_device, ) - self.channel = grpc.insecure_channel(self.server_address, grpc_channel_options()) + self.channel = grpc.insecure_channel( + self.server_address, grpc_channel_options(initial_backoff=f"{config.environment_dt:.4f}s") + ) self.stub = async_inference_pb2_grpc.AsyncInferenceStub(self.channel) self.logger.info(f"Initializing client to connect to server at {self.server_address}") diff --git a/src/lerobot/transport/utils.py b/src/lerobot/transport/utils.py index ac61a5217b..bf1aab7554 100644 --- a/src/lerobot/transport/utils.py +++ b/src/lerobot/transport/utils.py @@ -147,26 +147,28 @@ def grpc_channel_options( max_receive_message_length: int = MAX_MESSAGE_SIZE, max_send_message_length: int = MAX_MESSAGE_SIZE, enable_retries: bool = True, - service_config: str | None = None, + initial_backoff: str = "0.1s", + max_attempts: int = 5, + backoff_multiplier: float = 2, + max_backoff: str = "2s", ): - if service_config is None: - service_config = { - "methodConfig": [ - { - "name": [{}], # Applies to ALL methods in ALL services - "retryPolicy": { - "maxAttempts": 5, # Max retries (total attempts = 5) - "initialBackoff": "0.1s", # First retry after 0.1s - "maxBackoff": "2s", # Max wait time between retries - "backoffMultiplier": 2, # Exponential backoff factor - "retryableStatusCodes": [ - "UNAVAILABLE", - "DEADLINE_EXCEEDED", - ], # Retries on network failures - }, - } - ] - } + service_config = { + "methodConfig": [ + { + "name": [{}], # Applies to ALL methods in ALL services + "retryPolicy": { + "maxAttempts": max_attempts, # Max retries (total attempts = 5) + "initialBackoff": initial_backoff, # First retry after 0.1s + "maxBackoff": max_backoff, # Max wait time between retries + "backoffMultiplier": backoff_multiplier, # Exponential backoff factor + "retryableStatusCodes": [ + "UNAVAILABLE", + "DEADLINE_EXCEEDED", + ], # Retries on network failures + }, + } + ] + } service_config_json = json.dumps(service_config) From a859c3612b0a0fe03f027d31e5baa6edaee1faa0 Mon Sep 17 00:00:00 2001 From: Eugene Mironov Date: Wed, 16 Jul 2025 02:55:36 +0700 Subject: [PATCH 5/5] Fixup --- src/lerobot/scripts/server/robot_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lerobot/scripts/server/robot_client.py b/src/lerobot/scripts/server/robot_client.py index 5d80782f9e..44d9cdf776 100644 --- a/src/lerobot/scripts/server/robot_client.py +++ b/src/lerobot/scripts/server/robot_client.py @@ -92,7 +92,7 @@ def __init__(self, config: RobotClientConfig): # Store configuration self.config = config self.robot = make_robot_from_config(config.robot) - # self.robot.connect() + self.robot.connect() lerobot_features = map_robot_keys_to_lerobot_features(self.robot)