support trajectory-based agents and OpenHands (#1184)#1548
support trajectory-based agents and OpenHands (#1184)#1548CharlieFRuan merged 5 commits intoNovaSky-AI:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a ChatHistoryExtractor class to support trajectory-based agents, along with a configuration and execution script for OpenHands. The review feedback highlights that the new extractor is currently unused in the main logic and that several required modules are not imported in harbor_generator.py.
| MAX_NUM_RETRIES_PER_TRIAL = 2 | ||
|
|
||
|
|
||
| class ChatHistoryExtractor: |
There was a problem hiding this comment.
Several required modules (json, pathlib.Path, and urllib.parse.urlparse) are used within the new ChatHistoryExtractor class but have not been imported in this file. This will result in a NameError at runtime when these methods are called.
import json
from pathlib import Path
from urllib.parse import urlparse
class ChatHistoryExtractor:| class ChatHistoryExtractor: | ||
| """Extracts a (chat_history, summarization_count, num_turns) tuple from Harbor trial results. | ||
|
|
||
| Supports two extraction strategies, tried in order: | ||
| 1. all_messages agents (terminus-2, terminus-1, terminus): metadata["all_messages"] | ||
| 2. Trajectory-based agents (mini-swe-agent, swe-agent, openhands): | ||
| trajectory.json converted to user/assistant messages | ||
| """ | ||
|
|
||
| # Agents that write trajectory.json (ATIF format) instead of metadata["all_messages"]. | ||
| # OpenHands uses condensation (off-policy) - use reject_summarization=false to allow. | ||
| TRAJECTORY_BASED_AGENTS = frozenset( | ||
| {"mini-swe-agent", "swe-agent", "openhands", "openhands-host"}) | ||
|
|
||
| @classmethod | ||
| def extract(cls, results) -> Optional[tuple]: | ||
| """Return (chat_history, summarization_count, num_turns) or None on failure.""" | ||
| agent_result = results.agent_result | ||
| if agent_result is None: | ||
| return None | ||
|
|
||
| metadata = agent_result.metadata or {} | ||
| chat_history = metadata.get("all_messages") | ||
| if chat_history is not None: | ||
| return chat_history, metadata.get("summarization_count", 0), metadata.get("n_episodes", 0) | ||
|
|
||
| # Fallback: load from trajectory.json or completions for trajectory-based agents | ||
| agent_name = (getattr(results.config.agent, | ||
| "name", None) or "").lower() | ||
| if agent_name not in cls.TRAJECTORY_BASED_AGENTS: | ||
| return None | ||
|
|
||
| trial_path = cls._trial_path_from_uri( | ||
| getattr(results, "trial_uri", None) or "") | ||
| if trial_path is None: | ||
| return None | ||
|
|
||
| trajectory_path = trial_path / "agent" / "trajectory.json" | ||
| chat_history = cls._from_atif_trajectory(trajectory_path) | ||
| if chat_history is None: | ||
| return None | ||
|
|
||
| # Trajectory-based agents don't track summarization; use 0 for strictly appending | ||
| return chat_history, 0, cls._count_turns(chat_history) | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Private helpers | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| @staticmethod | ||
| def _count_turns(messages: List[dict]) -> int: | ||
| return sum(1 for m in messages if m["role"] == "assistant") | ||
|
|
||
| @staticmethod | ||
| def _trial_path_from_uri(trial_uri: str) -> Optional[Path]: | ||
| """Extract local filesystem path from trial_uri (e.g. file:///path/to/trial).""" | ||
| if not trial_uri: | ||
| return None | ||
| try: | ||
| parsed = urlparse(trial_uri) | ||
| if parsed.scheme == "file" and parsed.path: | ||
| return Path(parsed.path) | ||
| except Exception: | ||
| pass | ||
| return None | ||
|
|
||
| @classmethod | ||
| def _from_atif_trajectory(cls, trajectory_path: Path) -> Optional[List[dict]]: | ||
| """Convert ATIF trajectory JSON to user/assistant chat messages for SkyRL training. | ||
|
|
||
| Handles system steps (prepended to first user message), agent observations | ||
| (converted to user messages for alternating user/assistant pattern), and | ||
| tool_calls (serialized into assistant content). | ||
| """ | ||
| if not trajectory_path.exists(): | ||
| return None | ||
| try: | ||
| with open(trajectory_path) as f: | ||
| data = json.load(f) | ||
| except Exception as e: | ||
| logger.warning( | ||
| f"Failed to load trajectory from {trajectory_path}: {e}") | ||
| return None | ||
|
|
||
| messages: List[dict] = [] | ||
| pending_system: List[str] = [] | ||
|
|
||
| for step in data.get("steps", []): | ||
| source = step.get("source", "") | ||
| message = step.get("message", "") | ||
| observation = step.get("observation") | ||
|
|
||
| if source == "system": | ||
| if message: | ||
| pending_system.append(message) | ||
| continue | ||
|
|
||
| if source == "user": | ||
| content = message or "" | ||
| if pending_system: | ||
| content = "\n\n".join(pending_system) + "\n\n" + content | ||
| pending_system = [] | ||
| messages.append({"role": "user", "content": content}) | ||
|
|
||
| elif source == "agent": | ||
| content = message or "" | ||
| if step.get("tool_calls"): | ||
| content = content + "\n" + \ | ||
| json.dumps({"tool_calls": step["tool_calls"]}) | ||
| if not content: | ||
| continue | ||
| messages.append({"role": "assistant", "content": content}) | ||
|
|
||
| # Observations represent environment feedback; emit as user message | ||
| # to maintain the alternating user/assistant pattern required for RL. | ||
| if observation and observation.get("results"): | ||
| obs_parts = [r.get("content", "") | ||
| for r in observation["results"] if r.get("content")] | ||
| if obs_parts: | ||
| messages.append( | ||
| {"role": "user", "content": "\n".join(obs_parts)}) | ||
|
|
||
| return messages if messages else None |
There was a problem hiding this comment.
🔴 ChatHistoryExtractor is defined but never called, making OpenHands integration non-functional
The PR adds ChatHistoryExtractor to handle trajectory-based agents like OpenHands (which write trajectory.json instead of populating metadata["all_messages"]), but harbor_agent_loop at examples/train_integrations/harbor/harbor_generator.py:396-398 still directly accesses results.agent_result.metadata["all_messages"]. The new openhands.yaml config omits store_all_messages: true (which the docs at docs/content/docs/harbor/index.mdx:224 state is required), because OpenHands doesn't support it. When running OpenHands via the new config, line 396 raises KeyError, the except Exception at line 407 catches it, retries exhaust, and every trajectory is marked as failed. The ChatHistoryExtractor.extract() method was written to gracefully fall back to trajectory.json for these agents but is never invoked.
Prompt for agents
The ChatHistoryExtractor class is defined at line 31 but never used. The harbor_agent_loop method at lines 396-398 still hardcodes metadata['all_messages'], metadata['summarization_count'], and metadata['n_episodes'] access, which will fail for OpenHands (trajectory-based) agents. To fix: replace lines 396-398 in harbor_agent_loop with a call to ChatHistoryExtractor.extract(results). Something like:
extracted = ChatHistoryExtractor.extract(results)
if extracted is None:
logger.warning(f'{prefix} failed: Could not extract chat history')
continue
chat_history, summarization_count, num_turns = extracted
Additionally, ChatHistoryExtractor uses Path, json, and urlparse which are not imported at the top of the file. Add: import json, from pathlib import Path, from urllib.parse import urlparse.
Was this helpful? React with 👍 or 👎 to provide feedback.
|
Thank you! |
Notes from Charlie:
rollout_detailsfor book-keepings likeprompt_token_ids,response_token_idsandlogprobs, which is only supported by Terminus 2 in Harbor.PR Description
This PR implements the Harbor side of issue #1184: support for agents beyond terminus-2 (for example OpenHands), including strictly-appending chat extraction.
What changed
examples/train_integrations/harbor/harbor_generator.pymetadata["all_messages"]when present (unchanged for terminus-style agents).openhands,mini-swe-agent,swe-agent), fallback: load{trial_uri}/agent/trajectory.json, parse ATIF steps, and emit an alternatinguser/assistantmessage list (system text folded into the first user turn; tool calls serialized into assistant content; observations mapped to user messages where needed).reject_summarization(SkyRL-side extension on the Harbor trial config dict): whentrue(default), drop rollouts that report summarization for theall_messagespath (mirrors the “RL only with summarization off” story used for terminus-2).api_baseinjection: only set hosted-modelapi_baseif the trial template has not already set it (supports Docker /host.docker.internal-style overrides).summarization_count/num_turnswhen aggregating rollout stats.examples/train_integrations/harbor/harbor_trial_config/openhands.yamlexamples/train_integrations/harbor/run_codecontest_openhands.shOPENHANDS_*env vars to disable condensation and history truncation (cleaner context-window failure, closer to strictly-appending chat for RL), sets a defaultVLLM_API_BASEfor containers reaching host vLLM, and invokes the Harbor entrypoint with OpenHands-oriented settings.Attached trajectories + debugging logs for a sample code contest task.
code_contests-4316__FmrT9kw_training.log
code_contests-4316__FmrT9kw.trajectory.json