Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@
set -euxo pipefail

project_name='puffin'
exp_name='Qwen2.5-7B-Puffin-Test'
exp_name='Qwen2.5-7B-Math-Puffin-Test'

# Ray
export RAY_ADDRESS=${RAY_ADDRESS:-"http://localhost:8265"}
export RUNTIME_ENV=${RUNTIME_ENV:-"./verl/trainer/runtime_env.yaml"}
export NNODES=${NNODES:-4}
# Paths
MODEL_PATH=${MODEL_PATH:-"${HOME}/verl/models/Qwen2.5-7B"}
CKPTS_DIR=${CKPTS_DIR:-"${HOME}/verl/ckpts/${project_name}/${exp_name}"}
TRAIN_FILE=${TRAIN_FILE:-"${HOME}/verl/data/puffin_train.parquet"}
TEST_FILE=${TEST_FILE:-"${HOME}/verl/data/puffin_test.parquet"}
export RAY_DATA_HOME=${RAY_DATA_HOME:-"${HOME}/verl"}
export MODEL_PATH=${MODEL_PATH:-"${RAY_DATA_HOME}/models/Qwen2.5-Math-7B"}
export CKPTS_DIR=${CKPTS_DIR:-"${RAY_DATA_HOME}/ckpts/${project_name}/${exp_name}"}
export TRAIN_FILE=${TRAIN_FILE:-"${RAY_DATA_HOME}/data/puffin_train.parquet"}
export TEST_FILE=${TEST_FILE:-"${RAY_DATA_HOME}/data/puffin_test.parquet"}

# Algorithm
## Train
max_prompt_length=$((1024 * 2))
max_response_length=$((1024 * 8))
# TODO
# force_append_eos=True
max_prompt_length=$((1024 * 1))
max_response_length=$((1024 * 3))
gen_prompt_bsz=512
train_prompt_bsz=512
train_prompt_mini_bsz=32
## Validation
val_top_k=-1 # 0 for HF rollout, -1 for vLLM rollout

Expand All @@ -24,24 +30,29 @@ val_top_k=-1 # 0 for HF rollout, -1 for vLLM rollout
use_dynamic_bsz=True
infer_micro_batch_size=null
train_micro_batch_size=null
offload=True
offload=False

python3 -m verl.trainer.main_ppo \
ray job submit --no-wait --runtime-env="${RUNTIME_ENV}" \
--working-dir "${PWD}" \
-- python3 -m verl.trainer.main_ppo \
data.train_files="${TRAIN_FILE}" \
data.val_files="${TEST_FILE}" \
data.prompt_key=prompt \
data.truncation='left' \
data.max_prompt_length=${max_prompt_length} \
data.max_response_length=${max_response_length} \
data.train_batch_size=512 \
data.gen_batch_size=${gen_prompt_bsz} \
data.train_batch_size=${train_prompt_bsz} \
data.truncation='left' \
actor_rollout_ref.rollout.n=16 \
actor_rollout_ref.actor.kl_loss_coef=0 \
actor_rollout_ref.actor.clip_ratio=0.2 \
actor_rollout_ref.actor.clip_ratio_low=0.2 \
actor_rollout_ref.actor.clip_ratio_high=0.25 \
algorithm.adv_estimator=grpo \
algorithm.kl_ctrl.kl_coef=0.0 \
algorithm.gamma=1.0 \
algorithm.lam=0.95 \
algorithm.filter_groups.enable=True \
algorithm.filter_groups.fill_train_batch=True \
algorithm.filter_groups.drop_last_mini_batch=True \
actor_rollout_ref.model.use_remove_padding=True \
actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \
actor_rollout_ref.ref.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \
Expand All @@ -57,7 +68,7 @@ python3 -m verl.trainer.main_ppo \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.actor.optim.lr_warmup_steps=10 \
actor_rollout_ref.actor.optim.weight_decay=0.1 \
actor_rollout_ref.actor.ppo_mini_batch_size=512 \
actor_rollout_ref.actor.ppo_mini_batch_size=${train_prompt_mini_bsz} \
actor_rollout_ref.actor.ppo_micro_batch_size=${train_micro_batch_size} \
actor_rollout_ref.actor.fsdp_config.param_offload=${offload} \
actor_rollout_ref.actor.fsdp_config.optimizer_offload=${offload} \
Expand All @@ -67,20 +78,24 @@ python3 -m verl.trainer.main_ppo \
actor_rollout_ref.rollout.gpu_memory_utilization=0.85 \
actor_rollout_ref.rollout.log_prob_micro_batch_size=${infer_micro_batch_size} \
actor_rollout_ref.rollout.tensor_model_parallel_size=1 \
actor_rollout_ref.rollout.enable_chunked_prefill=True \
actor_rollout_ref.rollout.max_num_batched_tokens=$((max_prompt_length + max_response_length)) \
actor_rollout_ref.rollout.val_kwargs.top_k="${val_top_k}" \
actor_rollout_ref.rollout.val_kwargs.top_p=0.7\
actor_rollout_ref.rollout.val_kwargs.top_p=1.0\
actor_rollout_ref.rollout.val_kwargs.temperature=1.0 \
actor_rollout_ref.rollout.val_kwargs.n=1 \
actor_rollout_ref.rollout.val_kwargs.do_sample=True \
actor_rollout_ref.ref.log_prob_micro_batch_size=${infer_micro_batch_size} \
actor_rollout_ref.ref.fsdp_config.param_offload=${offload} \
actor_rollout_ref.ref.ulysses_sequence_parallel_size=1 \
actor_rollout_ref.actor.fsdp_config.fsdp_size=-1 \
custom_reward_function.overlong_buffer.len=512 \
custom_reward_function.overlong_buffer.penalty_factor=1.0 \
trainer.logger=['console','wandb'] \
trainer.project_name="${project_name}" \
trainer.experiment_name="${exp_name}" \
trainer.n_gpus_per_node=8 \
trainer.nnodes=1 \
trainer.nnodes="${NNODES}" \
+trainer.val_before_train=True \
trainer.test_freq=2 \
trainer.save_freq=2 \
Expand Down
35 changes: 35 additions & 0 deletions verl/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,41 @@ def select(self, batch_keys=None, non_tensor_batch_keys=None, meta_info_keys=Non

return DataProto(batch=sub_batch, non_tensor_batch=non_tensor_batch, meta_info=sub_meta_info)

def sel_idxs(self, idxs):
"""
Select specific indices from the DataProto.

Args:
idxs (torch.Tensor or numpy.ndarray or list): Indices to select

Returns:
DataProto: A new DataProto containing only the selected indices
"""
if isinstance(idxs, list):
idxs = torch.tensor(idxs, dtype=torch.int32)

if isinstance(idxs, np.ndarray):
idxs_np = idxs
idxs_torch = torch.from_numpy(idxs)
else: # torch.Tensor
idxs_torch = idxs
idxs_np = idxs.detach().cpu().numpy()

if self.batch is not None:
# Use TensorDict's built-in indexing capabilities
selected_batch = TensorDict(source={
key: tensor[idxs_torch] for key, tensor in self.batch.items()
},
batch_size=(idxs_torch.shape[0],))
else:
selected_batch = None

selected_non_tensor = {}
for key, val in self.non_tensor_batch.items():
selected_non_tensor[key] = val[idxs_np]

return DataProto(batch=selected_batch, non_tensor_batch=selected_non_tensor, meta_info=self.meta_info)

def pop(self, batch_keys=None, non_tensor_batch_keys=None, meta_info_keys=None) -> 'DataProto':
"""Pop a subset of the DataProto via `batch_keys` and `meta_info_keys`

Expand Down
11 changes: 10 additions & 1 deletion verl/trainer/config/ppo_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ data:
reward_fn_key: data_source
max_prompt_length: 512
max_response_length: 512
train_batch_size: 1024
gen_batch_size: 1024
train_batch_size: ${data.gen_batch_size}
val_batch_size: null # DEPRECATED: Validation datasets are sent to inference engines as a whole batch, which will schedule the memory themselves
return_raw_input_ids: False # This should be set to true when the tokenizer between policy and rm differs
return_raw_chat: False
Expand Down Expand Up @@ -167,6 +168,10 @@ reward_model:
custom_reward_function:
path: null
name: compute_score
overlong_buffer:
len: 0
penalty_factor: 1.0
log: False

algorithm:
gamma: 1.0
Expand All @@ -176,6 +181,10 @@ algorithm:
kl_ctrl:
type: fixed
kl_coef: 0.001
filter_groups:
enable: False
fill_train_batch: True
drop_last_mini_batch: True

trainer:
balance_batch: True
Expand Down
8 changes: 6 additions & 2 deletions verl/trainer/main_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,14 @@ def main_task(config):
raise NotImplementedError

compute_score = get_custom_reward_fn(config)
reward_fn = reward_manager_cls(tokenizer=tokenizer, num_examine=0, compute_score=compute_score, reward_fn_key=config.data.reward_fn_key)
reward_fn = reward_manager_cls(tokenizer=tokenizer, num_examine=0, compute_score=compute_score, reward_fn_key=config.data.reward_fn_key,
max_resp_len=config.data.max_response_length,
overlong_buffer_cfg=config.custom_reward_function.overlong_buffer)

# Note that we always use function-based RM for validation
val_reward_fn = reward_manager_cls(tokenizer=tokenizer, num_examine=1, compute_score=compute_score, reward_fn_key=config.data.reward_fn_key)
val_reward_fn = reward_manager_cls(tokenizer=tokenizer, num_examine=1, compute_score=compute_score, reward_fn_key=config.data.reward_fn_key,
max_resp_len=config.data.max_response_length,
overlong_buffer_cfg=config.custom_reward_function.overlong_buffer)

resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping)

Expand Down
37 changes: 36 additions & 1 deletion verl/trainer/ppo/metric_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
"""

import torch
from typing import Any, Dict, List
from typing import Any, Dict, List, Callable
import numpy as np
from verl import DataProto
from collections import Counter, defaultdict


def reduce_metrics(metrics: Dict[str, List[Any]]) -> Dict[str, Any]:
Expand Down Expand Up @@ -166,3 +167,37 @@ def compute_throughout_metrics(batch: DataProto, timing_raw: Dict[str, float], n
'perf/time_per_step': time,
'perf/throughput': total_num_tokens / (time * n_gpus),
}

def bootstrap_metric(data: list[dict[str, Any]],
subset_size: int,
reduce_fns: list[Callable[[np.ndarray], float]],
n_bootstrap: int = 1000,
seed: int = 42
) -> list[tuple[float, float]]:
"""
Bootstrap the metric to get the confidence interval
"""
np.random.seed(seed)

bootstrap_metric_lsts = [[] for _ in range(len(reduce_fns))]
for _ in range(n_bootstrap):
bootstrap_idxs = np.random.choice(len(data), size=subset_size, replace=True)
bootstrap_data = [data[i] for i in bootstrap_idxs]
for i, reduce_fn in enumerate(reduce_fns):
bootstrap_metric_lsts[i].append(reduce_fn(bootstrap_data))
return [(np.mean(lst), np.std(lst)) for lst in bootstrap_metric_lsts]

def calc_maj_val(data: list[dict[str, Any]], vote_key: str, val_key: str) -> float:
"""
Calculate the majority voting metric
"""
vote2vals = defaultdict(list)
for d in data:
vote2vals[d[vote_key]].append(d[val_key])

vote2cnt = {k: len(v) for k, v in vote2vals.items()}
maj_vote = max(vote2cnt, key=vote2cnt.get)

maj_val = vote2vals[maj_vote][0]

return maj_val
Loading