Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/configs/evals/eval.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ generation:
precision: "bfloat16"
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.9
max_model_len: 2048
enforce_eager: False
Expand Down
2 changes: 1 addition & 1 deletion examples/configs/grpo_math_1B.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1 # When EP > 1, EP must be a multiple of TP since vLLM's EP = DP * TP
gpu_memory_utilization: 0.6
max_model_len: ${policy.max_total_sequence_length}
# when enforce_eager is False, it is optional to set ++policy.generation.vllm_kwargs.compilation_config.use_inductor=False for better accuracy,
Expand Down
2 changes: 1 addition & 1 deletion examples/configs/grpo_sliding_puzzle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ policy:
async_engine: false
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: ${policy.max_total_sequence_length}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.8
enforce_eager: True
max_model_len: ${policy.max_total_sequence_length}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: ${policy.max_total_sequence_length}
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 512
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 4
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 16384
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: ${policy.max_total_sequence_length}
enforce_eager: True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ policy:
precision: 'fp8'
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 4096
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 4096
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 4096
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 512
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 512
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 4
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: ${policy.max_total_sequence_length}
# NB(pjin): https://github.com/NVIDIA-NeMo/RL/pull/857
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 4
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 16384
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 4
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 16384
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 4
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 4096
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 4
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 4096
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: 512
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: ${policy.max_total_sequence_length}
enforce_eager: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: ${policy.max_total_sequence_length}
enforce_eager: False
Expand Down
2 changes: 1 addition & 1 deletion examples/configs/vlm_grpo_3B.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ policy:
precision: ${policy.precision}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enable_expert_parallel: false
expert_parallel_size: 1
gpu_memory_utilization: 0.6
max_model_len: ${policy.max_total_sequence_length}
enforce_eager: False
Expand Down
32 changes: 24 additions & 8 deletions nemo_rl/distributed/virtual_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,10 @@ def world_size(self) -> int:
def node_count(self) -> int:
return sum(1 for count in self._bundle_ct_per_node_list if count > 0)

def get_master_address_and_port(self) -> tuple[str, int]:
"""Gets the master address and port for the distributed training setup.
def get_available_address_and_port(
self, pg_idx: int, bundle_idx: int
) -> tuple[str, int]:
"""Gets an available address and port for the given placement group index and bundle index.

Returns:
Tuple of (address, port)
Expand All @@ -361,23 +363,37 @@ def get_master_address_and_port(self) -> tuple[str, int]:
if not self._node_placement_groups:
self.get_placement_groups()

# Use the first bundle of the first placement group
# This works for both unified PG and per-node PGs
pg = self.get_placement_groups()[0]
# Get the placement group
placement_groups = self.get_placement_groups()
if len(placement_groups) == 1:
pg = placement_groups[0]
else:
pg = placement_groups[pg_idx]

if pg.bundle_specs:
# Launch port finder on the first bundle of this placement group
# Launch port finder on the given bundle of this placement group
addr, port = ray.get(
_get_node_ip_and_free_port.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=0
placement_group=pg, placement_group_bundle_index=bundle_idx
),
# Need to explicitly set to 0 since it's possible for this to be unschedulable if all CPUs are already in use.
num_cpus=0,
).remote()
)
return addr, port

raise RuntimeError("No valid placement groups found to get master address")
raise RuntimeError(
"No valid placement groups found to get available address and port"
)

def get_master_address_and_port(self) -> tuple[str, int]:
"""Gets the master address and port for the distributed training setup.

Returns:
Tuple of (address, port)
"""
return self.get_available_address_and_port(pg_idx=0, bundle_idx=0)

def shutdown(self) -> bool:
"""Cleans up and releases all resources associated with this virtual cluster.
Expand Down
13 changes: 13 additions & 0 deletions nemo_rl/distributed/worker_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,17 @@ def _create_workers_from_bundle_indices(
# Get all placement groups
placement_groups = self.cluster.get_placement_groups()

# Get available address and port for each worker
available_addresses = []
available_ports = []
for group_idx, (pg_idx, local_bundle_indices) in enumerate(bundle_indices_list):
for local_rank, bundle_idx in enumerate(local_bundle_indices):
addr, port = self.cluster.get_available_address_and_port(
pg_idx, bundle_idx
)
available_addresses.append(addr)
available_ports.append(port)

for group_idx, (pg_idx, local_bundle_indices) in enumerate(bundle_indices_list):
current_group = []

Expand All @@ -478,6 +489,8 @@ def _create_workers_from_bundle_indices(
"MASTER_ADDR": self.master_address,
"MASTER_PORT": str(self.master_port),
"NODE_RANK": str(pg_idx),
"AVAILABLE_ADDR_LIST": str(available_addresses),
"AVAILABLE_PORT_LIST": str(available_ports),
}
)
worker_env_vars.pop("RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES", None)
Expand Down
2 changes: 1 addition & 1 deletion nemo_rl/models/generation/vllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class VllmSpecificArgs(TypedDict):
tensor_parallel_size: int
pipeline_parallel_size: int
enable_expert_parallel: bool
expert_parallel_size: int
gpu_memory_utilization: float
max_model_len: int
# Additional arguments for vLLM inserted by nemo rl based on the context of when vllm is used
Expand Down
44 changes: 35 additions & 9 deletions nemo_rl/models/generation/vllm/vllm_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,37 @@ def __init__(
"""Initialize a vLLM policy with distributed workers."""
# Store config
self.cfg = config
if self.cfg["vllm_cfg"]["pipeline_parallel_size"] > 1:
self.tp_size = self.cfg["vllm_cfg"]["tensor_parallel_size"]
self.pp_size = self.cfg["vllm_cfg"]["pipeline_parallel_size"]
self.ep_size = self.cfg["vllm_cfg"]["expert_parallel_size"]
self.model_parallel_size = self.tp_size * self.pp_size

assert cluster.world_size() % self.model_parallel_size == 0, (
"World size must be a multiple of model parallel size. "
f"Got world size {cluster.world_size()} and model parallel size (TP * PP) {self.model_parallel_size}."
)
self.dp_size = cluster.world_size() // self.model_parallel_size
self.vllm_dp_size = self.ep_size // self.tp_size

if self.pp_size > 1:
assert self.cfg["vllm_cfg"]["async_engine"], (
"When pipeline_parallel_size > 1, async_engine must be set to True in the vLLM configuration. "
"You can enable it by adding `policy.generation.vllm_cfg.async_engine=true` to your command."
)

if self.ep_size > 1:
assert self.ep_size % self.tp_size == 0, (
"When EP > 1, EP must be a multiple of TP since vLLM's EP = DP * TP. "
"Please update your configuration to set expert_parallel_size to a multiple of tensor_parallel_size."
)
if self.ep_size != self.tp_size:
# vLLM's EP = DP * TP, so here we need to use DP inside vLLM.
assert not self.cfg["vllm_cfg"]["async_engine"], (
"vLLM async_engine has some issues when using DP inside vLLM. "
"Please update your configuration to set `policy.generation.vllm_cfg.async_engine=false`. "
"See https://github.com/NVIDIA-NeMo/RL/issues/1101 for more details."
)

# Validate sampling parameters early to avoid resource allocation with unsupported configs.
# The vLLM sampler patch only supports temperature scaling and does not handle top_p/top_k correctly.
# However, we allow values above certain thresholds for token filtering purposes.
Expand Down Expand Up @@ -101,15 +126,10 @@ def __init__(

self.sharding_annotations = NamedSharding(
layout=np.arange(cluster.world_size()).reshape(
-1, # DP
config["vllm_cfg"]["pipeline_parallel_size"], # PP
config["vllm_cfg"]["tensor_parallel_size"], # TP
self.dp_size, self.pp_size, self.tp_size
),
names=["data_parallel", "pipeline_parallel", "tensor_parallel"],
)
self.model_parallel_size = self.sharding_annotations.get_axis_size(
"tensor_parallel"
) * self.sharding_annotations.get_axis_size("pipeline_parallel")

# non-colocated needs to use PACK strategy to avoid uneven node_bundles
# e.g. assuming we use 3 nodes with 8GPUs, 2 nodes for train and 1 node for inference.
Expand Down Expand Up @@ -137,11 +157,15 @@ def __init__(
worker_builder = RayWorkerBuilder(worker_cls, config)

# It's necessary to set env_vars here to ensure that vllm non-leader workers also have these env_vars
env_vars = {}
# Explicitly set NCCL_CUMEM_ENABLE to 1 to avoid the P2P initialization error for PyNCCLCommunicator.
# See https://github.com/NVIDIA-NeMo/RL/issues/564 for more details.
env_vars = {}
if not self.cfg["colocated"]["enabled"]:
env_vars["NCCL_CUMEM_ENABLE"] = "1"
# We should use vLLM DP if ep_size > tp_size since EP_SIZE = DP_SIZE * TP_SIZE in vLLM.
# See details in https://github.com/vllm-project/vllm/blob/main/examples/offline_inference/data_parallel.py
if self.ep_size > self.tp_size:
env_vars["VLLM_DP_SIZE"] = str(self.vllm_dp_size)

# Check if we need parallelism-aware worker group creation
if self.model_parallel_size > 1:
Expand Down Expand Up @@ -172,7 +196,9 @@ def __init__(
self._post_init()

# Number of data parallel groups is the number of tied worker groups
self.dp_size = self.worker_group.dp_size
assert self.dp_size == self.worker_group.dp_size, (
f"Data parallel size mismatch. Expected {self.dp_size}, got {self.worker_group.dp_size}"
)

# Used to track the round-robin selection of worker groups for generate_async
self.current_generate_dp_shard_idx = 0
Expand Down
Loading