diff --git a/recipe/puffin/run_puffin_qwen2.5_7b_test.sh b/recipe/puffin/run_puffin_7b_test_ray.sh similarity index 66% rename from recipe/puffin/run_puffin_qwen2.5_7b_test.sh rename to recipe/puffin/run_puffin_7b_test_ray.sh index dc9e23c527c..77c9c8de894 100644 --- a/recipe/puffin/run_puffin_qwen2.5_7b_test.sh +++ b/recipe/puffin/run_puffin_7b_test_ray.sh @@ -2,20 +2,26 @@ set -euxo pipefail project_name='puffin' -exp_name='Qwen2.5-7B-Puffin-Test' +exp_name='Qwen2.5-7B-Math-Puffin-Test' +# Ray +export RAY_ADDRESS=${RAY_ADDRESS:-"http://localhost:8265"} +export RUNTIME_ENV=${RUNTIME_ENV:-"./verl/trainer/runtime_env.yaml"} +export NNODES=${NNODES:-4} # Paths -MODEL_PATH=${MODEL_PATH:-"${HOME}/verl/models/Qwen2.5-7B"} -CKPTS_DIR=${CKPTS_DIR:-"${HOME}/verl/ckpts/${project_name}/${exp_name}"} -TRAIN_FILE=${TRAIN_FILE:-"${HOME}/verl/data/puffin_train.parquet"} -TEST_FILE=${TEST_FILE:-"${HOME}/verl/data/puffin_test.parquet"} +export RAY_DATA_HOME=${RAY_DATA_HOME:-"${HOME}/verl"} +export MODEL_PATH=${MODEL_PATH:-"${RAY_DATA_HOME}/models/Qwen2.5-Math-7B"} +export CKPTS_DIR=${CKPTS_DIR:-"${RAY_DATA_HOME}/ckpts/${project_name}/${exp_name}"} +export TRAIN_FILE=${TRAIN_FILE:-"${RAY_DATA_HOME}/data/puffin_train.parquet"} +export TEST_FILE=${TEST_FILE:-"${RAY_DATA_HOME}/data/puffin_test.parquet"} # Algorithm ## Train -max_prompt_length=$((1024 * 2)) -max_response_length=$((1024 * 8)) -# TODO -# force_append_eos=True +max_prompt_length=$((1024 * 1)) +max_response_length=$((1024 * 3)) +gen_prompt_bsz=512 +train_prompt_bsz=512 +train_prompt_mini_bsz=32 ## Validation val_top_k=-1 # 0 for HF rollout, -1 for vLLM rollout @@ -24,24 +30,29 @@ val_top_k=-1 # 0 for HF rollout, -1 for vLLM rollout use_dynamic_bsz=True infer_micro_batch_size=null train_micro_batch_size=null -offload=True +offload=False -python3 -m verl.trainer.main_ppo \ +ray job submit --no-wait --runtime-env="${RUNTIME_ENV}" \ + --working-dir "${PWD}" \ + -- python3 -m verl.trainer.main_ppo \ data.train_files="${TRAIN_FILE}" \ data.val_files="${TEST_FILE}" \ data.prompt_key=prompt \ data.truncation='left' \ data.max_prompt_length=${max_prompt_length} \ data.max_response_length=${max_response_length} \ - data.train_batch_size=512 \ + data.gen_batch_size=${gen_prompt_bsz} \ + data.train_batch_size=${train_prompt_bsz} \ data.truncation='left' \ actor_rollout_ref.rollout.n=16 \ actor_rollout_ref.actor.kl_loss_coef=0 \ - actor_rollout_ref.actor.clip_ratio=0.2 \ + actor_rollout_ref.actor.clip_ratio_low=0.2 \ + actor_rollout_ref.actor.clip_ratio_high=0.25 \ algorithm.adv_estimator=grpo \ algorithm.kl_ctrl.kl_coef=0.0 \ - algorithm.gamma=1.0 \ - algorithm.lam=0.95 \ + algorithm.filter_groups.enable=True \ + algorithm.filter_groups.fill_train_batch=True \ + algorithm.filter_groups.drop_last_mini_batch=True \ actor_rollout_ref.model.use_remove_padding=True \ actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \ actor_rollout_ref.ref.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \ @@ -57,7 +68,7 @@ python3 -m verl.trainer.main_ppo \ actor_rollout_ref.actor.optim.lr=1e-6 \ actor_rollout_ref.actor.optim.lr_warmup_steps=10 \ actor_rollout_ref.actor.optim.weight_decay=0.1 \ - actor_rollout_ref.actor.ppo_mini_batch_size=512 \ + actor_rollout_ref.actor.ppo_mini_batch_size=${train_prompt_mini_bsz} \ actor_rollout_ref.actor.ppo_micro_batch_size=${train_micro_batch_size} \ actor_rollout_ref.actor.fsdp_config.param_offload=${offload} \ actor_rollout_ref.actor.fsdp_config.optimizer_offload=${offload} \ @@ -67,8 +78,10 @@ python3 -m verl.trainer.main_ppo \ actor_rollout_ref.rollout.gpu_memory_utilization=0.85 \ actor_rollout_ref.rollout.log_prob_micro_batch_size=${infer_micro_batch_size} \ actor_rollout_ref.rollout.tensor_model_parallel_size=1 \ + actor_rollout_ref.rollout.enable_chunked_prefill=True \ + actor_rollout_ref.rollout.max_num_batched_tokens=$((max_prompt_length + max_response_length)) \ actor_rollout_ref.rollout.val_kwargs.top_k="${val_top_k}" \ - actor_rollout_ref.rollout.val_kwargs.top_p=0.7\ + actor_rollout_ref.rollout.val_kwargs.top_p=1.0\ actor_rollout_ref.rollout.val_kwargs.temperature=1.0 \ actor_rollout_ref.rollout.val_kwargs.n=1 \ actor_rollout_ref.rollout.val_kwargs.do_sample=True \ @@ -76,11 +89,13 @@ python3 -m verl.trainer.main_ppo \ actor_rollout_ref.ref.fsdp_config.param_offload=${offload} \ actor_rollout_ref.ref.ulysses_sequence_parallel_size=1 \ actor_rollout_ref.actor.fsdp_config.fsdp_size=-1 \ + custom_reward_function.overlong_buffer.len=512 \ + custom_reward_function.overlong_buffer.penalty_factor=1.0 \ trainer.logger=['console','wandb'] \ trainer.project_name="${project_name}" \ trainer.experiment_name="${exp_name}" \ trainer.n_gpus_per_node=8 \ - trainer.nnodes=1 \ + trainer.nnodes="${NNODES}" \ +trainer.val_before_train=True \ trainer.test_freq=2 \ trainer.save_freq=2 \ diff --git a/verl/protocol.py b/verl/protocol.py index 94d7410f687..a525feb9bb2 100644 --- a/verl/protocol.py +++ b/verl/protocol.py @@ -370,6 +370,41 @@ def select(self, batch_keys=None, non_tensor_batch_keys=None, meta_info_keys=Non return DataProto(batch=sub_batch, non_tensor_batch=non_tensor_batch, meta_info=sub_meta_info) + def sel_idxs(self, idxs): + """ + Select specific indices from the DataProto. + + Args: + idxs (torch.Tensor or numpy.ndarray or list): Indices to select + + Returns: + DataProto: A new DataProto containing only the selected indices + """ + if isinstance(idxs, list): + idxs = torch.tensor(idxs, dtype=torch.int32) + + if isinstance(idxs, np.ndarray): + idxs_np = idxs + idxs_torch = torch.from_numpy(idxs) + else: # torch.Tensor + idxs_torch = idxs + idxs_np = idxs.detach().cpu().numpy() + + if self.batch is not None: + # Use TensorDict's built-in indexing capabilities + selected_batch = TensorDict(source={ + key: tensor[idxs_torch] for key, tensor in self.batch.items() + }, + batch_size=(idxs_torch.shape[0],)) + else: + selected_batch = None + + selected_non_tensor = {} + for key, val in self.non_tensor_batch.items(): + selected_non_tensor[key] = val[idxs_np] + + return DataProto(batch=selected_batch, non_tensor_batch=selected_non_tensor, meta_info=self.meta_info) + def pop(self, batch_keys=None, non_tensor_batch_keys=None, meta_info_keys=None) -> 'DataProto': """Pop a subset of the DataProto via `batch_keys` and `meta_info_keys` diff --git a/verl/trainer/config/ppo_trainer.yaml b/verl/trainer/config/ppo_trainer.yaml index 40dd2e6fe94..54dfb619ca6 100644 --- a/verl/trainer/config/ppo_trainer.yaml +++ b/verl/trainer/config/ppo_trainer.yaml @@ -6,7 +6,8 @@ data: reward_fn_key: data_source max_prompt_length: 512 max_response_length: 512 - train_batch_size: 1024 + gen_batch_size: 1024 + train_batch_size: ${data.gen_batch_size} val_batch_size: null # DEPRECATED: Validation datasets are sent to inference engines as a whole batch, which will schedule the memory themselves return_raw_input_ids: False # This should be set to true when the tokenizer between policy and rm differs return_raw_chat: False @@ -167,6 +168,10 @@ reward_model: custom_reward_function: path: null name: compute_score + overlong_buffer: + len: 0 + penalty_factor: 1.0 + log: False algorithm: gamma: 1.0 @@ -176,6 +181,10 @@ algorithm: kl_ctrl: type: fixed kl_coef: 0.001 + filter_groups: + enable: False + fill_train_batch: True + drop_last_mini_batch: True trainer: balance_batch: True diff --git a/verl/trainer/main_ppo.py b/verl/trainer/main_ppo.py index 65a7865c92e..31623ad7b30 100644 --- a/verl/trainer/main_ppo.py +++ b/verl/trainer/main_ppo.py @@ -146,10 +146,14 @@ def main_task(config): raise NotImplementedError compute_score = get_custom_reward_fn(config) - reward_fn = reward_manager_cls(tokenizer=tokenizer, num_examine=0, compute_score=compute_score, reward_fn_key=config.data.reward_fn_key) + reward_fn = reward_manager_cls(tokenizer=tokenizer, num_examine=0, compute_score=compute_score, reward_fn_key=config.data.reward_fn_key, + max_resp_len=config.data.max_response_length, + overlong_buffer_cfg=config.custom_reward_function.overlong_buffer) # Note that we always use function-based RM for validation - val_reward_fn = reward_manager_cls(tokenizer=tokenizer, num_examine=1, compute_score=compute_score, reward_fn_key=config.data.reward_fn_key) + val_reward_fn = reward_manager_cls(tokenizer=tokenizer, num_examine=1, compute_score=compute_score, reward_fn_key=config.data.reward_fn_key, + max_resp_len=config.data.max_response_length, + overlong_buffer_cfg=config.custom_reward_function.overlong_buffer) resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping) diff --git a/verl/trainer/ppo/metric_utils.py b/verl/trainer/ppo/metric_utils.py index 4f0859d0058..49f54d1a614 100644 --- a/verl/trainer/ppo/metric_utils.py +++ b/verl/trainer/ppo/metric_utils.py @@ -16,9 +16,10 @@ """ import torch -from typing import Any, Dict, List +from typing import Any, Dict, List, Callable import numpy as np from verl import DataProto +from collections import Counter, defaultdict def reduce_metrics(metrics: Dict[str, List[Any]]) -> Dict[str, Any]: @@ -166,3 +167,37 @@ def compute_throughout_metrics(batch: DataProto, timing_raw: Dict[str, float], n 'perf/time_per_step': time, 'perf/throughput': total_num_tokens / (time * n_gpus), } + +def bootstrap_metric(data: list[dict[str, Any]], + subset_size: int, + reduce_fns: list[Callable[[np.ndarray], float]], + n_bootstrap: int = 1000, + seed: int = 42 + ) -> list[tuple[float, float]]: + """ + Bootstrap the metric to get the confidence interval + """ + np.random.seed(seed) + + bootstrap_metric_lsts = [[] for _ in range(len(reduce_fns))] + for _ in range(n_bootstrap): + bootstrap_idxs = np.random.choice(len(data), size=subset_size, replace=True) + bootstrap_data = [data[i] for i in bootstrap_idxs] + for i, reduce_fn in enumerate(reduce_fns): + bootstrap_metric_lsts[i].append(reduce_fn(bootstrap_data)) + return [(np.mean(lst), np.std(lst)) for lst in bootstrap_metric_lsts] + +def calc_maj_val(data: list[dict[str, Any]], vote_key: str, val_key: str) -> float: + """ + Calculate the majority voting metric + """ + vote2vals = defaultdict(list) + for d in data: + vote2vals[d[vote_key]].append(d[val_key]) + + vote2cnt = {k: len(v) for k, v in vote2vals.items()} + maj_vote = max(vote2cnt, key=vote2cnt.get) + + maj_val = vote2vals[maj_vote][0] + + return maj_val diff --git a/verl/trainer/ppo/ray_trainer.py b/verl/trainer/ppo/ray_trainer.py index d8228b1514b..e091d23c45f 100644 --- a/verl/trainer/ppo/ray_trainer.py +++ b/verl/trainer/ppo/ray_trainer.py @@ -24,6 +24,8 @@ from pprint import pprint from typing import Type, Dict from copy import deepcopy +from collections import defaultdict +from functools import partial import ray import numpy as np @@ -35,7 +37,7 @@ from verl.single_controller.ray import RayResourcePool, RayWorkerGroup, RayClassWithInitArgs from verl.single_controller.ray.base import create_colocated_worker_cls from verl.trainer.ppo import core_algos -from verl.trainer.ppo.metric_utils import compute_data_metrics, compute_throughout_metrics, compute_timing_metrics, reduce_metrics +from verl.trainer.ppo.metric_utils import compute_data_metrics, compute_throughout_metrics, compute_timing_metrics, reduce_metrics, bootstrap_metric, calc_maj_val from verl.utils.seqlen_balancing import get_seqlen_balanced_partitions, log_seqlen_unbalance from verl.utils.checkpoint.checkpoint_manager import find_latest_ckpt_path from verl.utils.dataset.rl_dataset import RLHFDataset, collate_fn @@ -314,6 +316,10 @@ def _validate_config(self): # number of GPUs total n_gpus = config.trainer.n_gpus_per_node * config.trainer.nnodes + if not config.algorithm.filter_groups.enable: + assert config.data.train_batch_size == config.data.gen_batch_size, \ + f"train_batch_size must be equal to gen_batch_size when filter_groups.enable is False, but got {config.data.train_batch_size =} and {config.data.gen_batch_size =}" + # 1. Check total batch size for data correctness real_train_batch_size = config.data.train_batch_size * config.actor_rollout_ref.rollout.n assert real_train_batch_size % n_gpus == 0, \ @@ -422,7 +428,7 @@ def _create_dataloader(self): sampler = SequentialSampler(data_source=self.train_dataset) self.train_dataloader = StatefulDataLoader(dataset=self.train_dataset, - batch_size=self.config.data.train_batch_size, + batch_size=self.config.data.gen_batch_size, num_workers=8, drop_last=True, collate_fn=collate_fn, @@ -494,8 +500,8 @@ def _maybe_log_val_generations(self, inputs, outputs, scores): self.validation_generations_logger.log(self.config.trainer.logger, samples, self.global_steps) def _validate(self): - reward_tensor_lst = [] data_source_lst = [] + reward_extra_infos_dict: dict[str, list] = defaultdict(list) # Lists to collect samples for the table sample_inputs = [] @@ -511,6 +517,7 @@ def _validate(self): # Store original inputs input_ids = test_batch.batch['input_ids'] + # TODO: Can we keep special tokens except for padding tokens? input_texts = [self.tokenizer.decode(ids, skip_special_tokens=True) for ids in input_ids] sample_inputs.extend(input_texts) @@ -549,33 +556,84 @@ def _validate(self): test_batch = test_batch.union(test_output_gen_batch) # evaluate using reward_function - reward_tensor = self.val_reward_fn(test_batch) + result = self.val_reward_fn(test_batch, return_dict=True) + reward_tensor = result["reward_tensor"] + reward_extra_info = result["reward_extra_info"] + for key, lst in reward_extra_info.items(): + reward_extra_infos_dict[key].extend(lst) # Store scores scores = reward_tensor.sum(-1).cpu().tolist() sample_scores.extend(scores) - reward_tensor_lst.append(reward_tensor) data_source_lst.append(test_batch.non_tensor_batch.get('data_source', ['unknown'] * reward_tensor.shape[0])) self._maybe_log_val_generations(inputs=sample_inputs, outputs=sample_outputs, scores=sample_scores) - reward_tensor = torch.cat(reward_tensor_lst, dim=0).sum(-1).cpu() # (batch_size,) data_sources = np.concatenate(data_source_lst, axis=0) - # evaluate test_score based on data source - data_source_reward = {} - for i in range(reward_tensor.shape[0]): - data_source = data_sources[i] - if data_source not in data_source_reward: - data_source_reward[data_source] = [] - data_source_reward[data_source].append(reward_tensor[i].item()) + data_src2prompt2var2vals = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + for sample_idx, data_source in enumerate(data_sources): + prompt = sample_inputs[sample_idx] + + var2vals = data_src2prompt2var2vals[data_source][prompt] + var2vals["final_reward"].append(sample_scores[sample_idx]) + for metric_name, metric_vals in reward_extra_infos_dict.items(): + var2vals[metric_name].append(metric_vals[sample_idx]) + + data_src2prompt2var2metric = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) + for data_source, prompt2var2vals in data_src2prompt2var2vals.items(): + for prompt, var2vals in prompt2var2vals.items(): + n_resps = len(var2vals["final_reward"]) + preds = var2vals["pred"] + for var_name, var_vals in var2vals.items(): + if var_name in ["pred", "final_reward"]: + continue + metric = {} + + metric[f"mean@{n_resps}"] = np.mean(var_vals) + metric[f"std@{n_resps}"] = np.std(var_vals) + + ns = [] + n = 2 + while n < n_resps: + ns.append(n) + n *= 2 + ns.append(n_resps) + + data = [{"val": val, "pred": pred} for val, pred in zip(var_vals, preds)] + for n in ns: + + (bon_mean, bon_std), (won_mean, won_std), (maj_n_mean, maj_n_std) = bootstrap_metric( + data, + subset_size=n, + reduce_fns=[ + lambda arr: np.max([d["val"] for d in arr]), + lambda arr: np.min([d["val"] for d in arr]), + partial(calc_maj_val, vote_key="pred", val_key="val") + ]) + metric[f"best@{n}/mean"], metric[f"best@{n}/std"] = bon_mean, bon_std + metric[f"worst@{n}/mean"], metric[f"worst@{n}/std"] = won_mean, won_std + metric[f"maj@{n}/mean"], metric[f"maj@{n}/std"] = maj_n_mean, maj_n_std + + data_src2prompt2var2metric[data_source][prompt][var_name] = metric + + data_src2var2metric2prompt_vals = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + for data_source, prompt2var2metric in data_src2prompt2var2metric.items(): + for prompt, var2metric in prompt2var2metric.items(): + for var_name, metric in var2metric.items(): + for metric_name, metric_val in metric.items(): + data_src2var2metric2prompt_vals[data_source][var_name][metric_name].append(metric_val) metric_dict = {} - for data_source, rewards in data_source_reward.items(): - metric_dict[f'val/test_score/{data_source}'] = np.mean(rewards) + for data_source, var2metric2prompt_vals in data_src2var2metric2prompt_vals.items(): + for var_name, metric2prompt_vals in var2metric2prompt_vals.items(): + for metric_name, prompt_vals in metric2prompt_vals.items(): + pfx = f"{data_source}/{var_name}/{metric_name}" + metric_dict[pfx] = np.mean(prompt_vals) - return metric_dict + val_metric_dict = {f"val/{key}": value for key, value in metric_dict.items()} + return val_metric_dict def init_workers(self): """Init resource pool and worker group""" @@ -827,6 +885,77 @@ def fit(self): batch = batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True) batch = batch.union(gen_batch_output) + with _timer('reward', timing_raw): + # compute scores. Support both model and function-based. + # We first compute the scores using reward model. Then, we call reward_fn to combine + # the results from reward model and rule-based results. + if self.use_rm: + # we first compute reward model score + reward_tensor = self.rm_wg.compute_rm_score(batch) + batch = batch.union(reward_tensor) + + # we combine with rule-based rm + reward_tensor = self.reward_fn(batch) + batch.batch['token_level_scores'] = reward_tensor + + # compute rewards. apply_kl_penalty if available + if not self.config.actor_rollout_ref.actor.get('use_kl_loss', False): + batch, kl_metrics = apply_kl_penalty(batch, + kl_ctrl=self.kl_ctrl, + kl_penalty=self.config.algorithm.kl_penalty) + metrics.update(kl_metrics) + else: + batch.batch['token_level_rewards'] = batch.batch['token_level_scores'] + + if self.config.algorithm.filter_groups.enable: + filter_metric_dict = {} + + uid2seq_rewards = defaultdict(list) + for uid, tok_rewards in zip(batch.non_tensor_batch['uid'], batch.batch['token_level_rewards']): + seq_reward = torch.sum(tok_rewards).item() + uid2seq_rewards[uid].append(seq_reward) + + uid2seq_reward_std = {} + for uid, seq_rewards in uid2seq_rewards.items(): + uid2seq_reward_std[uid] = np.std(seq_rewards) + + kept_uids = [uid for uid, std in uid2seq_reward_std.items() if std > 0] + filter_metric_dict["non_uni_rew_prompt_ratio"] = len(kept_uids) / len(uid2seq_rewards) + filter_metric_dict["non_uni_rew_prompt_bsz"] = len(kept_uids) + + kept_idxs = [] + + + train_prompt_bsz = len(batch.batch) + fill_train_batch = self.config.algorithm.filter_groups.fill_train_batch + if len(kept_uids) > train_prompt_bsz or not fill_train_batch: + kept_uids = kept_uids[:train_prompt_bsz] + else: + for uid in uid2seq_reward_std.keys(): + if uid not in kept_uids: + kept_uids.append(uid) + if len(kept_uids) == train_prompt_bsz: + break + + for idx, uid in enumerate(batch.non_tensor_batch['uid']): + if uid in kept_uids: + kept_idxs.append(idx) + filter_metric_dict["non_uni_rew_traj_bsz"] = len(kept_idxs) + + world_size = self.actor_rollout_wg.world_size + kept_idxs = kept_idxs[:len(kept_idxs) // world_size * world_size] + if self.config.algorithm.filter_groups.drop_last_mini_batch: + train_traj_mini_bsz = self.config.actor_rollout_ref.actor.ppo_mini_batch_size * self.config.actor_rollout_ref.rollout.n + if len(kept_idxs) > train_traj_mini_bsz: + kept_idxs = kept_idxs[:len(kept_idxs) // train_traj_mini_bsz * train_traj_mini_bsz] + else: + print(f'[WARNING] {len(kept_idxs)=} < {train_traj_mini_bsz=}') + + filter_metric_dict["final_traj_ratio"] = len(kept_idxs) / len(batch.batch) + filter_metric_dict["final_traj_bsz"] = len(kept_idxs) + + batch = batch.sel_idxs(kept_idxs) + # balance the number of valid tokens on each dp rank. # Note that this breaks the order of data inside the batch. # Please take care when you implement group based adv computation such as GRPO and rloo @@ -854,27 +983,6 @@ def fit(self): batch = batch.union(values) with _timer('adv', timing_raw): - # compute scores. Support both model and function-based. - # We first compute the scores using reward model. Then, we call reward_fn to combine - # the results from reward model and rule-based results. - if self.use_rm: - # we first compute reward model score - reward_tensor = self.rm_wg.compute_rm_score(batch) - batch = batch.union(reward_tensor) - - # we combine with rule-based rm - reward_tensor = self.reward_fn(batch) - batch.batch['token_level_scores'] = reward_tensor - - # compute rewards. apply_kl_penalty if available - if not self.config.actor_rollout_ref.actor.get('use_kl_loss', False): - batch, kl_metrics = apply_kl_penalty(batch, - kl_ctrl=self.kl_ctrl, - kl_penalty=self.config.algorithm.kl_penalty) - metrics.update(kl_metrics) - else: - batch.batch['token_level_rewards'] = batch.batch['token_level_scores'] - # compute advantages, executed on the driver process batch = compute_advantage(batch, adv_estimator=self.config.algorithm.adv_estimator, diff --git a/verl/utils/reward_score/__init__.py b/verl/utils/reward_score/__init__.py index d72005da500..5946dbf2c41 100644 --- a/verl/utils/reward_score/__init__.py +++ b/verl/utils/reward_score/__init__.py @@ -27,7 +27,7 @@ def _default_compute_score(data_source, solution_str, ground_truth, extra_info=N res = math_verify.compute_score(solution_str, ground_truth) elif data_source == 'math_puffin': from . import math_puffin - res = math_puffin.compute_score + res = math_puffin.compute_score(solution_str, ground_truth) elif data_source in [ 'numina_aops_forum', 'numina_synthetic_math', 'numina_amc_aime', 'numina_synthetic_amc', 'numina_cn_k12', 'numina_olympiads' @@ -43,7 +43,9 @@ def _default_compute_score(data_source, solution_str, ground_truth, extra_info=N else: raise NotImplementedError - if isinstance(res, (int, float, bool)): + if isinstance(res, dict): + return res + elif isinstance(res, (int, float, bool)): return float(res) else: return float(res[0]) diff --git a/verl/utils/reward_score/math_puffin.py b/verl/utils/reward_score/math_puffin.py index e1a4863872f..752eafe201c 100644 --- a/verl/utils/reward_score/math_puffin.py +++ b/verl/utils/reward_score/math_puffin.py @@ -17,9 +17,6 @@ import signal from typing import Optional -import sympy -from sympy.parsing.latex import parse_latex - def last_boxed_only_string(string: str) -> Optional[str]: """Extract the last LaTeX boxed expression from a string. @@ -37,7 +34,7 @@ def last_boxed_only_string(string: str) -> Optional[str]: i = idx right_brace_idx = None num_left_braces_open = 0 - + while i < len(string): if string[i] == "{": num_left_braces_open += 1 @@ -82,45 +79,6 @@ def __enter__(self): def __exit__(self, type, value, traceback): signal.alarm(0) - -def is_equiv(x1: str, x2: str) -> bool: - """ - Args: - x1, x2: normalized LaTeX string - """ - try: - with timeout(seconds=10): - try: - parsed_x1 = parse_latex(x1) - parsed_x2 = parse_latex(x2) - except ( - sympy.parsing.latex.errors.LaTeXParsingError, - sympy.SympifyError, - TypeError, - ): - return False - - try: - diff = parsed_x1 - parsed_x2 - except TypeError: - return False - - try: - if sympy.simplify(diff) == 0: - return True - else: - return False - except ValueError: - return False - - except TimeoutError: - return False - except ImportError as e: - raise - except Exception as e: - return False - - # Constants for normalization SUBSTITUTIONS = [ ("an ", ""), ("a ", ""), (".$", "$"), ("\\$", ""), @@ -205,7 +163,7 @@ def is_correct_minerva(solution_str: str, gt: str, gt_need_extract: bool = False else: gt = normalize_final_answer(gt) - return (pred == gt or is_equiv(pred, gt)), pred + return (pred == gt), pred def is_correct_strict_box(pred: str, gt: str, pause_tokens_index: Optional[list[int]] = None) -> tuple[int, Optional[str]]: @@ -247,14 +205,16 @@ def verify(solution_str: str, answer: str, strict_box_verify: bool = False, True if the solution is correct, False otherwise """ if strict_box_verify: - corr, _ = is_correct_strict_box(solution_str, answer, pause_tokens_index) - return corr == 1 - - corr, _ = is_correct_minerva(solution_str, answer) - return corr + correct, pred = is_correct_strict_box(solution_str, answer, pause_tokens_index) + return correct == 1, pred + correct, pred = is_correct_minerva(solution_str, answer) + return correct, pred -def compute_score(solution_str: str, ground_truth: str, config, pause_tokens_index: Optional[list[int]] = None) -> float: +def compute_score(solution_str: str, + ground_truth: str, + strict_box_verify: bool = False, + pause_tokens_index: Optional[list[int]] = None) -> float: """Compute the reward score for a solution. Args: @@ -268,9 +228,15 @@ def compute_score(solution_str: str, ground_truth: str, config, pause_tokens_ind """ # Limit solution length for efficiency solution_str = solution_str[-300:] # The longest answer in MATH-500 has 159 characters - + # Verify the solution - strict_box_verify = config.reward_model.strict_box_verify - correct = verify(solution_str, ground_truth, strict_box_verify, pause_tokens_index) - - return 1.0 if correct else -1.0 + correct, pred = verify(solution_str, ground_truth, strict_box_verify, pause_tokens_index) + + reward = 1.0 if correct else -1.0 + acc = correct + + return { + "reward": reward, + "acc": acc, + "pred": pred, + } diff --git a/verl/workers/actor/dp_actor.py b/verl/workers/actor/dp_actor.py index 1057c6d08a6..7116b6545b9 100644 --- a/verl/workers/actor/dp_actor.py +++ b/verl/workers/actor/dp_actor.py @@ -322,7 +322,7 @@ def update_policy(self, data: DataProto): loss.backward() data = { - 'actor/entropy_loss': entropy_loss.detach().item(), + 'actor/entropy': entropy_loss.detach().item(), 'actor/pg_loss': pg_loss.detach().item(), 'actor/pg_clipfrac': pg_clipfrac.detach().item(), 'actor/ppo_kl': ppo_kl.detach().item(), diff --git a/verl/workers/reward_manager/naive.py b/verl/workers/reward_manager/naive.py index 5206fb48d52..23e42b79175 100644 --- a/verl/workers/reward_manager/naive.py +++ b/verl/workers/reward_manager/naive.py @@ -15,18 +15,25 @@ from verl import DataProto from verl.utils.reward_score import _default_compute_score import torch +from collections import defaultdict class NaiveRewardManager: """The reward manager. """ - def __init__(self, tokenizer, num_examine, compute_score=None, reward_fn_key='data_source') -> None: + def __init__(self, tokenizer, num_examine, compute_score=None, reward_fn_key='data_source', max_resp_len=None, overlong_buffer_cfg = None) -> None: self.tokenizer = tokenizer self.num_examine = num_examine # the number of batches of decoded responses to print to the console self.compute_score = compute_score or _default_compute_score self.reward_fn_key = reward_fn_key + self.overlong_buffer_cfg = overlong_buffer_cfg + self.max_resp_len = max_resp_len + if self.overlong_buffer_cfg is not None: + assert self.max_resp_len is not None, f"max_resp_len must be provided if {overlong_buffer_cfg=}, but got None" + + # TODO: Is this still necessary in algorithms other than PRIME? def verify(self, data): scores = [] for i in range(len(data)): @@ -63,14 +70,20 @@ def verify(self, data): data.batch['acc'] = torch.tensor(scores, dtype=torch.float32, device=prompt_ids.device) return scores - def __call__(self, data: DataProto): + def __call__(self, data: DataProto, return_dict: bool = False): """We will expand this function gradually based on the available datasets""" # If there is rm score, we directly return rm score. Otherwise, we compute via rm_score_fn if 'rm_scores' in data.batch.keys(): - return data.batch['rm_scores'] + if return_dict: + return { + "reward": data.batch['rm_scores'] + } + else: + return data.batch['rm_scores'] reward_tensor = torch.zeros_like(data.batch['responses'], dtype=torch.float32) + reward_extra_info = defaultdict(list) already_print_data_sources = {} @@ -91,6 +104,9 @@ def __call__(self, data: DataProto): # decode prompt_str = self.tokenizer.decode(valid_prompt_ids) response_str = self.tokenizer.decode(valid_response_ids) + eos_token = self.tokenizer.eos_token + if response_str.endswith(eos_token): + response_str = response_str[:-len(eos_token)] ground_truth = data_item.non_tensor_batch['reward_model']['ground_truth'] @@ -98,13 +114,38 @@ def __call__(self, data: DataProto): extra_info = data_item.non_tensor_batch.get('extra_info', None) - score = self.compute_score( + result = self.compute_score( data_source=data_source, solution_str=response_str, ground_truth=ground_truth, extra_info=extra_info, ) - reward_tensor[i, valid_response_length - 1] = score + + final_reward = 0 + + reward: float + if isinstance(result, dict): + assert "reward" in result + reward = result["reward"] + else: + reward = result + + for key, value in result.items(): + reward_extra_info[key].append(value) + + final_reward += reward + + overlong_buffer_len = self.overlong_buffer_cfg.len + if overlong_buffer_len > 0: + overlong_penalty_factor = self.overlong_buffer_cfg.penalty_factor + exceed_len = valid_response_length - (self.max_resp_len - overlong_buffer_len) + overlong_reward = max(-exceed_len / overlong_buffer_len * overlong_penalty_factor, 0) + final_reward += overlong_reward + if self.overlong_buffer_cfg.log: + reward_extra_info["overlong_reward"].append(overlong_reward) + reward_extra_info["overlong"].append(overlong_reward < 0) + + reward_tensor[i, valid_response_length - 1] = final_reward if data_source not in already_print_data_sources: already_print_data_sources[data_source] = 0 @@ -114,6 +155,13 @@ def __call__(self, data: DataProto): print("[prompt]", prompt_str) print("[response]", response_str) print("[ground_truth]", ground_truth) - print("[score]", score) - - return reward_tensor + for key, value in result.items(): + print(f"[{key}]", value) + + if return_dict: + return { + "reward_tensor": reward_tensor, + "reward_extra_info": reward_extra_info, + } + else: + return reward_tensor