diff --git a/docs/source/models/engine_args.rst b/docs/source/models/engine_args.rst index bdf566d3ebbd..15c0399c3090 100644 --- a/docs/source/models/engine_args.rst +++ b/docs/source/models/engine_args.rst @@ -16,8 +16,110 @@ Async Engine Arguments Below are the additional arguments related to the asynchronous engine: -.. argparse:: - :module: vllm.engine.arg_utils - :func: _async_engine_args_parser - :prog: -m vllm.entrypoints.openai.api_server - :nodefaultconst: \ No newline at end of file + Name or path of the huggingface tokenizer to use. + +.. option:: --revision + + The specific model version to use. It can be a branch name, a tag name, or a commit id. If unspecified, will use the default version. + +.. option:: --tokenizer-revision + + The specific tokenizer version to use. It can be a branch name, a tag name, or a commit id. If unspecified, will use the default version. + +.. option:: --tokenizer-mode {auto,slow} + + The tokenizer mode. + + * "auto" will use the fast tokenizer if available. + * "slow" will always use the slow tokenizer. + +.. option:: --trust-remote-code + + Trust remote code from huggingface. + +.. option:: --download-dir + + Directory to download and load the weights, default to the default cache dir of huggingface. + +.. option:: --load-format {auto,pt,safetensors,npcache,dummy} + + The format of the model weights to load. + + * "auto" will try to load the weights in the safetensors format and fall back to the pytorch bin format if safetensors format is not available. + * "pt" will load the weights in the pytorch bin format. + * "safetensors" will load the weights in the safetensors format. + * "npcache" will load the weights in pytorch format and store a numpy cache to speed up the loading. + * "dummy" will initialize the weights with random values, mainly for profiling. + +.. option:: --dtype {auto,half,float16,bfloat16,float,float32} + + Data type for model weights and activations. + + * "auto" will use FP16 precision for FP32 and FP16 models, and BF16 precision for BF16 models. + * "half" for FP16. Recommended for AWQ quantization. + * "float16" is the same as "half". + * "bfloat16" for a balance between precision and range. + * "float" is shorthand for FP32 precision. + * "float32" for FP32 precision. + +.. option:: --max-model-len + + Model context length. If unspecified, will be automatically derived from the model config. + +.. option:: --worker-use-ray + + Use Ray for distributed serving, will be automatically set when using more than 1 GPU. + +.. option:: --pipeline-parallel-size (-pp) + + Number of pipeline stages. + +.. option:: --tensor-parallel-size (-tp) + + Number of tensor parallel replicas. + +.. option:: --max-parallel-loading-workers + + Load model sequentially in multiple batches, to avoid RAM OOM when using tensor parallel and large models. + +.. option:: --block-size {8,16,32} + + Token block size for contiguous chunks of tokens. + +.. option:: --seed + + Random seed for operations. + +.. option:: --swap-space + + CPU swap space size (GiB) per GPU. + +.. option:: --gpu-memory-utilization + + The fraction of GPU memory to be used for the model executor, which can range from 0 to 1. + For example, a value of 0.5 would imply 50% GPU memory utilization. + If unspecified, will use the default value of 0.9. + +.. option:: --max-num-batched-tokens + + Maximum number of batched tokens per iteration. + +.. option:: --max-num-seqs + + Maximum number of sequences per iteration. + +.. option:: --max-paddings + + Maximum number of paddings in a batch. + +.. option:: --max-queue-length + + Maximum number of requests that can be present across all queues. + +.. option:: --disable-log-stats + + Disable logging statistics. + +.. option:: --quantization (-q) {awq,squeezellm,None} + + Method used to quantize the weights. diff --git a/tests/engine/test_max_queue_length.py b/tests/engine/test_max_queue_length.py new file mode 100644 index 000000000000..9ac345d122c4 --- /dev/null +++ b/tests/engine/test_max_queue_length.py @@ -0,0 +1,107 @@ +import pytest +import argparse +from typing import List, Tuple +from vllm.engine.llm_engine import QueueOverflowError +from vllm.logger import init_logger + +from vllm import EngineArgs, LLMEngine, SamplingParams, RequestOutput + +# initialize constants +logger = init_logger(__name__) + + +@pytest.fixture +def test_prompts() -> List[Tuple[str, SamplingParams]]: + """Create a list of test prompts with their sampling parameters.""" + return [ + ("A robot may not injure a human being", + SamplingParams(temperature=0.8, + top_k=5, + presence_penalty=0.2, + ignore_eos=True, + max_tokens=1000)), + ("To be or not to be,", + SamplingParams(temperature=0.8, + top_k=5, + presence_penalty=0.2, + ignore_eos=True, + max_tokens=1000)), + ("What is the meaning of life?", + SamplingParams(temperature=0.8, + top_k=5, + presence_penalty=0.2, + ignore_eos=True, + max_tokens=1000)), + ("It is only with the heart that one can see rightly", + SamplingParams(temperature=0.8, + top_k=5, + presence_penalty=0.2, + ignore_eos=True, + max_tokens=1000)), + ] + + +def process_requests(engine: LLMEngine, + test_prompts: List[Tuple[str, SamplingParams]]): + """Continuously process a list of prompts and handle the outputs.""" + request_id = 0 + while test_prompts or engine.has_unfinished_requests(): + if test_prompts: + prompt, sampling_params = test_prompts.pop(0) + try: + engine.add_request(str(request_id), prompt, sampling_params) + except ValueError as e: + # Log error, cleanup, end test + logger.info(f"{e}") + for i in range(request_id): + engine.abort_request(str(i)) + raise QueueOverflowError( + f"Queue exceeded max length: {e}") from e + request_id += 1 + + request_outputs: List[RequestOutput] = engine.step() + + for request_output in request_outputs: + if request_output.finished: + print(request_output) + + +@pytest.mark.parametrize( + "max_wait_q_len, expect_error", + [ + (1, True), # error expected + (2, True), + (3, False), # No error expected + (4, False), + ]) +def test_max_queue_length(max_wait_q_len, expect_error, test_prompts): + + # Setup engine with appropriate max_queue_length value + parser = argparse.ArgumentParser( + description='Demo on using the LLMEngine class directly') + parser = EngineArgs.add_cli_args(parser) + args_to_test = [ + '--max-num-seqs', + str(1), + '--max-queue-length', + str(max_wait_q_len), + "--max-num-batched-tokens", + "2048", + "--gpu-memory-utilization", + "0.7", + "--max-model-len", + "1024", + ] + args = parser.parse_args(args_to_test) + engine_args = EngineArgs.from_cli_args(args) + engine = LLMEngine.from_engine_args(engine_args) + + # Test engine against request + try: + process_requests(engine, test_prompts) + assert not expect_error + print("QueueOverflowError did not occur as expected.") + "QueueOverflowError did not occur as expected." + except QueueOverflowError as e: + assert expect_error + print(f" QueueOverflowError occurred as expected: {e}") diff --git a/tests/engine/tmql.py b/tests/engine/tmql.py new file mode 100644 index 000000000000..f4e283eeb6ad --- /dev/null +++ b/tests/engine/tmql.py @@ -0,0 +1,100 @@ +import pytest +import argparse +from typing import List, Tuple +from vllm.logger import init_logger + +from vllm import EngineArgs, LLMEngine, SamplingParams, RequestOutput + +# init variables +max_wait_q_len = 2 + +logger = init_logger(__name__) + + +class QueueOverflowError(Exception): + pass + + +def create_test_prompts() -> List[Tuple[str, SamplingParams]]: + """Create a list of test prompts with their sampling parameters.""" + return [ + ("A robot may not injure a human being", + SamplingParams(temperature=0.8, + top_k=5, + presence_penalty=0.2, + ignore_eos=True, + max_tokens=1000)), + ("To be or not to be,", + SamplingParams(temperature=0.8, + top_k=5, + presence_penalty=0.2, + ignore_eos=True, + max_tokens=1000)), + ("What is the meaning of life?", + SamplingParams(temperature=0.8, + top_k=5, + presence_penalty=0.2, + ignore_eos=True, + max_tokens=1000)), + ("It is only with the heart that one can see rightly", + SamplingParams(temperature=0.8, + top_k=5, + presence_penalty=0.2, + ignore_eos=True, + max_tokens=1000)), + ] + + +def process_requests(engine: LLMEngine, + test_prompts: List[Tuple[str, SamplingParams]]): + """Continuously process a list of prompts and handle the outputs.""" + request_id = 0 + # make sure to set something like max_num_seq to ONE + while test_prompts or engine.has_unfinished_requests(): + if test_prompts: + prompt, sampling_params = test_prompts.pop(0) + try: + engine.add_request(str(request_id), prompt, sampling_params) + except ValueError as e: + # Log error, cleanup, end test + logger.info(f"{e}") + for i in range(request_id): + engine.abort_request(str(i)) + raise QueueOverflowError( + f"Queue exceeded max length: {e}") from e + request_id += 1 + + request_outputs: List[RequestOutput] = engine.step() + + for request_output in request_outputs: + if request_output.finished: + print(request_output) + + +def initialize_engine(args: argparse.Namespace) -> LLMEngine: + """Initialize the LLMEngine from the command line arguments.""" + engine_args = EngineArgs.from_cli_args(args) + return LLMEngine.from_engine_args(engine_args) + + +def main(args: argparse.Namespace): + """Main function that sets up and runs the prompt processing.""" + engine = initialize_engine(args) + test_prompts = create_test_prompts() + with pytest.raises(QueueOverflowError, + match="Queue exceeded max length: .*"): + process_requests(engine, test_prompts) + + +# def test_max_queue_length(): +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Demo on using the LLMEngine class directly') + parser = EngineArgs.add_cli_args(parser) + args_to_test = [ + '--max-num-seqs', + str(1), '--max-queue-length', + str(max_wait_q_len) + ] + args = parser.parse_args(args_to_test) + main(args) diff --git a/tests/entrypoints/test_openai_server.py b/tests/entrypoints/test_openai_server.py index 4d9bfb460afe..65c06df032dd 100644 --- a/tests/entrypoints/test_openai_server.py +++ b/tests/entrypoints/test_openai_server.py @@ -1,6 +1,10 @@ # imports for guided decoding tests import json import re +import subprocess +import sys +import time +import asyncio from typing import List import jsonschema @@ -72,6 +76,8 @@ "Swift", "Kotlin" ] +# MAX_QUEUE_LEN = "1" + pytestmark = pytest.mark.openai @@ -94,7 +100,7 @@ def server(zephyr_lora_files, ray_ctx): MODEL_NAME, # use half precision for speed and memory savings in CI environment "--dtype", - "bfloat16", + "bfloat16", # use half precision for speed and memory savings in CI environment "--max-model-len", "8192", "--enforce-eager", @@ -109,6 +115,10 @@ def server(zephyr_lora_files, ray_ctx): "2", "--max-num-seqs", "128", + # "--max-queue-length", + # MAX_QUEUE_LEN, + # "--max-num-seqs", + # "1", ]) @@ -751,6 +761,72 @@ async def test_logits_bias(client: openai.AsyncOpenAI): assert first_response != completion.choices[0].text +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_max_queue_length(server, client: openai.AsyncOpenAI, + model_name: str): + sample_prompts = [ + "Who won the world series in 2020?", + "Where was the 2020 world series played?", + "How long did the 2020 world series last?", + "What were some television viewership statistics?", + "Why was the 2020 world series so popular?" + ] + + coroutines = [ + client.completions.create( + prompt=sample_prompt, + model=model_name, + temperature=0.8, + presence_penalty=0.2, + max_tokens=400, + ) for sample_prompt in sample_prompts + ] + + responses = await asyncio.gather(*coroutines, return_exceptions=True) + + for response in responses: + if response.status_code != 200: + assert response.status_code == 503 + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_max_queue_length(client: openai.AsyncOpenAI, model_name: str, + capsys): + + sample_prompts = [ + "Who won the world series in 2020?", + "Where was the 2020 world series played?", + "How long did the 2020 world series last?", + "What were some television viewership statistics?", + "Why was the 2020 world series so popular?" + ] + + coroutines = [ + asyncio.create_task( + client.completions.create( + prompt=sample_prompt, + model=model_name, + temperature=0.8, + presence_penalty=0.2, + max_tokens=400, + )) for sample_prompt in sample_prompts + ] + responses = await asyncio.gather(*coroutines, return_exceptions=True) + + err_cnt = 0 + for response in responses: + if "code" in response.__dict__: + assert response.__dict__["code"] == 503 + err_cnt += 1 + + # Ensure that # of err requests == (# of requests - max_queue_len - run_queue_len) + correctness_check = err_cnt == (len(sample_prompts) - int(MAX_QUEUE_LEN) - + 1) + print("Correct number of errors? ", correctness_check) + assert correctness_check + + @pytest.mark.asyncio @pytest.mark.parametrize("guided_decoding_backend", ["outlines", "lm-format-enforcer"]) diff --git a/vllm/config.py b/vllm/config.py index 0c4d770e4684..b2508fd754b1 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -680,6 +680,7 @@ class SchedulerConfig: iteration. max_model_len: Maximum length of a sequence (including prompt and generated text). + max_queue_length: The maximum number of requests allowed in the waiting queue. use_v2_block_manager: Whether to use the BlockSpaceManagerV2 or not. num_lookahead_slots: The number of slots to allocate per sequence per step, beyond the known token ids. This is used in speculative @@ -702,6 +703,7 @@ def __init__(self, max_num_batched_tokens: Optional[int], max_num_seqs: int, max_model_len: int, + max_queue_length: int, use_v2_block_manager: bool = False, num_lookahead_slots: int = 0, delay_factor: float = 0.0, @@ -728,12 +730,14 @@ def __init__(self, self.max_num_seqs = max_num_seqs self.max_model_len = max_model_len + self.max_queue_length = max_queue_length self.use_v2_block_manager = use_v2_block_manager self.num_lookahead_slots = num_lookahead_slots self.delay_factor = delay_factor self.chunked_prefill_enabled = enable_chunked_prefill self.embedding_mode = embedding_mode self.preemption_mode = preemption_mode + self.max_queue_length = max_queue_length self._verify_args() @@ -753,6 +757,7 @@ def _verify_args(self) -> None: f"max_num_batched_tokens ({self.max_num_batched_tokens}) must " "be greater than or equal to max_num_seqs " f"({self.max_num_seqs}).") + # TODO: verify max_queue_length if self.num_lookahead_slots < 0: raise ValueError( @@ -760,6 +765,9 @@ def _verify_args(self) -> None: f"({self.num_lookahead_slots}) must be greater than or " "equal to 0.") + def get_max_queue_length(self) -> int: + return self.max_queue_length + class DeviceConfig: diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 16374098b23d..4ad88647239e 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -50,6 +50,7 @@ class EngineArgs: max_num_batched_tokens: Optional[int] = None max_num_seqs: int = 256 max_logprobs: int = 20 # Default value for OpenAI Chat Completions API + max_queue_length: int = -1 disable_log_stats: bool = False revision: Optional[str] = None code_revision: Optional[str] = None @@ -384,6 +385,12 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: default=EngineArgs.max_logprobs, help=('Max number of log probs to return logprobs is specified in' ' SamplingParams.')) + parser.add_argument( + '--max-queue-length', + type=int, + default=EngineArgs.max_queue_length, + help='The maximum number of requests allowed in the waiting queue.' + ) parser.add_argument('--disable-log-stats', action='store_true', help='Disable logging statistics.') @@ -709,6 +716,7 @@ def create_engine_config(self, ) -> EngineConfig: scheduler_config = SchedulerConfig( max_num_batched_tokens=self.max_num_batched_tokens, max_num_seqs=self.max_num_seqs, + max_queue_length=self.max_queue_length, max_model_len=model_config.max_model_len, use_v2_block_manager=self.use_v2_block_manager, num_lookahead_slots=(self.num_lookahead_slots diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 7994b873fe9b..4b201af9b5f7 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -1,5 +1,6 @@ import asyncio import time +from http import HTTPStatus from functools import partial from typing import (AsyncIterator, Callable, Dict, Iterable, List, Optional, Set, Tuple, Type, Union) @@ -11,7 +12,7 @@ from vllm.core.scheduler import SchedulerOutputs from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_timeout import asyncio_timeout -from vllm.engine.llm_engine import LLMEngine +from vllm.engine.llm_engine import LLMEngine, QueueOverflowError from vllm.executor.ray_utils import initialize_ray_cluster, ray from vllm.inputs import LLMInputs, PromptInputs from vllm.logger import init_logger @@ -495,19 +496,26 @@ async def engine_step(self) -> bool: """Kick the engine to process the waiting requests. Returns True if there are in-progress requests.""" - new_requests, finished_requests = ( self._request_tracker.get_new_and_finished_requests()) for new_request in new_requests: # Add the request into the vLLM engine's waiting queue. # TODO: Maybe add add_request_batch to reduce Ray overhead + # curr_queue_len = len(self.engine.scheduler.waiting) + # max_queue_len = self.engine.scheduler.scheduler_config.get_max_queue_length( + # ) + # if max_queue_len > -1 and curr_queue_len >= max_queue_len: + # raise QueueOverflowError( + # message= + # "Request would exceed the indicated maximum queue length.", + # status_code=HTTPStatus.SERVICE_UNAVAILABLE) try: if self.engine_use_ray: await self.engine.add_request.remote( # type: ignore **new_request) else: - await self.engine.add_request_async(**new_request) + result = await self.engine.add_request_async(**new_request) except ValueError as e: # TODO: use a vLLM specific error for failed validation self._request_tracker.process_exception( @@ -566,6 +574,14 @@ async def add_request( lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Dict[str, str]] = None, ) -> AsyncStream: + # curr_queue_len = len(self.engine.scheduler.waiting) + # max_queue_len = self.engine.scheduler.scheduler_config.get_max_queue_length( + # ) + # if max_queue_len > -1 and curr_queue_len >= max_queue_len: + # raise QueueOverflowError( + # message= + # "Request would exceed the indicated maximum queue length.", + # status_code=HTTPStatus.SERVICE_UNAVAILABLE) if self.log_requests: if isinstance(inputs, str): shortened_prompt = inputs diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 0ad957ef9f95..70353cab2d80 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -3,6 +3,7 @@ from typing import TYPE_CHECKING, ClassVar, Dict, Iterable, List, Optional from typing import Sequence as GenericSequence from typing import Set, Type, TypeVar, Union +from http import HTTPStatus from transformers import GenerationConfig, PreTrainedTokenizer @@ -59,6 +60,14 @@ def _load_generation_config_dict(model_config: ModelConfig): _O = TypeVar("_O", RequestOutput, EmbeddingRequestOutput) +class QueueOverflowError(ValueError): + + def __init__(self, + message="Current request would exceed the max queue length.", + status_code=503): + super().__init__(message, status_code) + + class LLMEngine: """An LLM engine that receives requests and generates texts. @@ -459,6 +468,14 @@ def _add_processed_request( lora_request: Optional[LoRARequest], trace_headers: Optional[Dict[str, str]] = None, ) -> None: + # Check if request would exceed max queue length + curr_queue_len = len(self.scheduler.waiting) + max_queue_len = self.scheduler.scheduler_config.get_max_queue_length() + if max_queue_len > -1 and curr_queue_len >= max_queue_len: + raise QueueOverflowError( + f"Request {request_id} would exceed the indicated maximum " + f"queue length of {max_queue_len}", + HTTPStatus.SERVICE_UNAVAILABLE) # Create the sequences. block_size = self.cache_config.block_size seq_id = next(self.seq_counter) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index a708176c254e..09546b07b1a5 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -144,6 +144,7 @@ async def create_chat_completion(request: ChatCompletionRequest, @app.post("/v1/completions") async def create_completion(request: CompletionRequest, raw_request: Request): + generator = await openai_serving_completion.create_completion( request, raw_request) if isinstance(generator, ErrorResponse): diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index 744e1d94511b..b1d1d7104b90 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -295,7 +295,8 @@ async def create_chat_completion( conversation) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + msg, status_code = e.args + return self.create_error_response(msg, status_code=status_code) def get_chat_request_role(self, request: ChatCompletionRequest) -> str: if request.add_generation_prompt: diff --git a/vllm/entrypoints/openai/serving_completion.py b/vllm/entrypoints/openai/serving_completion.py index 8741893c9271..4308c60c1ee0 100644 --- a/vllm/entrypoints/openai/serving_completion.py +++ b/vllm/entrypoints/openai/serving_completion.py @@ -153,7 +153,8 @@ async def create_completion(self, request: CompletionRequest, generators.append(generator) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + msg, status_code = e.args + return self.create_error_response(msg, status_code=status_code) result_generator: AsyncIterator[Tuple[ int, RequestOutput]] = merge_async_iterators(*generators) @@ -188,7 +189,8 @@ async def create_completion(self, request: CompletionRequest, final_res_batch, request, request_id, created_time, model_name) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + msg, status_code = e.args + return self.create_error_response(msg, status_code=status_code) # When user requests streaming but we don't stream, we still need to # return a streaming response with a single event.