diff --git a/.buildkite/scripts/hardware_ci/run-cpu-distributed-smoke-test.sh b/.buildkite/scripts/hardware_ci/run-cpu-distributed-smoke-test.sh index 3caa49832c3f..f289a43c6be4 100644 --- a/.buildkite/scripts/hardware_ci/run-cpu-distributed-smoke-test.sh +++ b/.buildkite/scripts/hardware_ci/run-cpu-distributed-smoke-test.sh @@ -1,26 +1,43 @@ #!/bin/bash set -euox pipefail +export VLLM_CPU_CI_ENV=0 echo "--- PP+TP" vllm serve meta-llama/Llama-3.2-3B-Instruct -tp=2 -pp=2 & server_pid=$! -timeout 600 bash -c "until curl localhost:8000/v1/models; do sleep 1; done" || exit 1 +timeout 600 bash -c "until curl localhost:8000/v1/models > /dev/null 2>&1; do sleep 1; done" || exit 1 vllm bench serve \ --backend vllm \ --dataset-name random \ --model meta-llama/Llama-3.2-3B-Instruct \ --num-prompts 20 \ + --result-dir ./test_results \ + --result-filename tp_pp.json \ + --save-result \ --endpoint /v1/completions -kill -s SIGTERM $server_pid & +kill -s SIGTERM $server_pid; wait $server_pid || true +failed_req=$(jq '.failed' ./test_results/tp_pp.json) +if [ "$failed_req" -ne 0 ]; then + echo "Some requests were failed!" + exit 1 +fi echo "--- DP+TP" vllm serve meta-llama/Llama-3.2-3B-Instruct -tp=2 -dp=2 & server_pid=$! -timeout 600 bash -c "until curl localhost:8000/v1/models; do sleep 1; done" || exit 1 +timeout 600 bash -c "until curl localhost:8000/v1/models > /dev/null 2>&1; do sleep 1; done" || exit 1 vllm bench serve \ --backend vllm \ --dataset-name random \ --model meta-llama/Llama-3.2-3B-Instruct \ --num-prompts 20 \ + --result-dir ./test_results \ + --result-filename dp_pp.json \ + --save-result \ --endpoint /v1/completions -kill -s SIGTERM $server_pid & +kill -s SIGTERM $server_pid; wait $server_pid || true +failed_req=$(jq '.failed' ./test_results/dp_pp.json) +if [ "$failed_req" -ne 0 ]; then + echo "Some requests were failed!" + exit 1 +fi diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py index 40b797a1a8d9..fc554bd75694 100644 --- a/vllm/distributed/parallel_state.py +++ b/vllm/distributed/parallel_state.py @@ -851,6 +851,10 @@ def isend_tensor_dict( if self.world_size <= 1: return [] + if dst is None: + dst = (self.rank_in_group + 1) % self.world_size + assert dst < self.world_size, f"Invalid dst rank ({dst})" + if self.use_cpu_custom_send_recv: if self.device_communicator is None: raise ValueError("No device communicator found") @@ -868,10 +872,6 @@ def isend_tensor_dict( group = self.device_group metadata_group = self.cpu_group - if dst is None: - dst = (self.rank_in_group + 1) % self.world_size - assert dst < self.world_size, f"Invalid dst rank ({dst})" - metadata_list, tensor_list = _split_tensor_dict(tensor_dict) self.send_object(metadata_list, dst=dst) @@ -948,6 +948,11 @@ def irecv_tensor_dict( ]: if not torch.distributed.is_initialized() or self.world_size == 1: return None, [], [] + + if src is None: + src = (self.rank_in_group - 1) % self.world_size + assert src < self.world_size, f"Invalid src rank ({src})" + if self.use_cpu_custom_send_recv: if self.device_communicator is None: raise ValueError("No device communicator found") @@ -965,10 +970,6 @@ def irecv_tensor_dict( group = self.device_group metadata_group = self.cpu_group - if src is None: - src = (self.rank_in_group - 1) % self.world_size - assert src < self.world_size, f"Invalid src rank ({src})" - recv_metadata_list = self.recv_object(src=src) tensor_dict: dict[str, Any] = {} handles: list[Handle] = []