diff --git a/verl/third_party/sglang/__init__.py b/verl/third_party/sglang/__init__.py deleted file mode 100644 index 15593caaf36..00000000000 --- a/verl/third_party/sglang/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2023-2024 SGLang Team -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -# Copyright 2024 Bytedance Ltd. and/or its affiliates -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/verl/third_party/sglang/parallel_state.py b/verl/third_party/sglang/parallel_state.py deleted file mode 100644 index cdec743d13f..00000000000 --- a/verl/third_party/sglang/parallel_state.py +++ /dev/null @@ -1,328 +0,0 @@ -# Copyright 2024 Bytedance Ltd. and/or its affiliates -# Copyright 2023 The SGlang team. -# Adapted from -# https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/parallel_state.py -# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. -"""Model and data parallel groups.""" - -import os -from typing import Optional - -import sglang.srt.distributed.parallel_state as ps -import torch -import torch.distributed -from sglang.srt.distributed.parallel_state import ( - get_pp_group, - get_world_group, - init_distributed_environment, - init_model_parallel_group, -) - -""" -This version is strongly tied with Megatron to implement HybridEngine and weight sharing between vllm and Megatron. -- We assume the Megatron tp+dp+pp world is already established before calling this function. - -""" - -# Device mesh for using DTensor -_DEVICE_MESH = None - -# Tensor model parallel group that the current rank belongs to. -_TP = None -# Pipeline model parallel group that the current rank belongs to. -_PP = None - - -# This method is for initializing the ParallelGroup when using HybridEngine -# NOTE(linjunrong): this function is for megatron -def initialize_parallel_state( - distributed_init_method: str = "env://", - backend: str = "nccl", - tensor_model_parallel_size: int = 1, - num_tp_per_train_tp: int = 1, - pipeline_model_parallel_size: int = 1, -): - # torch.distributed.all_reduce does not free the input tensor until - # the synchronization point. This causes the memory usage to grow - # as the number of all_reduce calls increases. This env var disables - # this behavior. - # Related issue: - # https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573 - os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1" - - # NOTE(sgm): Modify for verl, Env vars will be set by TORCHRUN. - rank = int(os.getenv("RANK", "-1")) - local_rank = int(os.getenv("LOCAL_RANK", "0")) - - # Use the world_size set by TORCHRUN - world_size = int(os.getenv("WORLD_SIZE", "-1")) - assert world_size != -1, "The world_size is set to -1, not initialized by TORCHRUN" - init_distributed_environment(world_size, rank, distributed_init_method, local_rank, backend) - if torch.distributed.get_world_size() > 1: - # NOTE: build a separate inference group with infer tp & micro dp - initialize_model_parallel_for_sglang( - tensor_model_parallel_size=tensor_model_parallel_size, - num_tensor_model_parallel_groups_per_train_tp=num_tp_per_train_tp, - ) - else: - initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size, backend) - - -# NOTE(linjunrong): After init SGLang rollout using class EngineFragment, user should always remember to call -# this function to sync the _TP, _PP define at the beginning of this file. Otherwise, only the conterparts -# inside sglang.srt.distributed are init as ProcessGroup, the symbols defined in this file remain as None. -# It could be weird to maintain two _TP and _PP, I follow the same way to maintain an extra ones for -# verl itself as how it was done in verl.third_party.vllm.parallel_state. Note that the process is a little -# bit different -def ensure_model_parallel_initialized( - tensor_model_parallel_size: int, - pipeline_model_parallel_size: int = 1, - backend: Optional[str] = None, -) -> None: - """Helper to initialize model parallel groups if they are not initialized, - or ensure tensor-parallel and pipeline-parallel sizes are equal to expected - values if the model parallel groups are initialized. - """ - # get the backend of _DEVICE_WORLD_GROUP - backend = backend or torch.distributed.get_backend(get_world_group().device_group) - if not model_parallel_is_initialized(): - initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size, backend) - return - - assert get_tensor_model_parallel_world_size() == tensor_model_parallel_size, ( - f"tensor parallel group already initialized, but of unexpected size: " - f"{get_tensor_model_parallel_world_size()=} vs. {tensor_model_parallel_size=}" - ) - pp_world_size = get_pp_group().world_size - assert pp_world_size == pipeline_model_parallel_size, ( - f"pipeline parallel group already initialized, but of unexpected size: {pp_world_size=} vs. " - f"{pipeline_model_parallel_size=}" - ) - - -# TODO(sgm): deviate from the v0.5.4, not pp now -# NOTE(linjunrong): the SGLang version using _TP instead of ps._TP -def model_parallel_is_initialized(): - """Check if tensor and pipeline parallel groups are initialized.""" - return _TP is not None - # and _PIPELINE_MODEL_PARALLEL_GROUP is not None) - - -def initialize_model_parallel_for_sglang( - tensor_model_parallel_size: int, - num_tensor_model_parallel_groups_per_train_tp: int = 1, - pipeline_model_parallel_size: int = 1, -) -> None: - pass - - # Get world size and rank. Ensure some consistencies. - assert torch.distributed.is_initialized() - - assert isinstance(tensor_model_parallel_size, int) - - # assert num_tensor_model_parallel_groups_per_train_tp == 1 and not different_tp_group - # assert num_tensor_model_parallel_groups_per_train_tp > 1 and different_tp_group - - # Build the tensor model-parallel groups. - assert ps._TP is None, "tensor model parallel group is already initialized" - - global _TP - - world_size: int = torch.distributed.get_world_size() - - backend = torch.distributed.get_backend() - - num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size - - if num_tensor_model_parallel_groups_per_train_tp == 1: - # if tensor_model_parallel_size == train_tensor_parallel_size: - # using the same tp group as Megatron/vllm - assert _TP is None, "tensor model parallel group is already initialized" - group_ranks = [] - for i in range(num_tensor_model_parallel_groups): - ranks = range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size) - group_ranks.append(ranks) - _TP = init_model_parallel_group( - group_ranks=group_ranks, - local_rank=get_world_group().local_rank, - backend=backend, - use_custom_allreduce=False, # TODO: check why True is not work in Ray trainer - use_message_queue_broadcaster=True, - ) - ps._TP = _TP - # _MICRO_DATA_PARALLEL_GROUP is move to hybrid engine - else: - # initialize a micro_dp group and a tp group - # assume training tp=4, infer tp=2, then, weight is partitioned as - # [1], [2], [3], [4] for training and [1,2], [1,2], [3,4], [3,4] for inference - - # Build the inference tp groups - # train_tp = train_tensor_parallel_size - train_tp = num_tensor_model_parallel_groups_per_train_tp * tensor_model_parallel_size - # num_tensor_model_parallel_groups_per_train_tp = train_tp // tensor_model_parallel_size - assert _TP is None, "tensor model parallel group is already initialized" - group_ranks = [] - for i in range(num_tensor_model_parallel_groups // num_tensor_model_parallel_groups_per_train_tp): - start = train_tp * i - end = train_tp * (i + 1) - for j in range(num_tensor_model_parallel_groups_per_train_tp): - ranks = list(range(start, end, num_tensor_model_parallel_groups_per_train_tp)) - for i in range(len(ranks)): - ranks[i] += j - group_ranks.append(ranks) - _TP = init_model_parallel_group( - group_ranks=group_ranks, - local_rank=get_world_group().local_rank, - backend=backend, - use_custom_allreduce=False, # TODO: check why True is not work in Ray trainer - use_message_queue_broadcaster=True, - ) - ps._TP = _TP - - # Build the pipeline model-parallel groups. - # global _PIPELINE_MODEL_PARALLEL_GROUP - # global _PIPELINE_GLOBAL_RANKS - # assert ps._PIPELINE_MODEL_PARALLEL_GROUP is None, ("pipeline model parallel group is already initialized") - - # ps._PIPELINE_MODEL_PARALLEL_GROUP = mpu.get_pipeline_model_parallel_group() - # ps._PIPELINE_GLOBAL_RANKS = mpu.get_pipeline_model_parallel_ranks() - - # TODO: init using device mesh (not support hybrid engine now) - # Build the pipeline model-parallel groups. - num_pipeline_model_parallel_groups: int = world_size // pipeline_model_parallel_size - global _PP - assert _PP is None, "pipeline model parallel group is already initialized" - group_ranks = [] - for i in range(num_pipeline_model_parallel_groups): - ranks = list(range(i, world_size, num_pipeline_model_parallel_groups)) - group_ranks.append(ranks) - # pipeline parallel does not need custom allreduce - _PP = init_model_parallel_group(group_ranks, get_world_group().local_rank, backend, use_custom_allreduce=False) - ps._PP = _PP # for verl - - -def initialize_model_parallel( - tensor_model_parallel_size: int = 1, - pipeline_model_parallel_size: int = 1, - backend: Optional[str] = None, -) -> None: - """ - NOTE: This method is a hack from the open-sourced version without - asertion of world_size = tp * pp - - Initialize model parallel groups. - - Arguments: - tensor_model_parallel_size: number of GPUs used for tensor model - parallelism. - pipeline_model_parallel_size: number of GPUs used for pipeline model - parallelism. - - Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we - use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize - the model pipeline. The present function will - create 4 tensor model-parallel groups and 2 pipeline model-parallel groups: - 4 tensor model-parallel groups: - [g0, g1], [g2, g3], [g4, g5], [g6, g7] - 2 pipeline model-parallel groups: - [g0, g2, g4, g6], [g1, g3, g5, g7] - Note that for efficiency, the caller should make sure adjacent ranks - are on the same DGX box. For example if we are using 2 DGX-1 boxes - with a total of 16 GPUs, rank 0 to 7 belong to the first box and - ranks 8 to 15 belong to the second box. - """ - # Get world size and rank. Ensure some consistencies. - assert torch.distributed.is_initialized() - world_size: int = torch.distributed.get_world_size() - backend = backend or torch.distributed.get_backend(ps.get_world_group().device_group) - - # NOTE(sgm) we don't assert world_size == tp * pp - # DP is not managed by vllm but by the VeRL WorkerGroup - # if (world_size != - # tensor_model_parallel_size * pipeline_model_parallel_size): - # raise RuntimeError( - # f"world_size ({world_size}) is not equal to " - # f"tensor_model_parallel_size ({tensor_model_parallel_size}) x " - # f"pipeline_model_parallel_size ({pipeline_model_parallel_size})") - - num_tensor_model_parallel_groups: int = world_size // tensor_model_parallel_size - - global _TP - assert _TP is None, "tensor model parallel group is already initialized" - group_ranks = [] - for i in range(num_tensor_model_parallel_groups): - ranks = list(range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size)) - group_ranks.append(ranks) - - # message queue broadcaster is only used in tensor model parallel group - if ps._TP is not None: - _TP = ps._TP - else: - _TP = init_model_parallel_group( - group_ranks, - get_world_group().local_rank, - backend, - use_custom_allreduce=False, # TODO: check why True is not work in Ray trainer - use_message_queue_broadcaster=True, - ) - ps._TP = _TP - - # TODO: init using device mesh (not support hybrid engine now) - # Build the pipeline model-parallel groups. - num_pipeline_model_parallel_groups: int = world_size // pipeline_model_parallel_size - global _PP - assert _PP is None, "pipeline model parallel group is already initialized" - group_ranks = [] - for i in range(num_pipeline_model_parallel_groups): - ranks = list(range(i, world_size, num_pipeline_model_parallel_groups)) - group_ranks.append(ranks) - # pipeline parallel does not need custom allreduce - if ps._TP is not None: - _PP = ps._TP - else: - _PP = init_model_parallel_group(group_ranks, get_world_group().local_rank, backend, use_custom_allreduce=False) - ps._PP = _PP - - -""" -Device mesh utilities -""" - - -def get_device_mesh(): - assert _DEVICE_MESH is not None, "device mesh is not initialized" - return _DEVICE_MESH - - -""" -Tensor model parallel utilities -""" - - -# NOTE(linjunrong): In the vllm version parallel_state.py. verl created its own _TP and _PP as verl want to use -# the process group for some extra purpose. Under the hood, there is no difference between them and the original -# one in vllm.distributed.parallel_state. However, the implementation need to hack the init process of inference -# engine, as we do not maintain another SGLang here, I just use the original _TP and _PP directly. -def get_tensor_model_parallel_group(): - """Get the tensor model parallel group the caller rank belongs to.""" - - assert _TP is not None, "tensor model parallel group is not initialized" - return _TP.device_group - - -def get_tensor_model_parallel_world_size(): - """Return world size for the tensor model parallel group.""" - return torch.distributed.get_world_size(group=get_tensor_model_parallel_group()) - - -def get_tensor_model_parallel_rank(): - """Return my rank for the tensor model parallel group.""" - return torch.distributed.get_rank(group=get_tensor_model_parallel_group()) - - -def get_tensor_model_parallel_src_rank(): - """Calculate the global rank corresponding to the first local rank - in the tensor model parallel group.""" - global_rank = torch.distributed.get_rank() - local_world_size = get_tensor_model_parallel_world_size() - return (global_rank // local_world_size) * local_world_size