From 5dbe0166fbd441f6d1d62078c77b895ff4ff135a Mon Sep 17 00:00:00 2001 From: 0x404 <871206929@qq.com> Date: Mon, 28 Apr 2025 17:30:11 +0800 Subject: [PATCH 1/3] [ci] test: add FSDP checkpoint merging test into CI --- .github/workflows/e2e_ppo_trainer.yml | 4 +++ scripts/model_merger.py | 49 ++++++++++++++++++++++----- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/.github/workflows/e2e_ppo_trainer.yml b/.github/workflows/e2e_ppo_trainer.yml index 140d73a0528..9ee364d5466 100644 --- a/.github/workflows/e2e_ppo_trainer.yml +++ b/.github/workflows/e2e_ppo_trainer.yml @@ -86,6 +86,10 @@ jobs: run: | ray stop --force RESUME_MODE=auto bash tests/e2e/ppo_trainer/run_function_reward.sh + - name: Test FSDP checkpoints merging function (Qwen Actor) + run: | + exp_name="qwen2.5-0.5b-function-reward-minimal" + python scripts/model_merger.py --backend fsdp --hf_model_path Qwen/Qwen2.5-0.5B --local_dir checkpoints/verl-test/${exp_name}/global_step_1/actor --test --test_hf_dir checkpoints/verl-test/${exp_name}/global_step_1/actor/huggingface - name: Running GSM8K E2E without rmpad using function rm run: | ray stop --force diff --git a/scripts/model_merger.py b/scripts/model_merger.py index 40750678ba1..d4c81f8b6f7 100644 --- a/scripts/model_merger.py +++ b/scripts/model_merger.py @@ -45,10 +45,7 @@ "--local_dir", type=str, required=True, - help=( - "The path for your saved model. For megatron, point to the base dir of model, rng, optimizer checkpoints, " - "commonly be `config.default_local_dir/global_step_\{global_step\}`." - ), + help=("The path for your saved model. For megatron, point to the base dir of model, rng, optimizer checkpoints, commonly be `config.default_local_dir/global_step_\{global_step\}`."), ) parser.add_argument("--target_dir", required=False, default="tmp", type=str, help="The path for the target model") parser.add_argument("--hf_upload_path", default=False, type=str, help="The path of the huggingface repo to upload") @@ -85,6 +82,40 @@ def upload_model_to_huggingface(hf_path): api.upload_folder(folder_path=hf_path, repo_id=args.hf_upload_path, repo_type="model") +def test_fsdp_state_dict( + auto_model_class, + original_hf_model_path: str, + collected_state_dict: Dict[str, torch.Tensor], +) -> bool: + # load original model using bf16 since we collected state_dict with bf16 + original_model = auto_model_class.from_pretrained(original_hf_model_path, torch_dtype=torch.bfloat16) + original_state_dict = original_model.state_dict() + del original_model # Free memory + + original_keys = set(original_state_dict.keys()) + collected_keys = set(collected_state_dict.keys()) + + missing_keys = original_keys - collected_keys + assert len(missing_keys) == 0, f"Missing keys in collected state dict: {list(sorted(missing_keys))}" + + extra_keys = collected_keys - original_keys + assert len(extra_keys) == 0, f"Extra keys in collected state dict: {list(sorted(extra_keys))}" + + for key in original_keys: + original_shape = original_state_dict[key].shape + collected_shape = collected_state_dict[key].shape + assert original_shape == collected_shape, f"Shape mismatch for key '{key}': original {original_shape} vs collected {collected_shape}" + + original_dtype = original_state_dict[key].dtype + collected_dtype = collected_state_dict[key].dtype + assert original_dtype == collected_dtype, f"Dtype mismatch for key '{key}': original {original_dtype} vs collected {collected_dtype}" + + torch.testing.assert_close(original_state_dict[key], collected_state_dict[key], atol=1e-4, rtol=1e-4) + + print("FSDP checks passed: The collected state dict matches the hf model saved by FSDPCheckpointManager.") + return True + + def patch_model_generation_config(model, hf_model_path): """ The generation_config created from model config may be different to the pretrained model, @@ -96,10 +127,7 @@ def patch_model_generation_config(model, hf_model_path): try: model.generation_config = GenerationConfig.from_pretrained(args.hf_model_path) except OSError: - print( - f"Warning: Generation config file not found in {args.hf_model_path}, " - "using a generation config created from the model config." - ) + print(f"Warning: Generation config file not found in {args.hf_model_path}, using a generation config created from the model config.") pass return model @@ -203,7 +231,6 @@ def process_one_shard(rank, model_state_dict_lst): else: state_dict[key] = torch.cat(state_dict[key], dim=0) - print("Writing to local disk") hf_path = os.path.join(local_dir, "huggingface") if args.target_dir is None else args.target_dir config = AutoConfig.from_pretrained(args.hf_model_path) @@ -216,6 +243,10 @@ def process_one_shard(rank, model_state_dict_lst): else: raise NotImplementedError(f"Unknown architecture {config['architectures']}") + if args.test: + print("Running compatibility test") + test_fsdp_state_dict(auto_model, args.hf_model_path, state_dict) + with torch.device("meta"): model = auto_model.from_config(config, torch_dtype=torch.bfloat16) model.to_empty(device="cpu") From 9a82beb41f1d3c5785ad6acb103b92f3c6184da0 Mon Sep 17 00:00:00 2001 From: 0x404 <871206929@qq.com> Date: Mon, 28 Apr 2025 17:12:22 +0800 Subject: [PATCH 2/3] adapt run_function_reward.sh to have a SAVE_HF_MODEL option --- .github/workflows/e2e_ppo_trainer.yml | 2 +- tests/e2e/ppo_trainer/run_function_reward.sh | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/e2e_ppo_trainer.yml b/.github/workflows/e2e_ppo_trainer.yml index 9ee364d5466..98cd8bdabc6 100644 --- a/.github/workflows/e2e_ppo_trainer.yml +++ b/.github/workflows/e2e_ppo_trainer.yml @@ -81,7 +81,7 @@ jobs: - name: Running GSM8K E2E training tests on 8 L20 GPUs with rmpad using function rm with validation and saving run: | ray stop --force - VAL_BEFORE_TRAIN=True TEST_FREQ=1 SAVE_FREQ=1 bash tests/e2e/ppo_trainer/run_function_reward.sh + VAL_BEFORE_TRAIN=True TEST_FREQ=1 SAVE_FREQ=1 SAVE_HF_MODEL=True bash tests/e2e/ppo_trainer/run_function_reward.sh - name: Running GSM8K E2E training tests on 8 L20 GPUs with rmpad using function rm after resuming run: | ray stop --force diff --git a/tests/e2e/ppo_trainer/run_function_reward.sh b/tests/e2e/ppo_trainer/run_function_reward.sh index 89bae9eee11..0c842e86ffe 100644 --- a/tests/e2e/ppo_trainer/run_function_reward.sh +++ b/tests/e2e/ppo_trainer/run_function_reward.sh @@ -30,6 +30,15 @@ RESUME_MODE=${RESUME_MODE:-disable} SAVE_FREQ=${SAVE_FREQ:--1} TOT_TRAIN_STEPS=${TOT_TRAIN_STEPS:-1} +# whether to save hf_model +SAVE_HF_MODEL=${SAVE_HF_MODEL:-False} + +if [ "${SAVE_HF_MODEL}" = "True" ]; then + CHECKPOINT_CONTENTS="['model','hf_model','optimizer','extra']" +else + CHECKPOINT_CONTENTS="['model','optimizer','extra']" +fi + train_traj_micro_bsz_per_gpu=2 # b n_resp_per_prompt=4 # g @@ -70,6 +79,7 @@ python3 -m verl.trainer.main_ppo \ actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=${train_traj_micro_bsz_per_gpu} \ actor_rollout_ref.actor.fsdp_config.param_offload=${ACTOR_FSDP_PARAM_OFFLOAD} \ actor_rollout_ref.actor.fsdp_config.optimizer_offload=${ACTOR_FSDP_OPTIMIZER_OFFLOAD} \ + actor_rollout_ref.actor.checkpoint.contents=${CHECKPOINT_CONTENTS} \ actor_rollout_ref.actor.use_kl_loss="${USE_KL}" \ actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=${train_traj_micro_bsz_per_gpu} \ actor_rollout_ref.rollout.tensor_model_parallel_size=2 \ From 77b6827f099aefdc6fa26369f4d2b198c44d9d10 Mon Sep 17 00:00:00 2001 From: 0x404 <871206929@qq.com> Date: Mon, 28 Apr 2025 17:20:15 +0800 Subject: [PATCH 3/3] fix typo --- scripts/model_merger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/model_merger.py b/scripts/model_merger.py index d4c81f8b6f7..0566f2a81a2 100644 --- a/scripts/model_merger.py +++ b/scripts/model_merger.py @@ -112,7 +112,7 @@ def test_fsdp_state_dict( torch.testing.assert_close(original_state_dict[key], collected_state_dict[key], atol=1e-4, rtol=1e-4) - print("FSDP checks passed: The collected state dict matches the hf model saved by FSDPCheckpointManager.") + print("FSDP checks passed: The merged state_dict matches the hf model saved by FSDPCheckpointManager.") return True