diff --git a/docs/model-quirks.md b/docs/model-quirks.md index 7824e8bf78..ec37048469 100644 --- a/docs/model-quirks.md +++ b/docs/model-quirks.md @@ -31,9 +31,12 @@ NeMo-RL uses the vLLM V1 runtime for both synchronous and asynchronous inference ### Context Parallel with FSDP2 -NeMo-RL implemented this feature based on torch CP [implementation](https://github.com/pytorch/pytorch/blob/main/torch/distributed/tensor/experimental/_attention.py). And we inherit its limitations. +- NeMo-RL implemented this feature based on torch CP [implementation](https://github.com/pytorch/pytorch/blob/main/torch/distributed/tensor/experimental/_attention.py). And we inherit its limitations. Whether model level support CP only depends on arguments passed to `torch.nn.functional.scaled_dot_product_attention`. Current NeMo-RL passed all ones attention mask to `model.forward`. For Gemma-3, it won't ignore attention mask as result `attn_bias` is not None which is not supported by torch CP. Please see [assertion](https://github.com/pytorch/pytorch/blob/134179474539648ba7dee1317959529fbd0e7f89/torch/distributed/tensor/experimental/_attention.py#L262) . +- It's a known issue that context parallel can't be used together with sequence parallel. +Refer to [here](https://github.com/NVIDIA-NeMo/RL/issues/659) for more details. + ## vLLM Async Rollout Timeout vLLM async generation has a configurable timeout for waiting for individual sample results. This is particularly important for longer sequences on large models. diff --git a/nemo_rl/models/policy/dtensor_policy_worker.py b/nemo_rl/models/policy/dtensor_policy_worker.py index 68115dd052..f501c978c5 100644 --- a/nemo_rl/models/policy/dtensor_policy_worker.py +++ b/nemo_rl/models/policy/dtensor_policy_worker.py @@ -217,13 +217,26 @@ def __init__( tp_size = self.cfg["dtensor_cfg"]["tensor_parallel_size"] cp_size = self.cfg["dtensor_cfg"]["context_parallel_size"] dp_size = world_size // tp_size // cp_size + sequence_parallel_enabled = self.cfg["dtensor_cfg"]["sequence_parallel"] assert world_size == dp_size * tp_size * cp_size, ( f"World size({world_size}) must equal to dp_size({dp_size}) * tp_size({tp_size}) * cp_size({cp_size}) to use DTensor" ) + if sequence_parallel_enabled and tp_size == 1: + print( + "[WARNING]: sequence_parallel=True, but tp_size=1 which has no effect. Enable tp_size > 1 to use sequence parallelism." + ) + if cp_size > 1: assert not isinstance(self.model, Gemma3ForCausalLM), ( - "Context parallel is not supported for Gemma3ForCausalLM. Torch context parallel has many limitations. Please refer to https://github.com/NVIDIA/NeMo-RL/blob/main/docs/model-quirks.md#context-parallel-with-fsdp2 for more details." + "Context parallel is not supported for Gemma3ForCausalLM. Torch context parallel has many limitations. " + "Please refer to https://github.com/NVIDIA/NeMo-RL/blob/main/docs/model-quirks.md#context-parallel-with-fsdp2 for more details." + ) + + assert not (tp_size > 1 and sequence_parallel_enabled), ( + "It's a known issue that context parallel can't be used together with sequence parallel in DTensor worker. " + "Please either set cp_size = 1 or disable sequence parallel. " + "See https://github.com/NVIDIA-NeMo/RL/issues/659 for more details." ) device_mesh = torch.distributed.device_mesh.init_device_mesh( @@ -247,7 +260,7 @@ def __init__( self.dp_cp_mesh, self.tp_mesh, param_dtype=self.dtype, - sequence_parallel=self.cfg["dtensor_cfg"]["sequence_parallel"], + sequence_parallel=sequence_parallel_enabled, cpu_offload=self.cpu_offload, activation_checkpointing=self.cfg["dtensor_cfg"][ "activation_checkpointing" diff --git a/tests/unit/models/policy/test_dtensor_worker.py b/tests/unit/models/policy/test_dtensor_worker.py index e208873353..fcd0977117 100644 --- a/tests/unit/models/policy/test_dtensor_worker.py +++ b/tests/unit/models/policy/test_dtensor_worker.py @@ -42,7 +42,7 @@ def create_test_config( sequence_parallel: bool = False, cpu_offload: bool = False, activation_checkpointing: bool = False, - custom_parallel_plan: str = None, + custom_parallel_plan: str | None = None, ) -> PolicyConfig: return { "model_name": model_name, @@ -237,7 +237,7 @@ def test_lm_policy_init(policy_setup): @pytest.fixture def training_setup(request, two_gpu_virtual_cluster): """Setup and teardown specifically for training tests.""" - model_name, tp, cp, cpu_offload, sequence_parallel, activation_checkpointing = ( + model_name, tp, cp, sequence_parallel, cpu_offload, activation_checkpointing = ( request.param ) policy = None @@ -246,7 +246,7 @@ def training_setup(request, two_gpu_virtual_cluster): try: config = create_test_config( - model_name, tp, cp, cpu_offload, sequence_parallel, activation_checkpointing + model_name, tp, cp, sequence_parallel, cpu_offload, activation_checkpointing ) tokenizer = get_tokenizer(config["tokenizer"]) print( @@ -300,8 +300,7 @@ def training_setup(request, two_gpu_virtual_cluster): @pytest.mark.parametrize( "training_setup", [ - # model_name, tp, cp, cpu_offload, sequence_parallel, activation_checkpointing - # Split grid over tp/cp/cpu/sp/act across qwen and llama + # model_name tp cp sp cpu act (TEST_ASSETS.TINY_LLAMA_MODEL_PATH, 1, 1, False, False, False), (TEST_ASSETS.TINY_LLAMA_MODEL_PATH, 1, 1, True, False, False), (TEST_ASSETS.TINY_LLAMA_MODEL_PATH, 1, 1, False, True, False), @@ -317,7 +316,14 @@ def training_setup(request, two_gpu_virtual_cluster): (TEST_ASSETS.TINY_QWEN3_MODEL_PATH, 1, 1, False, True, True), (TEST_ASSETS.TINY_QWEN3_MODEL_PATH, 1, 1, True, True, True), (TEST_ASSETS.TINY_QWEN3_MODEL_PATH, 1, 2, False, False, False), - (TEST_ASSETS.TINY_GEMMA3_MODEL_PATH, 1, 1, True, True, False), + ( + TEST_ASSETS.TINY_GEMMA3_MODEL_PATH, + 1, + 1, + True, + True, + False, + ), # gemma3 doesn't support spda (TEST_ASSETS.TINY_GEMMA3_MODEL_PATH, 1, 1, True, False, True), (TEST_ASSETS.TINY_GEMMA3_MODEL_PATH, 1, 1, False, True, True), (TEST_ASSETS.TINY_GEMMA3_MODEL_PATH, 1, 1, True, True, True), @@ -363,7 +369,7 @@ def verify_loss_tensor(loss_tensor): @pytest.fixture def logprob_setup(request, two_gpu_virtual_cluster): """Setup and teardown specifically for training tests.""" - model_name, tp, cp, cpu_offload, sequence_parallel, activation_checkpointing = ( + model_name, tp, cp, sequence_parallel, cpu_offload, activation_checkpointing = ( request.param ) policy = None @@ -371,7 +377,7 @@ def logprob_setup(request, two_gpu_virtual_cluster): try: config = create_test_config( - model_name, tp, cp, cpu_offload, sequence_parallel, activation_checkpointing + model_name, tp, cp, sequence_parallel, cpu_offload, activation_checkpointing ) tokenizer = get_tokenizer(config["tokenizer"]) print( @@ -494,8 +500,9 @@ def test_dtensor_tp_and_tied_model_with_custom_parallel_plan(two_gpu_virtual_clu config = create_test_config( model_name=TEST_ASSETS.TINY_LLAMA_TIED_MODEL_PATH, tp=2, - cpu_offload=False, + cp=1, sequence_parallel=False, + cpu_offload=False, activation_checkpointing=False, custom_parallel_plan=custom_parallel_plan, )