Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions .github/workflows/e2e_ppo_trainer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,74 @@ jobs:
ENGINE=sglang GPU_MEMORY_UTILIZATION=0.6 ACTOR_FSDP_PARAM_OFFLOAD=True \
ACTOR_FSDP_OPTIMIZER_OFFLOAD=True REF_FSDP_PARAM_OFFLOAD=True \
bash tests/e2e/ppo_trainer/run_function_reward.sh

e2e_ppo_trainer_fused_kernels_vllm:
runs-on: [L20x8]
needs: pre_commit_for_ppo
timeout-minutes: 40 # Increase this timeout value as needed
env:
HTTP_PROXY: ${{ secrets.PROXY_HTTP }}
HTTPS_PROXY: ${{ secrets.PROXY_HTTPS }}
NO_PROXY: "localhost,127.0.0.1,hf-mirror.com"
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: hiyouga/verl:ngc-th2.6.0-cu126-vllm0.8.3-flashinfer0.2.2-cxx11abi0
options: --gpus all --shm-size=50g # Visual dataloader requires large memory
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Install the current repository
run: |
pip3 install -e .[test,geo,vllm]
# Geo3k
- name: Prepare Geo3k dataset
run: |
ray stop --force
python3 examples/data_preprocess/geo3k.py
- name: Running Geo3k VLM E2E with rmpad using fused kernel (Qwen2.5-VL)
run: |
ray stop --force
FUSED_KERNELS=True TRAIN_FILES=$HOME/data/geo3k/train.parquet VAL_FILES=$HOME/data/geo3k/test.parquet \
MAX_PROMPT_LEN=1536 MAX_RESPONSE_LEN=1536 \
MODEL_ID=Qwen/Qwen2.5-VL-3B-Instruct \
ADV_ESTIMATOR=grpo RM_PAD=True USE_KL=True ENABLE_CHUNKED_PREFILL=False \
GPU_MEMORY_UTILIZATION=0.6 ACTOR_FSDP_PARAM_OFFLOAD=True \
ACTOR_FSDP_OPTIMIZER_OFFLOAD=True REF_FSDP_PARAM_OFFLOAD=True \
bash tests/e2e/ppo_trainer/run_function_reward.sh

e2e_ppo_trainer_fused_kernels_sglang:
runs-on: [L20x8]
needs: pre_commit_for_ppo
timeout-minutes: 40 # Increase this timeout value as needed
env:
HTTP_PROXY: ${{ secrets.PROXY_HTTP }}
HTTPS_PROXY: ${{ secrets.PROXY_HTTPS }}
NO_PROXY: "localhost,127.0.0.1,hf-mirror.com"
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: ocss884/verl-sglang:ngc-th2.6.0-cu126-sglang0.4.6.post4
options: --gpus all --shm-size=50g # Visual dataloader requires large memory
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Install the current repository
run: |
pip3 install -e .[test,geo,gpu,sglang]
- name: Prepare Geo3k dataset
run: |
ray stop --force
python3 examples/data_preprocess/geo3k.py
- name: Running Geo3k VLM E2E with rmpad using fused kernel (Qwen2.5-VL)
run: |
ray stop --force
FUSED_KERNELS=True TRAIN_FILES=$HOME/data/geo3k/train.parquet VAL_FILES=$HOME/data/geo3k/test.parquet \
MAX_PROMPT_LEN=1536 MAX_RESPONSE_LEN=1536 \
MODEL_ID=Qwen/Qwen2.5-VL-3B-Instruct \
ADV_ESTIMATOR=grpo RM_PAD=True USE_KL=True ENABLE_CHUNKED_PREFILL=False \
ENGINE=sglang GPU_MEMORY_UTILIZATION=0.6 ACTOR_FSDP_PARAM_OFFLOAD=True \
ACTOR_FSDP_OPTIMIZER_OFFLOAD=True REF_FSDP_PARAM_OFFLOAD=True \
bash tests/e2e/ppo_trainer/run_function_reward.sh
2 changes: 1 addition & 1 deletion tests/e2e/ppo_trainer/run_function_reward.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ACTOR_FSDP_PARAM_OFFLOAD=${ACTOR_FSDP_PARAM_OFFLOAD:-False}
ACTOR_FSDP_OPTIMIZER_OFFLOAD=${ACTOR_FSDP_OPTIMIZER_OFFLOAD:-False}
REF_FSDP_PARAM_OFFLOAD=${REF_FSDP_PARAM_OFFLOAD:-True}
RM_PAD=${RM_PAD:-True}
FUSED_KERNELS=${FUSED_KERNELS:-True}
FUSED_KERNELS=${FUSED_KERNELS:-False}
ADV_ESTIMATOR=${ADV_ESTIMATOR:-gae}
USE_KL=${USE_KL:-False}
CUSTOM_REWARD_FN=${CUSTOM_REWARD_FN:-False}
Expand Down
17 changes: 12 additions & 5 deletions verl/workers/actor/dp_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@
from verl import DataProto
from verl.trainer.ppo.core_algos import agg_loss, compute_policy_loss, kl_penalty
from verl.utils.debug import GPUMemoryLogger
from verl.utils.device import get_device_name, get_torch_device, is_cuda_available, is_npu_available
from verl.utils.fsdp_utils import FSDPModule, fsdp2_clip_grad_norm_
from verl.utils.py_functional import append_to_dict
from verl.utils.seqlen_balancing import get_reverse_idx, rearrange_micro_batches
from verl.utils.torch_functional import logprobs_from_logits
from verl.utils.ulysses import gather_outpus_and_unpad, ulysses_pad_and_slice_inputs
from verl.workers.actor import BasePPOActor
from verl.utils.device import get_device_name, get_torch_device, is_cuda_available, is_npu_available

if is_cuda_available:
from flash_attn.bert_padding import pad_input, unpad_input, rearrange, index_first_axis
from flash_attn.bert_padding import index_first_axis, pad_input, rearrange, unpad_input
elif is_npu_available:
from transformers.integrations.npu_flash_attention import pad_input, unpad_input, rearrange, index_first_axis
from transformers.integrations.npu_flash_attention import index_first_axis, pad_input, rearrange, unpad_input


__all__ = ["DataParallelPPOActor"]
Expand Down Expand Up @@ -122,13 +122,17 @@ def _forward_micro_batch(self, micro_batch, temperature, calculate_entropy=False
input_ids_rmpad_rolled = input_ids_rmpad_rolled.squeeze(0) # ((total_nnz / sp) + pad)

# only pass input_ids and position_ids to enable flash_attn_varlen
extra_args = {}
if self.use_fused_kernels:
extra_args["temperature"] = temperature

output = self.actor_module(
input_ids=input_ids_rmpad,
attention_mask=None,
position_ids=position_ids_rmpad,
**multi_modal_inputs,
use_cache=False,
temperature=temperature,
**extra_args,
) # prevent model thinks we are generating

if self.use_fused_kernels:
Expand Down Expand Up @@ -190,13 +194,16 @@ def _forward_micro_batch(self, micro_batch, temperature, calculate_entropy=False
log_probs = full_log_probs.squeeze(-1)[:, -response_length - 1 : -1] # (bsz, response_length)

else: # not using rmpad and no ulysses sp
extra_args = {}
if self.use_fused_kernels:
extra_args["temperature"] = temperature
output = self.actor_module(
input_ids=input_ids,
attention_mask=attention_mask,
position_ids=position_ids,
**multi_modal_inputs,
use_cache=False,
temperature=temperature,
**extra_args,
) # prevent model thinks we are generating

if self.use_fused_kernels:
Expand Down
Loading