Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions vllm/entrypoints/fast_sync_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
from queue import Empty
from typing import Union

from vllm import envs
from vllm.distributed.communication_op import broadcast_tensor_dict
from vllm.engine.arg_utils import EngineArgs
from vllm.engine.llm_engine import LLMEngine
from vllm.executor.multiproc_gpu_executor import MultiprocessingGPUExecutor
from vllm.executor.ray_gpu_executor import RayGPUExecutor
from vllm.inputs import PromptInputs, TextTokensPrompt
from vllm.logger import init_logger
from vllm.pooling_params import PoolingParams
Expand Down Expand Up @@ -49,7 +52,9 @@ def _poll_requests(self):
if not self.llm_engine.has_unfinished_requests():
logger.info("No unfinished requests. Waiting...")
(request_id, prompt, sampling_params) = self.input_queue.get()
if self.need_restart:
if self.need_restart and isinstance(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would suggest syncing this logic with line 101, or at least a comment clarifying that need_restart is produced but not consumed in the single-GPU GPUExecutor case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not having the if isinstance where this value is set is a micro-optimization :p

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments have no performance impact 😼 Will leave this convo up in the event some confused person noses around this part of the code.

self.llm_engine.model_executor,
MultiprocessingGPUExecutor):
logger.info("Restarting worker loops")
for worker in self.llm_engine.model_executor.workers:
worker.execute_method("start_worker_execution_loop")
Expand All @@ -66,13 +71,24 @@ def _poll_requests(self):
def run_engine(self):
self.llm_engine = LLMEngine.from_engine_args(
self.engine_args, usage_context=UsageContext.LLM_CLASS)
assert not isinstance(
self.llm_engine.model_executor,
RayGPUExecutor), "Ray is not supported in sync openai mode"

self.result_queue.put(("Ready", None, None))
request_stats = {}
log_interval = 100
poll_interval = envs.VLLM_SYNC_SERVER_ENGINE_STEPS_BETWEEN_POLLS
try:
while True:
self._poll_requests()
poll_interval -= 1
if (self.input_queue.qsize() >=
envs.VLLM_SYNC_SERVER_ACCUM_REQUESTS
or poll_interval <= 0
or not self.llm_engine.has_unfinished_requests()):
self._poll_requests()
poll_interval = \
envs.VLLM_SYNC_SERVER_ENGINE_STEPS_BETWEEN_POLLS
step_outputs = self.llm_engine.step()
log_interval -= 1
if log_interval == 0:
Expand Down
13 changes: 12 additions & 1 deletion vllm/entrypoints/sync_openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import uvicorn
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from fastapi.routing import Mount
from prometheus_client import make_asgi_app
Expand Down Expand Up @@ -178,7 +179,8 @@ async def completions(request: CompletionRequest, raw_request: Request):
created_time = int(time.time())
return StreamingResponse(content=completion_generator(
request.model, result_queue, choices, created_time, ids),
media_type="text/event-stream")
media_type="text/event-stream",
headers={"Access-Control-Allow-Origin": "*"})
while True:
request_id, token, stats = await result_queue.get()
choice_idx = choices[request_id]
Expand Down Expand Up @@ -207,4 +209,13 @@ def parse_args():
args = parse_args()
engine_args = EngineArgs.from_cli_args(args)
runner.set_engine_args(engine_args)

app.add_middleware(
CORSMiddleware,
allow_origins=args.allowed_origins,
allow_credentials=args.allow_credentials,
allow_methods=args.allowed_methods,
allow_headers=args.allowed_headers,
)

uvicorn.run(app, port=args.port, host=args.host)
10 changes: 10 additions & 0 deletions vllm/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
VLLM_INSTALL_PUNICA_KERNELS: bool = False
CMAKE_BUILD_TYPE: Optional[str] = None
VERBOSE: bool = False
VLLM_SYNC_SERVER_ACCUM_REQUESTS: int = 1
VLLM_SYNC_SERVER_ENGINE_STEPS_BETWEEN_POLLS: int = 1

# The begin-* and end* here are used by the documentation generator
# to extract the used env vars.
Expand Down Expand Up @@ -219,6 +221,14 @@
# Both spawn and fork work
"VLLM_WORKER_MULTIPROC_METHOD":
lambda: os.getenv("VLLM_WORKER_MULTIPROC_METHOD", "spawn"),

# Try to accumulate this many requests before proceeding
"VLLM_SYNC_SERVER_ACCUM_REQUESTS":
lambda: int(os.getenv("VLLM_SYNC_SERVER_ACCUM_REQUESTS", "1")),

# Poll for new requests every this many steps
"VLLM_SYNC_SERVER_ENGINE_STEPS_BETWEEN_POLLS":
lambda: int(os.getenv("VLLM_SYNC_SERVER_ENGINE_STEPS_BETWEEN_POLLS", "1")),
}

# end-env-vars-definition
Expand Down