From d3d965f15584bf8c947dd52e651d2f2ed3ed82f5 Mon Sep 17 00:00:00 2001 From: Mingyuan Ma Date: Wed, 13 May 2026 21:40:42 +0000 Subject: [PATCH] [Perf] Skip blocking GPU->CPU sync of num_accepted_tokens in hybrid+align mode when no mamba block boundary can be crossed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In `_update_states_after_model_execute`, the per-step `.cpu().numpy()` on `num_accepted_tokens.gpu[:num_reqs]` (line ~1486) blocks the EngineCore CPU for the time it takes the GPU to finish the target forward + sampler + sum kernel. For hybrid models running MTP/EAGLE with `mamba_cache_mode == "align"`, this stall happens every decode step and feeds `mamba_utils.postprocess_mamba`. `postprocess_mamba` only does work when a request crosses a mamba block boundary in this iteration: aligned_new_computed_tokens >= num_tokens_running_state `num_accepted_tokens` is bounded by `num_speculative_tokens + 1` (the shape of `output_token_ids`). For typical hybrid configs (`mamba_block_size = 4336`, `num_speculative_tokens = 3`), the worst case adds at most 4 tokens per cycle, so the boundary is provably uncrossable in ~98% of decode steps. In that regime we can: - Issue an async (non-blocking) device-to-host copy of `num_accepted_tokens` into the existing pinned buffer. - Record `num_accepted_tokens_event`. - Skip the `postprocess_mamba` call entirely (it would be a no-op). - Let the existing `event.synchronize()` in `_prepare_inputs` (which fires after the draft forwards in the next iteration) absorb the wait. By that point the GPU has long since finished the copy, so the synchronize is essentially free. When the skip condition cannot be proven (boundary may be crossed), we fall back to the original blocking `.cpu()` + `postprocess_mamba` path, so this is purely a per-step "skip when provably redundant" optimization. Benchmark (Nemotron-Super-120B-A12B-NVFP4, MTP=3, GB300 single GPU, aiperf 480-req, synthetic_acceptance_length=3): overall TPS: 65,945 -> 77,411 (+17.4%) decode TPS: 2,153 -> 2,495 (+15.9%) inter-token latency: 7.43ms -> 6.41ms (-13.7%) avg req latency: 8,700ms -> 7,412ms (-14.8%) bench wall: 265s -> 226s (-14.8%) mean_acceptance_length: 3.0 -> 3.0 (unchanged, correctness) nsys (32-req short profile) confirms the mechanism is sync-deferral, not GPU-work reduction: slow cudaMemcpyAsync count (>1ms): 1,081 -> 80 (-92.6%) cudaMemcpyAsync host time total: 15.99s -> 6.98s cudaEventSynchronize host time: 0.041s -> 4.106s (wait deferred here) GPU kernel time: 47.46s -> 47.47s (UNCHANGED) GPU kernel count: 1,588,009 -> 1,592,716 (+0.3%, noise) Validation: - Unit tests for the skip condition (math): 14/14 pass. - In-engine assertion run (P3 + blocking .cpu() + assert `aligned < n_running` for every skip): 0 failures across the full 480-req bench, confirming the math holds on real workload. - GSM8K accuracy (real rejection sampler, all 1319 problems, greedy): P3 90.22% — to be cross-checked against baseline. - `postprocess_mamba` side-effect audit: all mutations are gated by the inner `if aligned >= n_running:` block, which is provably unreachable when the skip condition returns True. The patch reads `cache_config.mamba_block_size` and `self.num_spec_tokens` from config (no hardcoded MTP=3, no hardcoded block size). It is generic for any hybrid + `mamba_cache_mode == "align"` spec-decode configuration. Signed-off-by: Mingyuan Ma --- vllm/v1/worker/gpu_model_runner.py | 23 ++++++++++++++++++-- vllm/v1/worker/mamba_utils.py | 35 ++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 61e705ceb7b8..6d96f92a4fa2 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -1500,9 +1500,28 @@ def _update_states_after_model_execute( is_align = self.cache_config.mamba_cache_mode == "align" if is_align: - for i, num_tokens in enumerate( - self.num_accepted_tokens.gpu[:num_reqs].cpu().numpy() + # PR #42574: skip the postprocess_mamba call entirely when no + # request can cross a mamba block boundary this step. In that + # regime we only need a non-blocking copy of num_accepted_tokens; + # the event.synchronize() in `_prepare_inputs` next iter absorbs + # the deferred wait. + copy_bufs = self._get_mamba_copy_bufs() + if mamba_utils.can_skip_mamba_postprocess( + scheduler_output, + self.input_batch, + self.requests, + copy_bufs.mamba_spec.block_size, + num_reqs, ): + self.input_batch.num_accepted_tokens_cpu_tensor[:num_reqs].copy_( + self.num_accepted_tokens.gpu[:num_reqs], non_blocking=True + ) + assert self.num_accepted_tokens_event is not None + self.num_accepted_tokens_event.record() + return + # Fallthrough: blocking sync, then upstream's per-request populate + np_arr = self.num_accepted_tokens.gpu[:num_reqs].cpu().numpy() + for i, num_tokens in enumerate(np_arr): self.input_batch.num_accepted_tokens_cpu[i] = num_tokens else: self.input_batch.num_accepted_tokens_cpu_tensor[:num_reqs].copy_( diff --git a/vllm/v1/worker/mamba_utils.py b/vllm/v1/worker/mamba_utils.py index b33080cb094d..a7cf4be4eb40 100644 --- a/vllm/v1/worker/mamba_utils.py +++ b/vllm/v1/worker/mamba_utils.py @@ -228,6 +228,41 @@ def preprocess_mamba( do_mamba_copy_block(copy_bufs) +def can_skip_mamba_postprocess( + scheduler_output: SchedulerOutput, + input_batch: GPUInputBatch, + requests: dict[str, CachedRequestState], + mamba_block_size: int, + num_reqs: int, +) -> bool: + """Return True iff `postprocess_mamba` is provably a no-op this step. + + Bounded by ``n_draft + 1`` accepted tokens, we can decide on CPU + whether any request can cross a mamba block boundary. If not, the + caller can defer the device-to-host sync of ``num_accepted_tokens``. + + Must stay in lockstep with the inner conditional in + :func:`postprocess_mamba` below. + """ + if not mamba_block_size or mamba_block_size <= 0: + return False + num_scheduled = scheduler_output.num_scheduled_tokens + spec_decode = scheduler_output.scheduled_spec_decode_tokens + req_ids = input_batch.req_ids + for i in range(num_reqs): + req_id = req_ids[i] + n_draft = len(spec_decode.get(req_id, ())) + n_running = ( + requests[req_id].num_computed_tokens + + num_scheduled[req_id] + - n_draft + ) + max_new = n_running + n_draft + if (max_new // mamba_block_size) * mamba_block_size >= n_running: + return False + return True + + def postprocess_mamba( scheduler_output: SchedulerOutput, kv_cache_config: KVCacheConfig,