Expose a fused_topk_raw_logits API#2682
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a specialized Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request exposes a new fused_topk_raw_logits API which leverages the TRTLLM routingRenormalize kernel, including a new Python module, C++ bindings, JIT compilation updates, and tests. However, a critical thread-safety issue has been identified: a race condition in the management of the global workspace pool. This shared use of intermediate buffers across different threads and CUDA streams on the same device can lead to data corruption and information leakage between concurrent requests. My review includes a suggestion to address this race condition.
| from flashinfer.utils import device_support_pdl | ||
|
|
||
| _ROUTING_TILE_TOKENS_DIM = 128 | ||
| _workspace_pool: dict[tuple[str, int | None], "_RawLogitsTopkWorkspace"] = {} |
There was a problem hiding this comment.
The _workspace_pool global dictionary and the _get_workspace function implement a shared workspace pool that is not thread-safe or stream-safe. The _RawLogitsTopkWorkspace instance is shared across all threads and CUDA streams using the same GPU device. This workspace contains pre-allocated tensors (e.g., topk_weights_bf16, topk_packed, expert_counts) used as intermediate buffers and outputs by the CUDA kernels. Concurrent calls to fused_topk_raw_logits from different threads or streams on the same device will result in multiple kernels writing to the same memory locations simultaneously. This leads to data corruption and potential information leakage between different requests or users in a multi-tenant environment (e.g., an LLM serving platform). To remediate this, consider making the workspace pool key include the CUDA stream ID, or using thread-local storage, or implementing a locking mechanism to ensure exclusive access to the workspace during kernel execution.
| def _get_workspace(device: torch.device) -> _RawLogitsTopkWorkspace: | ||
| key = (device.type, device.index) | ||
| ws = _workspace_pool.get(key) | ||
| if ws is None: | ||
| ws = _RawLogitsTopkWorkspace(device) | ||
| _workspace_pool[key] = ws | ||
| return ws |
There was a problem hiding this comment.
The current implementation of _get_workspace is not thread-safe because it accesses the global _workspace_pool without synchronization. This can lead to a race condition where multiple threads create a workspace for the same device simultaneously, causing unpredictable behavior. To fix this, a lock should be used to protect the creation and insertion of new workspaces into the pool.
| def _get_workspace(device: torch.device) -> _RawLogitsTopkWorkspace: | |
| key = (device.type, device.index) | |
| ws = _workspace_pool.get(key) | |
| if ws is None: | |
| ws = _RawLogitsTopkWorkspace(device) | |
| _workspace_pool[key] = ws | |
| return ws | |
| def _get_workspace(device: torch.device) -> _RawLogitsTopkWorkspace: | |
| if not hasattr(_get_workspace, "lock"): | |
| import threading | |
| _get_workspace.lock = threading.Lock() | |
| key = (device.type, device.index) | |
| ws = _workspace_pool.get(key) | |
| if ws is not None: | |
| return ws | |
| with _get_workspace.lock: | |
| ws = _workspace_pool.get(key) | |
| if ws is None: | |
| ws = _RawLogitsTopkWorkspace(device) | |
| _workspace_pool[key] = ws | |
| return ws |
| // - This function computes top-k scores/weights from raw routing logits (mPtrScores path). | ||
| // - mPtrTopKIds is intentionally left nullptr to force score-driven routing selection. | ||
| // - topk_packed_ptr is required by routing kernels for large-token paths. | ||
| void fused_topk_raw_logits_trtllm_renormalize( |
There was a problem hiding this comment.
I have some concern about the API name, can we make it simpler? (and why do we call it fused?)
1a4e0de to
b418e50
Compare
📌 Description
Expose a raw logits topk api for existing code in trtllm fused moe.
Currently the implementation is not optimized and perf is wrose than a naive torch implementation here sgl-project/sglang@6ed9c53
🔍 Related Issues
#2676
sgl-project/sglang#19537
🚀 Pull Request Checklist
Thank you for contributing to FlashInfer! Before we review your pull request, please make sure the following items are complete.
✅ Pre-commit Checks
pre-commitby runningpip install pre-commit(or used your preferred method).pre-commit install.pre-commit run --all-filesand fixed any reported issues.🧪 Tests
unittest, etc.).Reviewer Notes