Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/configs/dpo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ policy:
tensor_parallel_size: 1
context_parallel_size: 1
custom_parallel_plan: null
clear_cache_every_n_steps: null

dynamic_batching:
enabled: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ policy:
tensor_parallel_size: 8
context_parallel_size: 1
custom_parallel_plan: null
clear_cache_every_n_steps: 1
env_vars:
PYTORCH_CUDA_ALLOC_CONF: "max_split_size_mb:64"

Expand Down
1 change: 1 addition & 0 deletions nemo_rl/models/policy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class DTensorConfig(TypedDict):
tensor_parallel_size: NotRequired[int]
context_parallel_size: NotRequired[int]
custom_parallel_plan: NotRequired[str]
clear_cache_every_n_steps: NotRequired[int]


class SequencePackingConfig(TypedDict):
Expand Down
13 changes: 12 additions & 1 deletion nemo_rl/models/policy/dtensor_policy_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import gc
import itertools
import os
import warnings
from collections import defaultdict
from contextlib import AbstractContextManager, contextmanager, nullcontext
from typing import Any, Generator, Iterable, Optional, Set, Union, cast
Expand Down Expand Up @@ -629,10 +630,20 @@ def train(
mb_iterator = batch.make_microbatch_iterator(mbs)
iterator_len = batch.size // mbs

empty_cache_steps = self.cfg.get("dtensor_cfg", {}).get(
"empty_cache_every_n_steps"
)
if empty_cache_steps:
warnings.warn(
f"Emptying cache every {empty_cache_steps} microbatches, doing so unnnecessarily would incur a large performance overhead."
)

for mb_idx, mb in enumerate(
itertools.chain(mb_iterator, dummy_iterator)
):
torch.cuda.empty_cache()
# Conditioanlly empty cache when sensitive to fragmentation
if empty_cache_steps and mb_idx % empty_cache_steps == 0:
torch.cuda.empty_cache()

with torch.autocast(device_type="cuda", dtype=self.dtype):
if self.enable_seq_packing:
Expand Down
13 changes: 12 additions & 1 deletion nemo_rl/models/policy/dtensor_policy_worker_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import gc
import itertools
import os
import warnings
from collections import defaultdict
from contextlib import AbstractContextManager, contextmanager, nullcontext
from typing import Any, Generator, Iterable, Optional, cast
Expand Down Expand Up @@ -573,10 +574,20 @@ def train(
mb_iterator = batch.make_microbatch_iterator(mbs)
iterator_len = batch.size // mbs

empty_cache_steps = self.cfg.get("dtensor_cfg", {}).get(
"empty_cache_every_n_steps"
)
if empty_cache_steps:
warnings.warn(
f"Emptying cache every {empty_cache_steps} microbatches, doing so unnnecessarily would incur a large performance overhead."
)

for mb_idx, mb in enumerate(
itertools.chain(mb_iterator, dummy_iterator)
):
torch.cuda.empty_cache()
# Conditioanlly empty cache when sensitive to fragmentation
if empty_cache_steps and mb_idx % empty_cache_steps == 0:
torch.cuda.empty_cache()

with torch.autocast(device_type="cuda", dtype=self.dtype):
if self.enable_seq_packing:
Expand Down