Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed data/babyai/test.parquet
Binary file not shown.
Binary file modified data/babyai/train.parquet
Binary file not shown.
Binary file modified data/babyai/val.parquet
Binary file not shown.
247 changes: 133 additions & 114 deletions data/generate_train_agentgym_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import pandas as pd
from collections import defaultdict
import numpy as np
import json
import random

# Define environments to extract with correct naming
ENVIRONMENTS = [
Expand Down Expand Up @@ -49,22 +51,6 @@ def make_prefix(question, environment):
Task: {question}\n"""
return prefix

def extract_solution(solution_str):
"""
Extract numerical solution from a string.

Args:
solution_str: String containing the solution

Returns:
Extracted numerical value
"""
solution = re.search("#### (\\-?[0-9\\.\\,]+)", solution_str)
assert solution is not None
final_solution = solution.group(0)
final_solution = final_solution.split('#### ')[1].replace(',', '')
return final_solution

def process_group_data(group_name, group_samples):
"""
Process samples for a specific environment group.
Expand All @@ -81,6 +67,7 @@ def process_group_data(group_name, group_samples):
for idx, sample in enumerate(group_samples):
item_id = sample['item_id']
conversations = sample['conversations']
conversations_text = json.dumps(conversations)

# Process each conversation
dialog_data = []
Expand Down Expand Up @@ -110,14 +97,12 @@ def process_group_data(group_name, group_samples):
"ability": "agent-reasoning",
"reward_model": {
"style": "rule",
"ground_truth": {
"environment": group_name,
"task_id": item_id
}
"ground_truth": conversations_text
},
"extra_info": {
'split': group_name,
'index': idx,
'conversations': conversations_text
}
}
processed_data.append(data)
Expand Down Expand Up @@ -157,59 +142,54 @@ def group_samples_by_environment(data, env_mapping):

return env_groups

def split_data(samples, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1, random_seed=42):
def get_environment_from_item_id(item_id, env_mapping):
"""
Split data into train, validation, and test sets.
Determine the environment name from an item_id using the mapping.

Args:
samples: List of data samples
train_ratio: Ratio of training samples (default 0.8)
val_ratio: Ratio of validation samples (default 0.1)
test_ratio: Ratio of test samples (default 0.1)
random_seed: Random seed for reproducibility

item_id: The item identifier string
env_mapping: Dictionary mapping environment names to potential ID prefixes

Returns:
Dictionary with 'train', 'validation', 'test' keys containing corresponding samples
Environment name or None if not matched
"""
assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-10, "Ratios must sum to 1"

# Set random seed for reproducibility
np.random.seed(random_seed)

# Shuffle indices
indices = np.random.permutation(len(samples))

# Calculate split sizes
n_train = int(len(samples) * train_ratio)
n_val = int(len(samples) * val_ratio)

# Split indices
train_indices = indices[:n_train]
val_indices = indices[n_train:n_train + n_val]
test_indices = indices[n_train + n_val:]

# Create splits
splits = {
'train': [samples[i] for i in train_indices],
'validation': [samples[i] for i in val_indices],
'test': [samples[i] for i in test_indices],
}
for env_name, prefixes in env_mapping.items():
for prefix in prefixes:
if prefix in item_id:
return env_name
return None

def create_id_to_sample_map(samples):
"""
Create a mapping from item_id to sample data.

return splits
Args:
samples: List of sample data

Returns:
Dictionary mapping item_id to sample data
"""
id_map = {}
for sample in samples:
id_map[sample['item_id']] = sample
return id_map

def save_environment_data(env_groups, output_base_dir):
def save_environment_data(train_env_groups, eval_item_ids, train_id_to_sample, output_base_dir):
"""
Save environment data to separate directories with train/test/validation splits.
Save environment data with train samples, random validation samples, and test samples.
Test samples are from AgentTraj-L but their item_ids come from AgentEval.

Args:
env_groups: Dictionary with environment name as key and samples as value
train_env_groups: Dictionary with environment name as key and training samples as value
eval_item_ids: Dictionary with environment name as key and list of item_ids from evaluation dataset
train_id_to_sample: Dictionary mapping item_id to sample data from training dataset
output_base_dir: Base directory where environment subdirectories will be created
"""
# Ensure base output directory exists
os.makedirs(output_base_dir, exist_ok=True)

# Process each environment group
for env_name, samples in env_groups.items():
for env_name, samples in train_env_groups.items():
if not samples:
print(f"Warning: No samples found for environment '{env_name}'. Skipping.")
continue
Expand All @@ -220,73 +200,112 @@ def save_environment_data(env_groups, output_base_dir):
env_dir = os.path.join(output_base_dir, env_name)
os.makedirs(env_dir, exist_ok=True)

# Process samples for this environment
processed_samples = process_group_data(env_name, samples)
# Process training samples for this environment
processed_train_samples = process_group_data(env_name, samples)

# Split data into train/validation/test sets
if len(processed_samples) < 3:
print(f"Warning: Only {len(processed_samples)} samples for {env_name}, using all for train")
splits = {
'train': processed_samples,
'validation': processed_samples[:1], # Use first sample for both val and test
'test': processed_samples[:1] # if there's only one or two samples
}
# Get test sample ids from eval dataset for this environment
eval_ids = eval_item_ids.get(env_name, [])

# Find matching samples in training data using eval item_ids
test_sample_data = []
matched_count = 0
for item_id in eval_ids:
if item_id in train_id_to_sample:
test_sample_data.append(train_id_to_sample[item_id])
matched_count += 1

print(f"Found {matched_count} out of {len(eval_ids)} test samples for {env_name}")

# Process test samples
processed_test_samples = process_group_data(env_name, test_sample_data)

# Get validation samples: randomly select 20 samples from training data
# Avoid using samples that are in the test set
test_item_ids = set(item['item_id'] for item in processed_test_samples)
available_train_samples = [s for s in processed_train_samples if s['item_id'] not in test_item_ids]

val_samples = []
if len(available_train_samples) > 20:
# Random sample without replacement
val_indices = random.sample(range(len(available_train_samples)), 20)
val_samples = [available_train_samples[i] for i in val_indices]
else:
# Adjust split ratios for very small datasets
if len(processed_samples) < 10:
# For small datasets, ensure at least 1 sample in each split
train_ratio = max(0.6, 1 - 2/len(processed_samples))
val_ratio = test_ratio = (1 - train_ratio) / 2
print(f"Adjusted split ratios for small dataset: train={train_ratio:.2f}, val={val_ratio:.2f}, test={test_ratio:.2f}")
else:
train_ratio, val_ratio, test_ratio = 0.8, 0.1, 0.1

splits = split_data(
processed_samples,
train_ratio=train_ratio,
val_ratio=val_ratio,
test_ratio=test_ratio
)
# If less than 20 samples, use all available samples
val_samples = available_train_samples.copy()

# Save each split
for split_name, split_samples in splits.items():
if not split_samples:
print(f"Warning: No samples in {split_name} split for {env_name}")
continue

# Convert to DataFrame
df = pd.DataFrame(split_samples)

# Define output filename based on split
if split_name == 'validation':
output_file = os.path.join(env_dir, "val.parquet")
else:
output_file = os.path.join(env_dir, f"{split_name}.parquet")

# Save to parquet
df.to_parquet(output_file)
print(f"Saved {len(split_samples)} samples to {output_file}")
# Save train samples
if processed_train_samples:
train_df = pd.DataFrame(processed_train_samples)
train_file = os.path.join(env_dir, "train.parquet")
train_df.to_parquet(train_file)
print(f"Saved {len(processed_train_samples)} training samples to {train_file}")

# Save validation samples
if val_samples:
val_df = pd.DataFrame(val_samples)
val_file = os.path.join(env_dir, "val.parquet")
val_df.to_parquet(val_file)
print(f"Saved {len(val_samples)} validation samples to {val_file}")

# Save test samples
if processed_test_samples:
test_df = pd.DataFrame(processed_test_samples)
test_file = os.path.join(env_dir, "test.parquet")
test_df.to_parquet(test_file)
print(f"Saved {len(processed_test_samples)} test samples to {test_file}")
else:
print(f"Warning: No test samples found for environment '{env_name}'")

def main():
"""
Main function to process and save AgentGym dataset by environment.
Main function to process and save AgentGym datasets by environment.
"""
# Load the dataset
print("Loading dataset...")
dataset = load_dataset("AgentGym/AgentTraj-L")
data = dataset['train']
# Set random seed for reproducibility
random.seed(42)

# Load the training dataset
print("Loading training dataset (AgentTraj-L)...")
train_dataset = load_dataset("AgentGym/AgentTraj-L")
train_data = train_dataset['train']

# Load the evaluation dataset
print("Loading evaluation dataset (AgentEval)...")
eval_dataset = load_dataset("AgentGym/AgentEval")
eval_data = eval_dataset['test']

# Group training samples by environment
print("Grouping training samples by environment...")
train_env_groups = group_samples_by_environment(train_data, ENV_ID_MAPPING)

# Create a mapping from item_id to sample data
print("Creating item_id to sample mapping...")
train_id_to_sample = create_id_to_sample_map(train_data)

# Group evaluation sample IDs by environment
print("Grouping evaluation sample IDs by environment...")
eval_env_item_ids = defaultdict(list)

for sample in eval_data:
item_id = sample['item_id']
env_name = get_environment_from_item_id(item_id, ENV_ID_MAPPING)

if env_name:
eval_env_item_ids[env_name].append(item_id)
else:
print(f"Warning: Could not match evaluation sample with item_id '{item_id}' to any environment")

# Group samples by environment using the ID mapping
print("Grouping samples by environment...")
env_groups = group_samples_by_environment(data, ENV_ID_MAPPING)
# Print statistics
print("\n--- Training Data Statistics ---")
for env, samples in train_env_groups.items():
print(f"Environment: {env}, Number of training samples: {len(samples)}")

# Print group statistics
for env, samples in env_groups.items():
print(f"Environment: {env}, Number of samples: {len(samples)}")
print("\n--- Evaluation Data Statistics ---")
for env, item_ids in eval_env_item_ids.items():
print(f"Environment: {env}, Number of evaluation item_ids: {len(item_ids)}")

# Save environment data to appropriate directories
print("Saving environment data with train/val/test splits...")
save_environment_data(env_groups, output_base_dir='./')
# Save environment data
print("\nSaving environment data...")
save_environment_data(train_env_groups, eval_env_item_ids, train_id_to_sample, output_base_dir='./')

print("Processing complete!")

Expand Down
Binary file modified data/maze/test.parquet
Binary file not shown.
Binary file modified data/maze/train.parquet
Binary file not shown.
Binary file modified data/maze/val.parquet
Binary file not shown.
Binary file removed data/movie/test.parquet
Binary file not shown.
Binary file modified data/movie/train.parquet
Binary file not shown.
Binary file modified data/movie/val.parquet
Binary file not shown.
Binary file removed data/sciworld/test.parquet
Binary file not shown.
Binary file modified data/sciworld/train.parquet
Binary file not shown.
Binary file modified data/sciworld/val.parquet
Binary file not shown.
Binary file removed data/sqlgym/test.parquet
Binary file not shown.
Binary file modified data/sqlgym/train.parquet
Binary file not shown.
Binary file modified data/sqlgym/val.parquet
Binary file not shown.
Binary file removed data/textcraft/test.parquet
Binary file not shown.
Binary file modified data/textcraft/train.parquet
Binary file not shown.
Binary file modified data/textcraft/val.parquet
Binary file not shown.
Binary file removed data/todo/test.parquet
Binary file not shown.
Binary file modified data/todo/train.parquet
Binary file not shown.
Binary file modified data/todo/val.parquet
Binary file not shown.
Binary file removed data/weather/test.parquet
Binary file not shown.
Binary file modified data/weather/train.parquet
Binary file not shown.
Binary file modified data/weather/val.parquet
Binary file not shown.
Binary file removed data/webshop/test.parquet
Binary file not shown.
Binary file modified data/webshop/train.parquet
Binary file not shown.
Binary file modified data/webshop/val.parquet
Binary file not shown.
Binary file modified data/wordle/test.parquet
Binary file not shown.
Binary file modified data/wordle/train.parquet
Binary file not shown.
Binary file modified data/wordle/val.parquet
Binary file not shown.
Empty file removed example.sh
Empty file.
2 changes: 1 addition & 1 deletion train_ppo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# --- Configuration (defaults, can be overridden via env vars) ---
export CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-1,2,3} # change your GPU ID here
WAND_PROJECT=${WAND_PROJECT:-'OpenManus-rl'}
export BASE_MODEL=${BASE_MODEL:-'Qwen/Qwen2.5-3B'}
export BASE_MODEL=${BASE_MODEL:-'../model/Qwen2.5-3B'}
AGENTGYM_HOST=${AGENTGYM_HOST:-'0.0.0.0'} # Default to 0.0.0.0 for external access
AGENTGYM_SQL_BIRD_PATH=${AGENTGYM_SQL_BIRD_PATH:-} # Used only for sqlgym
export PYTHONPATH="./openmanus_rl/agentgym/agentenv:${PYTHONPATH}"
Expand Down
41 changes: 0 additions & 41 deletions verl/trainer/ppo/ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import re
from openmanus_rl.llm_agent.openmanus import OpenManusAgent, AgentConfig
from verl.utils.reward_score import SUPPORTED_REWARD_SCORE_FNS
from verl.utils.reward_score.agentgym import compute_score as agentgym_compute_score
from verl.utils.reward_score.reward_components import RewardComposer, GoalReward, LengthPenalty, FormatReward
from verl.utils.tracking import Tracking

import ray
Expand Down Expand Up @@ -518,52 +516,13 @@ def __init__(

self._create_dataloader()
self._init_logger()
self._init_reward_composer()

def _init_logger(self):
self.logger = Tracking(project_name=self.config.trainer.project_name,
experiment_name=self.config.trainer.experiment_name,
default_backend=self.config.trainer.logger,
config=OmegaConf.to_container(self.config, resolve=True))

def _init_reward_composer(self):
"""Initializes the RewardComposer based on the configuration."""
components = []
cfg = self.reward_component_config
print(f"[Trainer._init_reward_composer] Initializing with config: {cfg}")

# --- Build Reward Components List ---
# Example: Dynamically add components based on config
if cfg.get('goal_reward', {}).get('enabled', True):
components.append(GoalReward(weight=cfg['goal_reward'].get('weight', 1.0)))
print(" - Added GoalReward")

if cfg.get('length_penalty', {}).get('enabled', False):
lp_cfg = cfg['length_penalty']
components.append(LengthPenalty(
weight=lp_cfg.get('weight', -0.01),
max_length=lp_cfg.get('max_length', 500),
min_length=lp_cfg.get('min_length', 10),
penalty_type=lp_cfg.get('penalty_type', "linear")
))
print(" - Added LengthPenalty")

if cfg.get('format_reward', {}).get('enabled', False):
fmt_cfg = cfg['format_reward']
# Get patterns specific to the current env or use default
patterns = fmt_cfg.get('patterns_by_env', {}).get(
self.config.data.env_name, # Assumes env_name is available in self.config.data
fmt_cfg.get('patterns_by_env', {}).get('default', [])
)
components.append(FormatReward(
weight=fmt_cfg.get('weight', 0.2),
required_patterns=patterns
))
print(f" - Added FormatReward with patterns: {patterns}")

self.reward_composer = RewardComposer(components=components)
print(f"[Trainer._init_reward_composer] Composer initialized with {len(components)} components.")

def _create_dataloader(self):
from torch.utils.data import DataLoader
# TODO: we have to make sure the batch size is divisible by the dp size
Expand Down
Loading
Loading