-
-
Notifications
You must be signed in to change notification settings - Fork 11.8k
Revert "[PerfFix] Avoid separate thread for MP executor shm spin (#28012)" #28289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request reverts a previous performance-related change (#28012) that was causing issues in deployments with tensor parallelism greater than one. The revert re-introduces a dedicated I/O thread in the multiprocessor executor to handle worker responses, removing the custom FutureWrapper and its associated logic. The changes touch several components, including executors, KV connector utilities, and tests, to align with the restored asynchronous execution model. My review identified a critical issue with an incorrect type hint that could lead to runtime errors, and a high-severity thread-safety concern in the asynchronous aggregation logic that, while not causing a bug with the current configuration, is fragile and should be addressed to prevent future issues.
| @torch.inference_mode() | ||
| def sample_tokens( | ||
| self, grammar_output: "GrammarOutput | None" | ||
| self, grammar_output: "GrammarOutput" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type hint for grammar_output has been changed to GrammarOutput, but callers of this method (e.g., in vllm/v1/executor/abstract.py) can pass None. This creates a discrepancy between the type hint and the actual usage, and can lead to a runtime AttributeError if None is passed and its attributes are accessed downstream. The type hint should be reverted to GrammarOutput | None to accurately reflect that None is a valid value.
| self, grammar_output: "GrammarOutput" | |
| self, grammar_output: "GrammarOutput | None" |
| outputs: list[ModelRunnerOutput | None] = [None] * len(output_futures) | ||
| remaining = len(output_futures) | ||
|
|
||
| def make_callback(idx): | ||
| def callback(fut): | ||
| if result_future.done(): | ||
| return | ||
|
|
||
| try: | ||
| outputs[idx] = fut.result() | ||
| except CancelledError: | ||
| result_future.cancel() | ||
| except Exception as e: | ||
| result_future.set_exception(e) | ||
|
|
||
| # this check assumes io_thread_pool uses a single thread | ||
| nonlocal remaining | ||
| remaining -= 1 | ||
| if not remaining: | ||
| result_future.set_result(self.aggregate(outputs, output_rank)) | ||
|
|
||
| return callback | ||
|
|
||
| for i, output_future in enumerate(output_futures): | ||
| output_future.add_done_callback(make_callback(i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of async_aggregate is not thread-safe. The remaining counter is accessed and modified without a lock. While the comment on line 246 correctly points out the assumption of a single-threaded I/O pool, this design is fragile. If the ThreadPoolExecutor in MultiprocExecutor is ever configured with more than one worker, this will introduce a race condition, leading to incorrect behavior. To make this implementation robust and thread-safe, a lock should be used to protect the shared remaining counter.
from threading import Lock
outputs: list[ModelRunnerOutput | None] = [None] * len(output_futures)
remaining = len(output_futures)
lock = Lock()
def make_callback(idx):
def callback(fut):
if result_future.done():
return
try:
outputs[idx] = fut.result()
except CancelledError:
result_future.cancel()
except Exception as e:
result_future.set_exception(e)
with lock:
# This check is now thread-safe.
nonlocal remaining
remaining -= 1
if not remaining:
if not result_future.done():
result_future.set_result(self.aggregate(outputs, output_rank))
return callback
for i, output_future in enumerate(output_futures):
output_future.add_done_callback(make_callback(i))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| def __init__(self, refs, aggregator: KVOutputAggregator | None = None): | ||
| super().__init__() | ||
| self.ref_or_refs = ref_or_refs | ||
| self.refs = refs | ||
| self.aggregator = aggregator | ||
|
|
||
| def result(self, timeout=None): | ||
| if timeout is not None: | ||
| raise NotImplementedError("timeout is not supported") | ||
|
|
||
| outputs = ray.get(self.ref_or_refs, timeout=timeout) | ||
| if self.aggregator is None: | ||
| return outputs | ||
| return self.refs[0].get() | ||
|
|
||
| outputs = [ref.get() for ref in self.refs] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ray futures retrieved via nonexistent
ObjectRef.get
The new FutureWrapper.result() calls self.refs[0].get() and [ref.get() for ref in self.refs]. Ray object references don’t expose a .get() method; they must be resolved with ray.get(ref) or ray.get(refs). This means any non-blocking path that returns a FutureWrapper will immediately raise AttributeError when the scheduler awaits the result, breaking Ray execution entirely.
Useful? React with 👍 / 👎.
|
|
||
| # When PP is used, we return a FutureWrapper immediately so that | ||
| # the scheduler can yield to the next batch. | ||
| return FutureWrapper(refs[0]) | ||
| return FutureWrapper(refs) | ||
|
|
||
| # Get output from all workers when connector is present | ||
| assert self.kv_output_aggregator is not None | ||
| if not non_block: | ||
| # Block and get results from all workers | ||
| return self.kv_output_aggregator.aggregate(ray.get(refs)) | ||
| outputs = [ref.get() for ref in refs] | ||
| return self.kv_output_aggregator.aggregate(outputs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking Ray sampling uses
.get() on ObjectRef
RayDistributedExecutor.sample_tokens now resolves Ray outputs with refs[0].get() and [ref.get() for ref in refs]. ObjectRef does not provide a get() API, so any synchronous sampling (with or without a KV connector) will fail with AttributeError before a response is returned. Use ray.get(refs) to retrieve the values.
Useful? React with 👍 / 👎.
…m-project#28012)" This reverts commit c9f66da. Signed-off-by: NickLucche <[email protected]>
4dc2fee to
8a1d25a
Compare
|
Here is the change again with a fix: #28319 Thanks @NickLucche and @DarkLight1337 |
…m-project#28012)" (vllm-project#28289) Signed-off-by: NickLucche <[email protected]>
…m-project#28012)" (vllm-project#28289) Signed-off-by: NickLucche <[email protected]> Signed-off-by: xuebwang-amd <[email protected]>
…m-project#28012)" (vllm-project#28289) Signed-off-by: NickLucche <[email protected]>
This PR #28012 is breaking PD deployments with TP>1.