diff --git a/docs/platforms/xpu.md b/docs/platforms/xpu.md index 4ffa39fe5015..1ba56b192402 100644 --- a/docs/platforms/xpu.md +++ b/docs/platforms/xpu.md @@ -90,3 +90,54 @@ python -m sglang.bench_serving -h Additionally, the requests can be formed with [OpenAI Completions API](https://docs.sglang.io/basic_usage/openai_api_completions.html) and sent via the command line (e.g. using `curl`) or via your own script. + +## Prefill-Decode (P/D) Disaggregation on Intel XPU [Experimental] + +SGLang supports prefill-decode disaggregation on Intel XPU using the [NIXL](https://github.com/ai-dynamo/nixl) KV-transfer backend. + +**Tested models:** + +| Model | Notes | +|:---:|:---:| +| [Qwen/Qwen3-0.6B](https://huggingface.co/Qwen/Qwen3-0.6B) | Used in integration tests; verified on Intel XPU with homogeneous P/D (XPU prefill + XPU decode) | +| [Qwen/Qwen2.5-7B-Instruct](https://huggingface.co/Qwen/Qwen2.5-7B-Instruct) | Verified on Intel XPU with homogeneous P/D (XPU prefill + XPU decode) | + +**Prerequisites:** `pip install nixl sglang-router` + +**Start the prefill server (GPU 0):** + +```bash +ZE_AFFINITY_MASK=0 UCX_POSIX_USE_PROC_LINK=n python -m sglang.launch_server \ + --model-path Qwen/Qwen3-0.6B --trust-remote-code --device xpu \ + --disaggregation-mode prefill --disaggregation-transfer-backend nixl \ + --disaggregation-bootstrap-port 12335 --host 0.0.0.0 --port 30000 +``` + +**Start the decode server (GPU 1):** + +```bash +ZE_AFFINITY_MASK=1 UCX_POSIX_USE_PROC_LINK=n python -m sglang.launch_server \ + --model-path Qwen/Qwen3-0.6B --trust-remote-code --device xpu \ + --disaggregation-mode decode --disaggregation-transfer-backend nixl \ + --disaggregation-bootstrap-port 12335 --host 0.0.0.0 --port 30001 +``` + +**Start the router:** + +```bash +python -m sglang_router.launch_router \ + --pd-disaggregation \ + --prefill http://127.0.0.1:30000 \ + --decode http://127.0.0.1:30001 \ + --host 0.0.0.0 --port 8000 +``` + +**Send a request:** + +```bash +curl http://127.0.0.1:8000/v1/completions \ + -H "Content-Type: application/json" \ + -d '{"model": "Qwen/Qwen3-0.6B", "prompt": "The capital of France is", "max_tokens": 32}' +``` + +> **Note:** `UCX_POSIX_USE_PROC_LINK=n` is required on Intel XPU to avoid UCX shared-memory transport issues. diff --git a/python/sglang/srt/disaggregation/nixl/conn.py b/python/sglang/srt/disaggregation/nixl/conn.py index 5c11b3efbbaa..ff706f33643f 100644 --- a/python/sglang/srt/disaggregation/nixl/conn.py +++ b/python/sglang/srt/disaggregation/nixl/conn.py @@ -402,6 +402,14 @@ def _send_kvcache_generic( ): """Generic KV cache transfer supporting both MHA and MLA architectures. Used by both send_kvcache and maybe_send_extra.""" + # Convert pointer lists to np.uint64 arrays up front. + # torch.int exceeds np.int64 range on Intel XPU (addresses have bit 63 set, e.g. + # 0xffff81ab54e01000). Casting here prevents overflow when these values + # are later used in numpy arithmetic. + src_data_ptrs = np.array(src_data_ptrs, dtype=np.uint64) + dst_data_ptrs = np.array(dst_data_ptrs, dtype=np.uint64) + item_lens = np.array(item_lens, dtype=np.uint64) + # group by indices prefill_kv_blocks, dst_kv_blocks = group_concurrent_contiguous( prefill_data_indices, dst_data_indices @@ -449,11 +457,11 @@ def _send_kvcache_generic( # Precompute block starts/lengths to reduce Python-level loops. prefill_starts = np.fromiter( - (block[0] for block in prefill_kv_blocks), dtype=np.int64 + (block[0] for block in prefill_kv_blocks), dtype=np.uint64 ) - dst_starts = np.fromiter((block[0] for block in dst_kv_blocks), dtype=np.int64) + dst_starts = np.fromiter((block[0] for block in dst_kv_blocks), dtype=np.uint64) block_lens = np.fromiter( - (len(block) for block in prefill_kv_blocks), dtype=np.int64 + (len(block) for block in prefill_kv_blocks), dtype=np.uint64 ) for src_ptr, dst_ptr, item_len in layers_params: @@ -465,14 +473,14 @@ def _send_kvcache_generic( def make_req_array(addr_chunks, len_chunks, gpu): if not addr_chunks: - return np.empty((0, 3), dtype=np.int64) - flat_addrs = np.concatenate(addr_chunks) - flat_lens = np.concatenate(len_chunks) + return np.empty((0, 3), dtype=np.uint64) + flat_addrs = np.concatenate(addr_chunks).astype(np.uint64, copy=False) + flat_lens = np.concatenate(len_chunks).astype(np.uint64, copy=False) return np.column_stack( ( flat_addrs, flat_lens, - np.full_like(flat_addrs, gpu), + np.full_like(flat_addrs, gpu, dtype=np.uint64), ) ) @@ -623,13 +631,13 @@ def send_kvcache_slice( def make_req_array(addr_chunks, size, gpu): if not addr_chunks: - return np.empty((0, 3), dtype=np.int64) - flat_addrs = np.concatenate(addr_chunks) + return np.empty((0, 3), dtype=np.uint64) + flat_addrs = np.concatenate(addr_chunks).astype(np.uint64, copy=False) return np.column_stack( ( flat_addrs, - np.full_like(flat_addrs, size), - np.full_like(flat_addrs, gpu), + np.full_like(flat_addrs, size, dtype=np.uint64), + np.full_like(flat_addrs, gpu, dtype=np.uint64), ) ) diff --git a/test/registered/disaggregation/test_disaggregation_xpu.py b/test/registered/disaggregation/test_disaggregation_xpu.py new file mode 100644 index 000000000000..3b42e17dcd9b --- /dev/null +++ b/test/registered/disaggregation/test_disaggregation_xpu.py @@ -0,0 +1,92 @@ +""" +Disaggregation integration test for the NIXL transfer backend on Intel XPU. + +Launches a prefill server, a decode server, and a load-balancer using the +NIXL KV-transfer backend, then verifies that basic text completion works +end-to-end. This exercises the np.uint64 pointer-arithmetic fix in +python/sglang/srt/disaggregation/nixl/conn.py, which is required on +Intel XPU where device addresses have bit 63 set (e.g. 0xffff81ab54e01000) +and would overflow np.int64. + +Usage: + python3 -m pytest test/registered/disaggregation/test_disaggregation_xpu.py -v +""" + +import subprocess +import unittest + +import requests +import torch + +from sglang.test.ci.ci_register import register_cuda_ci +from sglang.test.server_fixtures.disaggregation_fixture import ( + PDDisaggregationServerBase, +) +from sglang.test.test_utils import DEFAULT_SMALL_MODEL_NAME_FOR_TEST_QWEN + +register_cuda_ci( + est_time=300, + suite="stage-a-test-1-gpu-small", + disabled="Intel XPU only — not available in standard CUDA CI", +) + +_XPU_AVAILABLE = torch.xpu.is_available() + + +@unittest.skipUnless( + _XPU_AVAILABLE, "Intel XPU not available (torch.xpu.is_available() returned False)" +) +class TestDisaggregationNixlBasic(PDDisaggregationServerBase): + """Smoke-test the NIXL disaggregation backend with a small completion.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.model = DEFAULT_SMALL_MODEL_NAME_FOR_TEST_QWEN + # Force the NIXL backend and XPU device. + cls.transfer_backend = ["--disaggregation-transfer-backend", "nixl"] + cls.rdma_devices = [] + cls.extra_prefill_args = ["--device", "xpu"] + cls.extra_decode_args = ["--device", "xpu"] + subprocess.check_call( + ["pip", "install", "sglang-router"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + cls.launch_all() + + def test_completion_returns_text(self): + """A simple completion must succeed and return non-empty generated text.""" + response = requests.post( + self.lb_url + "/generate", + json={ + "text": "The capital of France is", + "sampling_params": {"temperature": 0, "max_new_tokens": 16}, + }, + ) + self.assertEqual(response.status_code, 200, response.text) + data = response.json() + self.assertIn("text", data, f"Unexpected response shape: {data}") + self.assertGreater( + len(data["text"]), + 0, + "Generated text should not be empty", + ) + + def test_completion_correct_output(self): + """Disaggregated NIXL output must produce the expected token for a deterministic prompt.""" + response = requests.post( + self.lb_url + "/generate", + json={ + "text": "1 + 1 =", + "sampling_params": {"temperature": 0, "max_new_tokens": 4}, + }, + ) + self.assertEqual(response.status_code, 200, response.text) + generated = response.json()["text"] + # The model should produce "2" somewhere in the first few tokens. + self.assertIn("2", generated, f"Expected '2' in output, got: {generated!r}") + + +if __name__ == "__main__": + unittest.main()