Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 22 additions & 57 deletions tests/e2e/long_term/accuracy/accuracy_multicard.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@
#
import gc
import multiprocessing
import signal
import subprocess
import sys
import time
from multiprocessing import Queue

import lm_eval
import pytest
import requests
import torch

SERVER_HOST = "127.0.0.1"
Expand All @@ -36,7 +32,7 @@

# pre-trained model path on Hugging Face.
# Qwen/Qwen2.5-0.5B-Instruct: accuracy test for DP.
# Qwen/Qwen3-30B-A3B: accuracy test for EP.
# Qwen/Qwen3-30B-A3B: accuracy test for EP and DP.
# deepseek-ai/DeepSeek-V2-Lite: accuracy test for TP.
MODEL_NAME = ["Qwen/Qwen3-30B-A3B", "deepseek-ai/DeepSeek-V2-Lite"]

Expand Down Expand Up @@ -145,58 +141,27 @@ def test_lm_eval_accuracy(monkeypatch: pytest.MonkeyPatch, model):
f"Expected: {EXPECTED_VALUE[model]}±{RTOL} | Measured: {result}"


@pytest.mark.parametrize("max_tokens", [10])
@pytest.mark.parametrize("model", ["Qwen/Qwen2.5-0.5B-Instruct"])
def test_lm_eval_accuracy_dp(model, max_tokens):
log_file = open("accuracy_pd.log", "a+")
cmd = [
"vllm", "serve", model, "--max_model_len", "4096",
"--tensor_parallel_size", "2", "--data_parallel_size", "2"
]
server_proc = subprocess.Popen(cmd,
stdout=log_file,
stderr=subprocess.DEVNULL)
DP_DENSCE_MODEL = ["Qwen/Qwen2.5-0.5B-Instruct"]
DP_MOE_MOEDL = ["Qwen/Qwen3-30B-A3B"]

try:
for _ in range(300):
try:
r = requests.get(HEALTH_URL, timeout=1)
if r.status_code == 200:
break
except requests.exceptions.RequestException:
pass
time.sleep(1)
else:
log_file.flush()
log_file.seek(0)
log_content = log_file.read()
pytest.fail(
f"vLLM serve did not become healthy after 300s: {HEALTH_URL}\n"
f"==== vLLM Serve Log Start ===\n{log_content}\n==== vLLM Serve Log End ==="
)

prompt = "bejing is a"
payload = {
"prompt": prompt,
"max_tokens": max_tokens,
"sampling_params": {
"temperature": 0.0,
"top_p": 1.0,
"seed": 123
}
}
resp = requests.post(COMPLETIONS_URL, json=payload, timeout=30)
resp.raise_for_status()
data = resp.json()
DP_MORE_ARGS = {
"Qwen/Qwen2.5-0.5B-Instruct":
"tensor_parallel_size=2,data_parallel_size=2",
"Qwen/Qwen3-30B-A3B":
"tensor_parallel_size=2,data_parallel_size=2,enable_expert_parallel=True,max_model_len=1024,enforce_eager=True",
}

generated = data["choices"][0]["text"].strip()
expected = "city in north china, it has many famous attractions"
assert generated == expected, f"Expected `{expected}`, got `{generated}`"

finally:
server_proc.send_signal(signal.SIGINT)
try:
server_proc.wait(timeout=10)
except subprocess.TimeoutExpired:
server_proc.kill()
server_proc.wait()
@pytest.mark.parametrize("model", DP_DENSCE_MODEL)
def test_lm_eval_accuracy_dp(model):
result_queue: Queue[float] = multiprocessing.Queue()
p = multiprocessing.Process(target=run_test,
args=(result_queue, model,
MAX_MODEL_LEN[model], MODEL_TYPE[model],
DP_MORE_ARGS[model]))
p.start()
p.join()
result = result_queue.get()
print(result)
assert (EXPECTED_VALUE[model] - RTOL < result < EXPECTED_VALUE[model] + RTOL), \
Copy link
Collaborator Author

@MengqingCao MengqingCao Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the same EXPECTED_VALUE as that of llm without dp to make sure the accuracy of dp is correct

f"Expected: {EXPECTED_VALUE[model]}±{RTOL} | Measured: {result}"
4 changes: 3 additions & 1 deletion tests/e2e/multicard/test_data_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import pytest

MODELS = ["Qwen/Qwen2.5-0.5B-Instruct"]
MODELS = ["Qwen/Qwen2.5-0.5B-Instruct", "Qwen/Qwen3-30B-A3B"]


@pytest.mark.parametrize("model", MODELS)
Expand All @@ -54,6 +54,8 @@ def test_data_parallel_inference(model, max_tokens):
"--trust-remote-code",
"--enforce-eager",
]
if model == "Qwen/Qwen3-30B-A3B":
cmd.append("--enable-expert-parallel")

print(f"Running subprocess: {' '.join(cmd)}")
proc = subprocess.run(cmd,
Expand Down
21 changes: 21 additions & 0 deletions vllm_ascend/distributed/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import torch.distributed as dist
from vllm.distributed.device_communicators.base_device_communicator import \
DeviceCommunicatorBase
from vllm.utils import logger


class NPUCommunicator(DeviceCommunicatorBase):
Expand All @@ -34,6 +35,12 @@ def __init__(self,
# init device according to rank
self.device = torch.npu.current_device()

if self.use_all2all:
from vllm.distributed.device_communicators.all2all import \
NaiveAll2AllManager
self.all2all_manager = NaiveAll2AllManager(self.cpu_group)
logger.info("Using naive all2all manager.")

def all_to_all(self,
input_: torch.Tensor,
scatter_dim: int = 0,
Expand Down Expand Up @@ -73,3 +80,17 @@ def all_to_all(self,
dist.all_to_all(output_list, input_list, group=self.device_group)
output_tensor = torch.cat(output_list, dim=gather_dim).contiguous()
return output_tensor

# TODO: Add ut for dispatch and combine
def dispatch(
self, hidden_states: torch.Tensor,
router_logits: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
assert self.all2all_manager is not None
hidden_states, router_logits = self.all2all_manager.dispatch(
hidden_states, router_logits)
return hidden_states, router_logits

def combine(self, hidden_states: torch.Tensor) -> torch.Tensor:
assert self.all2all_manager is not None
hidden_states = self.all2all_manager.combine(hidden_states)
return hidden_states
Loading