diff --git a/.github/workflows/ci_xpu.yml b/.github/workflows/ci_xpu.yml index f99ca7d172..7398af53d3 100644 --- a/.github/workflows/ci_xpu.yml +++ b/.github/workflows/ci_xpu.yml @@ -24,7 +24,7 @@ jobs: - name: Code Checkout env: - docker_image: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.1.0 + docker_image: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.2.0 run: | REPO="https://github.com/${{ github.repository }}.git" FULL_REPO="${{ github.repository }}" @@ -55,7 +55,7 @@ jobs: - name: Run CI unittest env: - docker_image: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.1.0 + docker_image: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.2.0 run: | runner_name="${{ runner.name }}" last_char="${runner_name: -1}" diff --git a/scripts/run_ci_xpu.sh b/scripts/run_ci_xpu.sh index 870b463d91..9afd81249b 100644 --- a/scripts/run_ci_xpu.sh +++ b/scripts/run_ci_xpu.sh @@ -176,3 +176,33 @@ if [ ${kv_block_test_exit_code} -ne 0 ]; then echo "kv block相关测试失败,请检查pr代码" exit 1 fi + +echo "============================开始EP并行测试!============================" +sleep 5 +rm -rf log/* +rm -f core* +xpu-smi +export XPU_VISIBLE_DEVICES="0,1,2,3" +export BKCL_ENABLE_XDR=1 +export BKCL_RDMA_NICS=xgbe1,xgbe2,xgbe3,xgbe4 +export BKCL_TRACE_TOPO=1 +export BKCL_PCIE_RING=1 +export XSHMEM_MODE=1 +export XSHMEM_QP_NUM_PER_RANK=32 +export BKCL_RDMA_VERBS=1 + +wget -q https://paddle-qa.bj.bcebos.com/xpu_third_party/xDeepEP.tar.gz +tar -xzf xDeepEP.tar.gz +cd xDeepEP +bash build.sh +cd - + +python tests/ci_use/XPU_45T/run_ep.py +ep_exit_code=$? + +if [ ${ep_exit_code} -ne 0 ]; then + echo "log/workerlog.0" + cat log/workerlog.0 + echo "EP并行 相关测试失败,请检查pr代码" + exit 1 +fi diff --git a/tests/ci_use/XPU_45T/run_ep.py b/tests/ci_use/XPU_45T/run_ep.py new file mode 100644 index 0000000000..c82242aa39 --- /dev/null +++ b/tests/ci_use/XPU_45T/run_ep.py @@ -0,0 +1,79 @@ +import os + +import psutil + +from fastdeploy import LLM, SamplingParams + + +def test_fd_ep(): + """ """ + + msg1 = [ + {"role": "system", "content": ""}, + {"role": "user", "content": "北京天安门广场在哪里?"}, + ] + messages = [msg1] + + # 采样参数 + sampling_params = SamplingParams(top_p=0, max_tokens=500) + + # 模型路径与设备配置 + model = os.getenv("model_path", "/home/ERNIE-4.5-300B-A47B-Paddle") + xpu_visible_devices = os.getenv("XPU_VISIBLE_DEVICES", "0") + xpu_device_num = len(xpu_visible_devices.split(",")) + + enable_expert_parallel = True + if enable_expert_parallel: + tensor_parallel_size = 1 + data_parallel_size = xpu_device_num + else: + tensor_parallel_size = xpu_device_num + data_parallel_size = 1 + + engine_worker_queue_port = [str(8023 + i * 10) for i in range(data_parallel_size)] + engine_worker_queue_port = ",".join(engine_worker_queue_port) + + print(f"[INFO] messages: {messages}") + + llm = LLM( + model=model, + enable_expert_parallel=enable_expert_parallel, + tensor_parallel_size=tensor_parallel_size, + data_parallel_size=data_parallel_size, + max_model_len=8192, + quantization="wint4", + engine_worker_queue_port=engine_worker_queue_port, + max_num_seqs=8, + ) + + try: + outputs = llm.chat(messages, sampling_params) + assert outputs, "❌ LLM 推理返回空结果。" + + for idx, output in enumerate(outputs): + prompt = output.prompt + generated_text = getattr(output.outputs, "text", "").strip() + + print(f"{'-'*100}") + print(f"[PROMPT {idx}] {prompt}") + print(f"{'-'*100}") + print(f"[GENERATED TEXT] {generated_text}") + print(f"{'-'*100}") + + # 核心断言:输出不能为空 + assert generated_text, f"❌ 推理结果为空 (index={idx})" + + finally: + # 无论是否报错都清理子进程 + current_process = psutil.Process(os.getpid()) + for child in current_process.children(recursive=True): + try: + child.kill() + print(f"[CLEANUP] 已杀死子进程 {child.pid}") + except Exception as e: + print(f"[WARN] 无法杀死子进程 {child.pid}: {e}") + print("✅ 已清理所有 FastDeploy 子进程。") + + +if __name__ == "__main__": + test_fd_ep()