Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 60 additions & 28 deletions vllm/v1/structured_output/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import itertools
import multiprocessing
import threading
from collections.abc import Iterable
from concurrent.futures import Future, ThreadPoolExecutor
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -63,39 +64,65 @@ def __init__(self, vllm_config: VllmConfig):
max_workers = max(1, min(multiprocessing.cpu_count() // 2, 8))
self.executor_for_fillmask = ThreadPoolExecutor(max_workers=max_workers)

if not self.vllm_config.model_config.skip_tokenizer_init:
# The default max_workers if not specified is the number of
# CPUs * 5, which is way too high since these tasks are CPU-bound,
# not I/O bound. We also know we would never dominate CPU usage
# with just grammar compilation, so we set it to half the number
# of CPUs.
max_workers = max(1, (multiprocessing.cpu_count() + 1) // 2)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.tokenizer = cached_tokenizer_from_config(
model_config=self.vllm_config.model_config
)
reasoning_parser = (
self.vllm_config.structured_outputs_config.reasoning_parser
)
reasoning_parser_plugin = (
self.vllm_config.structured_outputs_config.reasoning_parser_plugin
)
if reasoning_parser_plugin and len(reasoning_parser_plugin) > 3:
ReasoningParserManager.import_reasoning_parser(reasoning_parser_plugin)

reasoning_parser = (
self.vllm_config.structured_outputs_config.reasoning_parser
)
if reasoning_parser:
reasoner_cls = ReasoningParserManager.get_reasoning_parser(
reasoning_parser
)
self.reasoner = reasoner_cls(tokenizer=self.tokenizer)
# Tokenizer is loaded lazily to avoid duplicate tokenizer initialization
# in multiprocess mode. For GGUF models, this prevents a semaphore leak
# that causes server hangs (tokenizer builds merges on the fly, which
# uses multiprocessing primitives that don't clean up in subprocesses).
self._tokenizer = None
self._tokenizer_initialized = False
self._tokenizer_init_lock = threading.Lock()
self.executor: ThreadPoolExecutor | None = None

self.enable_in_reasoning = (
self.vllm_config.structured_outputs_config.enable_in_reasoning
)

@property
def tokenizer(self):
"""Lazily initialize tokenizer when first accessed (thread-safe)."""
# Double-checked locking pattern for thread-safe lazy initialization
if not self._tokenizer_initialized:
with self._tokenizer_init_lock:
if not self._tokenizer_initialized:
self._init_tokenizer()
return self._tokenizer

def _init_tokenizer(self):
"""Initialize tokenizer and related components on first use."""
if self._tokenizer_initialized:
return

if self.vllm_config.model_config.skip_tokenizer_init:
raise RuntimeError(
"Structured output requires a tokenizer, but "
"skip_tokenizer_init is enabled. Either disable "
"skip_tokenizer_init or avoid using structured output."
)

# The default max_workers if not specified is the number of
# CPUs * 5, which is way too high since these tasks are CPU-bound,
# not I/O bound. We also know we would never dominate CPU usage
# with just grammar compilation, so we set it to half the number
# of CPUs.
max_workers = max(1, (multiprocessing.cpu_count() + 1) // 2)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self._tokenizer = cached_tokenizer_from_config(
model_config=self.vllm_config.model_config
)

reasoning_parser = self.vllm_config.structured_outputs_config.reasoning_parser
reasoning_parser_plugin = (
self.vllm_config.structured_outputs_config.reasoning_parser_plugin
)
if reasoning_parser_plugin and len(reasoning_parser_plugin) > 3:
ReasoningParserManager.import_reasoning_parser(reasoning_parser_plugin)

if reasoning_parser:
reasoner_cls = ReasoningParserManager.get_reasoning_parser(reasoning_parser)
self.reasoner = reasoner_cls(tokenizer=self._tokenizer)

self._tokenizer_initialized = True
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadPoolExecutor leaked on initialization failure

Low Severity

In _init_tokenizer, the ThreadPoolExecutor is created at line 108 before calling cached_tokenizer_from_config and subsequent initialization steps. If any of these steps fail, _tokenizer_initialized remains False but self.executor holds a reference to the created executor. On retry (next access to self.tokenizer), a new ThreadPoolExecutor is created and assigned to self.executor, orphaning the previous one without proper shutdown. This can leak thread resources in scenarios with transient initialization failures.

Fix in Cursor Fix in Web


def grammar_init(self, request: "Request") -> None:
if request.structured_output_request is None:
return
Expand Down Expand Up @@ -149,6 +176,11 @@ def grammar_init(self, request: "Request") -> None:
raise ValueError(f"Unsupported structured output backend: {backend}")

if self._use_async_grammar_compilation:
# Ensure tokenizer (and executor) is initialized
_ = self.tokenizer
assert self.executor is not None, (
"Executor should be initialized with tokenizer"
)
grammar = self.executor.submit(self._create_grammar, request)
else:
grammar = self._create_grammar(request) # type: ignore[assignment]
Expand Down