Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions openmanus_rl/agentgym/agentenv-webshop/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ channels:
- conda-forge
- defaults
dependencies:
- python=3.8.13=ha86cf86_0_cpython
- python=3.8.13
- faiss-cpu=1.7.4
- openjdk=11.0.21=h4260e57_0
- openjdk=11.0.21
167 changes: 121 additions & 46 deletions openmanus_rl/llm_agent/openmanus.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _init_env_clients(self) -> List[Any]: # Renamed and return type changed
# raise ValueError(f"Task class {task_class_name} did not provide a client for port {port}.")
except Exception as e:
print(f" - Client {i+1}: Error initializing Task or getting client for port {port}: {e}")
print(traceback.format_exc()) # Print detailed traceback
print(traceback.format_exc())
# Decide how to handle failure: raise error or skip? Skipping for now.
# raise

Expand Down Expand Up @@ -336,7 +336,8 @@ def _run_single_rollout(self, initial_prompt_ids: torch.Tensor, task_idx: int, c
'step_rewards': step_rewards, # List of rewards from each env.step call
'reward': final_reward, # Reward from the *last* env.step call
'env_score': final_env_score, # Final score reported by env info
'turns': turns,
'turns': turns, # Total number of turns executed
'valid_actions': len([msg for msg in trajectory if msg.get("from") == "gpt"]), # Count of agent's responses
'task_idx': task_idx,
'done': done # Whether the episode finished naturally or via error
}
Expand Down Expand Up @@ -422,17 +423,45 @@ def run_llm_loop(self, gen_batch: DataProto, output_dir: str = None, global_step
if not valid_results:
print("[Agent.run_llm_loop] Error: No valid rollout results collected.")
# Return empty DataProto but with correct structure if possible
return DataProto.from_dict({
empty_proto = DataProto.from_dict({
"input_ids": torch.empty((0,0), dtype=torch.long),
"attention_mask": torch.empty((0,0), dtype=torch.long),
"position_ids": torch.empty((0,0), dtype=torch.long),
"info_mask": torch.empty((0,0), dtype=torch.long),
"token_level_rewards": torch.empty((0,0), dtype=torch.float)
})
# Add necessary meta_info for downstream compute_log_prob call
empty_proto.meta_info = {'micro_batch_size': 1}
return empty_proto

# --- Format Results into DataProto ---
processed_data = self._convert_rollout_results_to_dataproto(valid_results, gen_batch)

# --- CRITICAL: Add necessary meta_info parameters for compute_log_prob ---
# These parameters are required by DataParallelActor.compute_log_prob
# Source values from the actor_rollout_wg config or AgentConfig
log_prob_micro_batch_size = getattr(self.actor_rollout_wg, 'log_prob_micro_batch_size', 128)
if hasattr(self.config, 'actor_rollout_ref') and hasattr(self.config.actor_rollout_ref, 'rollout'):
# If running within the trainer which has direct access to these configs
log_prob_micro_batch_size = getattr(self.config.actor_rollout_ref.rollout, 'log_prob_micro_batch_size', log_prob_micro_batch_size)

# Ensure these keys exist and have reasonable default values even if not specified in config
if 'micro_batch_size' not in processed_data.meta_info:
processed_data.meta_info['micro_batch_size'] = log_prob_micro_batch_size

if 'temperature' not in processed_data.meta_info:
processed_data.meta_info['temperature'] = getattr(self.config, 'temperature', 1.0)

if 'use_dynamic_bsz' not in processed_data.meta_info:
processed_data.meta_info['use_dynamic_bsz'] = getattr(self.config, 'log_prob_use_dynamic_bsz', False)

# If dynamic batch size is used, also set max_token_len
if processed_data.meta_info.get('use_dynamic_bsz', False):
max_token_len = getattr(self.config, 'log_prob_max_token_len_per_gpu', 2048)
processed_data.meta_info['max_token_len'] = max_token_len

print(f"[Agent.run_llm_loop] Added log_prob parameters to meta_info: micro_batch_size={processed_data.meta_info['micro_batch_size']}, temperature={processed_data.meta_info['temperature']}, use_dynamic_bsz={processed_data.meta_info['use_dynamic_bsz']}")

print(f"[Agent.run_llm_loop] Finished processing rollout results.")
return processed_data

Expand All @@ -454,9 +483,25 @@ def _convert_rollout_results_to_dataproto(self, results: List[Dict], original_ba
batch_position_ids = []
batch_info_mask = []
batch_token_level_rewards = [] # Store final token-level rewards for PPO
batch_meta_info = defaultdict(list)
batch_responses = [] # Initialize batch_responses

# Initialize final_meta_info by copying all items from the original_batch.meta_info
# This ensures that any global metadata from the input batch is preserved.
final_meta_info = {}
if hasattr(original_batch, 'meta_info') and original_batch.meta_info:
for k, v in original_batch.meta_info.items():
final_meta_info[k] = v # Shallow copy, or deepcopy if mutable objects are a concern

# For collecting stats and per-rollout lists that will be converted to tensors or kept as lists
per_rollout_task_idx = []
per_rollout_turns_stats = []
per_rollout_valid_action_stats = []
per_rollout_done_flags = []
per_rollout_valid_search_stats = [] # Placeholder
per_rollout_rewards = [] # Last step reward for each rollout
per_rollout_env_scores = [] # Final env score for each rollout
per_rollout_trajectories = [] # List of trajectories

# Get reward allocation strategy from config
reward_allocation = "last_token" # Default
if self.config.algorithm_config:
Expand All @@ -478,12 +523,35 @@ def _convert_rollout_results_to_dataproto(self, results: List[Dict], original_ba

turns = result_dict.get('turns', 0)
task_idx = result_dict.get('task_idx', -1)

# Get the original batch index
valid_actions_count = result_dict.get('valid_actions', 0)
done_flag = result_dict.get('done', True) # Default to True if missing, indicating completion or error
reward_val = result_dict.get('reward', 0.0)
env_score_val = result_dict.get('env_score', 0.0)
trajectory_val = result_dict.get('trajectory', [])

# Correctly append to per_rollout_ lists
per_rollout_task_idx.append(task_idx)
per_rollout_turns_stats.append(turns)
per_rollout_valid_action_stats.append(valid_actions_count)
per_rollout_done_flags.append(done_flag)
per_rollout_valid_search_stats.append(0) # Placeholder, as search is not explicitly tracked here
per_rollout_rewards.append(reward_val)
per_rollout_env_scores.append(env_score_val)
per_rollout_trajectories.append(trajectory_val)

# Get the original batch index (used for trajectory processing below)
original_batch_idx = original_indices_map.get(task_idx, -1)
if original_batch_idx == -1:
print(f"[Agent._convert_rollout] Warning: Task idx {task_idx} not found in original batch. Skipping.")
continue
print(f"[Agent._convert_rollout] Warning: Task idx {task_idx} not found in original batch. Skipping this result for trajectory processing.")
# If a result can't be mapped, its trajectory-derived tensors might be misaligned.
# For simplicity, we might skip creating tensor entries for it, or handle padding carefully.
# However, its stats (task_idx, turns, etc.) are already appended to per_rollout_ lists.
# This might lead to length mismatches if not handled carefully when creating final tensors.
# A robust solution would be to filter results list upfront or ensure all task_idx are mappable.
# For now, we proceed, and downstream tensor creation should handle potential Nones if any result is fully skipped.
# OR, more simply, if we can't map, we might have to skip this entire result_dict earlier.
# For now, let the per_rollout lists gather all data, and mismatches will be an issue at tensor conversion.
pass # Original_batch_idx is used for trajectory processing, not for the stats lists directly.

# --- Concatenate conversation and identify agent segments ---
conversation_ids_list = []
Expand Down Expand Up @@ -675,20 +743,25 @@ def _convert_rollout_results_to_dataproto(self, results: List[Dict], original_ba
batch_responses.append(response_only_ids_padded)

# Add metadata
batch_meta_info["task_idx"].append(task_idx)
batch_meta_info["turns_stats"].append(turns)
batch_meta_info["valid_action_stats"].append(valid_actions)
batch_meta_info["reward"].append(result_dict.get('reward', 0.0)) # Last step reward
batch_meta_info["env_score"].append(result_dict.get('env_score', 0.0)) # Final env score
batch_meta_info["rollout_trajectory"].append(trajectory)
# Copy relevant metadata from original_batch
for key, value in original_batch.meta_info.items():
if key not in ['idx', 'reward', 'env_score']: # Avoid duplication
if isinstance(value, list) and len(value) > original_batch_idx:
batch_meta_info[key].append(value[original_batch_idx])
elif not isinstance(value, list): # Keep non-list metadata
if task_idx == original_indices[0]: # Add only once per batch
batch_meta_info[key] = value
if "task_idx" not in final_meta_info:
final_meta_info["task_idx"] = []
if "turns_stats" not in final_meta_info:
final_meta_info["turns_stats"] = []
if "valid_action_stats" not in final_meta_info:
final_meta_info["valid_action_stats"] = []
if "reward" not in final_meta_info:
final_meta_info["reward"] = []
if "env_score" not in final_meta_info:
final_meta_info["env_score"] = []
if "rollout_trajectory" not in final_meta_info:
final_meta_info["rollout_trajectory"] = []

final_meta_info["task_idx"].append(task_idx)
final_meta_info["turns_stats"].append(turns)
final_meta_info["valid_action_stats"].append(valid_actions_count)
final_meta_info["reward"].append(reward_val)
final_meta_info["env_score"].append(env_score_val)
final_meta_info["rollout_trajectory"].append(trajectory_val)

# --- Stack Tensors ---
if not batch_input_ids:
Expand All @@ -707,36 +780,38 @@ def _convert_rollout_results_to_dataproto(self, results: List[Dict], original_ba
"input_ids": torch.cat(batch_input_ids, dim=0),
"attention_mask": torch.cat(batch_attention_mask, dim=0),
"position_ids": torch.cat(batch_position_ids, dim=0),
"info_mask": torch.cat(batch_info_mask, dim=0),
"info_mask": torch.cat(batch_info_mask, dim=0), # This is the equivalent of responses_with_info_mask related construction
"token_level_rewards": torch.cat(batch_token_level_rewards, dim=0),
"responses": torch.cat(batch_responses, dim=0)
}

# Create DataProto and add metadata
data_proto = DataProto.from_dict(final_batch)
for key, value in batch_meta_info.items():
try:
if isinstance(value, list) and all(isinstance(item, (int, float)) for item in value):
data_proto.meta_info[key] = torch.tensor(value)
# Handle numpy arrays if they appear
elif isinstance(value, np.ndarray):
data_proto.meta_info[key] = torch.from_numpy(value)
else:
# Keep as list for non-numeric types (like trajectories)
data_proto.meta_info[key] = value
except (ValueError, TypeError, RuntimeError) as e:
# Fallback: keep as list if tensor conversion fails
print(f"[Agent._convert_rollout] Warning: Could not convert metadata '{key}' to tensor: {e}. Keeping as list.")
data_proto.meta_info[key] = value

# Explicitly add final env scores as a tensor if possible
if "env_score" in batch_meta_info:
try:
data_proto.meta_info["env_scores"] = torch.tensor(batch_meta_info["env_score"], dtype=torch.float32)
except (ValueError, TypeError):
# Fallback case
print("[Agent._convert_rollout] Could not convert env_scores to tensor, keeping original list.")
data_proto.meta_info["env_scores"] = batch_meta_info["env_score"]

# Add collected statistics and per-rollout lists to final_meta_info, converting to tensors where appropriate
# These will overwrite any keys with the same name inherited from original_batch.meta_info if they were lists per sample.
final_meta_info['task_idx'] = torch.tensor(per_rollout_task_idx, dtype=torch.long)
final_meta_info['turns_stats'] = torch.tensor(per_rollout_turns_stats, dtype=torch.long)
final_meta_info['valid_action_stats'] = torch.tensor(per_rollout_valid_action_stats, dtype=torch.long)
final_meta_info['valid_search_stats'] = torch.tensor(per_rollout_valid_search_stats, dtype=torch.long) # Will be zeros
final_meta_info['active_mask'] = torch.tensor([not done for done in per_rollout_done_flags], dtype=torch.bool)
final_meta_info['reward'] = torch.tensor(per_rollout_rewards, dtype=torch.float32) # Individual rewards per rollout
final_meta_info['env_score'] = torch.tensor(per_rollout_env_scores, dtype=torch.float32) # Final scores per rollout
final_meta_info['rollout_trajectory'] = per_rollout_trajectories # Keep as list of lists/dicts

# If 'idx' was in original_batch.meta_info and was a tensor, it might have been copied directly.
# If it needs to be specifically task_idx, the above 'task_idx' tensor is now authoritative for the samples in this batch.
# We can choose to remove the original 'idx' if it causes confusion or ensure it's compatible.
# For now, the new 'task_idx' list converted to a tensor becomes the primary index for these processed samples.
if 'idx' in final_meta_info and not torch.is_tensor(final_meta_info['idx']):
# If original idx was not a tensor or needs to be sample-specific for this processed batch
print(f"[Agent._convert_rollout] Replacing original 'idx' with new 'task_idx' tensor.")
final_meta_info['idx'] = final_meta_info['task_idx']
elif 'idx' not in final_meta_info:
final_meta_info['idx'] = final_meta_info['task_idx']

# Assign the fully constructed final_meta_info to the DataProto object
data_proto.meta_info = final_meta_info

print(f"[Agent._convert_rollout] Final batch shapes: input_ids={final_batch['input_ids'].shape}, token_level_rewards={final_batch['token_level_rewards'].shape}, responses={final_batch['responses'].shape}")
return data_proto
Expand Down
17 changes: 7 additions & 10 deletions train_ppo.sh
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
#!/bin/bash

# --- Configuration (defaults, can be overridden via env vars) ---
export CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-0,5,9}
export CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-1,4,5}
WAND_PROJECT=${WAND_PROJECT:-'OpenManus-rl'}
export BASE_MODEL=${BASE_MODEL:-'Qwen/Qwen2.5-3B'}
export BASE_MODEL=${BASE_MODEL:-'../model/Qwen2.5-3B'}
AGENTGYM_HOST=${AGENTGYM_HOST:-'0.0.0.0'} # Default to 0.0.0.0 for external access
AGENTGYM_SQL_BIRD_PATH=${AGENTGYM_SQL_BIRD_PATH:-} # Used only for sqlgym
export NCCL_IB_DISABLE=1
export NCCL_P2P_DISABLE=1
export PYTHONPATH="./openmanus_rl/agentgym/agentenv:${PYTHONPATH}"
export VLLM_ATTENTION_BACKEND=XFORMERS # vllm + qwen2-7b with flash_attn has some issues


# --- Argument Parsing ---
usage() {
Expand Down Expand Up @@ -230,6 +226,7 @@ export EXPERIMENT_NAME="OpenManus-rl-ppo-${BASE_MODEL##*/}-${AGENTGYM_ENV_NAME}$

# --- Run PPO Training in Base Environment ---
echo -e "\\n[Trainer] Running PPO training in base environment '$BASE_CONDA_ENV'..."
export VLLM_ATTENTION_BACKEND=${VLLM_ATTENTION_BACKEND:-XFORMERS}

# Construct server base URL, adding path if needed
AGENTGYM_SERVER_BASE="http://$AGENTGYM_HOST" # Base URL without port
Expand Down Expand Up @@ -284,7 +281,7 @@ hydra_overrides=(
"data.env_ports=[${AGENTGYM_PORTS_STR}]"
"data.train_data_num=null"
"data.val_data_num=null"
"data.train_batch_size=6"
"data.train_batch_size=3"
"data.val_batch_size=3"
"data.max_prompt_length=4096"
"data.max_response_length=1000"
Expand All @@ -297,8 +294,8 @@ hydra_overrides=(
"actor_rollout_ref.model.enable_gradient_checkpointing=true"
"actor_rollout_ref.model.use_remove_padding=True"
"actor_rollout_ref.actor.optim.lr_warmup_steps_ratio=0.95"
"actor_rollout_ref.actor.ppo_mini_batch_size=6"
"actor_rollout_ref.actor.ppo_micro_batch_size=6"
"actor_rollout_ref.actor.ppo_mini_batch_size=4"
"actor_rollout_ref.actor.ppo_micro_batch_size=4"
"actor_rollout_ref.actor.fsdp_config.param_offload=true"
"actor_rollout_ref.actor.fsdp_config.grad_offload=true"
"actor_rollout_ref.actor.fsdp_config.optimizer_offload=true"
Expand Down Expand Up @@ -332,7 +329,7 @@ hydra_overrides=(
"trainer.default_hdfs_dir=null"
"trainer.n_gpus_per_node=3"
"trainer.nnodes=1"
"trainer.save_freq=100"
"trainer.save_freq=1"
"trainer.test_freq=50"
"trainer.project_name=$WAND_PROJECT"
"trainer.experiment_name=$EXPERIMENT_NAME"
Expand Down
2 changes: 1 addition & 1 deletion verl/trainer/main_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def main(config):
env_vars_for_main_task['CUDA_VISIBLE_DEVICES'] = original_cuda_visible

print(f"[main] Runtime env to be passed to main_task actor: {{'env_vars': {env_vars_for_main_task}}}")
ray.get(main_task.options(runtime_env={'env_vars': env_vars_for_main_task}).remote(config))
ray.get(main_task.remote(config))
print("[main] main_task finished.")
ray.shutdown()
print("[main] Ray shutdown.")
Expand Down
Loading
Loading