From 34c0dd50e15bdf72a080fd0a329849a57eaad6fc Mon Sep 17 00:00:00 2001 From: Benjamin Chislett Date: Mon, 28 Jul 2025 20:48:16 +0000 Subject: [PATCH 1/7] patch torch.compile edge-case for xgrammar apply bitmask Signed-off-by: Benjamin Chislett --- vllm/v1/worker/gpu_model_runner.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index a5bf197ba161..36e3d65723d4 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -1210,9 +1210,14 @@ def apply_grammar_bitmask( cumulative_index += 1 + num_spec_tokens grammar_bitmask = sorted_bitmask + # If the grammar bitmask and the logits have the same shape + # we don't need to pass indices to the kernel, + # since the bitmask is already aligned with the logits. + skip_out_indices = grammar_bitmask.shape[0] == logits.shape[0] + # Serialization of np.ndarray is much more efficient than a tensor, # so we receive it in that format. - grammar_bitmask = torch.from_numpy(grammar_bitmask) + grammar_bitmask = torch.from_numpy(grammar_bitmask).contiguous() # Force use of the torch.compile implementation from xgrammar to work # around issues with the Triton kernel in concurrent structured output @@ -1220,7 +1225,7 @@ def apply_grammar_bitmask( xgr_torch_compile.apply_token_bitmask_inplace_torch_compile( logits, grammar_bitmask.to(self.device, non_blocking=True), - indices=out_indices, + indices=out_indices if not skip_out_indices else None, ) def sync_and_slice_intermediate_tensors( From d346d3ff8d0943c93f83cff5230590d47e663343 Mon Sep 17 00:00:00 2001 From: Benjamin Chislett Date: Mon, 28 Jul 2025 20:50:12 +0000 Subject: [PATCH 2/7] cache is_terminated property on xgrammar state Signed-off-by: Benjamin Chislett --- vllm/v1/structured_output/backend_xgrammar.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/vllm/v1/structured_output/backend_xgrammar.py b/vllm/v1/structured_output/backend_xgrammar.py index 88544565e544..732fc5defe43 100644 --- a/vllm/v1/structured_output/backend_xgrammar.py +++ b/vllm/v1/structured_output/backend_xgrammar.py @@ -148,6 +148,7 @@ class XgrammarGrammar(StructuredOutputGrammar): repr=False, hash=False, init=False) + _is_terminated: bool = field(default=False, repr=False, hash=False) def accept_tokens(self, request_id: str, tokens: list[int]) -> bool: """Accepts a list of tokens and advances the FSM. @@ -155,6 +156,8 @@ def accept_tokens(self, request_id: str, tokens: list[int]) -> bool: Returns True if the FSM was advanced successfully. Returns False if the FSM failed to advance. """ + if self._is_terminated: + return False for token in tokens: if not self.matcher.accept_token(token): logger.error( @@ -162,6 +165,7 @@ def accept_tokens(self, request_id: str, tokens: list[int]) -> bool: "for tokens %s. Please file an issue.", request_id, token) return False self.num_processed_tokens += 1 + self._is_terminated = self.matcher.is_terminated() return True def validate_tokens(self, tokens: list[int]) -> list[int]: @@ -189,7 +193,7 @@ def fill_bitmask(self, bitmask: torch.Tensor, idx: int) -> None: self.matcher.fill_next_token_bitmask(bitmask, idx) def is_terminated(self) -> bool: - return self.matcher.is_terminated() + return self._is_terminated def reset(self): self.num_processed_tokens = 0 From b48817239b868c5fbc6e3c464fa80822d33c7f8f Mon Sep 17 00:00:00 2001 From: Benjamin Chislett Date: Mon, 28 Jul 2025 20:52:55 +0000 Subject: [PATCH 3/7] parallelize and optimize grammar bitmask creation Signed-off-by: Benjamin Chislett --- vllm/v1/structured_output/__init__.py | 81 ++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 14 deletions(-) diff --git a/vllm/v1/structured_output/__init__.py b/vllm/v1/structured_output/__init__.py index bd1dd01f9063..495cad253a3d 100644 --- a/vllm/v1/structured_output/__init__.py +++ b/vllm/v1/structured_output/__init__.py @@ -28,6 +28,8 @@ logger = init_logger(__name__) +USE_PARALLEL_FILLBITMASK = True + class StructuredOutputManager: """Engine-level manager for structured output requests.""" @@ -40,6 +42,14 @@ def __init__(self, vllm_config: VllmConfig): self._grammar_bitmask: Optional[torch.Tensor] = None self._full_mask = torch.tensor(-1, dtype=torch.int32) + self.bitmask_batch_size = 8 + if USE_PARALLEL_FILLBITMASK: + max_workers = min(multiprocessing.cpu_count(), 8) + self.executor_for_fillmask = ThreadPoolExecutor( + max_workers=max_workers) + + torch._dynamo.config.recompile_limit = 64 + if not self.vllm_config.model_config.skip_tokenizer_init: # The default max_workers if not specified is the number of # CPUs * 5, which is way too high since these tasks are CPU-bound, @@ -120,6 +130,17 @@ def _async_create_grammar( assert self.backend is not None return self.backend.compile_grammar(request_type, grammar_spec) + def _async_fill_bitmasks( + self, + batch: list[tuple[StructuredOutputGrammar, int]], + ): + for grammar, index in batch: + if grammar.is_terminated(): + self._grammar_bitmask[index].fill_(self._full_mask) + else: + grammar.fill_bitmask(self._grammar_bitmask, index) + return True + def grammar_bitmask( self, requests: dict[str, Request], @@ -158,11 +179,42 @@ def grammar_bitmask( # Note that for thinking support, we will need to # reset the relevant part of the bitmask for consequent # request here. - bitmask_tensor[:(len(ordered_seq) * (1 + max_num_spec_tokens))].fill_( - self._full_mask) + # bitmask_tensor[:(len(ordered_seq) * (1 + max_num_spec_tokens))].fill_( + # self._full_mask) + + # FILL ALL BITMASKS FOR FIRST PASS + promises = [] + batch = [] + for req_id, _ in ordered_seq: + request = requests[req_id] + structured_output_request = request.structured_output_request + + batch.append((structured_output_request.grammar, cumulative_index)) + if len(batch) == self.bitmask_batch_size: + if USE_PARALLEL_FILLBITMASK: + promises.append( + self.executor_for_fillmask.submit( + self._async_fill_bitmasks, batch)) + else: + self._async_fill_bitmasks(batch) + batch = [] + + cumulative_index += 1 + if batch: + if USE_PARALLEL_FILLBITMASK: + promises.append( + self.executor_for_fillmask.submit( + self._async_fill_bitmasks, batch)) + else: + self._async_fill_bitmasks(batch) + + # Wait for all bitmask filling tasks to complete. + for promise in promises: + promise.result() # NOTE: This outer loop can likely be parallelized to improve # performance of bitmask generation for large batches. + """ for req_id, _ in ordered_seq: request = requests[req_id] structured_output_request = request.structured_output_request @@ -180,21 +232,22 @@ def grammar_bitmask( state_advancements = 0 req_tokens = scheduled_spec_decode_tokens.get(req_id, []) + [None] for i, token in enumerate(req_tokens): - if apply_bitmask and not \ - structured_output_request.grammar.is_terminated(): - structured_output_request.grammar.fill_bitmask( - bitmask_tensor, cumulative_index) - if token is not None: - # In order to generate the correct bitmask for each - # position in the speculative sequence, we advance - # the FSM state for each speculative token and rollback - # to restore the previous state when we are finished. - assert structured_output_request.grammar.accept_tokens( - req_id, [token]) - state_advancements += 1 + # if True: + if i != 0: + if apply_bitmask and not \ + structured_output_request.grammar.is_terminated(): + structured_output_request.grammar.fill_bitmask( + bitmask_tensor, cumulative_index) + else: + bitmask_tensor[cumulative_index].fill_(self._full_mask) + if apply_bitmask and structured_output_request.grammar.is_terminated() and token is not None: + assert structured_output_request.grammar.accept_tokens( + req_id, [token]) + state_advancements += 1 cumulative_index += 1 if state_advancements > 0: structured_output_request.grammar.rollback(state_advancements) + """ if cumulative_index < bitmask_tensor.shape[0]: bitmask_tensor = bitmask_tensor[:cumulative_index] From 18b3ff415f27739c88c89d1d7ead9719389e88e2 Mon Sep 17 00:00:00 2001 From: Benjamin Chislett Date: Tue, 29 Jul 2025 18:46:11 +0000 Subject: [PATCH 4/7] refactor parallel bitmask fill code Signed-off-by: Benjamin Chislett --- vllm/v1/structured_output/__init__.py | 174 +++++++++++++------------- 1 file changed, 86 insertions(+), 88 deletions(-) diff --git a/vllm/v1/structured_output/__init__.py b/vllm/v1/structured_output/__init__.py index 495cad253a3d..c19628fa1d59 100644 --- a/vllm/v1/structured_output/__init__.py +++ b/vllm/v1/structured_output/__init__.py @@ -3,7 +3,7 @@ from __future__ import annotations import multiprocessing -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor from typing import TYPE_CHECKING, Optional from vllm.config import VllmConfig @@ -28,8 +28,6 @@ logger = init_logger(__name__) -USE_PARALLEL_FILLBITMASK = True - class StructuredOutputManager: """Engine-level manager for structured output requests.""" @@ -42,14 +40,14 @@ def __init__(self, vllm_config: VllmConfig): self._grammar_bitmask: Optional[torch.Tensor] = None self._full_mask = torch.tensor(-1, dtype=torch.int32) - self.bitmask_batch_size = 8 - if USE_PARALLEL_FILLBITMASK: - max_workers = min(multiprocessing.cpu_count(), 8) + max_batch_size = self.vllm_config.scheduler_config.max_num_seqs + self.fill_bitmask_parallel_threshold = 128 + if self.fill_bitmask_parallel_threshold < max_batch_size: + self.fill_bitmask_parallel_batch_size = 16 + max_workers = max(1, min(multiprocessing.cpu_count(), 8)) self.executor_for_fillmask = ThreadPoolExecutor( max_workers=max_workers) - torch._dynamo.config.recompile_limit = 64 - if not self.vllm_config.model_config.skip_tokenizer_init: # The default max_workers if not specified is the number of # CPUs * 5, which is way too high since these tasks are CPU-bound, @@ -130,16 +128,25 @@ def _async_create_grammar( assert self.backend is not None return self.backend.compile_grammar(request_type, grammar_spec) - def _async_fill_bitmasks( + def _fill_bitmasks( self, - batch: list[tuple[StructuredOutputGrammar, int]], - ): - for grammar, index in batch: - if grammar.is_terminated(): - self._grammar_bitmask[index].fill_(self._full_mask) - else: + batch: list[tuple[StructuredOutputGrammar, int, bool]], + ) -> None: + assert self._grammar_bitmask is not None + for grammar, index, apply_bitmask in batch: + if apply_bitmask and not grammar.is_terminated(): grammar.fill_bitmask(self._grammar_bitmask, index) - return True + else: + # Note that for thinking support, we will need to + # reset the relevant part of the bitmask for consequent + # requests here. + self._grammar_bitmask[index].fill_(self._full_mask) + + def _async_submit_fill_bitmask( + self, + batch: list[tuple[StructuredOutputGrammar, int, bool]], + ) -> Future: + return self.executor_for_fillmask.submit(self._fill_bitmasks, batch) def grammar_bitmask( self, @@ -167,7 +174,6 @@ def grammar_bitmask( self.backend.allocate_token_bitmask( max_batch_size * (1 + max_num_spec_tokens)) - bitmask_tensor = self._grammar_bitmask # Generate a batched bitmask for all structured output requests. # When speculative decoding is enabled, we need to include multiple # masks for each request, one for each possible bonus token position. @@ -176,79 +182,62 @@ def grammar_bitmask( ordered_seq = sorted(structured_output_request_ids.items(), key=lambda x: x[1]) - # Note that for thinking support, we will need to - # reset the relevant part of the bitmask for consequent - # request here. - # bitmask_tensor[:(len(ordered_seq) * (1 + max_num_spec_tokens))].fill_( - # self._full_mask) - - # FILL ALL BITMASKS FOR FIRST PASS - promises = [] - batch = [] - for req_id, _ in ordered_seq: - request = requests[req_id] - structured_output_request = request.structured_output_request - - batch.append((structured_output_request.grammar, cumulative_index)) - if len(batch) == self.bitmask_batch_size: - if USE_PARALLEL_FILLBITMASK: - promises.append( - self.executor_for_fillmask.submit( - self._async_fill_bitmasks, batch)) - else: - self._async_fill_bitmasks(batch) - batch = [] - - cumulative_index += 1 - if batch: - if USE_PARALLEL_FILLBITMASK: - promises.append( - self.executor_for_fillmask.submit( - self._async_fill_bitmasks, batch)) - else: - self._async_fill_bitmasks(batch) - - # Wait for all bitmask filling tasks to complete. - for promise in promises: - promise.result() - - # NOTE: This outer loop can likely be parallelized to improve - # performance of bitmask generation for large batches. - """ - for req_id, _ in ordered_seq: - request = requests[req_id] - structured_output_request = request.structured_output_request - - if TYPE_CHECKING: - assert structured_output_request is not None - assert structured_output_request.grammar is not None - apply_bitmask: bool = True - if self.reasoner is not None: - if structured_output_request.reasoning_ended is None: - structured_output_request.reasoning_ended = \ - self.reasoner.is_reasoning_end(request.prompt_token_ids) - apply_bitmask = structured_output_request.reasoning_ended - - state_advancements = 0 - req_tokens = scheduled_spec_decode_tokens.get(req_id, []) + [None] - for i, token in enumerate(req_tokens): - # if True: - if i != 0: - if apply_bitmask and not \ - structured_output_request.grammar.is_terminated(): - structured_output_request.grammar.fill_bitmask( - bitmask_tensor, cumulative_index) - else: - bitmask_tensor[cumulative_index].fill_(self._full_mask) - if apply_bitmask and structured_output_request.grammar.is_terminated() and token is not None: - assert structured_output_request.grammar.accept_tokens( - req_id, [token]) - state_advancements += 1 + # Optimized parallel filling of bitmasks for + # non-spec, large-batch-size cases + if len(ordered_seq) > self.fill_bitmask_parallel_threshold and \ + max_num_spec_tokens == 0: + promises = [] + batch = [] + for req_id, _ in ordered_seq: + request = requests[req_id] + structured_output_request = request.structured_output_request + if TYPE_CHECKING: + assert structured_output_request is not None + assert structured_output_request.grammar is not None + + apply_bitmask = self.should_fill_bitmask(request) + batch.append((structured_output_request.grammar, + cumulative_index, apply_bitmask)) + if len(batch) == self.fill_bitmask_parallel_batch_size: + promises.append(self._async_submit_fill_bitmask(batch)) + batch = [] + cumulative_index += 1 - if state_advancements > 0: - structured_output_request.grammar.rollback(state_advancements) - """ + if batch: + promises.append(self._async_submit_fill_bitmask(batch)) + # Wait for all bitmask filling tasks to complete. + for promise in promises: + promise.result() + else: + # Fallback to serial filling of bitmasks for small-batch-size cases + for req_id, _ in ordered_seq: + request = requests[req_id] + structured_output_request = request.structured_output_request + + if TYPE_CHECKING: + assert structured_output_request is not None + assert structured_output_request.grammar is not None + apply_bitmask = self.should_fill_bitmask(request) + + state_advancements = 0 + req_tokens = scheduled_spec_decode_tokens.get(req_id, []) + for i, token in enumerate(req_tokens + [None]): + self._fill_bitmasks([(structured_output_request.grammar, + cumulative_index, apply_bitmask)]) + + if apply_bitmask and \ + structured_output_request.grammar.is_terminated() and \ + token is not None: + assert structured_output_request.grammar.accept_tokens( + req_id, [token]) + state_advancements += 1 + cumulative_index += 1 + if state_advancements > 0: + structured_output_request.grammar.rollback( + state_advancements) + + bitmask_tensor = self._grammar_bitmask if cumulative_index < bitmask_tensor.shape[0]: bitmask_tensor = bitmask_tensor[:cumulative_index] @@ -257,6 +246,15 @@ def grammar_bitmask( # and deserialization when sending this to the GPU workers. return bitmask_tensor.numpy() + def should_fill_bitmask(self, request: Request) -> bool: + if self.reasoner is not None: + assert request.structured_output_request is not None + if request.structured_output_request.reasoning_ended is None: + request.structured_output_request.reasoning_ended = \ + self.reasoner.is_reasoning_end(request.prompt_token_ids) + return request.structured_output_request.reasoning_ended + return True + def should_advance(self, request: Request) -> bool: if not request.use_structured_output: return False From f559a22eb9641656d75513d5793630f83932abcd Mon Sep 17 00:00:00 2001 From: Benjamin Chislett Date: Tue, 29 Jul 2025 19:28:08 +0000 Subject: [PATCH 5/7] bugfix Signed-off-by: Benjamin Chislett --- vllm/v1/structured_output/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/vllm/v1/structured_output/__init__.py b/vllm/v1/structured_output/__init__.py index c19628fa1d59..48b3d772ce46 100644 --- a/vllm/v1/structured_output/__init__.py +++ b/vllm/v1/structured_output/__init__.py @@ -226,9 +226,8 @@ def grammar_bitmask( self._fill_bitmasks([(structured_output_request.grammar, cumulative_index, apply_bitmask)]) - if apply_bitmask and \ - structured_output_request.grammar.is_terminated() and \ - token is not None: + if apply_bitmask and token is not None and \ + not structured_output_request.grammar.is_terminated(): assert structured_output_request.grammar.accept_tokens( req_id, [token]) state_advancements += 1 From 27cce16f14f6a9a8e0c89f713fae5380608e737d Mon Sep 17 00:00:00 2001 From: Benjamin Chislett Date: Tue, 29 Jul 2025 19:36:37 +0000 Subject: [PATCH 6/7] bugfix Signed-off-by: Benjamin Chislett --- vllm/v1/structured_output/backend_xgrammar.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm/v1/structured_output/backend_xgrammar.py b/vllm/v1/structured_output/backend_xgrammar.py index 732fc5defe43..5e00f6380416 100644 --- a/vllm/v1/structured_output/backend_xgrammar.py +++ b/vllm/v1/structured_output/backend_xgrammar.py @@ -188,6 +188,7 @@ def validate_tokens(self, tokens: list[int]) -> list[int]: def rollback(self, num_tokens: int) -> None: self.matcher.rollback(num_tokens) self.num_processed_tokens -= num_tokens + self._is_terminated = self.matcher.is_terminated() def fill_bitmask(self, bitmask: torch.Tensor, idx: int) -> None: self.matcher.fill_next_token_bitmask(bitmask, idx) From 19fd9468031ed693d4eda70601e3b3d4ce71b64d Mon Sep 17 00:00:00 2001 From: Benjamin Chislett Date: Mon, 4 Aug 2025 15:01:33 -0400 Subject: [PATCH 7/7] Update vllm/v1/structured_output/__init__.py Co-authored-by: Russell Bryant Signed-off-by: Benjamin Chislett Signed-off-by: Benjamin Chislett --- vllm/v1/structured_output/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/vllm/v1/structured_output/__init__.py b/vllm/v1/structured_output/__init__.py index 48b3d772ce46..63604a335d9f 100644 --- a/vllm/v1/structured_output/__init__.py +++ b/vllm/v1/structured_output/__init__.py @@ -44,7 +44,10 @@ def __init__(self, vllm_config: VllmConfig): self.fill_bitmask_parallel_threshold = 128 if self.fill_bitmask_parallel_threshold < max_batch_size: self.fill_bitmask_parallel_batch_size = 16 - max_workers = max(1, min(multiprocessing.cpu_count(), 8)) + # Use: + # - at least 1 CPU + # - at most half the number of CPUs or 8, whichever is less + max_workers = max(1, min(multiprocessing.cpu_count() // 2, 8)) self.executor_for_fillmask = ThreadPoolExecutor( max_workers=max_workers)