Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,14 @@ def extract_ip_port_fast(url):
)
ip, port = extract_ip_port_fast(prefill_request_url)

req_data["max_tokens"] -= 1
# OpenAI Chat Completions: when both fields are present,
# max_completion_tokens takes precedence, so decrement that one
# to keep the per-request budget consistent with what the
# backend will enforce. Fall back to max_tokens otherwise.
if "max_completion_tokens" in req_data:
req_data["max_completion_tokens"] -= 1
elif "max_tokens" in req_data:
req_data["max_tokens"] -= 1

req_data["kv_transfer_params"] = {
"do_remote_decode": False,
Expand Down Expand Up @@ -311,6 +318,17 @@ def extract_ip_port_fast(url):
)


@app.route("/health", methods=["GET"])
async def health_check():
with _list_lock:
return {
"status": "ok",
"prefill_instances": len(prefill_instances),
"decode_instances": len(decode_instances),
"transfer_type": TRANSFER_TYPE,
}


if __name__ == "__main__":
t = start_service_discovery("0.0.0.0", 36367)
app.debug = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,24 @@ def _process_deferred_tasks(self) -> None:
self._deferred_tasks = still_deferred

def _is_remote_ready(self, task: WriteTask) -> bool:
"""Check if remote blocks are allocated for this task.
"""Check if remote blocks are allocated and populated for this task.

Returns True only when the remote allocation entry exists *and*
carries a non-None ``block_ids`` mapping. The latter check ensures
that ``_execute_write_task`` is never invoked for a task whose
remote ``block_ids`` are still being filled in by the scheduler;
without it we would either drop the task or busy-loop on it.

Args:
task: The write task

Returns:
True if remote blocks are ready
"""
return (
task.transfer_id in self.worker.moriio_wrapper.done_remote_allocate_req_dict
info = self.worker.moriio_wrapper.done_remote_allocate_req_dict.get(
task.transfer_id
)
return info is not None and info.block_ids is not None

def _get_remote_alloc_info(self, transfer_id: str) -> RemoteAllocInfo:
"""Get remote allocation info for a request.
Expand All @@ -188,21 +195,17 @@ def _get_remote_alloc_info(self, transfer_id: str) -> RemoteAllocInfo:
def _execute_write_task(self, task: WriteTask) -> None:
"""Execute a single write task.

Callers must ensure ``_is_remote_ready(task)`` returned ``True``
before invoking this method; ``_is_remote_ready`` guarantees that
``request_info.block_ids`` is non-None and the entry exists.

Args:
task: The write task to execute

"""
# Get remote allocation info
request_info = self._get_remote_alloc_info(task.transfer_id)

if request_info.block_ids is None:
logger.debug(
"Request remote block IDs not ready:request_id = %s, transfer_id = %s",
task.request_id,
task.transfer_id,
)
return

# Wait for CUDA event
# The attention computation of the current layer cannot
# overlap with the kv transfer task,
Expand Down
Loading