diff --git a/rllm/experimental/rollout/rollout_engine.py b/rllm/experimental/rollout/rollout_engine.py index 1660d65a2..7146be416 100644 --- a/rllm/experimental/rollout/rollout_engine.py +++ b/rllm/experimental/rollout/rollout_engine.py @@ -3,11 +3,10 @@ from dataclasses import dataclass from typing import TYPE_CHECKING -from rllm.experimental.rollout.types import TokenInput, Tokenizer, TokenOutput -from rllm.tools.tool_base import ToolCall - if TYPE_CHECKING: + from rllm.experimental.rollout.types import TokenInput, Tokenizer, TokenOutput from rllm.parser import ChatTemplateParser + from rllm.tools.tool_base import ToolCall @dataclass @@ -43,6 +42,8 @@ def to_dict(self): @classmethod def from_dict(cls, data: dict): + from rllm.tools.tool_base import ToolCall + return cls( text=data.get("text"), content=data.get("content"), diff --git a/rllm/experimental/verl/patch.py b/rllm/experimental/verl/patch.py index 81183d7b8..cc74c9f71 100644 --- a/rllm/experimental/verl/patch.py +++ b/rllm/experimental/verl/patch.py @@ -10,48 +10,10 @@ logger = logging.getLogger(__name__) -_VERL_ACTOR_PATCHED = False _VERL_DYNAMIC_BATCH_PATCHED = False _VLLM_SDK_PATCHED = False -# --------------------------------------------------------------------------- -# Verl actor: per-call policy loss mode override -# --------------------------------------------------------------------------- - - -def patch_verl_actor_for_loss_override() -> None: - """Patch ``DataParallelPPOActor.update_policy`` to support per-call loss mode. - - When ``data.meta_info`` contains ``"policy_loss_mode_override"``, the - actor temporarily uses that loss mode instead of the one baked into - ``self.config.policy_loss.loss_mode``. The original config value is - restored after the call (even on exception). - """ - global _VERL_ACTOR_PATCHED - if _VERL_ACTOR_PATCHED: - return - - from verl.workers.actor.dp_actor import DataParallelPPOActor - - _original_update_policy = DataParallelPPOActor.update_policy - - def _patched_update_policy(self, data): - override = data.meta_info.get("policy_loss_mode_override") - if override is not None: - original = self.config.policy_loss.get("loss_mode", "vanilla") - self.config.policy_loss["loss_mode"] = override - try: - return _original_update_policy(self, data) - finally: - self.config.policy_loss["loss_mode"] = original - return _original_update_policy(self, data) - - DataParallelPPOActor.update_policy = _patched_update_policy - _VERL_ACTOR_PATCHED = True - logger.info("Patched DataParallelPPOActor.update_policy for per-call loss mode override") - - # --------------------------------------------------------------------------- # Verl dynamic batch: sync micro-batch counts across DP ranks # --------------------------------------------------------------------------- diff --git a/rllm/experimental/verl/verl_advantage.py b/rllm/experimental/verl/verl_advantage.py index e6a092dc9..5c8ca2cc3 100644 --- a/rllm/experimental/verl/verl_advantage.py +++ b/rllm/experimental/verl/verl_advantage.py @@ -31,11 +31,8 @@ def compute_advantage_verl(batch: DataProto, config: DictConfig) -> tuple[DataPr is_last_step = batch.non_tensor_batch["is_last_step"] last_step_indices = np.where(is_last_step)[0] not_last_step_indices = np.where(~is_last_step)[0] - non_last_step_batch = batch.select_idxs(not_last_step_indices) - batch = batch.select_idxs(last_step_indices) - batch = compute_advantage( - batch, + adv_kwargs = dict( adv_estimator=config.algorithm.adv_estimator, gamma=config.algorithm.gamma, lam=config.algorithm.lam, @@ -44,6 +41,17 @@ def compute_advantage_verl(batch: DataProto, config: DictConfig) -> tuple[DataPr config=config.algorithm, ) + if len(not_last_step_indices) == 0: + # All steps are last steps (e.g. single-step trajectories) — compute directly, no broadcast needed + batch = compute_advantage(batch, **adv_kwargs) + return batch, metrics + + # Multi-step: split by last step, compute advantages on last steps, broadcast to earlier steps + non_last_step_batch = batch.select_idxs(not_last_step_indices) + batch = batch.select_idxs(last_step_indices) + + batch = compute_advantage(batch, **adv_kwargs) + _stepwise_advantage_broadcast(batch, non_last_step_batch, config) batch = DataProto.concat([batch, non_last_step_batch]) @@ -73,7 +81,9 @@ def _stepwise_advantage_broadcast(last_step_batch: DataProto, non_last_step_batc traj_ep_to_scalar_adv[(traj_id, eps_id)] = scalar - scalar_rows = torch.stack([torch.full_like(tgt_mask[i], fill_value=traj_ep_to_scalar_adv[(traj_id, eps_id)], dtype=torch.float32) for i, (traj_id, eps_id) in enumerate(zip(tgt_traj_ids, tgt_eps_ids, strict=False))]) + scalar_rows = torch.stack( + [torch.full_like(tgt_mask[i], fill_value=traj_ep_to_scalar_adv[(traj_id, eps_id)], dtype=torch.float32) for i, (traj_id, eps_id) in enumerate(zip(tgt_traj_ids, tgt_eps_ids, strict=False))] + ) final_advantage = scalar_rows * tgt_mask non_last_step_batch.batch["advantages"] = final_advantage diff --git a/rllm/experimental/verl/verl_backend.py b/rllm/experimental/verl/verl_backend.py index 8050974cd..6473bbca7 100644 --- a/rllm/experimental/verl/verl_backend.py +++ b/rllm/experimental/verl/verl_backend.py @@ -9,6 +9,7 @@ import math import uuid +from collections import defaultdict from collections.abc import Iterable from functools import reduce from typing import TYPE_CHECKING, Any @@ -26,7 +27,9 @@ ) from verl.trainer.ppo.ray_trainer import RayPPOTrainer, ResourcePoolManager from verl.trainer.ppo.utils import Role, WorkerType +from verl.utils import tensordict_utils as tu from verl.utils.metric import reduce_metrics +from verl.workers.utils.padding import left_right_2_no_padding, no_padding_2_padding from rllm.agents.agent import Episode from rllm.data import Dataset @@ -51,6 +54,36 @@ _VERL_KNOWN_LOSSES: set[str] | None = None +class CustomPPOLoss: + """Wraps Verl's ``ppo_loss`` to support per-call loss mode override. + + When the data TensorDict contains ``policy_loss_mode_override``, + the loss mode is temporarily overridden for that call. Instances + are serialised via cloudpickle and sent to remote workers through + Verl's ``set_loss_fn`` RPC. + """ + + def __init__(self, config): + # Convert OmegaConf DictConfig → ActorConfig dataclass + from verl.utils.config import omega_conf_to_dataclass + + self.config = omega_conf_to_dataclass(config) + + def __call__(self, model_output, data, dp_group=None): + from verl.utils import tensordict_utils as _tu + from verl.workers.utils.losses import ppo_loss + + override = _tu.get(data, "policy_loss_mode_override", default=None) + if override is not None: + original = self.config.policy_loss.get("loss_mode", "vanilla") + self.config.policy_loss["loss_mode"] = override + try: + return ppo_loss(self.config, model_output, data, dp_group) + finally: + self.config.policy_loss["loss_mode"] = original + return ppo_loss(self.config, model_output, data, dp_group) + + def _get_verl_known_losses() -> set[str]: """Lazily load the set of registered Verl policy loss function names.""" global _VERL_KNOWN_LOSSES @@ -135,9 +168,8 @@ def init_rollout_engine(self, **kwargs) -> RolloutEngine: VerlEngine: The initialized rollout engine. """ # Apply Verl patches - from rllm.experimental.verl.patch import patch_verl_actor_for_loss_override, patch_verl_dynamic_batch_sync + from rllm.experimental.verl.patch import patch_verl_dynamic_batch_sync - patch_verl_actor_for_loss_override() patch_verl_dynamic_batch_sync() # If SDK is enabled, instrument vLLM replicas before creating workers @@ -150,7 +182,10 @@ def init_rollout_engine(self, **kwargs) -> RolloutEngine: assert self.async_rollout_manager is not None, "async_rollout_manager is not available. Issues with RayPPOTrainer's `init_workers()` function." - # Step 2: initialize the rollout engine + # Step 2: replace loss function on remote workers to support per-role loss override + self.actor_rollout_wg.set_loss_fn(CustomPPOLoss(self.config.actor_rollout_ref.actor)) + + # Step 3: initialize the rollout engine self.rollout_engine = VerlEngine( config=self.config, rollout_manager=self.async_rollout_manager, @@ -158,7 +193,7 @@ def init_rollout_engine(self, **kwargs) -> RolloutEngine: processor=self.processor, ) - # Step 3: store the algorithm config + # Step 4: store the algorithm config self.algorithm_config = kwargs.get("algorithm_config") return self.rollout_engine @@ -173,6 +208,14 @@ def validate_config(self) -> None: """Validate verl-specific configuration settings.""" assert self.config.actor_rollout_ref.rollout.mode == "async", "Only async rollout mode is supported for VerlBackend" assert self.use_rm is False, "Reward models are not supported. Rewards should be assigned using a reward function in the workflow or environment." + # Enforce new EngineWorker path (TensorDict + no-padding) + legacy_mode = self.config.trainer.get("use_legacy_worker_impl", "auto") + if legacy_mode != "disable": # force to disable legacy worker impl + logger.warning( + "VerlBackend forces use_legacy_worker_impl='disable' (new EngineWorker path), got '{legacy_mode}'." + "If you insist on using the legacy worker implementation, consider using the older agent workflow trainer." + ) + self.config.trainer.use_legacy_worker_impl = "disable" if self.config.rllm.stepwise_advantage.mode != "broadcast": # automatically set the stepwise_advantage_mode to "broadcast", the warning is already shown in AlgorithmConfig.from_config self.config.rllm.stepwise_advantage.mode = "broadcast" @@ -290,41 +333,13 @@ def _pad_dataproto_to_world_size(self, batch: DataProto) -> DataProto: batch.non_tensor_batch["is_valid"][pad_start:pad_end] = False return batch - def _pad_dataproto_for_megatron_training(self, batch: DataProto) -> DataProto: - """Pad batch for megatron actor update using uniform random sampling. - - Megatron's make_minibatch_iterator requires per-GPU batch to be divisible - by ppo_mini_batch_size. Padded samples are randomly sampled real data that - participate in training as redundant samples from the same distribution. - """ - - world_size = self._get_dp_world_size() - if world_size is None: - return batch - - ppo_mini_batch_size = self.config.actor_rollout_ref.actor.ppo_mini_batch_size - rollout_n = self.config.actor_rollout_ref.rollout.n - divisor = math.lcm(world_size, ppo_mini_batch_size * rollout_n) - - batch = self._remove_padding(batch) - original_batch_size = batch.batch["prompts"].shape[0] - - pad_size = (-original_batch_size) % divisor - if pad_size > 0: - pad_indices = np.random.choice(original_batch_size, size=pad_size, replace=True) - pad_batch = batch.select_idxs(pad_indices) - batch = DataProto.concat([batch, pad_batch]) - # Deliberately skip setting is_pad_step/is_last_step/is_valid on padded rows. - # This method is only called in update_policy (the last pipeline stage before - # actor update), so _remove_padding is never called on this output. The padded - # rows are real duplicates that participate in training with real advantages. - return batch - async def process_backend_batch(self, trainer_state: TrainerState, **kwargs) -> None: """Compute step-level values: old_log_probs, ref_log_probs, critic values. - Reuses logic from AgentWorkflowPPOTrainer._compute_step_level_values. - Note: This is async for protocol compatibility but operations are sync (blocking) + Uses the new EngineWorker path: converts DataProto to TensorDict in + no-padding format, calls workers, converts results back to padded + DataProto. The no-padding TensorDict (batch_td) is created once and + reused across all inference worker calls. """ metrics = trainer_state.metrics timing_dict = trainer_state.timing_dict @@ -339,29 +354,42 @@ async def process_backend_batch(self, trainer_state: TrainerState, **kwargs) -> batch = self._pad_dataproto_to_world_size(batch=batch) self._balance_batch(batch, metrics=metrics) + # Set meta_info needed by workers batch.meta_info["global_token_num"] = torch.sum(batch.batch["attention_mask"], dim=-1).tolist() - # get images_seqlens - if "multi_modal_inputs" in batch.non_tensor_batch.keys(): + batch.meta_info["temperature"] = self.config.actor_rollout_ref.rollout.temperature + if "multi_modal_inputs" in batch.non_tensor_batch: images_seqlens_all = [] for multi_modal_input in batch.non_tensor_batch["multi_modal_inputs"]: - if "image_grid_thw" not in multi_modal_input.keys(): + if "image_grid_thw" not in multi_modal_input: continue images_seqlens_all.extend(multi_modal_input["images_seqlens"].tolist()) batch.meta_info["images_seqlens"] = images_seqlens_all + # Convert to TensorDict + no-padding ONCE — reused for all inference calls. + # to_tensordict() does NOT mutate the original DataProto. + # left_right_2_no_padding mutates batch_td in-place. + batch_td = batch.to_tensordict() + batch_td = left_right_2_no_padding(batch_td) + + # --- Compute old_log_probs --- with simple_timer("old_log_probs", timing_dict): - # Compute old_log_probs from actor - old_log_prob = self.actor_rollout_wg.compute_log_prob(batch) - entropys = old_log_prob.batch["entropys"] + tu.assign_non_tensor(batch_td, calculate_entropy=True, compute_loss=False) + output = self.actor_rollout_wg.compute_log_prob(batch_td) + log_probs = no_padding_2_padding(tu.get(output, "log_probs"), batch_td) + entropy = no_padding_2_padding(tu.get(output, "entropy"), batch_td) + + # Entropy metric (for logging only) response_masks = batch.batch["response_mask"] loss_agg_mode = self.config.actor_rollout_ref.actor.loss_agg_mode - entropy_agg = agg_loss(loss_mat=entropys, loss_mask=response_masks, loss_agg_mode=loss_agg_mode) + entropy_agg = agg_loss(loss_mat=entropy, loss_mask=response_masks, loss_agg_mode=loss_agg_mode) metrics["actor/entropy"] = entropy_agg.detach().item() - old_log_prob.batch.pop("entropys") + + # Merge old_log_probs back into the padded DataProto + old_log_prob = DataProto.from_tensordict(tu.get_tensordict({"old_log_probs": log_probs.float()})) batch = batch.union(old_log_prob) # Compute rollout log prob diff if available - if "rollout_log_probs" in batch.batch.keys(): + if "rollout_log_probs" in batch.batch: rollout_old_log_probs = batch.batch["rollout_log_probs"] actor_old_log_probs = batch.batch["old_log_probs"] attention_mask = batch.batch["attention_mask"] @@ -381,19 +409,27 @@ async def process_backend_batch(self, trainer_state: TrainerState, **kwargs) -> } metrics.update(rollout_probs_diff_metrics) - # Compute reference log_probs if using reference policy + # --- Compute reference log_probs (reuse batch_td) --- if self.use_reference_policy: with simple_timer("ref", timing_dict): + tu.assign_non_tensor(batch_td, calculate_entropy=False, compute_loss=False) if not self.ref_in_actor: - ref_log_prob = self.ref_policy_wg.compute_ref_log_prob(batch) + ref_output = self.ref_policy_wg.compute_ref_log_prob(batch_td) else: - ref_log_prob = self.actor_rollout_wg.compute_ref_log_prob(batch) + tu.assign_non_tensor(batch_td, no_lora_adapter=True) + ref_output = self.actor_rollout_wg.compute_log_prob(batch_td) + ref_lp = no_padding_2_padding(tu.get(ref_output, "log_probs"), batch_td) + ref_log_prob = DataProto.from_tensordict(tu.get_tensordict({"ref_log_prob": ref_lp.float()})) batch = batch.union(ref_log_prob) - # Compute critic values if using critic + # --- Compute critic values --- if self.use_critic: with simple_timer("values", timing_dict): - values = self.critic_wg.compute_values(batch) + tu.assign_non_tensor(batch_td, compute_loss=False) + values_output = self.critic_wg.infer_batch(batch_td) + values_output = values_output.get() # blocking await on future + values_tensor = no_padding_2_padding(tu.get(values_output, "values"), batch_td) + values = DataProto.from_tensordict(tu.get_tensordict({"values": values_tensor.float()})) batch = batch.union(values) # Mask truncated samples if configured @@ -426,55 +462,77 @@ async def compute_advantages(self, trainer_state: TrainerState, algorithm_config async def update_policy(self, trainer_state: TrainerState, **kwargs) -> None: """Update actor and critic policies. - Note: This is async for protocol compatibility but operations are sync (blocking) + Uses the new EngineWorker path: converts DataProto to TensorDict in + no-padding format with training metadata, then calls workers. The new + workers handle micro-batching internally, so no manual re-padding is + needed before the update. """ global_steps = trainer_state.global_step - batch = trainer_state.backend_batch - - # Re-pad batch before gradient updates. For megatron, use the larger - # divisor (world_size * ppo_mini_batch_size) with uniform random sampling. - actor_strategy = self.config.actor_rollout_ref.actor.get("strategy", None) - if actor_strategy == "megatron": - batch = self._pad_dataproto_for_megatron_training(batch) - else: - batch = self._pad_dataproto_to_world_size(batch) - trainer_state.backend_batch = batch + batch: DataProto = trainer_state.backend_batch # type: ignore[assignment] # Update critic - # NOTE: The megatron-padded batch (with duplicated samples) is also used for - # the critic update. This is acceptable because: (1) GRPO disables the critic, - # and (2) if a megatron critic is used, it needs the same divisibility and the - # duplicated samples are real data from the same distribution. If the critic has - # a different ppo_mini_batch_size, the divisor may need to account for it. if self.use_critic: with simple_timer("update_critic", trainer_state.timing_dict): - critic_output = self.critic_wg.update_critic(batch) - critic_output_metrics = reduce_metrics(critic_output.meta_info["metrics"]) - trainer_state.metrics.update(critic_output_metrics) + critic_td = batch.to_tensordict() + critic_td = left_right_2_no_padding(critic_td) + ppo_mbs_critic = self.config.critic.ppo_mini_batch_size * self.config.actor_rollout_ref.rollout.n + tu.assign_non_tensor( + critic_td, + global_batch_size=ppo_mbs_critic, + mini_batch_size=ppo_mbs_critic, + epochs=self.config.critic.ppo_epochs, + seed=self.config.critic.data_loader_seed, + dataloader_kwargs={"shuffle": self.config.critic.shuffle}, + ) + critic_output = self.critic_wg.train_mini_batch(critic_td) + critic_output = critic_output.get() + critic_output_metrics = tu.get(critic_output, "metrics") + trainer_state.metrics.update(reduce_metrics(critic_output_metrics)) # Update actor (after critic warmup) if self.config.trainer.get("critic_warmup", 0) <= global_steps: with simple_timer("update_actor", trainer_state.timing_dict): self._update_actor_with_loss_routing(batch, trainer_state) - def _update_actor_with_loss_routing(self, batch, trainer_state: TrainerState) -> None: + def _update_actor_with_loss_routing(self, batch: DataProto, trainer_state: TrainerState) -> None: """Update actor with per-loss-group splitting when ``loss_fn_map`` is set. Roles that share the same policy loss function are grouped together into a single ``update_actor`` call, minimising the number of - optimiser steps. + optimiser steps. Each (sub-)batch is converted to TensorDict + + no-padding format with training metadata before being sent to the + worker. """ - from collections import defaultdict - - import numpy as np - loss_fn_map = self.algorithm_config.loss_fn_map if self.algorithm_config is not None else {} group_roles = batch.non_tensor_batch.get("group_roles") if hasattr(batch, "non_tensor_batch") and batch.non_tensor_batch is not None else None + # Common training metadata + rollout_n = self.config.actor_rollout_ref.rollout.n + actor_cfg = self.config.actor_rollout_ref.actor + ppo_mbs = actor_cfg.ppo_mini_batch_size * rollout_n + + def _send_actor_update(sub_batch: DataProto, loss_override: str | None = None) -> None: + """Convert DataProto to TensorDict, inject metadata, send to worker.""" + batch_td = sub_batch.to_tensordict() + batch_td = left_right_2_no_padding(batch_td) + metadata: dict[str, Any] = dict( + calculate_entropy=(actor_cfg.entropy_coeff != 0.0), + global_batch_size=ppo_mbs, + mini_batch_size=ppo_mbs, + epochs=actor_cfg.ppo_epochs, + seed=actor_cfg.data_loader_seed, + dataloader_kwargs={"shuffle": actor_cfg.shuffle}, + ) + if loss_override is not None: + metadata["policy_loss_mode_override"] = loss_override + tu.assign_non_tensor(batch_td, **metadata) + actor_output = self.actor_rollout_wg.update_actor(batch_td) + actor_metrics = tu.get(actor_output, "metrics") + trainer_state.metrics.update(reduce_metrics(actor_metrics)) + # Fast path: no per-role loss overrides or no role annotations. if not loss_fn_map or group_roles is None: - actor_output = self.actor_rollout_wg.update_actor(batch) - trainer_state.metrics.update(reduce_metrics(actor_output.meta_info["metrics"])) + _send_actor_update(batch) return # Resolve each role to a Verl loss name with validation + fallback. @@ -494,10 +552,7 @@ def _update_actor_with_loss_routing(self, batch, trainer_state: TrainerState) -> if len(loss_to_roles) <= 1: # All roles share the same loss — single update. - loss_name = next(iter(loss_to_roles)) - batch.meta_info["policy_loss_mode_override"] = loss_name - actor_output = self.actor_rollout_wg.update_actor(batch) - trainer_state.metrics.update(reduce_metrics(actor_output.meta_info["metrics"])) + _send_actor_update(batch, next(iter(loss_to_roles))) return # Multiple distinct losses: split batch by loss group, update each. @@ -506,9 +561,7 @@ def _update_actor_with_loss_routing(self, batch, trainer_state: TrainerState) -> mask = np.array([r in role_set for r in group_roles]) indices = np.where(mask)[0] sub_batch = batch[indices] - sub_batch.meta_info["policy_loss_mode_override"] = loss_name - actor_output = self.actor_rollout_wg.update_actor(sub_batch) - trainer_state.metrics.update(reduce_metrics(actor_output.meta_info["metrics"])) + _send_actor_update(sub_batch, loss_name) def shutdown(self) -> None: """Placeholder, just use the BackendProtocol's default shutdown method.""" diff --git a/rllm/rewards/code_reward.py b/rllm/rewards/code_reward.py index 4d9845a62..28e4807f5 100644 --- a/rllm/rewards/code_reward.py +++ b/rllm/rewards/code_reward.py @@ -19,7 +19,6 @@ # from rllm.rewards.code_utils.code_contests import run_test as code_contests_run_test from rllm.rewards.code_utils.livecodebench import run_test as lcb_run_test -from rllm.rewards.code_utils.taco import run_test as taco_run_test from rllm.rewards.reward_types import RewardConfig, RewardOutput, RewardType from rllm.tools.code_tools.code_tool import CodeTool from rllm.tools.code_tools.together_tool import TogetherCodeTool @@ -175,6 +174,8 @@ def postprocess_lcb_sample(sample): # https://huggingface.co/datasets/PrimeIntellect/verifiable-coding-problems def primeintellect_check_correctness(tests, code, use_tci=False): + from rllm.rewards.code_utils.taco import run_test as taco_run_test + if isinstance(tests, str): try: tests = ast.literal_eval(tests) @@ -247,7 +248,17 @@ def lcb_check_correctness_v2(sample, generation, timeout=6, debug=False): # Create detailed test results in_outs = json.loads(sample["input_output"]) detailed_results["total_tests"] = len(result[0]) - detailed_results["test_results"] = [{"input": inp, "expected": out, "passed": res == True, "error": metadata_list[0].get("error", None), "error_message": metadata_list[0].get("error_message", None), "output": metadata_list[0].get("output", None)} for inp, out, res in zip(in_outs["inputs"], in_outs["outputs"], result[0], strict=False)] + detailed_results["test_results"] = [ + { + "input": inp, + "expected": out, + "passed": res == True, + "error": metadata_list[0].get("error", None), + "error_message": metadata_list[0].get("error_message", None), + "output": metadata_list[0].get("output", None), + } + for inp, out, res in zip(in_outs["inputs"], in_outs["outputs"], result[0], strict=False) + ] detailed_results["passed_tests"] = sum(1 for r in result[0] if r == True) detailed_results["all_passed"] = all(r == True for r in result[0]) diff --git a/rllm/tools/__init__.py b/rllm/tools/__init__.py index 193687d71..b647f3cf7 100644 --- a/rllm/tools/__init__.py +++ b/rllm/tools/__init__.py @@ -22,4 +22,4 @@ tool_registry = ToolRegistry() tool_registry.register_all(DEFAULT_TOOLS) -__all__ = ["PythonInterpreter", "LocalRetrievalTool", "GoogleSearchTool", "FirecrawlTool", "TavilyExtractTool", "TavilySearchTool", "ToolRegistry", "tool_registry"] +__all__ = ["PythonInterpreter", "GoogleSearchTool", "FirecrawlTool", "TavilyExtractTool", "TavilySearchTool", "ToolRegistry", "tool_registry"] diff --git a/rllm/tools/code_tools/__init__.py b/rllm/tools/code_tools/__init__.py index b45a44e50..a7fb5f3f1 100644 --- a/rllm/tools/code_tools/__init__.py +++ b/rllm/tools/code_tools/__init__.py @@ -6,7 +6,6 @@ __all__ = [ "PythonInterpreter", # New unified interpreter "E2BPythonInterpreter", # Legacy interpreters for backward compatibility - "LocalPythonInterpreter", "LCBPythonInterpreter", "TogetherCodeTool", ] diff --git a/rllm/trainer/deprecated/tinker_workflow_trainer.py b/rllm/trainer/deprecated/tinker_workflow_trainer.py index e7229c95d..cecffffd7 100644 --- a/rllm/trainer/deprecated/tinker_workflow_trainer.py +++ b/rllm/trainer/deprecated/tinker_workflow_trainer.py @@ -14,7 +14,7 @@ import tinker import torch -from transformers import AutoProcessor, AutoTokenizer +from transformers import AutoTokenizer from rllm.agents.agent import Episode from rllm.engine.agent_workflow_engine import AgentWorkflowEngine @@ -91,6 +91,8 @@ def __init__( model_name_lower = self.config.model.name.lower() if "vl" in model_name_lower or "vision" in model_name_lower: try: + from transformers import AutoProcessor + processor = AutoProcessor.from_pretrained(self.config.model.name, trust_remote_code=True) if hasattr(processor, "image_processor") and processor.image_processor is not None: image_processor = processor.image_processor diff --git a/rllm/trainer/tinker/tinker_backend.py b/rllm/trainer/tinker/tinker_backend.py index 14d1727c4..29ea2793e 100644 --- a/rllm/trainer/tinker/tinker_backend.py +++ b/rllm/trainer/tinker/tinker_backend.py @@ -19,7 +19,7 @@ import tinker import torch from omegaconf import DictConfig -from transformers import AutoProcessor, AutoTokenizer +from transformers import AutoTokenizer from rllm.agents.agent import Episode from rllm.data import Dataset @@ -132,6 +132,8 @@ def init_rollout_engine(self, **kwargs) -> RolloutEngine: model_name_lower = self.full_config.model.name.lower() if "vl" in model_name_lower or "vision" in model_name_lower: try: + from transformers import AutoProcessor + processor = AutoProcessor.from_pretrained(self.full_config.model.name, trust_remote_code=True) if hasattr(processor, "image_processor") and processor.image_processor is not None: image_processor = processor.image_processor @@ -139,7 +141,18 @@ def init_rollout_engine(self, **kwargs) -> RolloutEngine: except Exception as e: logger.warning(f"Failed to load image_processor for VLM model: {e}") - self.rollout_engine = TinkerEngine(base_url=self.full_config.tinker_base_url, model_name=self.full_config.model.name, service_client=self.service_client, tokenizer=self.tokenizer, max_prompt_length=self.full_config.data.max_prompt_length, max_response_length=self.full_config.data.max_response_length, max_model_length=self.full_config.training.max_length, sampling_params=self.full_config.sampling, **self.full_config.rollout_engine, image_processor=image_processor) + self.rollout_engine = TinkerEngine( + base_url=self.full_config.tinker_base_url, + model_name=self.full_config.model.name, + service_client=self.service_client, + tokenizer=self.tokenizer, + max_prompt_length=self.full_config.data.max_prompt_length, + max_response_length=self.full_config.data.max_response_length, + max_model_length=self.full_config.training.max_length, + sampling_params=self.full_config.sampling, + **self.full_config.rollout_engine, + image_processor=image_processor, + ) return self.rollout_engine def validate_config(self) -> None: @@ -147,7 +160,10 @@ def validate_config(self) -> None: # Check for recommended sampling parameters sampling_params = self.full_config.sampling if sampling_params.get("temperature", 1.0) != 1.0 or sampling_params.get("top_p", 1.0) != 1.0: - logger.warning("Temperature and top_p are set away from 1.0, this is not recommended by Tinker and can cause mysterious issues with logprobs. See https://github.com/thinking-machines-lab/tinker-cookbook/pull/86 for discussion.") + logger.warning( + "Temperature and top_p are set away from 1.0, this is not recommended by Tinker and can cause mysterious issues with logprobs." + "See https://github.com/thinking-machines-lab/tinker-cookbook/pull/86 for discussion." + ) # Validate num_minibatches (currently only support 1) if self.full_config.training.get("num_minibatches", 1) != 1: