diff --git a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml new file mode 100644 index 0000000000..9b9e2009c2 --- /dev/null +++ b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml @@ -0,0 +1,270 @@ +grpo: + max_num_epochs: 1 + num_prompts_per_step: 8 + num_generations_per_prompt: 4 + max_rollout_turns: 1 # for multi-turn rollouts. Workplace assistant has 1 turn but can have up to 6 tool-calling steps + max_num_steps: 72 + normalize_rewards: true + use_leave_one_out_baseline: true + val_period: 6 + val_at_start: true + overlong_filtering: false + max_val_samples: null # inferred from size of val dataset. for multi evals, repeat val ds via `num_repeats` in `ng_prepare_data`. + val_batch_size: null + seed: 42 + use_dynamic_sampling: false + dynamic_sampling_max_gen_batches: 10 + batch_multiplier: 1 + reward_shaping: + enabled: false + overlong_buffer_length: 128 + overlong_buffer_penalty: 1 + max_response_length: ${policy.max_total_sequence_length} + reward_scaling: + enabled: false + source_min: 0.0 + source_max: 1.0 + target_min: 0.0 + target_max: 1.0 + skip_reference_policy_logprobs_calculation: true + +loss_fn: + reference_policy_kl_penalty: 0 + reference_policy_kl_type: "k3" + kl_input_clamp_value: 20.0 + kl_output_clamp_value: 10.0 + ratio_clip_min: 0.2 + ratio_clip_max: 0.2 + ratio_clip_c: null + # (default off) loss formulation improvements (docs/guides/grpo.md#loss) + use_on_policy_kl_approximation: false + truncated_importance_sampling_ratio: null + use_importance_sampling_correction: false + token_level_loss: true + +checkpointing: + enabled: true + checkpoint_dir: "results/grpo-workplace-assistant-nemotron-nano-v2-9b" + metric_name: "val:accuracy" + higher_is_better: true + keep_top_k: 3 + save_period: 6 # Save checkpoint every 6 steps (aligned with val_period) + checkpoint_must_save_by: null + +policy: + model_name: "nvidia/NVIDIA-Nemotron-Nano-9B-v2" + tokenizer: + name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default + chat_template_kwargs: null # can be used to pass kwargs to the chat template, e.g., enable_thinking=true + hf_config_overrides: {} + train_global_batch_size: ${mul:${grpo.num_prompts_per_step}, ${grpo.num_generations_per_prompt}} # Match the total rollouts per step + train_micro_batch_size: 1 + logprob_batch_size: 1 + generation_batch_size: 32 # Only used when generating using HF backend + max_total_sequence_length: 28672 + precision: "bfloat16" + logprob_chunk_size: null # Disabled to allow defer_fp32_logits: false + + dtensor_cfg: + _v2: false + enabled: false + cpu_offload: False + sequence_parallel: false + activation_checkpointing: true + tensor_parallel_size: 2 + context_parallel_size: 1 + custom_parallel_plan: null + clear_cache_every_n_steps: null + + megatron_cfg: + enabled: true + bias_activation_fusion: false + tensor_model_parallel_size: 8 + # We might want to consider setting this value higher (e.g. to 1) and raising the vllm generation max mem utilization + empty_unused_memory_level: 0 + activation_checkpointing: true + # train_iters needs to be large enough to cover all training steps + train_iters: 100000 + expert_tensor_parallel_size: 1 + expert_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + num_layers_in_first_pipeline_stage: null + num_layers_in_last_pipeline_stage: null + context_parallel_size: 1 + pipeline_dtype: ${policy.precision} + sequence_parallel: false + freeze_moe_router: true + moe_router_dtype: "fp64" + moe_router_load_balancing_type: "none" # "seq_aux_loss" causes logprob error divergence for grpo + moe_router_bias_update_rate: 0.0 # by default, disable bias updates for grpo + #gives ~20% training perf speedup with sequence packing + apply_rope_fusion: True + defer_fp32_logits: false + moe_permute_fusion: false + + optimizer: + optimizer: "adam" + lr: 5.0e-6 + min_lr: 5.0e-7 + weight_decay: 0.01 + bf16: true + fp16: false + params_dtype: "float32" + + #adam + adam_beta1: 0.9 + adam_beta2: 0.999 + adam_eps: 1e-8 + + #sgd + sgd_momentum: 0.9 + + #distributed optimizer + use_distributed_optimizer: true + use_precision_aware_optimizer: true + + # optimizer cpu offload + optimizer_cpu_offload: false + optimizer_offload_fraction: 0.0 + + clip_grad: ${policy.max_grad_norm} + + scheduler: + start_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} + end_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} + weight_decay_incr_style: "constant" + lr_decay_style: "constant" + lr_decay_iters: 100000 # Must be greater than lr_warmup_iters + lr_warmup_iters: 13 + lr_warmup_init: 5.0e-7 + # override_opt_param_scheduler: true + + distributed_data_parallel_config: + grad_reduce_in_fp32: false + overlap_grad_reduce: true + overlap_param_gather: true + use_custom_fsdp: false + data_parallel_sharding_strategy: "optim_grads_params" + + env_vars: null + + # See docs/design-docs/sequence-packing-and-dynamic-batching.md + # for more details on dynamic batching and sequence packing. + dynamic_batching: + enabled: False + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} + sequence_length_round: 64 + + sequence_packing: + enabled: false + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} + algorithm: "modified_first_fit_decreasing" + sequence_length_round: 64 + + # makes the training sequence length divisible by the tensor parallel size + # this is useful for sequence parallel training + make_sequence_length_divisible_by: 1 + max_grad_norm: 1.0 + offload_optimizer_for_logprob: false # Only useful for non-colocated generation since colocated generation will always offload optimizer to cuda before refit + + optimizer: null + + scheduler: + - name: "torch.optim.lr_scheduler.ConstantLR" + kwargs: + factor: 1.0 + total_iters: 10000000000 + - milestones: [] + + generation: + backend: "vllm" + max_new_tokens: ${policy.max_total_sequence_length} + temperature: 1.0 + top_p: 1.0 + top_k: null + stop_token_ids: null + stop_strings: null + vllm_cfg: + async_engine: true + precision: ${policy.precision} + # Must match megatron_cfg.tensor_model_parallel_size + tensor_parallel_size: 8 + pipeline_parallel_size: 1 + enable_expert_parallel: false + expert_parallel_size: 1 + gpu_memory_utilization: 0.7 + max_model_len: ${policy.max_total_sequence_length} + enforce_eager: false + use_deep_gemm: False + num_last_layers_in_bf16: 0 + num_first_layers_in_bf16: 0 + kv_cache_dtype: "auto" + expose_http_server: true + skip_tokenizer_init: false + http_server_serving_chat_kwargs: + # Workplace assistant uses 26 tools, so we enable auto_tools. + # For Nemotron Nano v2, we use the dedicated `nemotron_json` tool parser, + # registered via `nemotron_toolcall_parser_no_streaming.py`. + enable_auto_tools: true + tool_parser: nemotron_json + vllm_kwargs: + compilation_config: + # when enforce_eager is False, set ++policy.generation.vllm_kwargs.compilation_config.use_inductor=False for better accuracy, + # with the flag, vllm will use the custom CUDA kernels instead of the Triton kernels generated by torch.compile + # for more details, see convergence issue https://github.com/NVIDIA-NeMo/RL/issues/998 + use_inductor: False + mamba_ssm_cache_dtype: "float32" + colocated: + # true: generation shares training GPUs + # false: uses dedicated generation resources + enabled: true + # only relevant when enabled is false + resources: + gpus_per_node: null # Decides num gpus to be dedicated to generation when there is one node in the cluster i.e cluster.num_nodes == 1 + num_nodes: null # Decides number of nodes to be dedicated to generation + +data: + # Using the prepared train and validation datasets (downloaded from HuggingFace and split 90/10) + # Train: 1129 samples, Validation: 126 samples + train_jsonl_fpath: 3rdparty/Gym-workspace/Gym/resources_servers/workplace_assistant/data/train.jsonl + validation_jsonl_fpath: 3rdparty/Gym-workspace/Gym/resources_servers/workplace_assistant/data/validation.jsonl + agent_name: workplace_assistant_simple_agent + shuffle: true + num_workers: 0 + +env: + should_use_nemo_gym: true + should_log_nemo_gym_responses: true # If you have low logging storage, set this to false + nemo_gym: # This is passed into NeMo-Gym as the initial_global_config_dict + config_paths: + - responses_api_models/vllm_model/configs/vllm_model_for_training.yaml # Required! And it must be *for_training + - resources_servers/workplace_assistant/configs/workplace_assistant.yaml + workplace_assistant_simple_agent: + responses_api_agents: + simple_agent: + max_steps: 6 # Workplace assistant allows up to 6 tool-calling steps per task + +logger: + log_dir: "logs/grpo-workplace-assistant-nemotron-nano-v2-9b" # Base directory for all logs + num_val_samples_to_print: 0 # Number of validation samples to pretty print on terminal + wandb_enabled: true + tensorboard_enabled: false + mlflow_enabled: false # Disable MLflow logging + swanlab_enabled: false + monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard + wandb: + project: "grpo-workplace-assistant" + name: "nemotron-nano-v2-9b-workplace-assistant" + tensorboard: {} + mlflow: + experiment_name: "grpo-workplace-assistant" + run_name: "nemotron-nano-v2-9b-workplace-assistant" + gpu_monitoring: + collection_interval: 10 # How often to collect GPU usage metrics (in seconds) + flush_interval: 10 # How often to flush GPU usage metrics to the loggers (in seconds) + +cluster: + gpus_per_node: 8 + num_nodes: 1 # Single node by default; set to 2+ for multi-node training diff --git a/nemo_rl/models/generation/vllm/nemotron_toolcall_parser_no_streaming.py b/nemo_rl/models/generation/vllm/nemotron_toolcall_parser_no_streaming.py new file mode 100644 index 0000000000..dfcf1a4c22 --- /dev/null +++ b/nemo_rl/models/generation/vllm/nemotron_toolcall_parser_no_streaming.py @@ -0,0 +1,140 @@ +""" +Nemotron JSON tool parser for vLLM, adapted from +`nvidia/NVIDIA-Nemotron-Nano-9B-v2` on Hugging Face: +`https://huggingface.co/nvidia/NVIDIA-Nemotron-Nano-9B-v2/blob/main/nemotron_toolcall_parser_no_streaming.py`. + +The original file is licensed under the NVIDIA Open Model License / +Apache-2.0-equivalent terms; this variant is trimmed to the pieces needed +for NeMo RL non-streaming tool calling. +""" + +from __future__ import annotations + +import json +import re +from collections.abc import Sequence +from typing import Union + +from vllm.entrypoints.openai.protocol import ( + ChatCompletionRequest, + DeltaMessage, + ExtractedToolCallInformation, + FunctionCall, + ToolCall, +) +from vllm.entrypoints.openai.tool_parsers.abstract_tool_parser import ( + ToolParser, + ToolParserManager, +) +from vllm.logger import init_logger +from vllm.transformers_utils.tokenizer import AnyTokenizer + +logger = init_logger(__name__) + + +@ToolParserManager.register_module("nemotron_json") +class NemotronJSONToolParser(ToolParser): + """ + Simple tool parser for Nemotron-Nano v2 models using ... + JSON blocks in the assistant output. + """ + + def __init__(self, tokenizer: AnyTokenizer): + super().__init__(tokenizer) + + self.current_tool_name_sent: bool = False + self.prev_tool_call_arr: list[dict] = [] + self.current_tool_id: int = -1 + self.streamed_args_for_tool: list[str] = [] + + self.tool_call_start_token: str = "" + self.tool_call_end_token: str = "" + + self.tool_call_regex = re.compile( + r"(.*?)", re.DOTALL + ) + + def extract_tool_calls( + self, + model_output: str, + request: ChatCompletionRequest, + ) -> ExtractedToolCallInformation: + """ + Non-streaming extraction: look for a single ... + block containing a JSON list of tool calls. + """ + if self.tool_call_start_token not in model_output: + return ExtractedToolCallInformation( + tools_called=False, + tool_calls=[], + content=model_output, + ) + + try: + # Grab the JSON substring inside the TOOLCALL tags. + str_tool_calls = self.tool_call_regex.findall(model_output)[0].strip() + if not str_tool_calls.startswith("["): + str_tool_calls = "[" + str_tool_calls + if not str_tool_calls.endswith("]"): + str_tool_calls = str_tool_calls + "]" + + json_tool_calls = json.loads(str_tool_calls) + tool_calls: list[ToolCall] = [] + for tool_call in json_tool_calls: + try: + args = tool_call.get("arguments") + if isinstance(args, dict): + args_str = json.dumps(args, ensure_ascii=False) + else: + args_str = args + + tool_calls.append( + ToolCall( + type="function", + function=FunctionCall( + name=tool_call["name"], + arguments=args_str, + ), + ) + ) + except Exception: + # Skip malformed tool call entries rather than failing hard. + continue + + content = model_output[: model_output.rfind(self.tool_call_start_token)] + + return ExtractedToolCallInformation( + tools_called=bool(tool_calls), + tool_calls=tool_calls, + content=content if content else None, + ) + + except Exception: + logger.exception( + "Error in extracting tool call from response. Response: %s", + model_output, + ) + return ExtractedToolCallInformation( + tools_called=False, + tool_calls=[], + content=model_output, + ) + + def extract_tool_calls_streaming( + self, + previous_text: str, + current_text: str, + delta_text: str, + previous_token_ids: Sequence[int], + current_token_ids: Sequence[int], + delta_token_ids: Sequence[int], + request: ChatCompletionRequest, + ) -> Union[DeltaMessage, None]: + """ + Streaming tool calling is not supported in this simplified parser. + """ + raise NotImplementedError( + "Tool calling is not supported in streaming mode for NemotronJSONToolParser." + ) + + diff --git a/nemo_rl/models/generation/vllm/vllm_worker_async.py b/nemo_rl/models/generation/vllm/vllm_worker_async.py index c8ab9fcf2a..6742e29661 100644 --- a/nemo_rl/models/generation/vllm/vllm_worker_async.py +++ b/nemo_rl/models/generation/vllm/vllm_worker_async.py @@ -25,6 +25,20 @@ import uvicorn from fastapi import FastAPI +# Register the Nemotron JSON tool parser with vLLM if available. +# This ensures that using `tool_parser: nemotron_json` in the NeMo RL +# config will succeed without requiring users to manually import the +# plugin in their own code. +try: # pragma: no cover - optional runtime dependency + from nemo_rl.models.generation.vllm import ( # noqa: F401 + nemotron_toolcall_parser_no_streaming, + ) +except Exception: + # If the module is missing for some reason, vLLM will simply not + # have the `nemotron_json` parser registered and will raise a + # clear error when it is requested. + pass + from nemo_rl.distributed.batched_data_dict import BatchedDataDict from nemo_rl.distributed.virtual_cluster import _get_free_port_local, _get_node_ip_local from nemo_rl.distributed.worker_group_utils import get_nsight_config_if_pattern_matches @@ -106,9 +120,13 @@ def _replace_prefix_tokens( if model_prefix_token_ids[-1] == eos_token_id: model_cut_end -= 1 - # We take everything starting with the EOS token ID. + # We take everything starting with the EOS token ID in the shared prefix + # between template_prefix_token_ids and template_token_ids. template_cut_start = -1 - for pos in reversed(range(len(template_prefix_token_ids))): + # Guard against pathological cases where template_prefix_token_ids may be + # longer than template_token_ids by only iterating over the common length. + max_pos = min(len(template_prefix_token_ids), len(template_token_ids)) + for pos in reversed(range(max_pos)): if template_token_ids[pos] == eos_token_id: template_cut_start = pos break