diff --git a/examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py b/examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py index e2a0bfc7c9bc..458083f93c16 100644 --- a/examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py +++ b/examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py @@ -259,7 +259,14 @@ def extract_ip_port_fast(url): ) ip, port = extract_ip_port_fast(prefill_request_url) - req_data["max_tokens"] -= 1 + # OpenAI Chat Completions: when both fields are present, + # max_completion_tokens takes precedence, so decrement that one + # to keep the per-request budget consistent with what the + # backend will enforce. Fall back to max_tokens otherwise. + if "max_completion_tokens" in req_data: + req_data["max_completion_tokens"] -= 1 + elif "max_tokens" in req_data: + req_data["max_tokens"] -= 1 req_data["kv_transfer_params"] = { "do_remote_decode": False, @@ -311,6 +318,17 @@ def extract_ip_port_fast(url): ) +@app.route("/health", methods=["GET"]) +async def health_check(): + with _list_lock: + return { + "status": "ok", + "prefill_instances": len(prefill_instances), + "decode_instances": len(decode_instances), + "transfer_type": TRANSFER_TYPE, + } + + if __name__ == "__main__": t = start_service_discovery("0.0.0.0", 36367) app.debug = True diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_engine.py b/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_engine.py index 973c0bb801c8..f5ecb58a6d17 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_engine.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_engine.py @@ -154,7 +154,13 @@ def _process_deferred_tasks(self) -> None: self._deferred_tasks = still_deferred def _is_remote_ready(self, task: WriteTask) -> bool: - """Check if remote blocks are allocated for this task. + """Check if remote blocks are allocated and populated for this task. + + Returns True only when the remote allocation entry exists *and* + carries a non-None ``block_ids`` mapping. The latter check ensures + that ``_execute_write_task`` is never invoked for a task whose + remote ``block_ids`` are still being filled in by the scheduler; + without it we would either drop the task or busy-loop on it. Args: task: The write task @@ -162,9 +168,10 @@ def _is_remote_ready(self, task: WriteTask) -> bool: Returns: True if remote blocks are ready """ - return ( - task.transfer_id in self.worker.moriio_wrapper.done_remote_allocate_req_dict + info = self.worker.moriio_wrapper.done_remote_allocate_req_dict.get( + task.transfer_id ) + return info is not None and info.block_ids is not None def _get_remote_alloc_info(self, transfer_id: str) -> RemoteAllocInfo: """Get remote allocation info for a request. @@ -188,6 +195,10 @@ def _get_remote_alloc_info(self, transfer_id: str) -> RemoteAllocInfo: def _execute_write_task(self, task: WriteTask) -> None: """Execute a single write task. + Callers must ensure ``_is_remote_ready(task)`` returned ``True`` + before invoking this method; ``_is_remote_ready`` guarantees that + ``request_info.block_ids`` is non-None and the entry exists. + Args: task: The write task to execute @@ -195,14 +206,6 @@ def _execute_write_task(self, task: WriteTask) -> None: # Get remote allocation info request_info = self._get_remote_alloc_info(task.transfer_id) - if request_info.block_ids is None: - logger.debug( - "Request remote block IDs not ready:request_id = %s, transfer_id = %s", - task.request_id, - task.transfer_id, - ) - return - # Wait for CUDA event # The attention computation of the current layer cannot # overlap with the kv transfer task,