Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions verl/experimental/agent_loop/tool_agent_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
register,
)
from verl.experimental.agent_loop.tool_parser import FunctionCall, ToolParser
from verl.experimental.agent_loop.utils import add_generation_prompt_for_gpt_oss, format_gpt_oss_tool_response_manually
from verl.experimental.agent_loop.utils import build_gpt_oss_tool_response_text
from verl.interactions.base import BaseInteraction
from verl.interactions.utils.interaction_registry import initialize_interactions_from_config
from verl.tools.schemas import ToolResponse
Expand Down Expand Up @@ -360,14 +360,7 @@ async def _handle_processing_tools_state(self, agent_data: AgentData) -> AgentSt
else:
if self.tool_parser_name == "gpt-oss":
logger.info("manually format tool responses for gpt-oss")
# Format tool responses manually
tool_response_texts = []
for i, tool_msg in enumerate(add_messages):
actual_tool_name = tool_call_names[i]
formatted = format_gpt_oss_tool_response_manually(tool_msg["content"], actual_tool_name)
tool_response_texts.append(formatted)

tool_response_text = add_generation_prompt_for_gpt_oss("".join(tool_response_texts))
tool_response_text = build_gpt_oss_tool_response_text(add_messages, tool_call_names)
response_ids = await self.loop.run_in_executor(
None, lambda: self.tokenizer.encode(tool_response_text, add_special_tokens=False)
)
Expand Down
11 changes: 11 additions & 0 deletions verl/experimental/agent_loop/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import os
from typing import Any


def resolve_config_path(config_path: str) -> str:
Expand Down Expand Up @@ -95,3 +96,13 @@ def add_generation_prompt_for_gpt_oss(message_content: str) -> str:
Message content string with generation prompt
"""
return message_content + "<|start|>assistant"


def build_gpt_oss_tool_response_text(messages: list[dict[str, Any]], tool_call_names: list[str]) -> str:
"""Build gpt-oss tool response text (manual formatting + generation prompt)."""
tool_response_texts: list[str] = []
for i, tool_msg in enumerate(messages):
actual_tool_name = tool_call_names[i]
formatted = format_gpt_oss_tool_response_manually(tool_msg["content"], actual_tool_name)
tool_response_texts.append(formatted)
return add_generation_prompt_for_gpt_oss("".join(tool_response_texts))
8 changes: 7 additions & 1 deletion verl/trainer/ppo/ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from verl.utils.checkpoint.checkpoint_manager import find_latest_ckpt_path, should_save_ckpt_esi
from verl.utils.config import omega_conf_to_dataclass
from verl.utils.debug import marked_timer
from verl.utils.import_utils import load_class_from_fqn
from verl.utils.metric import reduce_metrics
from verl.utils.py_functional import rename_dict
from verl.utils.rollout_skip import RolloutSkip
Expand Down Expand Up @@ -847,7 +848,12 @@ def init_workers(self):
# create async rollout manager and request scheduler
self.async_rollout_mode = False
if self.config.actor_rollout_ref.rollout.mode == "async":
from verl.experimental.agent_loop import AgentLoopManager
# Support custom AgentLoopManager via config
manager_class_fqn = self.config.actor_rollout_ref.rollout.get("agent", {}).get("agent_loop_manager_class")
if manager_class_fqn:
AgentLoopManager = load_class_from_fqn(manager_class_fqn, "AgentLoopManager")
else:
from verl.experimental.agent_loop import AgentLoopManager

self.async_rollout_mode = True
if self.config.reward_model.enable and self.config.reward_model.enable_resource_pool:
Expand Down
33 changes: 33 additions & 0 deletions verl/utils/import_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,39 @@ def load_extern_object(module_path: str, object_name: str) -> object:
return getattr(module, object_name)


def load_class_from_fqn(fqn: str, description: str = "class") -> type:
"""Load a class from its fully qualified name.

Args:
fqn: Fully qualified class name (e.g., 'mypackage.module.ClassName').
description: Description for error messages (e.g., 'AgentLoopManager').

Returns:
The loaded class.

Raises:
ValueError: If fqn format is invalid (missing dot separator).
ImportError: If the module cannot be imported.
AttributeError: If the class is not found in the module.

Example:
>>> cls = load_class_from_fqn("verl.experimental.agent_loop.AgentLoopManager")
>>> instance = cls(config=config, ...)
"""
if "." not in fqn:
raise ValueError(
f"Invalid {description} '{fqn}'. Expected fully qualified class name (e.g., 'mypackage.module.ClassName')."
)
try:
module_path, class_name = fqn.rsplit(".", 1)
module = importlib.import_module(module_path)
return getattr(module, class_name)
except ImportError as e:
raise ImportError(f"Failed to import module '{module_path}' for {description}: {e}") from e
except AttributeError as e:
raise AttributeError(f"Class '{class_name}' not found in module '{module_path}': {e}") from e


@deprecated(replacement="load_module(file_path); getattr(module, type_name)")
def load_extern_type(file_path: str, type_name: str) -> type:
"""DEPRECATED. Directly use `load_extern_object` instead."""
Expand Down
6 changes: 6 additions & 0 deletions verl/workers/config/rollout.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class AgentLoopConfig(BaseConfig):
default_agent_loop: str = "single_turn_agent"
agent_loop_config_path: Optional[str] = None
custom_async_server: CustomAsyncServerConfig = field(default_factory=CustomAsyncServerConfig)
# Fully qualified class name for custom AgentLoopManager (e.g., "mypackage.module.MyManager").
# Security: This class will be dynamically imported via importlib. Only use trusted class paths.
agent_loop_manager_class: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -179,6 +182,9 @@ class RolloutConfig(BaseConfig):
# Use Prometheus to collect and monitor rollout statistics
prometheus: PrometheusConfig = field(default_factory=PrometheusConfig)

# Extension point for custom configurations
custom: Optional[dict] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field seems not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is designed as an extension point for downstream projects to pass custom configurations without modifying core config classes.

For example, a project implementing remote rollout could use:

actor_rollout_ref:
  rollout:
    custom:
      remote_rollout:
        server_url: "https://..."
        callback_url: "http://..."
        timeout_seconds: 300

Then access it via config.actor_rollout_ref.rollout.get("custom", {}) in their custom AgentLoopManager.

Happy to add a docstring clarifying this is an extension point!


update_weights_bucket_megabytes: int = 512

skip_rollout: bool = False
Expand Down
Loading