diff --git a/recipe/fully_async_policy/megatron_worker.py b/recipe/fully_async_policy/megatron_worker.py index fc948ce2ea8..342d2fe267e 100644 --- a/recipe/fully_async_policy/megatron_worker.py +++ b/recipe/fully_async_policy/megatron_worker.py @@ -27,7 +27,7 @@ get_device_name, get_torch_device, ) -from verl.utils.megatron_utils import load_megatron_model_to_gpu, offload_megatron_model_to_cpu, per_tensor_generator +from verl.utils.megatron_utils import load_megatron_model_to_gpu, offload_megatron_model_to_cpu from verl.workers.megatron_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker logger = logging.getLogger(__file__) @@ -113,17 +113,7 @@ def clear_cpu_model(self, n): class DetachActorWorker(DetachNcclSync): def _get_actor_params_generator(self): assert self._is_actor - if self.bridge is not None: - generator = self.bridge.export_weights(self.actor.actor_module) - else: - generator = per_tensor_generator( - self.actor.actor_module, - self.actor_model_config, - self.weight_converter, - self.tf_config, - self.layer_name_mapping, - ) - + generator = self.bridge.export_weights(self.actor.actor_module) return generator @register(dispatch_mode=Dispatch.ONE_TO_ALL) diff --git a/recipe/one_step_off_policy/megatron_workers.py b/recipe/one_step_off_policy/megatron_workers.py index c2a2407939e..0ca42f59496 100644 --- a/recipe/one_step_off_policy/megatron_workers.py +++ b/recipe/one_step_off_policy/megatron_workers.py @@ -123,21 +123,7 @@ class DetachActorWorker(DetachSync): @register(dispatch_mode=Dispatch.ONE_TO_ALL) def _get_actor_params_generator(self): assert self._is_actor - from verl.models.mcore import get_mcore_weight_converter - from verl.utils.megatron_utils import per_tensor_generator - - layer_name_mapping = { - "qkv_layer_name": "self_attention.linear_qkv.", - "gate_proj_layer_name": "linear_fc1.", - } - weight_converter = get_mcore_weight_converter(self.actor_model_config, self.dtype) - generator = per_tensor_generator( - self.actor.actor_module, - self.actor_model_config, - weight_converter, - self.tf_config, - layer_name_mapping, - ) + generator = self.bridge.export_weights(self.actor.actor_module) return generator @register(dispatch_mode=Dispatch.ONE_TO_ALL) diff --git a/tests/special_distributed/test_mcore_config_converter.py b/tests/special_distributed/test_mcore_config_converter.py deleted file mode 100644 index d8f24c49911..00000000000 --- a/tests/special_distributed/test_mcore_config_converter.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright 2024 Bytedance Ltd. and/or its affiliates -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os - -import megatron.core.parallel_state as mpu -import torch -from megatron.core.transformer import MLATransformerConfig, TransformerConfig -from transformers import AutoConfig, PretrainedConfig - -from verl.models.mcore import hf_to_mcore_config -from verl.utils.distributed import destroy_global_process_group, initialize_global_process_group - -TEST_MODELS = [ - "Qwen/Qwen2.5-7B", # Qwen2 dense - "Qwen/Qwen3-8B", # Qwen3 dense - "deepseek-ai/deepseek-coder-1.3b-instruct", # deepseek dense - "Qwen/Qwen2-57B-A14B", # Qwen2 moe - "Qwen/Qwen3-30B-A3B", # Qwen3 moe - # "mistralai/Mixtral-8x7B-v0.1", # Mixtral # require authentication - "deepseek-ai/DeepSeek-V3-Base", # Deepseek V3 -] - - -def check_config_converter_results(tf_config: TransformerConfig | MLATransformerConfig, hf_config: PretrainedConfig): - assert tf_config.num_layers == hf_config.num_hidden_layers, ( - f"Number of layers mismatch: {tf_config.num_layers} != {hf_config.num_hidden_layers}" - ) - assert tf_config.hidden_size == hf_config.hidden_size, ( - f"Hidden size mismatch: {tf_config.hidden_size} != {hf_config.hidden_size}" - ) - assert tf_config.num_attention_heads == hf_config.num_attention_heads, ( - f"Number of attention heads mismatch: {tf_config.num_attention_heads} != {hf_config.num_attention_heads}" - ) - assert tf_config.num_query_groups == hf_config.num_key_value_heads, ( - f"Number of query groups mismatch: {tf_config.num_query_groups} != {hf_config.num_key_value_heads}" - ) - assert tf_config.ffn_hidden_size == hf_config.intermediate_size, ( - f"FFN hidden size mismatch: {tf_config.ffn_hidden_size} != {hf_config.intermediate_size}" - ) - assert tf_config.attention_dropout == hf_config.attention_dropout, ( - f"Attention dropout mismatch: {tf_config.attention_dropout} != {hf_config.attention_dropout}" - ) - assert tf_config.hidden_dropout == getattr(hf_config, "hidden_dropout", 0.0), ( - f"Hidden dropout mismatch: {tf_config.hidden_dropout} != {getattr(hf_config, 'hidden_dropout', 0.0)}" - ) - if getattr(hf_config, "head_dim", None) is not None: - assert tf_config.kv_channels == getattr(hf_config, "head_dim", None), ( - f"Head dim mismatch: {tf_config.kv_channels} != {getattr(hf_config, 'head_dim', None)}" - ) - assert tf_config.layernorm_epsilon == hf_config.rms_norm_eps, ( - f"Layernorm epsilon mismatch: {tf_config.layernorm_epsilon} != {hf_config.rms_norm_eps}" - ) - - -def modify_hf_config(name: str, hf_config: PretrainedConfig): - if name == "deepseek-ai/DeepSeek-V3-Base": - hf_config.num_nextn_predict_layers = 0 - hf_config.quantization_config = None - return hf_config - - -def test_mcore_config_converter(): - """ - Test the conversion of Hugging Face model configurations to MCore configurations. - """ - local_rank, rank, world_size = initialize_global_process_group() - mpu.initialize_model_parallel( - tensor_model_parallel_size=2, - pipeline_model_parallel_size=2, - virtual_pipeline_model_parallel_size=None, - use_sharp=False, - context_parallel_size=2, - expert_model_parallel_size=1, - expert_tensor_parallel_size=None, - nccl_communicator_config_path=None, - ) - for model_name in TEST_MODELS: - print(f"testing {model_name}") - hf_config = AutoConfig.from_pretrained(os.path.expanduser(f"~/models/configs/{model_name}/config.json")) - hf_config = modify_hf_config(model_name, hf_config) - tf_config = hf_to_mcore_config(hf_config, torch.bfloat16) - check_config_converter_results(tf_config, hf_config) - - destroy_global_process_group() - - -if __name__ == "__main__": - test_mcore_config_converter() diff --git a/tests/special_e2e/run_fully_async_policy.sh b/tests/special_e2e/run_fully_async_policy.sh index 8db26aea080..272ed574dad 100644 --- a/tests/special_e2e/run_fully_async_policy.sh +++ b/tests/special_e2e/run_fully_async_policy.sh @@ -185,10 +185,12 @@ elif [ "${ACTOR_STRATEGY}" == "megatron" ]; then actor_rollout_ref.actor.megatron.grad_offload=${actor_offload} \ actor_rollout_ref.actor.megatron.pipeline_model_parallel_size=${train_pp} \ actor_rollout_ref.actor.megatron.tensor_model_parallel_size=${train_tp} \ + actor_rollout_ref.actor.megatron.use_mbridge=True \ actor_rollout_ref.rollout.tensor_model_parallel_size=${gen_tp} \ + actor_rollout_ref.ref.megatron.use_mbridge=True \ actor_rollout_ref.ref.megatron.pipeline_model_parallel_size=${train_pp} \ actor_rollout_ref.ref.megatron.tensor_model_parallel_size=${train_tp} \ - actor_rollout_ref.ref.megatron.param_offload=${ref_offload} $@ + actor_rollout_ref.ref.megatron.param_offload=${ref_offload} $@ else echo "Error: Unknown strategy ${ACTOR_STRATEGY}. Please use 'fsdp2' or 'megatron'" exit 1 diff --git a/tests/special_e2e/run_one_step_off_policy.sh b/tests/special_e2e/run_one_step_off_policy.sh index bac973b2831..4de8e35325f 100755 --- a/tests/special_e2e/run_one_step_off_policy.sh +++ b/tests/special_e2e/run_one_step_off_policy.sh @@ -162,7 +162,9 @@ elif [ "${ACTOR_STRATEGY}" == "megatron" ]; then actor_rollout_ref.actor.megatron.grad_offload=${actor_offload} \ actor_rollout_ref.actor.megatron.pipeline_model_parallel_size=${train_pp} \ actor_rollout_ref.actor.megatron.tensor_model_parallel_size=${train_tp} \ + actor_rollout_ref.actor.megatron.use_mbridge=True \ actor_rollout_ref.rollout.tensor_model_parallel_size=${gen_tp} \ + actor_rollout_ref.ref.megatron.use_mbridge=True \ actor_rollout_ref.ref.megatron.pipeline_model_parallel_size=${train_pp} \ actor_rollout_ref.ref.megatron.tensor_model_parallel_size=${train_tp} \ actor_rollout_ref.ref.megatron.param_offload=${ref_offload} $@ diff --git a/tests/special_e2e/run_ppo_trainer_megatron.sh b/tests/special_e2e/run_ppo_trainer_megatron.sh index cd8033f132e..606ad2e82be 100644 --- a/tests/special_e2e/run_ppo_trainer_megatron.sh +++ b/tests/special_e2e/run_ppo_trainer_megatron.sh @@ -103,7 +103,7 @@ CRITIC_PARAM_OFFLOAD=${CRITIC_PARAM_OFFLOAD:-$COMMON_PARAM_OFFLOAD} CRITIC_GRAD_OFFLOAD=${CRITIC_GRAD_OFFLOAD:-$COMMON_GRAD_OFFLOAD} CRITIC_OPTIMIZER_OFFLOAD=${CRITIC_OPTIMIZER_OFFLOAD:-$COMMON_OPTIMIZER_OFFLOAD} RM_PARAM_OFFLOAD=${RM_PARAM_OFFLOAD:-$COMMON_PARAM_OFFLOAD} -USE_MBRIDGE=${USE_MBRIDGE:-False} +USE_MBRIDGE=${USE_MBRIDGE:-True} VANILLA_MBRIDGE=${VANILLA_MBRIDGE:-True} VALUE_VANILLA_MBRIDGE=${VALUE_VANILLA_MBRIDGE:-$VANILLA_MBRIDGE} USE_FUSED_KERNELS=${USE_FUSED_KERNELS:-False} diff --git a/verl/utils/megatron_peft_utils.py b/verl/utils/megatron_peft_utils.py index a18776c3413..3f8bee5ae09 100644 --- a/verl/utils/megatron_peft_utils.py +++ b/verl/utils/megatron_peft_utils.py @@ -59,7 +59,7 @@ def get_adapter_state_dict(model): Returns: Dict of adapter parameter names to tensors """ - from verl.utils.megatron_utils import unwrap_model + from megatron.core.utils import unwrap_model # Unwrap model from DDP/Float16Module unwrapped = unwrap_model(model) @@ -138,8 +138,7 @@ def load_adapter_checkpoint( strict: Whether to strictly enforce parameter name matching """ from megatron.core import mpu - - from verl.utils.megatron_utils import unwrap_model + from megatron.core.utils import unwrap_model # Get rank-specific path rank_path = _get_rank_checkpoint_path(checkpoint_path) @@ -192,7 +191,7 @@ def count_adapter_parameters(model): Returns: Tuple of (adapter_params, total_params, percentage) """ - from verl.utils.megatron_utils import unwrap_model + from megatron.core.utils import unwrap_model unwrapped = unwrap_model(model) if isinstance(unwrapped, list): diff --git a/verl/workers/actor/megatron_actor.py b/verl/workers/actor/megatron_actor.py index 666f26be0d0..a5f7c65fdaa 100644 --- a/verl/workers/actor/megatron_actor.py +++ b/verl/workers/actor/megatron_actor.py @@ -33,6 +33,7 @@ # from megatron.core.optimizer import DistributedOptimizer from megatron.core.optimizer import DistributedOptimizer from megatron.core.pipeline_parallel import get_forward_backward_func +from megatron.core.utils import get_model_config, unwrap_model from omegaconf import OmegaConf from torch import nn @@ -49,7 +50,6 @@ set_router_replay_data, ) from verl.utils.megatron.tensor_parallel import vocab_parallel_entropy, vocab_parallel_log_probs_from_logits -from verl.utils.megatron_utils import get_model_config, unwrap_model from verl.utils.profiler import GPUMemoryLogger from verl.utils.profiler.profile import Profiler from verl.utils.py_functional import append_to_dict diff --git a/verl/workers/megatron_workers.py b/verl/workers/megatron_workers.py index db2e3fb1b97..d0d9dcc787a 100644 --- a/verl/workers/megatron_workers.py +++ b/verl/workers/megatron_workers.py @@ -35,7 +35,6 @@ from megatron.core import parallel_state as mpu from verl import DataProto -from verl.models.mcore import get_mcore_weight_converter from verl.single_controller.base import Worker from verl.single_controller.base.decorator import Dispatch, make_nd_compute_dataproto_dispatch_fn, register from verl.utils import hf_tokenizer @@ -57,11 +56,10 @@ load_megatron_optimizer, offload_megatron_model_to_cpu, offload_megatron_optimizer, - per_tensor_generator, register_megatron_training_hooks, ) from verl.utils.memory_utils import aggressive_empty_cache -from verl.utils.model import get_hf_model_path, load_mcore_dist_weights, load_megatron_gptmodel_weights +from verl.utils.model import get_hf_model_path, load_mcore_dist_weights from verl.utils.profiler import ( DistProfiler, DistProfilerExtension, @@ -113,9 +111,9 @@ def _init_hf_config_and_tf_config( trust_remote_code=False, megatron_config=None, ): + assert megatron_config.use_mbridge, "use_mbridge must be True" from transformers import AutoConfig - from verl.models.mcore import hf_to_mcore_config from verl.utils import hf_processor, hf_tokenizer from verl.utils.fs import copy_to_local from verl.utils.model import update_model_config @@ -154,65 +152,58 @@ def _init_hf_config_and_tf_config( if self.rank == 0: print(f"Model config after override: {hf_config}") - from verl.models.mcore.config_converter import mapping_string_to_attn_backend + from verl.utils.megatron_utils import mapping_string_to_attn_backend - # todo: remove this line after mcore adopt mbridge 0.15, now for compatibility override_transformer_config = mapping_string_to_attn_backend(override_transformer_config) fp16 = dtype == torch.float16 bf16 = dtype == torch.bfloat16 - if fp16: - assert megatron_config.use_mbridge, "fp16 mode requires use_mbridge to be True" self.provider = None self.vanilla_bridge = megatron_config.get("vanilla_mbridge", True) - if megatron_config.use_mbridge: - if self.vanilla_bridge: - from verl.models.mcore.mbridge import AutoBridge - - bridge = AutoBridge.from_config(hf_config, dtype=dtype) - bridge.set_extra_args(**override_transformer_config) - tf_config = bridge.config - tf_config.fp16 = fp16 - tf_config.bf16 = bf16 - else: - from verl.models.mcore.bridge import AutoBridge - - # Use Megatron-Bridge to convert HF config to Megatron config - bridge = AutoBridge.from_hf_pretrained(self.local_path, trust_remote_code=trust_remote_code) - # Get Megatron provider and configure it - provider = bridge.to_megatron_provider(load_weights=False) - - # In case of invalid overrides, we need to make sure some critical params are set correctly - provider.params_dtype = dtype - - # Pass distributed info - provider.tensor_model_parallel_size = megatron_config.tensor_model_parallel_size - provider.pipeline_model_parallel_size = megatron_config.pipeline_model_parallel_size - provider.expert_model_parallel_size = megatron_config.expert_model_parallel_size - provider.expert_tensor_parallel_size = megatron_config.expert_tensor_parallel_size - provider.virtual_pipeline_model_parallel_size = megatron_config.virtual_pipeline_model_parallel_size - provider.context_parallel_size = megatron_config.context_parallel_size - provider.sequence_parallel = megatron_config.sequence_parallel - - # Match verl implementation (need variable_seq_lengths) - from megatron.core.transformer.enums import AttnBackend - - provider.attention_backend = AttnBackend.flash - provider.variable_seq_lengths = True - provider.moe_token_dispatcher_type = "alltoall" - provider.moe_router_load_balancing_type = "none" - - # Apply transformer config overrides - for key, value in override_transformer_config.items(): - setattr(provider, key, value) - - provider.finalize() - self.provider = provider - tf_config = None # Will be set after model creation - self.bridge = bridge + if self.vanilla_bridge: + from verl.models.mcore.mbridge import AutoBridge + + bridge = AutoBridge.from_config(hf_config, dtype=dtype) + bridge.set_extra_args(**override_transformer_config) + tf_config = bridge.config + tf_config.fp16 = fp16 + tf_config.bf16 = bf16 else: - tf_config = hf_to_mcore_config(hf_config, dtype, **override_transformer_config) - self.bridge = None + from verl.models.mcore.bridge import AutoBridge + + # Use Megatron-Bridge to convert HF config to Megatron config + bridge = AutoBridge.from_hf_pretrained(self.local_path, trust_remote_code=trust_remote_code) + # Get Megatron provider and configure it + provider = bridge.to_megatron_provider(load_weights=False) + + # In case of invalid overrides, we need to make sure some critical params are set correctly + provider.params_dtype = dtype + + # Pass distributed info + provider.tensor_model_parallel_size = megatron_config.tensor_model_parallel_size + provider.pipeline_model_parallel_size = megatron_config.pipeline_model_parallel_size + provider.expert_model_parallel_size = megatron_config.expert_model_parallel_size + provider.expert_tensor_parallel_size = megatron_config.expert_tensor_parallel_size + provider.virtual_pipeline_model_parallel_size = megatron_config.virtual_pipeline_model_parallel_size + provider.context_parallel_size = megatron_config.context_parallel_size + provider.sequence_parallel = megatron_config.sequence_parallel + + # Match verl implementation (need variable_seq_lengths) + from megatron.core.transformer.enums import AttnBackend + + provider.attention_backend = AttnBackend.flash + provider.variable_seq_lengths = True + provider.moe_token_dispatcher_type = "alltoall" + provider.moe_router_load_balancing_type = "none" + + # Apply transformer config overrides + for key, value in override_transformer_config.items(): + setattr(provider, key, value) + + provider.finalize() + self.provider = provider + tf_config = None # Will be set after model creation + self.bridge = bridge if torch.distributed.get_rank() == 0: if tf_config is not None: @@ -407,16 +398,11 @@ def _build_model_optimizer( prefix=self.config.actor.megatron.dist_checkpointing_prefix, ) else: - if self.bridge is not None: - local_model_path = get_hf_model_path(self.config) - if self.vanilla_bridge: - self.bridge.load_weights(actor_module, local_model_path) - else: - self.bridge.load_hf_weights(actor_module, local_model_path) + local_model_path = get_hf_model_path(self.config) + if self.vanilla_bridge: + self.bridge.load_weights(actor_module, local_model_path) else: - load_megatron_gptmodel_weights( - self.config, self.hf_config, actor_module, params_dtype=self.dtype, is_value_model=False - ) + self.bridge.load_hf_weights(actor_module, local_model_path) if self.rank == 0: print_model_size(actor_module[0]) @@ -448,16 +434,11 @@ def _build_model_optimizer( prefix=self.config.ref.megatron.dist_checkpointing_prefix, ) else: - if self.bridge is not None: - local_model_path = get_hf_model_path(self.config) - if self.vanilla_bridge: - self.bridge.load_weights(ref_module, local_model_path) - else: - self.bridge.load_hf_weights(ref_module, local_model_path) + local_model_path = get_hf_model_path(self.config) + if self.vanilla_bridge: + self.bridge.load_weights(ref_module, local_model_path) else: - load_megatron_gptmodel_weights( - self.config, self.hf_config, ref_module, params_dtype=self.dtype, is_value_model=False - ) + self.bridge.load_hf_weights(ref_module, local_model_path) log_gpu_memory_usage("After ref module init", logger=logger) return ref_module, self.hf_config @@ -658,8 +639,6 @@ def init_model(self): "gate_proj_layer_name": "linear_fc1.", } self.weight_converter = None - if not self.config.actor.megatron.use_mbridge: - self.weight_converter = get_mcore_weight_converter(self.actor_model_config, self.dtype) get_torch_device().empty_cache() log_gpu_memory_usage("After init_model finish", logger=logger) @@ -673,19 +652,10 @@ async def rollout_mode(self): load_megatron_model_to_gpu(self.actor.actor_module, load_grad=False) log_gpu_memory_usage("After load actor params during rollout_mode", logger=logger) - if self.bridge is not None: - if self.vanilla_bridge: - per_tensor_param = self.bridge.export_weights(self.actor.actor_module) - else: - per_tensor_param = self.bridge.export_hf_weights(self.actor.actor_module) + if self.vanilla_bridge: + per_tensor_param = self.bridge.export_weights(self.actor.actor_module) else: - per_tensor_param = per_tensor_generator( - self.actor.actor_module, - self.actor_model_config, - self.weight_converter, - self.tf_config, - self.layer_name_mapping, - ) + per_tensor_param = self.bridge.export_hf_weights(self.actor.actor_module) if self.config.rollout.free_cache_engine: await self.rollout.resume(tags=["weights"]) @@ -1102,17 +1072,12 @@ def _build_critic_model_optimizer( prefix=self.config.megatron.dist_checkpointing_prefix, ) else: - if self.bridge is not None: - local_model_path = get_hf_model_path(self.config) - if self.vanilla_bridge: - self.bridge.load_weights(critic_module, local_model_path) - else: - self.bridge.load_hf_weights( - critic_module, local_model_path, allowed_mismatched_params=["output_layer.weight"] - ) + local_model_path = get_hf_model_path(self.config) + if self.vanilla_bridge: + self.bridge.load_weights(critic_module, local_model_path) else: - load_megatron_gptmodel_weights( - self.config, self.hf_config, critic_module, params_dtype=self.dtype, is_value_model=True + self.bridge.load_hf_weights( + critic_module, local_model_path, allowed_mismatched_params=["output_layer.weight"] ) t1 = time.time() if torch.distributed.get_rank() == 0: @@ -1381,19 +1346,13 @@ def _build_rm_model(self, model_path, tokenizer, override_model_config, override prefix=self.config.megatron.dist_checkpointing_prefix, ) else: - if self.bridge is not None: - local_model_path = get_hf_model_path(self.config) - if self.vanilla_bridge: - self.bridge.load_weights(reward_model, local_model_path) - else: - self.bridge.load_hf_weights( - reward_model, local_model_path, allowed_mismatched_params=["output_layer.weight"] - ) + local_model_path = get_hf_model_path(self.config) + if self.vanilla_bridge: + self.bridge.load_weights(reward_model, local_model_path) else: - load_megatron_gptmodel_weights( - self.config, self.hf_config, reward_model, params_dtype=self.dtype, is_value_model=True + self.bridge.load_hf_weights( + reward_model, local_model_path, allowed_mismatched_params=["output_layer.weight"] ) - get_torch_device().empty_cache() return reward_model, self.hf_config