Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions examples/split_placement/split_monkey_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
compute_timing_metrics,
marked_timer,
)
from verl.trainer.ppo.reward import compute_reward
from verl.utils.metric import reduce_metrics


Expand Down Expand Up @@ -95,14 +96,22 @@ def fit(self):
gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch)

batch = batch.union(gen_baseline_output)
reward_baseline_tensor = self.reward_fn(batch)
# compute reward model score on batch
rm_scores = None
if self.use_rm and "rm_scores" not in batch.batch.keys():
rm_scores = self.rm_wg.compute_rm_score(batch)
batch = batch.union(rm_scores)
reward_baseline_tensor, _ = compute_reward(batch, self.reward_fn)
reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1)

batch.pop(batch_keys=list(gen_baseline_output.batch.keys()))
keys_to_pop = set(gen_baseline_output.batch.keys())
if rm_scores is not None:
keys_to_pop.update(rm_scores.batch.keys())
batch.pop(batch_keys=list(keys_to_pop))

batch.batch["reward_baselines"] = reward_baseline_tensor

del gen_baseline_batch, gen_baseline_output
del rm_scores, gen_baseline_batch, gen_baseline_output

batch.non_tensor_batch["uid"] = np.array(
[str(uuid.uuid4()) for _ in range(len(batch.batch))], dtype=object
Expand Down Expand Up @@ -142,13 +151,13 @@ def fit(self):
# compute scores. Support both model and function-based.
# We first compute the scores using reward model. Then, we call reward_fn to combine
# the results from reward model and rule-based results.
if self.use_rm:
if self.use_rm and "rm_scores" not in batch.batch.keys():
# we first compute reward model score
reward_tensor = self.rm_wg.compute_rm_score(batch)
batch = batch.union(reward_tensor)

# we combine with rule-based rm
reward_tensor = self.reward_fn(batch)
reward_tensor, _ = compute_reward(batch, self.reward_fn)
batch.batch["token_level_scores"] = reward_tensor

# compute rewards. apply_kl_penalty if available
Expand Down
27 changes: 14 additions & 13 deletions recipe/dapo/dapo_ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
compute_advantage,
compute_response_mask,
)
from verl.trainer.ppo.reward import compute_reward
from verl.utils.profiler import marked_timer
from verl.utils.rollout_skip import RolloutSkip

Expand Down Expand Up @@ -150,14 +151,22 @@ def fit(self):
gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch)

new_batch = new_batch.union(gen_baseline_output)
reward_baseline_tensor = self.reward_fn(new_batch)
# compute reward model score on new_batch
rm_scores = None
if self.use_rm and "rm_scores" not in new_batch.batch.keys():
rm_scores = self.rm_wg.compute_rm_score(new_batch)
new_batch = new_batch.union(rm_scores)
reward_baseline_tensor, _ = compute_reward(new_batch, self.reward_fn)
reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1)

new_batch.pop(batch_keys=list(gen_baseline_output.batch.keys()))
keys_to_pop = set(gen_baseline_output.batch.keys())
if rm_scores is not None:
keys_to_pop.update(rm_scores.batch.keys())
new_batch.pop(batch_keys=list(keys_to_pop))

new_batch.batch["reward_baselines"] = reward_baseline_tensor

del gen_baseline_batch, gen_baseline_output
del rm_scores, gen_baseline_batch, gen_baseline_output

new_batch.non_tensor_batch["uid"] = np.array(
[str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtype=object
Expand All @@ -170,21 +179,13 @@ def fit(self):
# compute scores. Support both model and function-based.
# We first compute the scores using reward model. Then, we call reward_fn to combine
# the results from reward model and rule-based results.
if self.use_rm:
if self.use_rm and "rm_scores" not in new_batch.batch.keys():
# we first compute reward model score
reward_tensor = self.rm_wg.compute_rm_score(new_batch)
new_batch = new_batch.union(reward_tensor)

# we combine with rule-based rm
reward_extra_infos_dict: dict[str, list]
try:
reward_result = self.reward_fn(new_batch, return_dict=True)
reward_tensor = reward_result["reward_tensor"]
reward_extra_infos_dict = reward_result.get("reward_extra_info", {})
except Exception as e:
print(f"Error in reward_fn: {e}")
reward_tensor = self.reward_fn(new_batch)
reward_extra_infos_dict = {}
reward_tensor, reward_extra_infos_dict = compute_reward(new_batch, self.reward_fn)

new_batch.batch["token_level_scores"] = reward_tensor

Expand Down
27 changes: 14 additions & 13 deletions recipe/entropy/entropy_ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
compute_advantage,
compute_response_mask,
)
from verl.trainer.ppo.reward import compute_reward
from verl.utils.profiler import simple_timer


Expand Down Expand Up @@ -129,14 +130,22 @@ def fit(self):
gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch)

new_batch = new_batch.union(gen_baseline_output)
reward_baseline_tensor = self.reward_fn(new_batch)
# compute reward model score on new_batch
rm_scores = None
if self.use_rm and "rm_scores" not in new_batch.batch.keys():
rm_scores = self.rm_wg.compute_rm_score(new_batch)
new_batch = new_batch.union(rm_scores)
reward_baseline_tensor, _ = compute_reward(new_batch, self.reward_fn)
reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1)

new_batch.pop(batch_keys=list(gen_baseline_output.batch.keys()))
keys_to_pop = set(gen_baseline_output.batch.keys())
if rm_scores is not None:
keys_to_pop.update(rm_scores.batch.keys())
new_batch.pop(batch_keys=list(keys_to_pop))

new_batch.batch["reward_baselines"] = reward_baseline_tensor

del gen_baseline_batch, gen_baseline_output
del rm_scores, gen_baseline_batch, gen_baseline_output

new_batch.non_tensor_batch["uid"] = np.array(
[str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtype=object
Expand All @@ -149,21 +158,13 @@ def fit(self):
# compute scores. Support both model and function-based.
# We first compute the scores using reward model. Then, we call reward_fn to combine
# the results from reward model and rule-based results.
if self.use_rm:
if self.use_rm and "rm_scores" not in new_batch.batch.keys():
# we first compute reward model score
reward_tensor = self.rm_wg.compute_rm_score(new_batch)
new_batch = new_batch.union(reward_tensor)

# we combine with rule-based rm
reward_extra_infos_dict: dict[str, list]
try:
reward_result = self.reward_fn(new_batch, return_dict=True)
reward_tensor = reward_result["reward_tensor"]
reward_extra_infos_dict = reward_result["reward_extra_info"]
except Exception as e:
print(f"Error in reward_fn: {e}")
reward_tensor = self.reward_fn(new_batch)
reward_extra_infos_dict = {}
reward_tensor, reward_extra_infos_dict = compute_reward(new_batch, self.reward_fn)

new_batch.batch["token_level_scores"] = reward_tensor

Expand Down
93 changes: 53 additions & 40 deletions recipe/prime/prime_ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,45 @@ def _load_checkpoint(self):
if isinstance(self.train_dataloader.dataset, RLHFDataset):
self.train_dataloader.dataset.resume_dataset_state()

def compute_reward(self, batch: DataProto, n_samples: int):
update_style = self.config.reward_model.model.get("update", "none")
reward_output_metrics = {}
if update_style == "none": # only run forward
reward_output = self.rm_wg.compute_rm_score(batch)
elif update_style == "after": # update and directly return the reward
reward_output = self.rm_wg.update_rm(batch)
elif update_style == "before": # update reward model, and then run forward
reward_output = self.rm_wg.update_rm(batch)
if "metrics" in reward_output.meta_info.keys():
reward_output_metrics = reduce_metrics(reward_output.meta_info["metrics"])

reward_output = self.rm_wg.compute_rm_score(batch)
elif update_style == "reverse": # run forward to calculate statistics, then update reward model
reward_output = self.rm_wg.compute_rm_score(batch)

# broadcast q and acc tensor to each result
bc_td = DataProto.from_dict(
tensors={
"Q_bc": reward_output.batch["q"]
.sum(dim=-1)
.view(-1, n_samples)
.unsqueeze(1)
.expand(-1, n_samples, -1)
.reshape(-1, n_samples),
"acc_bc": batch.batch["acc"]
.view(-1, n_samples)
.unsqueeze(1)
.expand(-1, n_samples, -1)
.reshape(-1, n_samples),
}
)
batch = batch.union(bc_td)
reward_output = self.rm_wg.update_rm(batch)
else:
raise NotImplementedError

return reward_output, reward_output_metrics

def fit(self):
"""
The training loop of PPO.
Expand Down Expand Up @@ -389,10 +428,19 @@ def fit(self):
gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch)

batch = batch.union(gen_baseline_output)
reward_baseline_tensor = self.reward_fn(batch)
rm_scores, _ = self.compute_reward(batch, 1)
reward_baseline_tensor = rm_scores.batch.get(
"rm_scores", rm_scores.batch.get("acc_bc", None)
)
if reward_baseline_tensor is None:
raise ValueError(
"Neither 'rm_scores' nor 'acc_bc' found in reward model output for baseline."
)
reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1)

batch.pop(batch_keys=list(gen_baseline_output.batch.keys()))
keys_to_pop = set(gen_baseline_output.batch.keys())
keys_to_pop.update(rm_scores.batch.keys())
batch.pop(batch_keys=list(keys_to_pop))

batch.batch["reward_baselines"] = reward_baseline_tensor

Expand Down Expand Up @@ -448,46 +496,11 @@ def fit(self):

with simple_timer("adv", timing_raw):
if self.use_rm:
update_style = self.config.reward_model.model.get("update", "none")
if update_style == "none": # only run forward
reward_output = self.rm_wg.compute_rm_score(batch)
elif update_style == "after": # update and directly return the reward
reward_output = self.rm_wg.update_rm(batch)
elif update_style == "before": # update reward model, and then run forward
reward_output = self.rm_wg.update_rm(batch)
if "metrics" in reward_output.meta_info.keys():
reward_output_metrics = reduce_metrics(reward_output.meta_info["metrics"])
metrics.update(reward_output_metrics)

reward_output = self.rm_wg.compute_rm_score(batch)
elif (
update_style == "reverse"
): # run forward to calculate statistics, then update reward model
reward_output = self.rm_wg.compute_rm_score(batch)
# broadcast q and acc tensor to each result
bc_td = DataProto.from_dict(
tensors={
"Q_bc": reward_output.batch["q"]
.sum(dim=-1)
.view(-1, n_samples)
.unsqueeze(1)
.expand(-1, n_samples, -1)
.reshape(-1, n_samples),
"acc_bc": batch.batch["acc"]
.view(-1, n_samples)
.unsqueeze(1)
.expand(-1, n_samples, -1)
.reshape(-1, n_samples),
}
)
batch = batch.union(bc_td)
reward_output = self.rm_wg.update_rm(batch)
else:
raise NotImplementedError
reward_output, reward_output_metrics = self.compute_reward(batch, n_samples)
batch = batch.union(reward_output)
if "metrics" in reward_output.meta_info.keys():
reward_output_metrics = reduce_metrics(reward_output.meta_info["metrics"])
metrics.update(reward_output_metrics)
reward_output_metrics.update(reduce_metrics(reward_output.meta_info["metrics"]))
metrics.update(reward_output_metrics)

# compute advantages, executed on the driver process
batch = compute_advantage(
Expand Down
16 changes: 12 additions & 4 deletions recipe/sppo/sppo_ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,22 @@ def fit(self):
gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch)

batch = batch.union(gen_baseline_output)
reward_baseline_tensor = self.reward_fn(batch)
# compute reward model score on batch
rm_scores = None
if self.use_rm and "rm_scores" not in batch.batch.keys():
rm_scores = self.rm_wg.compute_rm_score(batch)
batch = batch.union(rm_scores)
reward_baseline_tensor, _ = compute_reward(batch, self.reward_fn)
reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1)

batch.pop(batch_keys=list(gen_baseline_output.batch.keys()))
keys_to_pop = set(gen_baseline_output.batch.keys())
if rm_scores is not None:
keys_to_pop.update(rm_scores.batch.keys())
batch.pop(batch_keys=list(keys_to_pop))

batch.batch["reward_baselines"] = reward_baseline_tensor

del gen_baseline_batch, gen_baseline_output
del rm_scores, gen_baseline_batch, gen_baseline_output

batch.non_tensor_batch["uid"] = np.array(
[str(uuid.uuid4()) for _ in range(len(batch.batch))], dtype=object
Expand All @@ -233,7 +241,7 @@ def fit(self):

with simple_timer("reward", timing_raw):
# compute reward model score
if self.use_rm:
if self.use_rm and "rm_scores" not in batch.batch.keys():
reward_tensor = self.rm_wg.compute_rm_score(batch)
batch = batch.union(reward_tensor)

Expand Down
14 changes: 11 additions & 3 deletions verl/trainer/ppo/ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,14 +1063,22 @@ def fit(self):
else:
gen_baseline_output = self.async_rollout_manager.generate_sequences(gen_baseline_batch)
batch = batch.union(gen_baseline_output)
reward_baseline_tensor = self.reward_fn(batch)
# compute reward model score on batch
rm_scores = None
if self.use_rm and "rm_scores" not in batch.batch.keys():
rm_scores = self.rm_wg.compute_rm_score(batch)
batch = batch.union(rm_scores)
reward_baseline_tensor, _ = compute_reward(batch, self.reward_fn)
reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1)

batch.pop(batch_keys=list(gen_baseline_output.batch.keys()))
keys_to_pop = set(gen_baseline_output.batch.keys())
if rm_scores is not None:
keys_to_pop.update(rm_scores.batch.keys())
batch.pop(batch_keys=list(keys_to_pop))

batch.batch["reward_baselines"] = reward_baseline_tensor

del gen_baseline_batch, gen_baseline_output
del rm_scores, gen_baseline_batch, gen_baseline_output
# repeat to align with repeated responses in rollout
batch = batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True)
batch = batch.union(gen_batch_output)
Expand Down