Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/sglang/srt/layers/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1301,8 +1301,12 @@ def _scatter_hidden_states_moe(

# DP scatter (if DP attention is enabled)
if context.attn_dp_size > 1:
if get_tensor_model_parallel_world_size() == get_attention_dp_size():
group = get_tp_group()
else:
group = get_attention_tp_group()
hidden_states_output, global_hidden_states = (
get_local_dp_buffer(),
get_local_dp_buffer(group),
hidden_states,
)
dp_scatter(hidden_states_output, global_hidden_states, forward_batch)
Expand Down
17 changes: 14 additions & 3 deletions python/sglang/srt/models/deepseek_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
fused_rope_inplace,
)
from sglang.srt.configs.deepseek_v4 import DeepSeekV4Config
from sglang.srt.distributed import get_pp_group, get_tensor_model_parallel_world_size
from sglang.srt.distributed import (
get_pp_group,
get_tensor_model_parallel_world_size,
get_tp_group,
)
from sglang.srt.environ import envs
from sglang.srt.eplb.expert_location import ModelConfigForExpertLocation
from sglang.srt.layers.attention.dsv4.compressor import Compressor
Expand All @@ -38,6 +42,7 @@
get_attention_cp_rank,
get_attention_cp_size,
get_attention_dp_size,
get_attention_tp_group,
get_attention_tp_rank,
get_attention_tp_size,
get_global_dp_buffer,
Expand Down Expand Up @@ -831,7 +836,10 @@ def forward(
input_ids = input_ids[cp_rank::cp_size].contiguous()
input_ids_global = input_ids
elif _use_tp_moe_gather:
hidden_states, local_hidden_states = get_global_dp_buffer(), hidden_states
hidden_states, local_hidden_states = (
get_global_dp_buffer(get_tp_group()),
hidden_states,
)
# hidden_states here follow TP_ATTN_FULL semantics: they are replicated
# within an attention-TP group. Use replicate gather to avoid summing the
# same activations across attention-TP ranks before entering MLP/MoE.
Expand All @@ -850,7 +858,10 @@ def forward(
input_ids_global=input_ids_global,
)
if _use_tp_moe_gather:
hidden_states, global_hidden_states = get_local_dp_buffer(), hidden_states
hidden_states, global_hidden_states = (
get_local_dp_buffer(get_attention_tp_group()),
hidden_states,
)
Comment on lines +861 to +864
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with the logic in LayerCommunicator and to ensure correct symmetric memory allocation when the tensor parallel size equals the attention data parallel size, the group for the local DP buffer should be selected based on whether tp_size == dp_size. This pattern is followed in communicator.py for all dp_scatter operations.

Suggested change
hidden_states, global_hidden_states = (
get_local_dp_buffer(get_attention_tp_group()),
hidden_states,
)
if get_tensor_model_parallel_world_size() == get_attention_dp_size():
group = get_tp_group()
else:
group = get_attention_tp_group()
hidden_states, global_hidden_states = (
get_local_dp_buffer(group),
hidden_states,
)

dp_scatter(hidden_states, global_hidden_states, forward_batch)
if _use_tp_attn_a2a_scatter:
assert _a2a_scatter_chunks is not None
Expand Down
Loading