diff --git a/dockerfiles/Dockerfile.nemo-rl b/dockerfiles/Dockerfile.nemo-rl index b339b14eea..9b66bf7f44 100644 --- a/dockerfiles/Dockerfile.nemo-rl +++ b/dockerfiles/Dockerfile.nemo-rl @@ -1,8 +1,18 @@ +# syntax=docker/dockerfile:1 # copied and edited from https://github.com/NVIDIA/NeMo-RL/blob/main/docker/Dockerfile # TODO: from next update try to re-use their dockerfile as is as they support specifying the commit ARG BASE_IMAGE=nvcr.io/nvidia/cuda-dl-base:25.05-cuda12.9-devel-ubuntu24.04 + +FROM scratch AS nemo-rl + +ARG NEMO_RL_COMMIT=${NEMO_RL_COMMIT:-e95efb912a6909b5da91ffeb197debe91fd480d8} +ADD --keep-git-dir=true https://github.com/NVIDIA-NeMo/RL.git#${NEMO_RL_COMMIT} / + + FROM ${BASE_IMAGE} AS base +# An environment variable to indicate that we are in a container. +ENV NRL_CONTAINER=1 # It is more convenient for users to run as root USER root @@ -28,13 +38,15 @@ apt-key adv --fetch-keys http://developer.download.nvidia.com/compute/cuda/repos apt update apt install -y nsight-systems-cli +# To fix CVE-2025-68973 +apt install -y --only-upgrade gnupg apt-get clean rm -rf /var/lib/apt/lists/* EOF # Install uv and python -ARG UV_VERSION=0.7.2 +ARG UV_VERSION=0.9.7 ARG PYTHON_VERSION=3.12 ENV PATH="/root/.local/bin:$PATH" RUN curl -LsSf https://astral.sh/uv/${UV_VERSION}/install.sh | sh && \ @@ -43,36 +55,44 @@ RUN curl -LsSf https://astral.sh/uv/${UV_VERSION}/install.sh | sh && \ # Disable usage stats by default for users who are sensitive to sharing usage. # Users are encouraged to enable if the wish. ENV RAY_USAGE_STATS_ENABLED=0 +# After ray>=2.47, this feature is enabled by default which creates uv venvs for any py_executable starting with `uv run`. +# There is severe contention and performance issues with this enabled considering our dependencies are so large and occasionally +# need to be compiled, so NeMo RL has an implementation in nemo_rl/utils/venv.py that does it once per node as opposed to once per task. +ENV RAY_ENABLE_UV_RUN_RUNTIME_ENV=0 ENV NEMO_RL_VENV_DIR=/opt/ray_venvs FROM base AS hermetic -ARG NEMO_RL_COMMIT -ENV NEMO_RL_COMMIT=${NEMO_RL_COMMIT:-85eeb8d059b0249cace427dd5dec9573107be224} - -RUN git clone https://github.com/NVIDIA-NeMo/RL.git /opt/NeMo-RL && cd /opt/NeMo-RL && git checkout ${NEMO_RL_COMMIT} && git submodule update --init --recursive - WORKDIR /opt/NeMo-RL # Variables to control the build of TE. If there are issues with parallelization, consider # setting these to 1. ARG MAX_JOBS ARG NVTE_BUILD_THREADS_PER_JOB +# Only use for custom vllm installs. Learn more at https://github.com/NVIDIA-NeMo/RL/blob/main/docs/guides/use-custom-vllm.md +ARG BUILD_CUSTOM_VLLM ENV UV_PROJECT_ENVIRONMENT=/opt/nemo_rl_venv ENV UV_LINK_MODE=copy -# This step is to warm the uv cache with flash-attn without invalidating it due to COPY layers -# This layer has to be manually updated -RUN <<"EOF" bash -exu -uv venv ${UV_PROJECT_ENVIRONMENT} - -VIRTUAL_ENV=$UV_PROJECT_ENVIRONMENT uv pip install --link-mode symlink setuptools torch==2.7.0 psutil ninja --torch-backend=cu128 -VIRTUAL_ENV=$UV_PROJECT_ENVIRONMENT uv pip install --link-mode symlink flash-attn==2.7.4.post1 --no-build-isolation -EOF - -RUN <<"EOF" bash -exu +# Ensure DeepEP is built for H100 and B200 (also mcore inference unified memory API now invokes a torch API that requires these to be set) +ENV TORCH_CUDA_ARCH_LIST="9.0 10.0" + +# First copy only the dependency files +COPY --from=nemo-rl pyproject.toml uv.lock ./ +# Copy in the top level __init__.py/package_info.py since build-custom-vllm.sh needs the nemo_rl package to exist. +COPY --from=nemo-rl nemo_rl/__init__.py nemo_rl/package_info.py ./nemo_rl/ +COPY --from=nemo-rl tools/build-custom-vllm.sh ./tools/build-custom-vllm.sh +COPY --from=nemo-rl --link research/ ./research/ +COPY --from=nemo-rl --link 3rdparty/ ./3rdparty/ + +RUN --mount=type=ssh <<"EOF" bash -exu +uv venv --seed +if [[ -n "${BUILD_CUSTOM_VLLM:-}" ]]; then + bash tools/build-custom-vllm.sh + source 3rdparty/vllm/nemo-rl.env +fi # uv sync has a more reliable resolver than simple uv pip install which can fail # Sync each training + inference backend one at a time (since they may conflict) @@ -83,19 +103,47 @@ RUN <<"EOF" bash -exu uv sync --link-mode symlink --locked --no-install-project uv sync --link-mode symlink --locked --extra vllm --no-install-project uv sync --link-mode symlink --locked --extra mcore --no-install-project +uv sync --link-mode symlink --locked --extra automodel --no-install-project uv sync --link-mode symlink --locked --all-groups --no-install-project + +# Remove the aiohttp in this uv cache dir to fully address CVE GHSA-mqqc-3gqh-h2x8 +# The ray install will include the older aiohttp version in its cache +find /root/.cache/uv -type d -path "*ray/_private/runtime_env/agent/thirdparty_files/aiohttp*" -exec rm -rf {} + EOF -RUN VIRTUAL_ENV=$UV_PROJECT_ENVIRONMENT uv pip install --link-mode=symlink /opt/NeMo-RL/3rdparty/Megatron-LM-workspace/Megatron-LM ENV PATH="/opt/nemo_rl_venv/bin:$PATH" ENV NEMO_RL_VENV_DIR=/opt/ray_venvs +WORKDIR /opt/NeMo-RL +FROM hermetic AS release + +ARG NVIDIA_BUILD_ID +ARG NVIDIA_BUILD_REF +ARG RC_DATE=00.00 +ARG TARGETARCH +ENV NVIDIA_BUILD_ID=${NVIDIA_BUILD_ID:-} +ENV NVIDIA_BUILD_REF=${NVIDIA_BUILD_REF:-} +LABEL com.nvidia.build.id="${NVIDIA_BUILD_ID}" +LABEL com.nvidia.build.ref="${NVIDIA_BUILD_REF}" -WORKDIR /opt/NeMo-RL ENV NEMO_RL_VENV_DIR=/opt/ray_venvs -# Copy in source and prefetch all virtual environments +# Copy in source from build context (defaults to cloned repo, can be overridden) +# Exclude pyproject.toml and uv.lock since those may be altered by build-custom-vllm.sh +COPY --from=nemo-rl --exclude=pyproject.toml --exclude=uv.lock . /opt/NeMo-RL +# Unshallow the repo to get the full history (in the case it was from the scratch layer). +# Potentially not necessary if the repo is passed in as a complete repository (w/ full git history), +# so do a quick check before trying to unshallow. +RUN git rev-parse --is-shallow-repository | grep -q true && git fetch --unshallow || true RUN UV_LINK_MODE=symlink uv run nemo_rl/utils/prefetch_venvs.py +# Generate container fingerprint for frozen environment support +# Store outside /opt/NeMo-RL to avoid being overwritten by user mounts +RUN python tools/generate_fingerprint.py > /opt/nemo_rl_container_fingerprint + +# NOTICES.txt file points to where the OSS source code is archived +RUN echo "This distribution includes open source which is archived at the following URL: https://opensource.nvidia.com/oss/teams/nvidia/nemo-rl/${RC_DATE}:linux-${TARGETARCH}/index.html" > NOTICES.txt && \ + echo "For further inquiries or assistance, contact us at oss-requests@nvidia.com" >> NOTICES.txt + RUN git clone https://github.com/NVIDIA-NeMo/Skills.git /opt/NeMo-Skills && cd /opt/NeMo-Skills && uv pip install . diff --git a/nemo_skills/pipeline/nemo_rl/grpo.py b/nemo_skills/pipeline/nemo_rl/grpo.py index 1cbd26c888..eeeff72474 100644 --- a/nemo_skills/pipeline/nemo_rl/grpo.py +++ b/nemo_skills/pipeline/nemo_rl/grpo.py @@ -192,7 +192,7 @@ def get_training_cmd( def get_checkpoint_convert_cmd(output_dir, final_hf_path, step, backend, max_position_embeddings=None): cmd = "export PYTHONPATH=$PYTHONPATH:/nemo_run/code && export UV_PROJECT=/opt/NeMo-RL && cd /nemo_run/code && " if backend == "fsdp": - cmd += "uv run --active python -m nemo_skills.training.nemo_rl.convert_dcp_to_hf " + cmd += "uv run --extra automodel python -m nemo_skills.training.nemo_rl.convert_dcp_to_hf " elif backend == "megatron": cmd += "uv run --extra mcore python -m nemo_skills.training.nemo_rl.convert_megatron_to_hf " else: diff --git a/nemo_skills/pipeline/nemo_rl/sft.py b/nemo_skills/pipeline/nemo_rl/sft.py index 87d12fbc8e..69a3e98408 100644 --- a/nemo_skills/pipeline/nemo_rl/sft.py +++ b/nemo_skills/pipeline/nemo_rl/sft.py @@ -174,7 +174,7 @@ def get_training_cmd( def get_checkpoint_convert_cmd(output_dir, final_hf_path, step, backend, max_position_embeddings=None): cmd = "export PYTHONPATH=$PYTHONPATH:/nemo_run/code && export UV_PROJECT=/opt/NeMo-RL && cd /nemo_run/code && " if backend == "fsdp": - cmd += "uv run --active python -m nemo_skills.training.nemo_rl.convert_dcp_to_hf " + cmd += "uv run --extra automodel python -m nemo_skills.training.nemo_rl.convert_dcp_to_hf " elif backend == "megatron": cmd += "uv run --extra mcore python -m nemo_skills.training.nemo_rl.convert_megatron_to_hf " else: diff --git a/nemo_skills/training/nemo_rl/configs/grpo.yaml b/nemo_skills/training/nemo_rl/configs/grpo.yaml index 53428ca873..9ec2af3358 100644 --- a/nemo_skills/training/nemo_rl/configs/grpo.yaml +++ b/nemo_skills/training/nemo_rl/configs/grpo.yaml @@ -1,5 +1,4 @@ -# copied and edited from https://github.com/NVIDIA/NeMo-RL/blob/ab1b638a499308caea022648daaf6994d390cbde/examples/configs/grpo_math_1B.yaml - +# Copied and edited from https://github.com/NVIDIA-NeMo/RL/blob/e95efb912a6909b5da91ffeb197debe91fd480d8/examples/configs/grpo_math_1B.yaml # GRPO Algorithm Configuration grpo: num_prompts_per_step: 32 @@ -9,7 +8,7 @@ grpo: max_num_steps: 1000000 normalize_rewards: true use_leave_one_out_baseline: true - val_period: 0 + val_period: 0 # disabled val_at_start: false overlong_filtering: false max_val_samples: 256 @@ -34,9 +33,16 @@ grpo: enabled: false # Set to true to enable async training mode # Max age (in training steps) for trajectories used in training max_trajectory_age_steps: 1 + in_flight_weight_updates: false # Set to true to enable in-flight weight updates + recompute_kv_cache_after_weight_updates: false # Set to true to recompute kv cache after in-flight-weight-updates loss_fn: reference_policy_kl_penalty: 0.01 + # Can be set to k1, k2, k3 + # For more details, see http://joschu.net/blog/kl-approx.html + reference_policy_kl_type: "k3" + kl_input_clamp_value: 20.0 + kl_output_clamp_value: 10.0 ratio_clip_min: 0.2 ratio_clip_max: 0.2 ratio_clip_c: null @@ -48,38 +54,42 @@ loss_fn: truncated_importance_sampling_ratio: null sequence_level_importance_ratios: false token_level_loss: true + force_on_policy_ratio: false # Set to true to force ratio=1.0 (requires train_global_batch_size == num_prompts_per_step * num_generations_per_prompt) checkpointing: enabled: true checkpoint_dir: "results/grpo" - metric_name: "val_reward" + metric_name: "val:reward" # one of "val:" or "train:" followed by the metric name higher_is_better: true keep_top_k: 50 save_period: 10 checkpoint_must_save_by: null + model_save_format: "safetensors" + save_consolidated: false policy: model_name: ??? tokenizer: name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default chat_template_kwargs: null # can be used to pass kwargs to the chat template, e.g., enable_thinking=true + hf_config_overrides: {} train_global_batch_size: 512 train_micro_batch_size: 4 generation_batch_size: 32 # Only used when generating using HF backend logprob_batch_size: 4 max_total_sequence_length: 512 precision: "bfloat16" - fsdp_offload_enabled: false - activation_checkpointing_enabled: false - refit_buffer_size_gb: 4 # used for refitting inference engine, the unit is GB tensor_model_parallel_size: 1 pipeline_model_parallel_size: 1 context_parallel_size: 1 lr: 1e-6 weight_decay: 0.01 min_lr: 1e-6 + logprob_chunk_size: null + offload_optimizer_for_logprob: false # Only useful for non-colocated generation since colocated generation will always offload optimizer to cuda before refit dtensor_cfg: + _v2: true enabled: true cpu_offload: False sequence_parallel: false @@ -88,58 +98,9 @@ policy: context_parallel_size: ${policy.context_parallel_size} custom_parallel_plan: null - # dynamic_batching improves performance by ensuring logprob and training microbatches - # have a sufficent number of tokens to maximize GPU utilization. Specifically, variable length - # responses are sorted by sequence length and bucketed into microbatches with a total - # amount of tokens is approximately close to 'train_mb_tokens' and 'logprob_mb_tokens' for the - # training and logprob stages respectively. - # We disable dynamic batching for Megatron as it is incompatible with Pipeline parallelism. - # Instead, we use sequence packing. - dynamic_batching: - enabled: False - train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} - logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} - sequence_length_round: 64 - - sequence_packing: - enabled: True - train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} - logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} - algorithm: "modified_first_fit_decreasing" - sequence_length_round: 64 - #If this value is set to null, it will be automatically assigned in the code. - make_sequence_length_divisible_by: null - max_grad_norm: 1.0 - - optimizer: - name: "torch.optim.AdamW" - kwargs: - lr: ${policy.lr} - weight_decay: ${policy.weight_decay} - betas: [0.9, 0.999] - eps: 1e-8 - # when using Dtensor, we need to set foreach - # and fused to False - foreach: False - fused: False - - scheduler: - - name: "torch.optim.lr_scheduler.LinearLR" - kwargs: - start_factor: 1.0 - end_factor: 1.0 - total_iters: 1 # must be >=1, here it keeps LR constant - - name: "torch.optim.lr_scheduler.CosineAnnealingLR" - kwargs: - T_max: ${grpo.max_num_steps} # total training steps - eta_min: ${policy.lr} # set min_lr = initial_lr -> constant schedule - - milestones: [0] # required to avoid config errors - - - megatron_cfg: enabled: true - empty_unused_memory_level: 0 # setting it to 0 will maximize performance, but it might lead to OOMs, while setting it to 2 to minimize the peak memory usage to avoid OOMs + empty_unused_memory_level: 1 # 1 is the minimum recommendation for RL since we almost always need to offload before beginning generation. Setting to 0 is faster, but you are more likely to run out of GPU memory. activation_checkpointing: false converter_type: "Qwen2ForCausalLM" tensor_model_parallel_size: ${policy.tensor_model_parallel_size} @@ -156,9 +117,12 @@ policy: moe_router_load_balancing_type: "none" # "seq_aux_loss" causes logprob error divergence for grpo moe_router_bias_update_rate: 0.0 # by default, disable bias updates for grpo moe_permute_fusion: false - bias_activation_fusion: True #gives ~20% training perf speedup with sequence packing apply_rope_fusion: True + # gives ~25% training perf speedup with sequence packing and apply_rope_fusion + bias_activation_fusion: True + defer_fp32_logits: False + moe_per_layer_logging: False optimizer: optimizer: "adam" @@ -191,10 +155,10 @@ policy: start_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} end_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} weight_decay_incr_style: "constant" - lr_decay_style: "cosine" + lr_decay_style: "cosine" # This is equivalent to constant unless min_lr is set to smaller value lr_decay_iters: ${grpo.max_num_steps} - lr_warmup_iters: 0 - lr_warmup_init: 1.0e-6 + lr_warmup_iters: 13 + lr_warmup_init: 5.0e-7 distributed_data_parallel_config: grad_reduce_in_fp32: false @@ -204,7 +168,53 @@ policy: use_custom_fsdp: false data_parallel_sharding_strategy: "optim_grads_params" + fp8_cfg: null + + env_vars: null + + # See docs/design-docs/sequence-packing-and-dynamic-batching.md + # for more details on dynamic batching and sequence packing. + dynamic_batching: + enabled: False + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} + sequence_length_round: 64 + + sequence_packing: + enabled: True + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} + algorithm: "modified_first_fit_decreasing" + sequence_length_round: 64 + + # makes the training sequence length divisible by the tensor parallel size + # this is useful for sequence parallel training + make_sequence_length_divisible_by: ${policy.dtensor_cfg.tensor_parallel_size} + max_grad_norm: 1.0 # megatron: Zero means no clipping, FSDP: null means no clipping + optimizer: + name: "torch.optim.AdamW" + kwargs: + lr: ${policy.lr} + weight_decay: ${policy.weight_decay} + betas: [0.9, 0.999] + eps: 1e-8 + # when using Dtensor, we need to set foreach + # and fused to False + foreach: False + fused: False + + scheduler: + - name: "torch.optim.lr_scheduler.LinearLR" + kwargs: + start_factor: 0.1 + end_factor: 1.0 + total_iters: 10 + - name: "torch.optim.lr_scheduler.CosineAnnealingLR" + kwargs: + T_max: ${grpo.max_num_steps} + eta_min: ${policy.min_lr} + - milestones: [10] generation: backend: "vllm" @@ -214,19 +224,34 @@ policy: top_k: null stop_token_ids: null stop_strings: null - vllm_cfg: - async_engine: false # Only for internal testing, will be enabled by https://github.com/NVIDIA/NeMo-RL/issues/447. + mcore_generation_config: # When using megatron for generation + buffer_size_gb: 20 # Total GPU memory (in GB) allocated for KV cache buffers + buffer_guaranteed_fraction: 0.1 # Fraction of buffer reserved for guaranteed active requests + num_cuda_graphs: 16 # Number of CUDA graphs to pre-compile for different batch sizes + block_size_tokens: 256 # Size of each KV cache block in tokens (affects memory granularity) + use_cuda_graphs_for_non_decode_steps: true # Enable CUDA graphs for prefill/context processing + enable_chunked_prefill: true # Split long prefills into chunks for better memory management + unified_memory_level: 0 # Unified memory usage level (0=disabled, higher values enable more aggressive paging) + max_tokens: 16384 # Maximum number of tokens to use in a single step. Analogous to vllm's max_num_batched_tokens + vllm_cfg: # When using vllm for generation + async_engine: false precision: ${policy.precision} + kv_cache_dtype: "auto" tensor_parallel_size: 1 pipeline_parallel_size: 1 expert_parallel_size: 1 # When EP > 1, EP must be a multiple of TP since vLLM's EP = DP * TP gpu_memory_utilization: 0.6 max_model_len: ${policy.max_total_sequence_length} - enable_expert_parallel: false - enforce_eager: True # Set as True to avoid vllm bug - # For most cases, use "dummy" to load the initial weights, since they will be overwritten during refit - # For Gemma models, we need to use "auto" due to a vllm bug - load_format: dummy + # when enforce_eager is False, it is optional to set ++policy.generation.vllm_kwargs.compilation_config.use_inductor=False for better accuracy, + # with the flag, vllm will use the custom CUDA kernels instead of the Triton kernels generated by torch.compile + # for more details, see convergence issue https://github.com/NVIDIA-NeMo/RL/issues/998 + enforce_eager: False + use_deep_gemm: False + num_last_layers_in_bf16: 0 + num_first_layers_in_bf16: 0 + enable_vllm_metrics_logger: true # Set to true to enable vLLM internal metrics logger, turn off for better performance + vllm_metrics_logger_interval: 0.5 # Interval in seconds to collect vLLM logger metrics + vllm_kwargs: {} colocated: # true: generation shares training GPUs # false: uses dedicated generation resources @@ -258,12 +283,15 @@ logger: num_val_samples_to_print: 0 # Number of validation samples to pretty print on terminal wandb_enabled: false tensorboard_enabled: false - mlflow_enabled: false + mlflow_enabled: false # Disable MLflow logging swanlab_enabled: false # Disable SwanLab logging - monitor_gpus: false # If true, will monitor GPU usage and log to wandb and/or tensorboard + monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard wandb: project: "grpo-dev" name: "grpo-dev-logger" + swanlab: + project: "grpo-dev" + name: "grpo-dev-logger" tensorboard: {} mlflow: experiment_name: "grpo-dev" @@ -275,5 +303,3 @@ logger: cluster: gpus_per_node: 1 num_nodes: 1 - -checkpoint_must_save_by: null diff --git a/nemo_skills/training/nemo_rl/configs/sft.yaml b/nemo_skills/training/nemo_rl/configs/sft.yaml index 20cc35ff8d..0c6d470e82 100644 --- a/nemo_skills/training/nemo_rl/configs/sft.yaml +++ b/nemo_skills/training/nemo_rl/configs/sft.yaml @@ -16,7 +16,7 @@ sft: checkpointing: enabled: true checkpoint_dir: "results/sft" - metric_name: "val_loss" + metric_name: "val:val_loss" # one of "val:" or "train:" followed by the metric name higher_is_better: false keep_top_k: 50 save_period: 100 @@ -33,8 +33,7 @@ policy: train_micro_batch_size: 1 max_total_sequence_length: 4096 precision: "bfloat16" - fsdp_offload_enabled: false - activation_checkpointing_enabled: false + offload_optimizer_for_logprob: false tensor_model_parallel_size: 1 pipeline_model_parallel_size: 1 context_parallel_size: 1 @@ -43,9 +42,10 @@ policy: weight_decay: 0.01 min_lr: 1e-6 - dtensor_cfg: + _v2: true enabled: true + env_vars: {} cpu_offload: False sequence_parallel: ${policy.sequence_parallel} activation_checkpointing: false @@ -53,9 +53,23 @@ policy: context_parallel_size: ${policy.context_parallel_size} custom_parallel_plan: null + # LoRA (Low-Rank Adaptation) Configuration + lora_cfg: + enabled: False # Set to True to enable LoRA fine-tuning + target_modules: [] # List of module names to apply LoRA (empty list with match_all_linear=true applies to all linear layers) + exclude_modules: [] # List of module names to exclude from LoRA + match_all_linear: true # If True, applies LoRA to all linear layers (overrides target_modules) + dim: 8 # LoRA rank (r): lower rank = fewer parameters but less capacity. Typical values: 4, 8, 16, 32, 64 + alpha: 32 # LoRA scaling factor: effective learning rate multiplier = alpha/dim. Typical values: 16, 32, 64 + dropout: 0.0 # Dropout probability applied to LoRA layers (0.0 = no dropout) + dropout_position: "post" # Where to apply dropout: "pre" (before LoRA) or "post" (after LoRA) + lora_A_init: "xavier" # Initialization method for LoRA A matrix: "xavier" or "uniform" + use_triton: true # Use Triton-optimized kernels for LoRA (faster but requires flash-attn). Disable when tensor_parallel_size > 1 megatron_cfg: enabled: false + env_vars: {} + empty_unused_memory_level: 1 activation_checkpointing: false tensor_model_parallel_size: ${policy.tensor_model_parallel_size} expert_tensor_parallel_size: 1 @@ -72,10 +86,25 @@ policy: moe_router_bias_update_rate: 1e-3 moe_permute_fusion: false #gives ~20% training perf speedup with sequence packing + apply_rope_fusion: True + # gives ~25% training perf speedup with sequence packing and apply_rope_fusion bias_activation_fusion: True - apply_rope_fusion: True # Only used if position_embedding_type=rope layernorm_epsilon: 1e-6 - empty_unused_memory_level: 0 # setting it to 0 will maximize performance, but it might lead to OOMs, while setting it to 2 to minimize the peak memory usage to avoid OOMs + defer_fp32_logits: False + moe_per_layer_logging: False + + peft: + enabled: false + target_modules: [] + exclude_modules: [] + dim: 8 + alpha: 32 + dropout: 0.0 + dropout_position: "post" + lora_A_init_method: "xavier" + lora_B_init_method: "zero" + a2a_experimental: false + lora_dtype: None optimizer: optimizer: "adam" @@ -98,7 +127,7 @@ policy: use_distributed_optimizer: true use_precision_aware_optimizer: true - # clip_grad: ${policy.max_grad_norm} + clip_grad: ${policy.max_grad_norm} # optimizer cpu offload optimizer_cpu_offload: false @@ -117,22 +146,24 @@ policy: grad_reduce_in_fp32: false overlap_grad_reduce: true overlap_param_gather: true - average_in_collective: true data_parallel_sharding_strategy: "optim_grads_params" + use_custom_fsdp: false dynamic_batching: enabled: false - + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + sequence_length_round: 64 sequence_packing: enabled: True train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} algorithm: "modified_first_fit_decreasing" sequence_length_round: 64 - logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} - #If this value is set to null, it will be automatically assigned in the code. - make_sequence_length_divisible_by: null - max_grad_norm: null + + # makes the training sequence length divisible by the tensor parallel size + # this is useful for sequence parallel training + make_sequence_length_divisible_by: ${policy.dtensor_cfg.tensor_parallel_size} + max_grad_norm: 0.0 # megatron: Zero means no clipping, FSDP: null means no clipping optimizer: name: "torch.optim.AdamW" @@ -155,7 +186,7 @@ policy: - name: "torch.optim.lr_scheduler.CosineAnnealingLR" kwargs: T_max: ${sft.max_num_steps} # total training steps - eta_min: ${policy.lr} # set min_lr = initial_lr -> constant schedule + eta_min: ${policy.min_lr} # set min_lr = initial_lr -> constant schedule - milestones: [0] # required to avoid config errors @@ -182,6 +213,9 @@ logger: wandb: project: "sft-dev" name: "sft-dev-${data.dataset_name}" + swanlab: + project: "sft-dev" + name: "sft-dev-${data.dataset_name}" tensorboard: log_dir: "tb_logs-sft-dev-${data.dataset_name}" mlflow: diff --git a/nemo_skills/training/nemo_rl/convert_dcp_to_hf.py b/nemo_skills/training/nemo_rl/convert_dcp_to_hf.py index c54e5842b9..3405bf2a39 100644 --- a/nemo_skills/training/nemo_rl/convert_dcp_to_hf.py +++ b/nemo_skills/training/nemo_rl/convert_dcp_to_hf.py @@ -16,11 +16,13 @@ # and added logic to figure out max step automatically import argparse +import json import os import re +import shutil +import subprocess import yaml -from nemo_rl.utils.native_checkpoint import convert_dcp_to_hf def parse_args(): @@ -80,6 +82,81 @@ def find_max_step_folder(training_folder, step_override=None): return os.path.join(training_folder, f"step_{chosen_step}") +def is_safetensors_checkpoint(weights_path): + """Check if checkpoint is in the new safetensors format (has model/.hf_metadata/).""" + hf_metadata_path = os.path.join(weights_path, "model", ".hf_metadata") + return os.path.isdir(hf_metadata_path) + + +def copy_tokenizer_files(tokenizer_path, hf_ckpt_path): + """Copy tokenizer files from the original model to the HF checkpoint directory. + + Args: + tokenizer_path: Path to directory containing tokenizer files + hf_ckpt_path: Path to the HF checkpoint directory + """ + tokenizer_files = [ + "tokenizer.json", + "tokenizer_config.json", + "special_tokens_map.json", + "vocab.json", + "merges.txt", + "added_tokens.json", + "chat_template.jinja", + ] + for fname in tokenizer_files: + src = os.path.join(tokenizer_path, fname) + if os.path.exists(src): + shutil.copy2(src, os.path.join(hf_ckpt_path, fname)) + print(f"Copied {fname}") + + +def convert_safetensors_to_hf(weights_path, hf_ckpt_path, model_name, tokenizer_path, hf_overrides=None): + """Convert safetensors checkpoint to HF format using offline_hf_consolidation.py.""" + model_dir = os.path.join(weights_path, "model") + + # Get the path to the consolidation script (same directory as this script) + script_dir = os.path.dirname(os.path.abspath(__file__)) + consolidation_script = os.path.join(script_dir, "offline_hf_consolidation.py") + + # Run the consolidation script using uv with the automodel extra to get nemo_automodel + # Reference: https://github.com/NVIDIA-NeMo/Automodel/blob/main/tools/offline_hf_consolidation.py + cmd = [ + "uv", + "run", + "--active", + "--extra", + "automodel", + "python", + consolidation_script, + "--model-name", + model_name, + "--input-dir", + model_dir, + "--output-dir", + hf_ckpt_path, + ] + + print(f"Running consolidation: {' '.join(cmd)}") + subprocess.run(cmd, check=True) + + # Copy tokenizer files (not handled by offline consolidation) + # TODO: this will fail if config["policy"]["model_name"] isn't a path, but that's not common and we should + # anyway remove this logic when it's properly handled in nemo-rl + copy_tokenizer_files(tokenizer_path, hf_ckpt_path) + + # Apply hf_overrides to config.json if provided + if hf_overrides: + config_path = os.path.join(hf_ckpt_path, "config.json") + with open(config_path, "r") as f: + config = json.load(f) + config.update(hf_overrides) + with open(config_path, "w") as f: + json.dump(config, f, indent=2) + + return hf_ckpt_path + + def main(): """Main entry point.""" args = parse_args() @@ -122,14 +199,28 @@ def main(): if args.max_position_embeddings: hf_overrides["max_position_embeddings"] = args.max_position_embeddings - hf_ckpt = convert_dcp_to_hf( - dcp_ckpt_path=dcp_ckpt_path, - hf_ckpt_path=args.hf_ckpt_path, - model_name_or_path=model_name_or_path, - tokenizer_name_or_path=tokenizer_name_or_path, - overwrite=True, - hf_overrides=hf_overrides, - ) + # Check if checkpoint is in the new safetensors format + if is_safetensors_checkpoint(dcp_ckpt_path): + print("Detected safetensors checkpoint format, using offline consolidation...") + hf_ckpt = convert_safetensors_to_hf( + weights_path=dcp_ckpt_path, + hf_ckpt_path=args.hf_ckpt_path, + model_name=model_name_or_path, + tokenizer_path=tokenizer_name_or_path, + hf_overrides=hf_overrides if hf_overrides else None, + ) + else: + print("Detected DCP checkpoint format, using DCP conversion...") + from nemo_rl.utils.native_checkpoint import convert_dcp_to_hf + + hf_ckpt = convert_dcp_to_hf( + dcp_ckpt_path=dcp_ckpt_path, + hf_ckpt_path=args.hf_ckpt_path, + model_name_or_path=model_name_or_path, + tokenizer_name_or_path=tokenizer_name_or_path, + overwrite=True, + hf_overrides=hf_overrides, + ) print(f"Saved HF checkpoint to: {hf_ckpt}") diff --git a/nemo_skills/training/nemo_rl/offline_hf_consolidation.py b/nemo_skills/training/nemo_rl/offline_hf_consolidation.py new file mode 100644 index 0000000000..5fc1330564 --- /dev/null +++ b/nemo_skills/training/nemo_rl/offline_hf_consolidation.py @@ -0,0 +1,146 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script can be used to consolidate sharded HF safetensors checkpoints +# to the consolidated format. + +# Example model directory structure: +# model/ +# ├── shard-00001-model-00001-of-00001.safetensors +# └── shard-00002-model-00001-of-00001.safetensors +# ... + +# This script works on both single and multiple workers: +# Example usage on 2 GPUs: +# torchrun --nproc-per-node=2 tools/offline_hf_consolidation.py --model-name meta-llama/Llama-3.2-1B --input-dir checkpoints/epoch_0_step_19/model/ --output-dir checkpoints/epoch_0_step_19/model/consolidated/ +# +# Example usage on 1 GPU: +# python tools/offline_hf_consolidation.py --model-name meta-llama/Llama-3.2-1B --input-dir checkpoints/epoch_0_step_19/model/ --output-dir checkpoints/epoch_0_step_19/model/consolidated/ + +# copied from https://github.com/NVIDIA-NeMo/Automodel/blob/main/tools/offline_hf_consolidation.py + +import argparse +import json +import os +import shutil + +import torch +import torch.distributed as dist +from nemo_automodel.components.checkpoint._backports.consolidate_hf_safetensors import ( + consolidate_safetensors_files_on_every_rank, +) +from nemo_automodel.components.distributed.init_utils import ( + get_rank_safe, + get_world_size_safe, + initialize_distributed, +) + + +def copy_metadata_files(input_dir, output_dir): + """ + Copy the metadata files over from the input directory to the output directory. + """ + for item_name in os.listdir(input_dir): + if item_name == "fqn_to_file_index_mapping.json": + continue # this is saved by the consolidation step + src_path = os.path.join(input_dir, item_name) + dst_path = os.path.join(output_dir, item_name) + shutil.move(src_path, dst_path) + shutil.rmtree(input_dir, ignore_errors=True) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Consolidate sharded HF safetensors checkpoints into consolidated files, " + "preserving original sharding layout where possible." + ) + ) + + parser.add_argument( + "--model-name", + "-m", + required=True, + help=( + "Hugging Face repo id (e.g. meta-llama/Llama-3.2-1B) or absolute path to a HF snapshot directory. " + "Used as reference to copy metadata and derive FQN->file index mapping." + ), + ) + parser.add_argument( + "--input-dir", + "-i", + required=True, + help="Directory containing sharded safetensors files to consolidate.", + ) + parser.add_argument( + "--output-dir", + "-o", + required=True, + help="Directory where consolidated safetensors and metadata will be written.", + ) + parser.add_argument( + "--num-threads", + type=int, + default=5, + help="Number of threads for writing consolidated data (default: 5).", + ) + parser.add_argument( + "--backend", + choices=["auto", "nccl", "gloo"], + default="auto", + help="Distributed backend to initialize (default: auto).", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + + backend = args.backend + if backend == "auto": + backend = "nccl" if torch.cuda.device_count() > 0 else "gloo" + initialize_distributed(backend) + + os.makedirs(args.output_dir, exist_ok=True) + + if not os.path.exists(args.input_dir): + raise FileNotFoundError("Could not locate the input directory. Pass an absolute path to the input directory.") + + hf_metadata_dir = os.path.join(args.input_dir, ".hf_metadata") + + if not os.path.exists(hf_metadata_dir) and not os.path.isdir(hf_metadata_dir): + raise FileNotFoundError("Expected to find the .hf_metadata directory in the input directory.") + + with open(os.path.join(hf_metadata_dir, "fqn_to_file_index_mapping.json"), "r") as f: + fqn_to_index_mapping = json.load(f) + + consolidate_safetensors_files_on_every_rank( + args.input_dir, + args.output_dir, + fqn_to_index_mapping, + num_threads=args.num_threads, + ) + + if get_world_size_safe() > 1: + dist.barrier() + + if get_rank_safe() == 0: + copy_metadata_files(hf_metadata_dir, args.output_dir) + + if get_world_size_safe() > 1: + dist.barrier() + + +if __name__ == "__main__": + main() diff --git a/nemo_skills/training/nemo_rl/start_grpo.py b/nemo_skills/training/nemo_rl/start_grpo.py index afe7d3f7be..82e34b2dda 100644 --- a/nemo_skills/training/nemo_rl/start_grpo.py +++ b/nemo_skills/training/nemo_rl/start_grpo.py @@ -328,20 +328,65 @@ def main() -> None: master_config, ) = setup(config, tokenizer, dataset, val_dataset) - grpo_train( - policy, - policy_generation, - dataloader, - val_dataloader, - tokenizer, - loss_fn, - task_to_env, - val_task_to_env, - logger, - checkpointer, - grpo_state, - master_config, - ) + # Check if async mode is enabled + if "async_grpo" in config["grpo"] and config["grpo"]["async_grpo"]["enabled"]: + # Async GRPO does not support dynamic sampling, reward scaling, or reward shaping (DAPO features) + unsupported_features = [ + "use_dynamic_sampling", + "reward_scaling", + "reward_shaping", + ] + + for feature in unsupported_features: + if feature not in config["grpo"]: + continue + + if feature == "use_dynamic_sampling": + if config["grpo"][feature]: + raise NotImplementedError(f"{feature} is not supported with async GRPO") + else: + if config["grpo"][feature]["enabled"]: + raise NotImplementedError(f"{feature} is not supported with async GRPO") + + from nemo_rl.algorithms.grpo import async_grpo_train + + print("🚀 Running async GRPO training") + + async_config = config["grpo"]["async_grpo"] + # Run async GRPO training + async_grpo_train( + policy=policy, + policy_generation=policy_generation, + dataloader=dataloader, + val_dataloader=val_dataloader, + tokenizer=tokenizer, + loss_fn=loss_fn, + task_to_env=task_to_env, + val_task_to_env=val_task_to_env, + logger=logger, + checkpointer=checkpointer, + grpo_save_state=grpo_state, + master_config=master_config, + max_trajectory_age_steps=async_config["max_trajectory_age_steps"], + ) + else: + print("🚀 Running synchronous GRPO training") + + # Run standard GRPO training + grpo_train( + policy, + policy_generation, + dataloader, + val_dataloader, + tokenizer, + loss_fn, + task_to_env, + val_task_to_env, + logger, + checkpointer, + grpo_state, + master_config, + ) if __name__ == "__main__": diff --git a/nemo_skills/utils.py b/nemo_skills/utils.py index 18e6f63c73..db1956e9a0 100644 --- a/nemo_skills/utils.py +++ b/nemo_skills/utils.py @@ -25,8 +25,6 @@ from pathlib import Path from typing import Any, Callable, List, Optional, Union -import fire -from fire import decorators as fire_decorators from rich.logging import RichHandler # isort: off @@ -507,6 +505,11 @@ def check_no_extra_args_fire(): RuntimeError: If the function name is not found in the calling context. ValueError: If extra arguments are found that are not accepted by the function. """ + + # Need to import here since nemo-rl async GRPO data processing imports this file and does not have fire installed on its VLLM uv venv. + import fire + from fire import decorators as fire_decorators + args = sys.argv[1:] # Extract the function name and its arguments from the command-line arguments function_name = args[0] diff --git a/tests/gpu-tests/test_eval.py b/tests/gpu-tests/test_eval.py index 1988dd961e..15e5789f2a 100644 --- a/tests/gpu-tests/test_eval.py +++ b/tests/gpu-tests/test_eval.py @@ -179,7 +179,8 @@ def test_aaa_prepare_and_eval_all_datasets(): # It also needs a special eval arg # TODO: after summarize results works natively with eval groups, we can merge these # TODO: enable bfcl_v4 after figuring out why it's broken in this setup - bfcl_eval_args = "++eval_config.partial_eval=true ++model_name=Qwen/Qwen3-1.7B-FC" + # setting 10 samples as bfcl is brittle when using only 2 + bfcl_eval_args = "++eval_config.partial_eval=true ++model_name=Qwen/Qwen3-1.7B-FC ++max_samples=10" eval( ctx=wrap_arguments(f"{common_ctx} {bfcl_eval_args}"), output_dir=output_dir, diff --git a/tests/gpu-tests/test_train.py b/tests/gpu-tests/test_train.py index fa45c1c113..845bc11eff 100644 --- a/tests/gpu-tests/test_train.py +++ b/tests/gpu-tests/test_train.py @@ -147,10 +147,10 @@ def test_grpo_nemo_rl(backend): grpo_nemo_rl( ctx=wrap_arguments( "++data.prompt.prompt_config=qwen/math-cot " - "++grpo.max_num_steps=5 " "++grpo.num_prompts_per_step=2 " "++policy.max_total_sequence_length=256 " "++policy.dtensor_cfg.tensor_parallel_size=1 " + "++policy.megatron_cfg.scheduler.lr_warmup_iters=2 " "++checkpointing.save_period=2 " "++policy.train_global_batch_size=2 " "++policy.train_micro_batch_size=1 "