Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/model-quirks.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ NeMo-RL uses the vLLM V1 runtime for both synchronous and asynchronous inference

### Context Parallel with FSDP2

NeMo-RL implemented this feature based on torch CP [implementation](https://github.com/pytorch/pytorch/blob/main/torch/distributed/tensor/experimental/_attention.py). And we inherit its limitations.
- NeMo-RL implemented this feature based on torch CP [implementation](https://github.com/pytorch/pytorch/blob/main/torch/distributed/tensor/experimental/_attention.py). And we inherit its limitations.
Whether model level support CP only depends on arguments passed to `torch.nn.functional.scaled_dot_product_attention`. Current NeMo-RL passed all ones attention mask to `model.forward`. For Gemma-3, it won't ignore attention mask as result `attn_bias` is not None which is not supported by torch CP. Please see [assertion](https://github.com/pytorch/pytorch/blob/134179474539648ba7dee1317959529fbd0e7f89/torch/distributed/tensor/experimental/_attention.py#L262) .

- It's a known issue that context parallel can't be used together with sequence parallel.
Refer to [here](https://github.com/NVIDIA-NeMo/RL/issues/659) for more details.

## vLLM Async Rollout Timeout

vLLM async generation has a configurable timeout for waiting for individual sample results. This is particularly important for longer sequences on large models.
Expand Down
17 changes: 15 additions & 2 deletions nemo_rl/models/policy/dtensor_policy_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,26 @@ def __init__(
tp_size = self.cfg["dtensor_cfg"]["tensor_parallel_size"]
cp_size = self.cfg["dtensor_cfg"]["context_parallel_size"]
dp_size = world_size // tp_size // cp_size
sequence_parallel_enabled = self.cfg["dtensor_cfg"]["sequence_parallel"]
assert world_size == dp_size * tp_size * cp_size, (
f"World size({world_size}) must equal to dp_size({dp_size}) * tp_size({tp_size}) * cp_size({cp_size}) to use DTensor"
)

if sequence_parallel_enabled and tp_size == 1:
print(
"[WARNING]: sequence_parallel=True, but tp_size=1 which has no effect. Enable tp_size > 1 to use sequence parallelism."
)

if cp_size > 1:
assert not isinstance(self.model, Gemma3ForCausalLM), (
"Context parallel is not supported for Gemma3ForCausalLM. Torch context parallel has many limitations. Please refer to https://github.com/NVIDIA/NeMo-RL/blob/main/docs/model-quirks.md#context-parallel-with-fsdp2 for more details."
"Context parallel is not supported for Gemma3ForCausalLM. Torch context parallel has many limitations. "
"Please refer to https://github.com/NVIDIA/NeMo-RL/blob/main/docs/model-quirks.md#context-parallel-with-fsdp2 for more details."
)

assert not (tp_size > 1 and sequence_parallel_enabled), (
"It's a known issue that context parallel can't be used together with sequence parallel in DTensor worker. "
"Please either set cp_size = 1 or disable sequence parallel. "
"See https://github.com/NVIDIA-NeMo/RL/issues/659 for more details."
)

device_mesh = torch.distributed.device_mesh.init_device_mesh(
Expand All @@ -247,7 +260,7 @@ def __init__(
self.dp_cp_mesh,
self.tp_mesh,
param_dtype=self.dtype,
sequence_parallel=self.cfg["dtensor_cfg"]["sequence_parallel"],
sequence_parallel=sequence_parallel_enabled,
cpu_offload=self.cpu_offload,
activation_checkpointing=self.cfg["dtensor_cfg"][
"activation_checkpointing"
Expand Down
25 changes: 16 additions & 9 deletions tests/unit/models/policy/test_dtensor_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def create_test_config(
sequence_parallel: bool = False,
cpu_offload: bool = False,
activation_checkpointing: bool = False,
custom_parallel_plan: str = None,
custom_parallel_plan: str | None = None,
) -> PolicyConfig:
return {
"model_name": model_name,
Expand Down Expand Up @@ -237,7 +237,7 @@ def test_lm_policy_init(policy_setup):
@pytest.fixture
def training_setup(request, two_gpu_virtual_cluster):
"""Setup and teardown specifically for training tests."""
model_name, tp, cp, cpu_offload, sequence_parallel, activation_checkpointing = (
model_name, tp, cp, sequence_parallel, cpu_offload, activation_checkpointing = (
request.param
)
policy = None
Expand All @@ -246,7 +246,7 @@ def training_setup(request, two_gpu_virtual_cluster):

try:
config = create_test_config(
model_name, tp, cp, cpu_offload, sequence_parallel, activation_checkpointing
model_name, tp, cp, sequence_parallel, cpu_offload, activation_checkpointing
)
tokenizer = get_tokenizer(config["tokenizer"])
print(
Expand Down Expand Up @@ -300,8 +300,7 @@ def training_setup(request, two_gpu_virtual_cluster):
@pytest.mark.parametrize(
"training_setup",
[
# model_name, tp, cp, cpu_offload, sequence_parallel, activation_checkpointing
# Split grid over tp/cp/cpu/sp/act across qwen and llama
# model_name tp cp sp cpu act
(TEST_ASSETS.TINY_LLAMA_MODEL_PATH, 1, 1, False, False, False),
(TEST_ASSETS.TINY_LLAMA_MODEL_PATH, 1, 1, True, False, False),
(TEST_ASSETS.TINY_LLAMA_MODEL_PATH, 1, 1, False, True, False),
Expand All @@ -317,7 +316,14 @@ def training_setup(request, two_gpu_virtual_cluster):
(TEST_ASSETS.TINY_QWEN3_MODEL_PATH, 1, 1, False, True, True),
(TEST_ASSETS.TINY_QWEN3_MODEL_PATH, 1, 1, True, True, True),
(TEST_ASSETS.TINY_QWEN3_MODEL_PATH, 1, 2, False, False, False),
(TEST_ASSETS.TINY_GEMMA3_MODEL_PATH, 1, 1, True, True, False),
(
TEST_ASSETS.TINY_GEMMA3_MODEL_PATH,
1,
1,
True,
True,
False,
), # gemma3 doesn't support spda
(TEST_ASSETS.TINY_GEMMA3_MODEL_PATH, 1, 1, True, False, True),
(TEST_ASSETS.TINY_GEMMA3_MODEL_PATH, 1, 1, False, True, True),
(TEST_ASSETS.TINY_GEMMA3_MODEL_PATH, 1, 1, True, True, True),
Expand Down Expand Up @@ -363,15 +369,15 @@ def verify_loss_tensor(loss_tensor):
@pytest.fixture
def logprob_setup(request, two_gpu_virtual_cluster):
"""Setup and teardown specifically for training tests."""
model_name, tp, cp, cpu_offload, sequence_parallel, activation_checkpointing = (
model_name, tp, cp, sequence_parallel, cpu_offload, activation_checkpointing = (
request.param
)
policy = None
data = None

try:
config = create_test_config(
model_name, tp, cp, cpu_offload, sequence_parallel, activation_checkpointing
model_name, tp, cp, sequence_parallel, cpu_offload, activation_checkpointing
)
tokenizer = get_tokenizer(config["tokenizer"])
print(
Expand Down Expand Up @@ -494,8 +500,9 @@ def test_dtensor_tp_and_tied_model_with_custom_parallel_plan(two_gpu_virtual_clu
config = create_test_config(
model_name=TEST_ASSETS.TINY_LLAMA_TIED_MODEL_PATH,
tp=2,
cpu_offload=False,
cp=1,
sequence_parallel=False,
cpu_offload=False,
activation_checkpointing=False,
custom_parallel_plan=custom_parallel_plan,
)
Expand Down
Loading