Nixl async transfer#23967
Conversation
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
/tag-and-rerun-ci |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
/tag-and-rerun-ci |
a50ea89 to
616ca55
Compare
Signed-off-by: Ovidiu Mara <ovidium@nvidia.com>
616ca55 to
28b6504
Compare
Signed-off-by: Ovidiu Mara <ovidium@nvidia.com>
|
@ishandhanani @iyastreb could you please help with review? It's the same PR as #20680 but with conflicts resolved (and fixing the P>D issue from main) |
Signed-off-by: Ovidiu Mara <ovidium@nvidia.com>
bf1059d to
66f674d
Compare
|
/tag-and-rerun-ci |
|
/tag-and-rerun-ci |
|
/rerun-failed-ci |
| except _NIXL_TRANSPORT_ERRORS as e: | ||
| logger.warning( | ||
| f"KVSender check_xfer_state failed for room {self.bootstrap_room}: {e}" | ||
| ) | ||
| self._send_failed = True | ||
| self._send_error = e | ||
| return KVPoll.Failed # type: ignore | ||
| if all(x == "DONE" for x in states): | ||
| if ( | ||
| self._transfer_start_time is not None | ||
| and self._transfer_metric.transfer_latency_s is None | ||
| ): | ||
| self._transfer_metric.transfer_latency_s = ( | ||
| time.perf_counter() - self._transfer_start_time | ||
| ) | ||
| return KVPoll.Success # type: ignore | ||
| if any(x == "ERR" for x in states): | ||
| self._send_failed = True | ||
| self._send_error = RuntimeError( | ||
| f"NIXL transfer error for room {self.bootstrap_room}" |
There was a problem hiding this comment.
It's a good point. I have now changed the code to catch exceptions in the worker thread, pass them to the main thread and raise from there, so that we can detect _NIXL_TRANSPORT_ERRORS as before. The worker thread still has to catch all exceptions otherwise it may die in case of other errors, which may cause hangs
ShangmingCai
left a comment
There was a problem hiding this comment.
Overall LGTM, but why remove _NIXL_TRANSPORT_ERRORS? I remember this was just added a short while ago.
…after bootstrap) Signed-off-by: Ovidiu Mara <ovidium@nvidia.com>
Signed-off-by: Ovidiu Mara <ovidium@nvidia.com>
Signed-off-by: Ovidiu Mara <ovidium@nvidia.com>
Signed-off-by: Ovidiu Mara <ovidium@nvidia.com>
Signed-off-by: Ovidiu Mara <ovidium@nvidia.com>
Bring in sgl-project#23967 (Nixl async transfer) and other main changes since the last merge. Conflicts were limited to python/sglang/srt/disaggregation/nixl/conn.py: 1. TransferInfo: kept main's `decode_prefix_len` field + `is_dummy()` method form, appended this PR's `staging` field at the tail. Updated 2 callers in this file from `req.is_dummy` to `req.is_dummy()`. 2. NixlKVManager.__init__ (PREFILL branch): kept this PR's `_init_staging_prefill_ctx()` AND main's `transfer_queues` / `transfer_worker` thread pool. Both run; staging ctx is initialized before workers spawn. 3. add_transfer_request: took main's async enqueue body (puts TransferKVChunk into transfer_queues[room % N], returns None) but kept this PR's `_prefetch_staging_reqs(bootstrap_room)` call before the enqueue. The staging dispatch (`_dispatch_kv_transfer`, `_do_staging_transfer`, `send_kvcache_staged`) is now temporarily dead code: enabling SGLANG_DISAGG_STAGING_BUFFER on NIXL has no effect until the next commit moves staging dispatch into `transfer_worker` (per the mooncake pattern). 4. update_transfer_status: kept this PR's tag-based dispatch (`_track_kv_arrival` / `_handle_stg_notification` / `_handle_aux_notification`) and merged main's "nokv" handling for decode-side radix cache hit (sgl-project#19746) into `_handle_aux_notification`. After this commit the staging buffer code path is preserved but unused; plain heterogeneous-TP transfers fall back to send_kvcache_slice via the new async worker. The next commit will wire staging into the worker (per-worker staging buffer + deferred re-enqueue on watermark not-ready, matching mooncake). Co-authored-by: Cursor <cursoragent@cursor.com>
…ke parity) After the previous merge of sgl-project#23967 (Nixl async transfer), staging buffer dispatch lived only in the now-deleted synchronous path of add_transfer_request, leaving SGLANG_DISAGG_STAGING_BUFFER a no-op on NIXL. This commit ports the staging dispatch into transfer_worker, 1:1 mirroring mooncake's per-worker staging design. 1. PREFILL __init__: build N staging buffers (one per transfer_queue) before workers spawn, and pass each worker its private buffer (NixlKVManager.__init__). Removes the lazy single-buffer creation in set_kv_buffer_tensors -- mooncake-style, staging buffers no longer depend on kv_buffer_tensors. 2. _try_create_staging_strategy(staging_buffer) replaces _get_staging_strategy. Returns a fresh PrefillStagingStrategy bound to the caller's staging buffer. The strategy MUST be a worker-local variable; never cache on self -- multiple workers would race on the same staging ring. 3. transfer_worker(queue, staging_buffer=None) now lazy-creates a per-worker staging_strategy on the first chunk it sees, then for each req in a chunk picks among: - staging (heterogeneous TP, both sides registered, watermark ready) -> _do_staging_transfer - send_kvcache (MLA / homogeneous TP) - send_kvcache_slice (heterogeneous TP, no staging or staging hard-failed for this chunk) When staging is not ready (watermark/alloc pending), _do_staging_transfer re-enqueues the chunk and signals `staging_deferred=True`; the worker breaks the per-req loop and `continue`s the main loop without advancing room status, so the chunk gets retried on the next pop. Same control-flow as mooncake.transfer_worker. 4. _do_staging_transfer reshaped to (handle, deferred) return tuple: - (None, True) -> chunk re-enqueued, caller should defer - (None, False) -> hard fallback, caller should try slice - (handle, False) -> staging RDMA posted; handle joins the per-chunk handle list and is busy-polled to DONE alongside aux/state handles. Oversized chunks (cannot ever fit) raise immediately. 5. _dispatch_kv_transfer (the old synchronous-path entry) is removed. add_transfer_request stays a thin enqueue + _prefetch_staging_reqs wrapper. Notes vs mooncake: - NIXL workers do NOT need an executor (no per-slice ThreadPoolExecutor); send_kvcache_slice posts a single bulk transfer. - NIXL workers do NOT send a separate ZMQ CHUNK_READY message: decode observes chunk arrival via the RDMA `stg_*` notification tag posted by send_kvcache_staged, which the decode-side receiver thread already handles. - Memory: staging pool grows N x (one per worker, default SGLANG_DISAGGREGATION_QUEUE_SIZE=4). Tunable via SGLANG_DISAGG_STAGING_POOL_SIZE_MB. Co-authored-by: Cursor <cursoragent@cursor.com>
Signed-off-by: Ovidiu Mara <ovidium@nvidia.com>
Taken over from #20680
Motivation
This PR improves the performance of NixlKVManager by making KV transfer asynchronous and multi-threaded on the prefill node. Previously,
add_transfer_requestperformed each chunk transfer synchronously and the caller (NixlKVSender) had to track and poll all transfer handles. With many decode instances and chunked transfers, this caused the prefill scheduler to block on transfer completion and limited throughput. This change aligns NIXL with the queue-based, multi-worker transfer design.Performance
We ran Qwen3-32B PD disaggregation with NIXL and observed a clear improvement in transfer latency via NIXL telemetry:
Async multi-worker transfer removes the synchronous bottleneck on the prefill path: chunks are processed in parallel by worker threads, and decode instances are sharded across queues for better overlap, which explains the lower mean and significantly improved tail (P95/P99) latency.
Modifications
Async transfer with queue + worker pool (PREFILL mode)
FastQueueinstances (count controlled bySGLANG_DISAGGREGATION_QUEUE_SIZE) and aThreadPoolExecutorper queue (total worker count fromSGLANG_DISAGGREGATION_THREAD_POOL_SIZE).TransferKVChunkdataclass and daemontransfer_workerthreads that consume chunks from the queues and executesend_kvcache/send_kvcache_slice,maybe_send_extra, andsend_auxin the worker.min(max(4, (0.5 * cpu_count) // 8), 12)when the env var is not set; queue size defaults to env (e.g. 4).Non-blocking
add_transfer_requestadd_transfer_requestno longer performs transfer inline; it enqueues aTransferKVChunktotransfer_queues[bootstrap_room % len(transfer_queues)]and returnsNone.request_status(e.g.Transferring,Success,Failed), so the sender no longer needs to hold or poll transfer handles.NixlKVSender simplifications
xfer_handles;poll()now relies onkv_mgr.check_status(bootstrap_room)only.clear()to removebootstrap_roomfromrequest_statuswhen appropriate.request_statusin the sender; the worker clearstransfer_infosand sets status toSuccesswhen the last chunk is done.Scheduler handling of Bootstrapping
prefill.py, requests inKVPoll.Bootstrappingare now treated as undone (together withWaitingForInputandTransferring) so the scheduler does not consider them complete before transfer progress.Testing
python3 -m sglang.test.few_shot_gsm8k --num-questions 200 --host 127.0.0.1 --port 8000: Accuracy: 0.945 with Qwen/Qwen3-8BTestDisaggregationAccuracypasses with NIXL (score 0.76, throughput 3949 token/s)Checklist
test_disaggregation_basic.py; 7 tests passed.)Review Process
/tag-run-ci-label,/rerun-failed-ci,/tag-and-rerun-ci)