[core]refactor communication layer: PR1(Added Refactor Infra Only)#1555
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5cd54ca33f
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
lishunyang12
left a comment
There was a problem hiding this comment.
Left a few comments. This is a big refactor -- worth getting the threading right before it goes in.
| if self.chunk_transfer_adapter: | ||
| self.chunk_transfer_adapter.process_pending_chunks(self.waiting, self.running) | ||
| if self.chunk_coordinator: | ||
| oco = self._latest_omni_connector_output |
There was a problem hiding this comment.
the variable names are too vague
| # Core scheduling methods | ||
| # ------------------------------------------------------------------ # | ||
|
|
||
| def process_pending_chunks( |
There was a problem hiding this comment.
Is it possible to put the processing of Async Chunk and non-async chunk code blocks in the same function? Currently, some processing seems replicated in process_pending_chunks & process_pending_batch_inputs. Could we use a single queue, waiting_for_input_waiting, to handle both waiting_for_input and waiting_for_chunk_waiting simultaneously, and only hold one request status (e.g., WAITING_FOR_INPUT) for both asynchronous and non-asynchronous chunk modes?
| finished_requests_needing_kv_transfer: dict[str, dict] = field(default_factory=dict) | ||
| # Requests that need to be registered for chunk recv by the Model Runner's | ||
| # background thread. Populated by ChunkSchedulingCoordinator. | ||
| pending_chunk_registrations: list = field(default_factory=list) |
There was a problem hiding this comment.
same as above. can we keep only one for registration?
| seed_input["prompt_token_ids"] = [0] * next_prompt_len | ||
| seed_input["multi_modal_data"] = seed_input["mm_processor_kwargs"] = None | ||
| for ds_stage_id in range(1, len(self.stage_list)): | ||
| sp_ds = sampling_params_list[ds_stage_id] |
There was a problem hiding this comment.
the variable name is too vague
|
Hey @natureofnature — I see two new commits since my review, but my 4 inline comments from 2/28 still seem open (busy-spin in recv_loop, shallow copy, dead _finished_save_reqs, duplicate MockQueue in e2e test). Could you take a look when you get a chance? |
I'm still working on it, and will fix them once the basic workflow functions well. @lishunyang12 |
|
@codex review |
04a89ed to
3d3e6e8
Compare
|
@tzhouam @divyanshsinghvi PTAL |
391544a to
bda4aac
Compare
4b8fd82 to
e9601a4
Compare
hsliuustc0106
left a comment
There was a problem hiding this comment.
Gate Status: ❌ BLOCKED
| Check | Status |
|---|---|
| DCO | ✅ |
| Docs | ✅ |
| Mergeable | ❌ CONFLICTING |
Prior Feedback Unaddressed (2/28)
- Dead state:
_finished_save_reqsinitialized but never read - Silent connector failure: Returns
Noneinstead of failing fast - Busy-spin:
time.sleep(0.001)placement still suboptimal - Duplicate MockQueue: Same class in two test files
Test Evidence Missing
Claims "Qwen2.5, Qwen 3 omni, Bagel works" but provides no test commands or output. Please add reproduction steps.
Minor: MRO Inconsistency
# gpu_ar_model_runner.py:154
class GPUARModelRunner(OmniGPUModelRunner, OmniConnectorModelRunnerMixin): # ✓
# gpu_generation_model_runner.py:45
class GPUGenerationModelRunner(OmniConnectorModelRunnerMixin, OmniGPUModelRunner): # ✗Please resolve conflicts and address the 4 prior items.
2b2b419 to
776d5ad
Compare
776d5ad to
5b24a97
Compare
hsliuustc0106
left a comment
There was a problem hiding this comment.
this PR to too large, can we test this in one model intensively before moving to all models?
e6d6219 to
eb7de9b
Compare
|
| @@ -0,0 +1,380 @@ | |||
| # SPDX-License-Identifier: Apache-2.0 | |||
|
merge ci passed but nightly ci failed |
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
eddf5d1 to
590a7d0
Compare
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
I fixed one issue that caused by this PR. And it seems current nightly CI problems (https://buildkite.com/vllm/vllm-omni/builds/6413/steps/canvas) are not related to this PR. @hsliuustc0106 |
…llm-project#1555) Signed-off-by: natureofnature <wzliu@connect.hku.hk> Co-authored-by: Hongsheng Liu <liuhongsheng4@huawei.com>
…llm-project#1555) Signed-off-by: natureofnature <wzliu@connect.hku.hk> Co-authored-by: Hongsheng Liu <liuhongsheng4@huawei.com>
…llm-project#1555) Signed-off-by: natureofnature <wzliu@connect.hku.hk> Co-authored-by: Hongsheng Liu <liuhongsheng4@huawei.com>
PLEASE FILL IN THE PR DESCRIPTION HERE ENSURING ALL CHECKLIST ITEMS (AT THE BOTTOM) HAVE BEEN CONSIDERED.
Purpose
Refactor the vLLM-Omni communication layer by moving all data-plane connector.put()/connector.get() calls from Orchestrator, OmniStage, and Scheduler into a unified OmniConnectorModelRunnerMixin at the Model Runner level, while keeping scheduling coordination logic (e.g., WAITING_FOR_CHUNK state) in the Scheduler.
This is the first PR for communication refactoring, which does not change existing workflow. The core files include Omni connector mixin and scheduler coordinator.
Architecture
Status Summary
OmniConnectorModelRunnerMixinas the main communication hub for the refactored path.Test Plan
Test Result
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model. Please runmkdocs serveto sync the documentation editions to./docs.BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)