From 09fa8564bbdadf7e6350091e3c90c418cba57deb Mon Sep 17 00:00:00 2001 From: FlorianJoncour Date: Sat, 6 Jan 2024 02:18:20 +0100 Subject: [PATCH 01/14] OpenAI API refactoring --- vllm/entrypoints/openai/api_server.py | 686 ++---------------- vllm/entrypoints/openai/serving_chat.py | 256 +++++++ vllm/entrypoints/openai/serving_completion.py | 288 ++++++++ vllm/entrypoints/openai/serving_engine.py | 151 ++++ 4 files changed, 738 insertions(+), 643 deletions(-) create mode 100644 vllm/entrypoints/openai/serving_chat.py create mode 100644 vllm/entrypoints/openai/serving_completion.py create mode 100644 vllm/entrypoints/openai/serving_engine.py diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 30c55f4c01c5..89550f5f8a86 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -1,19 +1,10 @@ -# Adapted from -# https://github.com/lm-sys/FastChat/blob/168ccc29d3f7edc50823016105c024fe2282732a/fastchat/serve/openai_api_server.py - import argparse -import asyncio -import codecs import json -import time -from contextlib import asynccontextmanager -from http import HTTPStatus -from typing import AsyncGenerator, Dict, List, Optional, Tuple, Union - from aioprometheus import MetricsMiddleware from aioprometheus.asgi.starlette import metrics import fastapi import uvicorn +from http import HTTPStatus from fastapi import Request from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware @@ -22,43 +13,33 @@ from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm.engine.metrics import add_global_metrics_labels -from vllm.entrypoints.openai.protocol import ( - CompletionRequest, CompletionResponse, CompletionResponseChoice, - CompletionResponseStreamChoice, CompletionStreamResponse, - ChatCompletionRequest, ChatCompletionResponse, - ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice, - ChatCompletionStreamResponse, ChatMessage, DeltaMessage, ErrorResponse, - LogProbs, ModelCard, ModelList, ModelPermission, UsageInfo) +from vllm.entrypoints.openai.protocol import CompletionRequest, ChatCompletionRequest, ErrorResponse from vllm.logger import init_logger -from vllm.outputs import RequestOutput -from vllm.sampling_params import SamplingParams -from vllm.transformers_utils.tokenizer import get_tokenizer -from vllm.utils import random_uuid +from vllm.entrypoints.openai.serving_chat import OpenAIServingChat +from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion TIMEOUT_KEEP_ALIVE = 5 # seconds +openai_serving_chat: OpenAIServingChat = None +openai_serving_completion: OpenAIServingCompletion = None logger = init_logger(__name__) -served_model = None -engine_args = None -engine = None -response_role = None - - -@asynccontextmanager -async def lifespan(app: fastapi.FastAPI): - - async def _force_log(): - while True: - await asyncio.sleep(10) - await engine.do_log_stats() - - if not engine_args.disable_log_stats: - asyncio.create_task(_force_log()) - - yield - -app = fastapi.FastAPI(lifespan=lifespan) +# @asynccontextmanager +# async def lifespan(app: fastapi.FastAPI): +# +# async def _force_log(): +# while True: +# await asyncio.sleep(10) +# await engine.do_log_stats() +# +# if not engine_args.disable_log_stats: +# asyncio.create_task(_force_log()) +# +# yield +# +# +# app = fastapi.FastAPI(lifespan=lifespan) +app = fastapi.FastAPI() def parse_args(): @@ -115,73 +96,10 @@ def parse_args(): app.add_route("/metrics", metrics) # Exposes HTTP metrics -def create_error_response(status_code: HTTPStatus, - message: str) -> JSONResponse: - return JSONResponse(ErrorResponse(message=message, - type="invalid_request_error").dict(), - status_code=status_code.value) - - -def load_chat_template(args, tokenizer): - if args.chat_template is not None: - try: - with open(args.chat_template, "r") as f: - chat_template = f.read() - except OSError: - # If opening a file fails, set chat template to be args to - # ensure we decode so our escape are interpreted correctly - chat_template = codecs.decode(args.chat_template, "unicode_escape") - - tokenizer.chat_template = chat_template - logger.info( - f"Using supplied chat template:\n{tokenizer.chat_template}") - elif tokenizer.chat_template is not None: - logger.info(f"Using default chat template:\n{tokenizer.chat_template}") - else: - logger.warning("No chat template provided. Chat API will not work.") - - @app.exception_handler(RequestValidationError) async def validation_exception_handler(_, exc): - return create_error_response(HTTPStatus.BAD_REQUEST, str(exc)) - - -async def check_model(request) -> Optional[JSONResponse]: - if request.model == served_model: - return - ret = create_error_response( - HTTPStatus.NOT_FOUND, - f"The model `{request.model}` does not exist.", - ) - return ret - - -async def check_length( - request: Union[ChatCompletionRequest, CompletionRequest], - prompt: Optional[str] = None, - prompt_ids: Optional[List[int]] = None -) -> Tuple[List[int], Optional[JSONResponse]]: - assert (not (prompt is None and prompt_ids is None) - and not (prompt is not None and prompt_ids is not None) - ), "Either prompt or prompt_ids should be provided." - input_ids = prompt_ids if prompt_ids is not None else tokenizer( - prompt).input_ids - token_num = len(input_ids) - - if request.max_tokens is None: - request.max_tokens = max_model_len - token_num - if token_num + request.max_tokens > max_model_len: - return input_ids, create_error_response( - HTTPStatus.BAD_REQUEST, - f"This model's maximum context length is {max_model_len} tokens. " - f"However, you requested {request.max_tokens + token_num} tokens " - f"({token_num} in the messages, " - f"{request.max_tokens} in the completion). " - f"Please reduce the length of the messages or completion.", - ) - else: - return input_ids, None - + err = openai_serving_chat.create_error_response(message=str(exc)) + return JSONResponse(err.dict(), status_code=HTTPStatus.BAD_REQUEST) @app.get("/health") async def health() -> Response: @@ -191,544 +109,32 @@ async def health() -> Response: @app.get("/v1/models") async def show_available_models(): - """Show available models. Right now we only have one model.""" - model_cards = [ - ModelCard(id=served_model, - root=served_model, - permission=[ModelPermission()]) - ] - return ModelList(data=model_cards) - - -def create_logprobs( - token_ids: List[int], - top_logprobs: Optional[List[Optional[Dict[int, float]]]] = None, - num_output_top_logprobs: Optional[int] = None, - initial_text_offset: int = 0, -) -> LogProbs: - """Create OpenAI-style logprobs.""" - logprobs = LogProbs() - last_token_len = 0 - if num_output_top_logprobs: - logprobs.top_logprobs = [] - for i, token_id in enumerate(token_ids): - step_top_logprobs = top_logprobs[i] - if step_top_logprobs is not None: - token_logprob = step_top_logprobs[token_id] - else: - token_logprob = None - token = tokenizer.convert_ids_to_tokens(token_id) - logprobs.tokens.append(token) - logprobs.token_logprobs.append(token_logprob) - if len(logprobs.text_offset) == 0: - logprobs.text_offset.append(initial_text_offset) - else: - logprobs.text_offset.append(logprobs.text_offset[-1] + - last_token_len) - last_token_len = len(token) - - if num_output_top_logprobs: - logprobs.top_logprobs.append({ - tokenizer.convert_ids_to_tokens(i): p - for i, p in step_top_logprobs.items() - } if step_top_logprobs else None) - return logprobs + models = await openai_serving_chat.show_available_models() + return JSONResponse(content=models.dict()) @app.post("/v1/chat/completions") async def create_chat_completion(request: ChatCompletionRequest, raw_request: Request): - """Completion API similar to OpenAI's API. - - See https://platform.openai.com/docs/api-reference/chat/create - for the API specification. This API mimics the OpenAI ChatCompletion API. - - NOTE: Currently we do not support the following features: - - function_call (Users should implement this by themselves) - - logit_bias (to be supported by vLLM engine) - """ - error_check_ret = await check_model(request) - if error_check_ret is not None: - return error_check_ret - - if request.logit_bias is not None and len(request.logit_bias) > 0: - # TODO: support logit_bias in vLLM engine. - return create_error_response(HTTPStatus.BAD_REQUEST, - "logit_bias is not currently supported") - - try: - prompt = tokenizer.apply_chat_template( - conversation=request.messages, - tokenize=False, - add_generation_prompt=request.add_generation_prompt) - except Exception as e: - logger.error(f"Error in applying chat template from request: {str(e)}") - return create_error_response(HTTPStatus.BAD_REQUEST, str(e)) - - token_ids, error_check_ret = await check_length(request, prompt=prompt) - if error_check_ret is not None: - return error_check_ret - - model_name = request.model - request_id = f"cmpl-{random_uuid()}" - created_time = int(time.monotonic()) - chunk_object_type = "chat.completion.chunk" - try: - spaces_between_special_tokens = request.spaces_between_special_tokens - sampling_params = SamplingParams( - n=request.n, - presence_penalty=request.presence_penalty, - frequency_penalty=request.frequency_penalty, - repetition_penalty=request.repetition_penalty, - temperature=request.temperature, - top_p=request.top_p, - min_p=request.min_p, - stop=request.stop, - stop_token_ids=request.stop_token_ids, - max_tokens=request.max_tokens, - best_of=request.best_of, - top_k=request.top_k, - ignore_eos=request.ignore_eos, - use_beam_search=request.use_beam_search, - skip_special_tokens=request.skip_special_tokens, - spaces_between_special_tokens=spaces_between_special_tokens, - ) - except ValueError as e: - return create_error_response(HTTPStatus.BAD_REQUEST, str(e)) - - result_generator = engine.generate(prompt, sampling_params, request_id, - token_ids) - - def get_role() -> str: - if request.add_generation_prompt: - return response_role - else: - return request.messages[-1]["role"] - - async def completion_stream_generator() -> AsyncGenerator[str, None]: - # Send first response for each request.n (index) with the role - role = get_role() - for i in range(request.n): - choice_data = ChatCompletionResponseStreamChoice( - index=i, delta=DeltaMessage(role=role), finish_reason=None) - chunk = ChatCompletionStreamResponse(id=request_id, - object=chunk_object_type, - created=created_time, - choices=[choice_data], - model=model_name) - data = chunk.json(exclude_unset=True, ensure_ascii=False) - yield f"data: {data}\n\n" - - # Send response to echo the input portion of the last message - if request.echo: - last_msg_content = "" - if request.messages and isinstance( - request.messages, list) and request.messages[-1].get( - "content") and request.messages[-1].get( - "role") == role: - last_msg_content = request.messages[-1]["content"] - if last_msg_content: - for i in range(request.n): - choice_data = ChatCompletionResponseStreamChoice( - index=i, - delta=DeltaMessage(content=last_msg_content), - finish_reason=None) - chunk = ChatCompletionStreamResponse( - id=request_id, - object=chunk_object_type, - created=created_time, - choices=[choice_data], - model=model_name) - data = chunk.json(exclude_unset=True, ensure_ascii=False) - yield f"data: {data}\n\n" - - # Send response for each token for each request.n (index) - previous_texts = [""] * request.n - previous_num_tokens = [0] * request.n - finish_reason_sent = [False] * request.n - async for res in result_generator: - res: RequestOutput - for output in res.outputs: - i = output.index - - if finish_reason_sent[i]: - continue - - if output.finish_reason is None: - # Send token-by-token response for each request.n - delta_text = output.text[len(previous_texts[i]):] - previous_texts[i] = output.text - previous_num_tokens[i] = len(output.token_ids) - choice_data = ChatCompletionResponseStreamChoice( - index=i, - delta=DeltaMessage(content=delta_text), - finish_reason=None) - chunk = ChatCompletionStreamResponse( - id=request_id, - object=chunk_object_type, - created=created_time, - choices=[choice_data], - model=model_name) - data = chunk.json(exclude_unset=True, ensure_ascii=False) - yield f"data: {data}\n\n" - else: - # Send the finish response for each request.n only once - prompt_tokens = len(res.prompt_token_ids) - final_usage = UsageInfo( - prompt_tokens=prompt_tokens, - completion_tokens=previous_num_tokens[i], - total_tokens=prompt_tokens + previous_num_tokens[i], - ) - choice_data = ChatCompletionResponseStreamChoice( - index=i, delta=[], finish_reason=output.finish_reason) - chunk = ChatCompletionStreamResponse( - id=request_id, - object=chunk_object_type, - created=created_time, - choices=[choice_data], - model=model_name) - if final_usage is not None: - chunk.usage = final_usage - data = chunk.json(exclude_unset=True, - exclude_none=True, - ensure_ascii=False) - yield f"data: {data}\n\n" - finish_reason_sent[i] = True - # Send the final done message after all response.n are finished - yield "data: [DONE]\n\n" - - async def completion_full_generator(): - final_res: RequestOutput = None - async for res in result_generator: - if await raw_request.is_disconnected(): - # Abort the request if the client disconnects. - await engine.abort(request_id) - return create_error_response(HTTPStatus.BAD_REQUEST, - "Client disconnected") - final_res = res - assert final_res is not None - - choices = [] - role = get_role() - for output in final_res.outputs: - choice_data = ChatCompletionResponseChoice( - index=output.index, - message=ChatMessage(role=role, content=output.text), - finish_reason=output.finish_reason, - ) - choices.append(choice_data) - - if request.echo: - last_msg_content = "" - if request.messages and isinstance( - request.messages, list) and request.messages[-1].get( - "content") and request.messages[-1].get( - "role") == role: - last_msg_content = request.messages[-1]["content"] - - for choice in choices: - full_message = last_msg_content + choice.message.content - choice.message.content = full_message - - num_prompt_tokens = len(final_res.prompt_token_ids) - num_generated_tokens = sum( - len(output.token_ids) for output in final_res.outputs) - usage = UsageInfo( - prompt_tokens=num_prompt_tokens, - completion_tokens=num_generated_tokens, - total_tokens=num_prompt_tokens + num_generated_tokens, - ) - response = ChatCompletionResponse( - id=request_id, - created=created_time, - model=model_name, - choices=choices, - usage=usage, - ) - - return response - - # Streaming response - if request.stream: - return StreamingResponse(completion_stream_generator(), + generator = await openai_serving_chat.create_chat_completion( + request, raw_request) + if request.stream and not isinstance(generator, ErrorResponse): + return StreamingResponse(content=generator, media_type="text/event-stream") else: - return await completion_full_generator() + return JSONResponse(content=generator.dict()) @app.post("/v1/completions") async def create_completion(request: CompletionRequest, raw_request: Request): - """Completion API similar to OpenAI's API. - - See https://platform.openai.com/docs/api-reference/completions/create - for the API specification. This API mimics the OpenAI Completion API. - - NOTE: Currently we do not support the following features: - - suffix (the language models we currently support do not support - suffix) - - logit_bias (to be supported by vLLM engine) - """ - - error_check_ret = await check_model(request) - if error_check_ret is not None: - return error_check_ret - - # OpenAI API supports echoing the prompt when max_tokens is 0. - echo_without_generation = request.echo and request.max_tokens == 0 - - if request.suffix is not None: - # The language models we currently support do not support suffix. - return create_error_response(HTTPStatus.BAD_REQUEST, - "suffix is not currently supported") - - if request.logit_bias is not None and len(request.logit_bias) > 0: - # TODO: support logit_bias in vLLM engine. - return create_error_response(HTTPStatus.BAD_REQUEST, - "logit_bias is not currently supported") - - model_name = request.model - request_id = f"cmpl-{random_uuid()}" - - use_token_ids = False - if isinstance(request.prompt, list): - if len(request.prompt) == 0: - return create_error_response(HTTPStatus.BAD_REQUEST, - "please provide at least one prompt") - first_element = request.prompt[0] - if isinstance(first_element, int): - use_token_ids = True - prompt = request.prompt - elif isinstance(first_element, (str, list)): - # TODO: handles multiple prompt case in list[list[int]] - if len(request.prompt) > 1: - return create_error_response( - HTTPStatus.BAD_REQUEST, - "multiple prompts in a batch is not currently supported") - use_token_ids = not isinstance(first_element, str) - prompt = request.prompt[0] - else: - prompt = request.prompt - - if use_token_ids: - _, error_check_ret = await check_length(request, prompt_ids=prompt) - else: - token_ids, error_check_ret = await check_length(request, prompt=prompt) - if error_check_ret is not None: - return error_check_ret - - created_time = int(time.monotonic()) - try: - spaces_between_special_tokens = request.spaces_between_special_tokens - sampling_params = SamplingParams( - n=request.n, - best_of=request.best_of, - presence_penalty=request.presence_penalty, - frequency_penalty=request.frequency_penalty, - repetition_penalty=request.repetition_penalty, - temperature=request.temperature, - top_p=request.top_p, - top_k=request.top_k, - min_p=request.min_p, - stop=request.stop, - stop_token_ids=request.stop_token_ids, - ignore_eos=request.ignore_eos, - max_tokens=request.max_tokens - if not echo_without_generation else 1, - logprobs=request.logprobs, - use_beam_search=request.use_beam_search, - prompt_logprobs=request.logprobs if request.echo else None, - skip_special_tokens=request.skip_special_tokens, - spaces_between_special_tokens=spaces_between_special_tokens, - ) - except ValueError as e: - return create_error_response(HTTPStatus.BAD_REQUEST, str(e)) - - if use_token_ids: - result_generator = engine.generate(None, - sampling_params, - request_id, - prompt_token_ids=prompt) - else: - result_generator = engine.generate(prompt, sampling_params, request_id, - token_ids) - - # Similar to the OpenAI API, when n != best_of, we do not stream the - # results. In addition, we do not stream the results when use beam search. - stream = (request.stream - and (request.best_of is None or request.n == request.best_of) - and not request.use_beam_search) - - def create_stream_response_json( - index: int, - text: str, - logprobs: Optional[LogProbs] = None, - finish_reason: Optional[str] = None, - usage: Optional[UsageInfo] = None, - ) -> str: - choice_data = CompletionResponseStreamChoice( - index=index, - text=text, - logprobs=logprobs, - finish_reason=finish_reason, - ) - response = CompletionStreamResponse( - id=request_id, - created=created_time, - model=model_name, - choices=[choice_data], - ) - if usage is not None: - response.usage = usage - response_json = response.json(exclude_unset=True, ensure_ascii=False) - - return response_json - - async def completion_stream_generator() -> AsyncGenerator[str, None]: - previous_texts = [""] * request.n - previous_num_tokens = [0] * request.n - has_echoed = [False] * request.n - async for res in result_generator: - res: RequestOutput - for output in res.outputs: - i = output.index - delta_text = output.text[len(previous_texts[i]):] - token_ids = output.token_ids[previous_num_tokens[i]:] - if request.logprobs is not None: - top_logprobs = output.logprobs[previous_num_tokens[i]:] - else: - top_logprobs = None - offsets = len(previous_texts[i]) - if request.echo and not has_echoed[i]: - if not echo_without_generation: - delta_text = res.prompt + delta_text - token_ids = res.prompt_token_ids + token_ids - if top_logprobs: - top_logprobs = res.prompt_logprobs + top_logprobs - else: # only just return the prompt - delta_text = res.prompt - token_ids = res.prompt_token_ids - if top_logprobs: - top_logprobs = res.prompt_logprobs - has_echoed[i] = True - if request.logprobs is not None: - logprobs = create_logprobs( - token_ids=token_ids, - top_logprobs=top_logprobs, - num_output_top_logprobs=request.logprobs, - initial_text_offset=offsets, - ) - else: - logprobs = None - previous_texts[i] = output.text - previous_num_tokens[i] = len(output.token_ids) - finish_reason = output.finish_reason - response_json = create_stream_response_json( - index=i, - text=delta_text, - logprobs=logprobs, - finish_reason=finish_reason, - ) - yield f"data: {response_json}\n\n" - if output.finish_reason is not None: - logprobs = (LogProbs() - if request.logprobs is not None else None) - prompt_tokens = len(res.prompt_token_ids) - completion_tokens = len(output.token_ids) - final_usage = UsageInfo( - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=prompt_tokens + completion_tokens, - ) - response_json = create_stream_response_json( - index=i, - text="", - logprobs=logprobs, - finish_reason=output.finish_reason, - usage=final_usage, - ) - yield f"data: {response_json}\n\n" - yield "data: [DONE]\n\n" - - # Streaming response - if stream: - return StreamingResponse(completion_stream_generator(), + generator = await openai_serving_completion.create_completion( + request, raw_request) + logger.info("TYPE COMPLETION : %s" % str(type(generator))) + if request.stream and not isinstance(generator, ErrorResponse): + return StreamingResponse(content=generator, media_type="text/event-stream") - - # Non-streaming response - final_res: RequestOutput = None - async for res in result_generator: - if await raw_request.is_disconnected(): - # Abort the request if the client disconnects. - await engine.abort(request_id) - return create_error_response(HTTPStatus.BAD_REQUEST, - "Client disconnected") - final_res = res - assert final_res is not None - choices = [] - prompt_token_ids = final_res.prompt_token_ids - prompt_logprobs = final_res.prompt_logprobs - prompt_text = final_res.prompt - for output in final_res.outputs: - if request.logprobs is not None: - if not echo_without_generation: - token_ids = output.token_ids - top_logprobs = output.logprobs - if request.echo: - token_ids = prompt_token_ids + token_ids - top_logprobs = prompt_logprobs + top_logprobs - else: - token_ids = prompt_token_ids - top_logprobs = prompt_logprobs - logprobs = create_logprobs( - token_ids=token_ids, - top_logprobs=top_logprobs, - num_output_top_logprobs=request.logprobs, - ) - else: - logprobs = None - if not echo_without_generation: - output_text = output.text - if request.echo: - output_text = prompt_text + output_text - else: - output_text = prompt_text - choice_data = CompletionResponseChoice( - index=output.index, - text=output_text, - logprobs=logprobs, - finish_reason=output.finish_reason, - ) - choices.append(choice_data) - - num_prompt_tokens = len(final_res.prompt_token_ids) - num_generated_tokens = sum( - len(output.token_ids) for output in final_res.outputs) - usage = UsageInfo( - prompt_tokens=num_prompt_tokens, - completion_tokens=num_generated_tokens, - total_tokens=num_prompt_tokens + num_generated_tokens, - ) - response = CompletionResponse( - id=request_id, - created=created_time, - model=model_name, - choices=choices, - usage=usage, - ) - - if request.stream: - # When user requests streaming but we don't stream, we still need to - # return a streaming response with a single event. - response_json = response.json(ensure_ascii=False) - - async def fake_stream_generator() -> AsyncGenerator[str, None]: - yield f"data: {response_json}\n\n" - yield "data: [DONE]\n\n" - - return StreamingResponse(fake_stream_generator(), - media_type="text/event-stream") - - return response + else: + return JSONResponse(content=generator.dict()) if __name__ == "__main__": @@ -749,19 +155,13 @@ async def fake_stream_generator() -> AsyncGenerator[str, None]: else: served_model = args.model - response_role = args.response_role - engine_args = AsyncEngineArgs.from_cli_args(args) engine = AsyncLLMEngine.from_engine_args(engine_args) - engine_model_config = asyncio.run(engine.get_model_config()) - max_model_len = engine_model_config.max_model_len - - # A separate tokenizer to map token IDs to strings. - tokenizer = get_tokenizer( - engine_model_config.tokenizer, - tokenizer_mode=engine_model_config.tokenizer_mode, - trust_remote_code=engine_model_config.trust_remote_code) - load_chat_template(args, tokenizer) + openai_serving_chat = OpenAIServingChat(engine, served_model, + args.response_role, + args.chat_template) + openai_serving_completion = OpenAIServingCompletion( + engine, served_model, args.response_role, args.chat_template) # Register labels for metrics add_global_metrics_labels(model_name=engine_args.model) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py new file mode 100644 index 000000000000..40a632a28c7f --- /dev/null +++ b/vllm/entrypoints/openai/serving_chat.py @@ -0,0 +1,256 @@ +import time +from fastapi import Request +from typing import AsyncGenerator, AsyncIterator, Union +from vllm.logger import init_logger +from vllm.utils import random_uuid +from vllm.engine.async_llm_engine import AsyncLLMEngine +from vllm.entrypoints.openai.protocol import ( + ChatCompletionRequest, ChatCompletionResponse, + ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice, + ChatCompletionStreamResponse, ChatMessage, DeltaMessage, ErrorResponse, + UsageInfo) +from vllm.outputs import RequestOutput +from vllm.sampling_params import SamplingParams +from vllm.entrypoints.openai.serving_engine import OpenAIServing + +logger = init_logger(__name__) + +class OpenAIServingChat(OpenAIServing): + def __init__(self, + engine: AsyncLLMEngine, + served_model: str, + response_role: str, + chat_template=None): + super().__init__(engine=engine, served_model=served_model, response_role=response_role, chat_template=chat_template) + + async def create_chat_completion(self, request: ChatCompletionRequest, + raw_request: Request + ) -> Union[ErrorResponse, AsyncGenerator[str, None], + ChatCompletionResponse]: + """Completion API similar to OpenAI's API. + + See https://platform.openai.com/docs/api-reference/chat/create + for the API specification. This API mimics the OpenAI ChatCompletion API. + + NOTE: Currently we do not support the following features: + - function_call (Users should implement this by themselves) + - logit_bias (to be supported by vLLM engine) + """ + error_check_ret = await self._check_model(request) + if error_check_ret is not None: + return error_check_ret + + if request.logit_bias is not None and len(request.logit_bias) > 0: + # TODO: support logit_bias in vLLM engine. + return self.create_error_response( + "logit_bias is not currently supported") + + try: + prompt = self.tokenizer.apply_chat_template( + conversation=request.messages, + tokenize=False, + add_generation_prompt=request.add_generation_prompt) + except Exception as e: + logger.error(f"Error in applying chat template from request: {str(e)}") + return self.create_error_response(str(e)) + + token_ids, error_check_ret = await self._check_length(request, prompt=prompt) + if error_check_ret is not None: + return error_check_ret + + request_id = f"cmpl-{random_uuid()}" + try: + spaces_between_special_tokens = request.spaces_between_special_tokens + sampling_params = SamplingParams( + n=request.n, + presence_penalty=request.presence_penalty, + frequency_penalty=request.frequency_penalty, + repetition_penalty=request.repetition_penalty, + temperature=request.temperature, + top_p=request.top_p, + min_p=request.min_p, + stop=request.stop, + stop_token_ids=request.stop_token_ids, + max_tokens=request.max_tokens, + best_of=request.best_of, + top_k=request.top_k, + ignore_eos=request.ignore_eos, + use_beam_search=request.use_beam_search, + skip_special_tokens=request.skip_special_tokens, + spaces_between_special_tokens=spaces_between_special_tokens, + ) + except ValueError as e: + return self.create_error_response(str(e)) + + result_generator = self.engine.generate(prompt, sampling_params, request_id, + token_ids) + # Streaming response + if request.stream: + return self.chat_completion_stream_generator( + request, result_generator, request_id) + else: + return await self.chat_completion_full_generator( + request, raw_request, result_generator, request_id) + + def get_chat_request_role(self, request: ChatCompletionRequest) -> str: + if request.add_generation_prompt: + return self.response_role + else: + return request.messages[-1].role + + async def chat_completion_stream_generator( + self, request: ChatCompletionRequest, + result_generator: AsyncIterator[RequestOutput], request_id: str + ) -> Union[ErrorResponse, AsyncGenerator[str, None]]: + + model_name = request.model + created_time = int(time.monotonic()) + chunk_object_type = "chat.completion.chunk" + + # Send first response for each request.n (index) with the role + role = self.get_chat_request_role(request) + for i in range(request.n): + choice_data = ChatCompletionResponseStreamChoice( + index=i, delta=DeltaMessage(role=role), finish_reason=None) + chunk = ChatCompletionStreamResponse(id=request_id, + object=chunk_object_type, + created=created_time, + choices=[choice_data], + model=model_name) + data = chunk.json(exclude_unset=True, ensure_ascii=False) + yield f"data: {data}\n\n" + + # Send response to echo the input portion of the last message + if request.echo: + last_msg_content = "" + if request.messages and isinstance( + request.messages, list) and request.messages[-1].get( + "content") and request.messages[-1].get( + "role") == role: + last_msg_content = request.messages[-1]["content"] + if last_msg_content: + for i in range(request.n): + choice_data = ChatCompletionResponseStreamChoice( + index=i, + delta=DeltaMessage(content=last_msg_content), + finish_reason=None) + chunk = ChatCompletionStreamResponse( + id=request_id, + object=chunk_object_type, + created=created_time, + choices=[choice_data], + model=model_name) + data = chunk.json(exclude_unset=True, ensure_ascii=False) + yield f"data: {data}\n\n" + + # Send response for each token for each request.n (index) + previous_texts = [""] * request.n + previous_num_tokens = [0] * request.n + finish_reason_sent = [False] * request.n + async for res in result_generator: + res: RequestOutput + for output in res.outputs: + i = output.index + + if finish_reason_sent[i]: + continue + + if output.finish_reason is None: + # Send token-by-token response for each request.n + delta_text = output.text[len(previous_texts[i]):] + previous_texts[i] = output.text + previous_num_tokens[i] = len(output.token_ids) + choice_data = ChatCompletionResponseStreamChoice( + index=i, + delta=DeltaMessage(content=delta_text), + finish_reason=None) + chunk = ChatCompletionStreamResponse( + id=request_id, + object=chunk_object_type, + created=created_time, + choices=[choice_data], + model=model_name) + data = chunk.json(exclude_unset=True, ensure_ascii=False) + yield f"data: {data}\n\n" + else: + # Send the finish response for each request.n only once + prompt_tokens = len(res.prompt_token_ids) + final_usage = UsageInfo( + prompt_tokens=prompt_tokens, + completion_tokens=previous_num_tokens[i], + total_tokens=prompt_tokens + previous_num_tokens[i], + ) + choice_data = ChatCompletionResponseStreamChoice( + index=i, delta=[], finish_reason=output.finish_reason) + chunk = ChatCompletionStreamResponse( + id=request_id, + object=chunk_object_type, + created=created_time, + choices=[choice_data], + model=model_name) + if final_usage is not None: + chunk.usage = final_usage + data = chunk.json(exclude_unset=True, + exclude_none=True, + ensure_ascii=False) + yield f"data: {data}\n\n" + finish_reason_sent[i] = True + # Send the final done message after all response.n are finished + yield "data: [DONE]\n\n" + + async def chat_completion_full_generator( + self, request: ChatCompletionRequest, raw_request: Request, + result_generator: AsyncIterator[RequestOutput], + request_id: str) -> Union[ErrorResponse, ChatCompletionResponse]: + + model_name = request.model + created_time = int(time.monotonic()) + final_res: RequestOutput = None + + async for res in result_generator: + if await raw_request.is_disconnected(): + # Abort the request if the client disconnects. + await self.engine.abort(request_id) + return self.create_error_response("Client disconnected") + final_res = res + assert final_res is not None + + choices = [] + role = self.get_chat_request_role(request) + for output in final_res.outputs: + choice_data = ChatCompletionResponseChoice( + index=output.index, + message=ChatMessage(role=role, content=output.text), + finish_reason=output.finish_reason, + ) + choices.append(choice_data) + + if request.echo: + last_msg_content = "" + if request.messages and isinstance( + request.messages, list) and request.messages[-1].get( + "content") and request.messages[-1].get( + "role") == role: + last_msg_content = request.messages[-1]["content"] + + for choice in choices: + full_message = last_msg_content + choice.message.content + choice.message.content = full_message + + num_prompt_tokens = len(final_res.prompt_token_ids) + num_generated_tokens = sum( + len(output.token_ids) for output in final_res.outputs) + usage = UsageInfo( + prompt_tokens=num_prompt_tokens, + completion_tokens=num_generated_tokens, + total_tokens=num_prompt_tokens + num_generated_tokens, + ) + response = ChatCompletionResponse( + id=request_id, + created=created_time, + model=model_name, + choices=choices, + usage=usage, + ) + + return response diff --git a/vllm/entrypoints/openai/serving_completion.py b/vllm/entrypoints/openai/serving_completion.py new file mode 100644 index 000000000000..bf6174da4aa4 --- /dev/null +++ b/vllm/entrypoints/openai/serving_completion.py @@ -0,0 +1,288 @@ +import time +from fastapi import Request +from typing import AsyncGenerator, Optional +from vllm.logger import init_logger +from vllm.utils import random_uuid +from vllm.engine.async_llm_engine import AsyncLLMEngine +from .protocol import ( + CompletionRequest, CompletionResponse, CompletionResponseChoice, + CompletionResponseStreamChoice, CompletionStreamResponse, + LogProbs, UsageInfo) +from vllm.outputs import RequestOutput +from vllm.sampling_params import SamplingParams +from vllm.entrypoints.openai.serving_engine import OpenAIServing + +logger = init_logger(__name__) + +class OpenAIServingCompletion(OpenAIServing): + def __init__(self, + engine: AsyncLLMEngine, + served_model: str, + response_role: str, + chat_template=None): + super().__init__(engine=engine, served_model=served_model, response_role=response_role, chat_template=chat_template) + + async def create_completion(self, request: CompletionRequest, raw_request: Request): + """Completion API similar to OpenAI's API. + + See https://platform.openai.com/docs/api-reference/completions/create + for the API specification. This API mimics the OpenAI Completion API. + + NOTE: Currently we do not support the following features: + - suffix (the language models we currently support do not support + suffix) + - logit_bias (to be supported by vLLM engine) + """ + + error_check_ret = await self._check_model(request) + if error_check_ret is not None: + return error_check_ret + + # OpenAI API supports echoing the prompt when max_tokens is 0. + echo_without_generation = request.echo and request.max_tokens == 0 + + if request.suffix is not None: + # The language models we currently support do not support suffix. + return self.create_error_response("suffix is not currently supported") + + if request.logit_bias is not None and len(request.logit_bias) > 0: + # TODO: support logit_bias in vLLM engine. + return self.create_error_response("logit_bias is not currently supported") + + model_name = request.model + request_id = f"cmpl-{random_uuid()}" + + use_token_ids = False + if isinstance(request.prompt, list): + if len(request.prompt) == 0: + return self.create_error_response("please provide at least one prompt") + first_element = request.prompt[0] + if isinstance(first_element, int): + use_token_ids = True + prompt = request.prompt + elif isinstance(first_element, (str, list)): + # TODO: handles multiple prompt case in list[list[int]] + if len(request.prompt) > 1: + return self.create_error_response("multiple prompts in a batch is not currently supported") + use_token_ids = not isinstance(first_element, str) + prompt = request.prompt[0] + else: + prompt = request.prompt + + if use_token_ids: + _, error_check_ret = await self._check_length(request, prompt_ids=prompt) + else: + token_ids, error_check_ret = await self._check_length(request, prompt=prompt) + if error_check_ret is not None: + return error_check_ret + + created_time = int(time.monotonic()) + try: + spaces_between_special_tokens = request.spaces_between_special_tokens + sampling_params = SamplingParams( + n=request.n, + best_of=request.best_of, + presence_penalty=request.presence_penalty, + frequency_penalty=request.frequency_penalty, + repetition_penalty=request.repetition_penalty, + temperature=request.temperature, + top_p=request.top_p, + top_k=request.top_k, + min_p=request.min_p, + stop=request.stop, + stop_token_ids=request.stop_token_ids, + ignore_eos=request.ignore_eos, + max_tokens=request.max_tokens + if not echo_without_generation else 1, + logprobs=request.logprobs, + use_beam_search=request.use_beam_search, + prompt_logprobs=request.logprobs if request.echo else None, + skip_special_tokens=request.skip_special_tokens, + spaces_between_special_tokens=spaces_between_special_tokens, + ) + except ValueError as e: + return self.create_error_response(str(e)) + + if use_token_ids: + result_generator = self.engine.generate(None, + sampling_params, + request_id, + prompt_token_ids=prompt) + else: + result_generator = self.engine.generate(prompt, sampling_params, request_id, + token_ids) + + # Similar to the OpenAI API, when n != best_of, we do not stream the + # results. In addition, we do not stream the results when use beam search. + stream = (request.stream + and (request.best_of is None or request.n == request.best_of) + and not request.use_beam_search) + + def create_stream_response_json( + index: int, + text: str, + logprobs: Optional[LogProbs] = None, + finish_reason: Optional[str] = None, + usage: Optional[UsageInfo] = None, + ) -> str: + choice_data = CompletionResponseStreamChoice( + index=index, + text=text, + logprobs=logprobs, + finish_reason=finish_reason, + ) + response = CompletionStreamResponse( + id=request_id, + created=created_time, + model=model_name, + choices=[choice_data], + ) + if usage is not None: + response.usage = usage + response_json = response.json(exclude_unset=True, ensure_ascii=False) + + return response_json + + async def completion_stream_generator() -> AsyncGenerator[str, None]: + previous_texts = [""] * request.n + previous_num_tokens = [0] * request.n + has_echoed = [False] * request.n + async for res in result_generator: + res: RequestOutput + for output in res.outputs: + i = output.index + delta_text = output.text[len(previous_texts[i]):] + token_ids = output.token_ids[previous_num_tokens[i]:] + if request.logprobs is not None: + top_logprobs = output.logprobs[previous_num_tokens[i]:] + else: + top_logprobs = None + offsets = len(previous_texts[i]) + if request.echo and not has_echoed[i]: + if not echo_without_generation: + delta_text = res.prompt + delta_text + token_ids = res.prompt_token_ids + token_ids + if top_logprobs: + top_logprobs = res.prompt_logprobs + top_logprobs + else: # only just return the prompt + delta_text = res.prompt + token_ids = res.prompt_token_ids + if top_logprobs: + top_logprobs = res.prompt_logprobs + has_echoed[i] = True + if request.logprobs is not None: + logprobs = self._create_logprobs( + token_ids=token_ids, + top_logprobs=top_logprobs, + num_output_top_logprobs=request.logprobs, + initial_text_offset=offsets, + ) + else: + logprobs = None + previous_texts[i] = output.text + previous_num_tokens[i] = len(output.token_ids) + finish_reason = output.finish_reason + response_json = create_stream_response_json( + index=i, + text=delta_text, + logprobs=logprobs, + finish_reason=finish_reason, + ) + yield f"data: {response_json}\n\n" + if output.finish_reason is not None: + logprobs = (LogProbs() + if request.logprobs is not None else None) + prompt_tokens = len(res.prompt_token_ids) + completion_tokens = len(output.token_ids) + final_usage = UsageInfo( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=prompt_tokens + completion_tokens, + ) + response_json = create_stream_response_json( + index=i, + text="", + logprobs=logprobs, + finish_reason=output.finish_reason, + usage=final_usage, + ) + yield f"data: {response_json}\n\n" + yield "data: [DONE]\n\n" + + # Streaming response + if stream: + return completion_stream_generator() + + # Non-streaming response + final_res: RequestOutput = None + async for res in result_generator: + if await raw_request.is_disconnected(): + # Abort the request if the client disconnects. + await self.engine.abort(request_id) + return self.create_error_response("Client disconnected") + final_res = res + assert final_res is not None + choices = [] + prompt_token_ids = final_res.prompt_token_ids + prompt_logprobs = final_res.prompt_logprobs + prompt_text = final_res.prompt + for output in final_res.outputs: + if request.logprobs is not None: + if not echo_without_generation: + token_ids = output.token_ids + top_logprobs = output.logprobs + if request.echo: + token_ids = prompt_token_ids + token_ids + top_logprobs = prompt_logprobs + top_logprobs + else: + token_ids = prompt_token_ids + top_logprobs = prompt_logprobs + logprobs = self._create_logprobs( + token_ids=token_ids, + top_logprobs=top_logprobs, + num_output_top_logprobs=request.logprobs, + ) + else: + logprobs = None + if not echo_without_generation: + output_text = output.text + if request.echo: + output_text = prompt_text + output_text + else: + output_text = prompt_text + choice_data = CompletionResponseChoice( + index=output.index, + text=output_text, + logprobs=logprobs, + finish_reason=output.finish_reason, + ) + choices.append(choice_data) + + num_prompt_tokens = len(final_res.prompt_token_ids) + num_generated_tokens = sum( + len(output.token_ids) for output in final_res.outputs) + usage = UsageInfo( + prompt_tokens=num_prompt_tokens, + completion_tokens=num_generated_tokens, + total_tokens=num_prompt_tokens + num_generated_tokens, + ) + response = CompletionResponse( + id=request_id, + created=created_time, + model=model_name, + choices=choices, + usage=usage, + ) + + if request.stream: + # When user requests streaming but we don't stream, we still need to + # return a streaming response with a single event. + response_json = response.json(ensure_ascii=False) + + async def fake_stream_generator() -> AsyncGenerator[str, None]: + yield f"data: {response_json}\n\n" + yield "data: [DONE]\n\n" + + return fake_stream_generator() + + return response diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py new file mode 100644 index 000000000000..d910f484e9b0 --- /dev/null +++ b/vllm/entrypoints/openai/serving_engine.py @@ -0,0 +1,151 @@ +import asyncio +import codecs +from http import HTTPStatus +from typing import Dict, List, Optional, Tuple, Union +from vllm.logger import init_logger +from vllm.transformers_utils.tokenizer import get_tokenizer +from vllm.engine.async_llm_engine import AsyncLLMEngine +from vllm.entrypoints.openai.protocol import ( + CompletionRequest, ChatCompletionRequest, ErrorResponse, + LogProbs, ModelCard, ModelList, ModelPermission) + +logger = init_logger(__name__) + +class OpenAIServing: + def __init__(self, + engine: AsyncLLMEngine, + served_model: str, + response_role: str, + chat_template=None): + self.engine = engine + self.served_model = served_model + self.chat_template = chat_template + self.response_role = response_role + + self.max_model_len = 0 + self.tokenizer = None + + if engine.engine_use_ray: + post_init_task = asyncio.ensure_future(self._post_init()) + asyncio.get_event_loop().run_until_complete(asyncio.gather(post_init_task)) + else: + asyncio.run(self._post_init()) + + async def _post_init(self): + engine_model_config = await self.engine.get_model_config() + self.max_model_len = engine_model_config.max_model_len + + # A separate tokenizer to map token IDs to strings. + self.tokenizer = get_tokenizer( + engine_model_config.tokenizer, + tokenizer_mode=engine_model_config.tokenizer_mode, + trust_remote_code=engine_model_config.trust_remote_code) + self._load_chat_template(self.chat_template) + + async def show_available_models(self) -> ModelList: + """Show available models. Right now we only have one model.""" + model_cards = [ + ModelCard(id=self.served_model, + root=self.served_model, + permission=[ModelPermission()]) + ] + return ModelList(data=model_cards) + + def _create_logprobs( + self, + token_ids: List[int], + top_logprobs: Optional[List[Optional[Dict[int, float]]]] = None, + num_output_top_logprobs: Optional[int] = None, + initial_text_offset: int = 0, + ) -> LogProbs: + """Create OpenAI-style logprobs.""" + logprobs = LogProbs() + last_token_len = 0 + if num_output_top_logprobs: + logprobs.top_logprobs = [] + for i, token_id in enumerate(token_ids): + step_top_logprobs = top_logprobs[i] + if step_top_logprobs is not None: + token_logprob = step_top_logprobs[token_id] + else: + token_logprob = None + token = self.tokenizer.convert_ids_to_tokens(token_id) + logprobs.tokens.append(token) + logprobs.token_logprobs.append(token_logprob) + if len(logprobs.text_offset) == 0: + logprobs.text_offset.append(initial_text_offset) + else: + logprobs.text_offset.append(logprobs.text_offset[-1] + + last_token_len) + last_token_len = len(token) + + if num_output_top_logprobs: + logprobs.top_logprobs.append({ + self.tokenizer.convert_ids_to_tokens(i): p + for i, p in step_top_logprobs.items() + } if step_top_logprobs else None) + return logprobs + + def create_error_response( + self, + message: str, + err_type: str = "BadRequestError", + status_code: HTTPStatus = HTTPStatus.BAD_REQUEST) -> ErrorResponse: + return ErrorResponse(message=message, + type=err_type, + code=status_code.value) + + async def _check_model(self, request) -> Optional[ErrorResponse]: + if request.model == self.served_model: + return + return self.create_error_response( + message=f"The model `{request.model}` does not exist.", + err_type="NotFoundError", + status_code=HTTPStatus.NOT_FOUND) + + async def _check_length( + self, + request: Union[ChatCompletionRequest, CompletionRequest], + prompt: Optional[str] = None, + prompt_ids: Optional[List[int]] = None + ) -> Tuple[List[int], Optional[ErrorResponse]]: + assert (not (prompt is None and prompt_ids is None) + and not (prompt is not None and prompt_ids is not None) + ), "Either prompt or prompt_ids should be provided." + input_ids = prompt_ids if prompt_ids is not None else self.tokenizer( + prompt).input_ids + token_num = len(input_ids) + + if request.max_tokens is None: + request.max_tokens = self.max_model_len - token_num + if token_num + request.max_tokens > self.max_model_len: + return input_ids, self.create_error_response( + f"This model's maximum context length is {self.max_model_len} tokens. " + f"However, you requested {request.max_tokens + token_num} tokens " + f"({token_num} in the messages, " + f"{request.max_tokens} in the completion). " + f"Please reduce the length of the messages or completion.", ) + else: + return input_ids, None + + def _load_chat_template(self, chat_template): + if chat_template is not None: + try: + with open(chat_template, "r") as f: + self.tokenizer.chat_template = f.read() + except OSError: + # If opening a file fails, set chat template to be args to + # ensure we decode so our escape are interpreted correctly + self.tokenizer.chat_template = codecs.decode( + chat_template, "unicode_escape") + + logger.info( + f"Using supplied chat template:\n{self.tokenizer.chat_template}" + ) + elif self.tokenizer.chat_template is not None: + logger.info( + f"Using default chat template:\n{self.tokenizer.chat_template}" + ) + else: + logger.warning( + "No chat template provided. Chat API will not work.") From fab0edfab86e0520286c70da0f27b02e9964c640 Mon Sep 17 00:00:00 2001 From: FlorianJoncour Date: Sat, 6 Jan 2024 02:19:55 +0100 Subject: [PATCH 02/14] format --- vllm/entrypoints/openai/api_server.py | 1 + vllm/entrypoints/openai/serving_chat.py | 37 +++++++------ vllm/entrypoints/openai/serving_completion.py | 54 ++++++++++++------- vllm/entrypoints/openai/serving_engine.py | 13 +++-- 4 files changed, 66 insertions(+), 39 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 89550f5f8a86..7301cdb8ff03 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -101,6 +101,7 @@ async def validation_exception_handler(_, exc): err = openai_serving_chat.create_error_response(message=str(exc)) return JSONResponse(err.dict(), status_code=HTTPStatus.BAD_REQUEST) + @app.get("/health") async def health() -> Response: """Health check.""" diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index 40a632a28c7f..626a148ab261 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -15,17 +15,22 @@ logger = init_logger(__name__) + class OpenAIServingChat(OpenAIServing): + def __init__(self, engine: AsyncLLMEngine, served_model: str, response_role: str, chat_template=None): - super().__init__(engine=engine, served_model=served_model, response_role=response_role, chat_template=chat_template) + super().__init__(engine=engine, + served_model=served_model, + response_role=response_role, + chat_template=chat_template) - async def create_chat_completion(self, request: ChatCompletionRequest, - raw_request: Request - ) -> Union[ErrorResponse, AsyncGenerator[str, None], + async def create_chat_completion( + self, request: ChatCompletionRequest, raw_request: Request + ) -> Union[ErrorResponse, AsyncGenerator[str, None], ChatCompletionResponse]: """Completion API similar to OpenAI's API. @@ -43,7 +48,7 @@ async def create_chat_completion(self, request: ChatCompletionRequest, if request.logit_bias is not None and len(request.logit_bias) > 0: # TODO: support logit_bias in vLLM engine. return self.create_error_response( - "logit_bias is not currently supported") + "logit_bias is not currently supported") try: prompt = self.tokenizer.apply_chat_template( @@ -51,10 +56,12 @@ async def create_chat_completion(self, request: ChatCompletionRequest, tokenize=False, add_generation_prompt=request.add_generation_prompt) except Exception as e: - logger.error(f"Error in applying chat template from request: {str(e)}") + logger.error( + f"Error in applying chat template from request: {str(e)}") return self.create_error_response(str(e)) - token_ids, error_check_ret = await self._check_length(request, prompt=prompt) + token_ids, error_check_ret = await self._check_length(request, + prompt=prompt) if error_check_ret is not None: return error_check_ret @@ -82,8 +89,8 @@ async def create_chat_completion(self, request: ChatCompletionRequest, except ValueError as e: return self.create_error_response(str(e)) - result_generator = self.engine.generate(prompt, sampling_params, request_id, - token_ids) + result_generator = self.engine.generate(prompt, sampling_params, + request_id, token_ids) # Streaming response if request.stream: return self.chat_completion_stream_generator( @@ -113,10 +120,10 @@ async def chat_completion_stream_generator( choice_data = ChatCompletionResponseStreamChoice( index=i, delta=DeltaMessage(role=role), finish_reason=None) chunk = ChatCompletionStreamResponse(id=request_id, - object=chunk_object_type, - created=created_time, - choices=[choice_data], - model=model_name) + object=chunk_object_type, + created=created_time, + choices=[choice_data], + model=model_name) data = chunk.json(exclude_unset=True, ensure_ascii=False) yield f"data: {data}\n\n" @@ -191,8 +198,8 @@ async def chat_completion_stream_generator( if final_usage is not None: chunk.usage = final_usage data = chunk.json(exclude_unset=True, - exclude_none=True, - ensure_ascii=False) + exclude_none=True, + ensure_ascii=False) yield f"data: {data}\n\n" finish_reason_sent[i] = True # Send the final done message after all response.n are finished diff --git a/vllm/entrypoints/openai/serving_completion.py b/vllm/entrypoints/openai/serving_completion.py index bf6174da4aa4..dbdf42467b0d 100644 --- a/vllm/entrypoints/openai/serving_completion.py +++ b/vllm/entrypoints/openai/serving_completion.py @@ -4,25 +4,31 @@ from vllm.logger import init_logger from vllm.utils import random_uuid from vllm.engine.async_llm_engine import AsyncLLMEngine -from .protocol import ( - CompletionRequest, CompletionResponse, CompletionResponseChoice, - CompletionResponseStreamChoice, CompletionStreamResponse, - LogProbs, UsageInfo) +from .protocol import (CompletionRequest, CompletionResponse, + CompletionResponseChoice, + CompletionResponseStreamChoice, + CompletionStreamResponse, LogProbs, UsageInfo) from vllm.outputs import RequestOutput from vllm.sampling_params import SamplingParams from vllm.entrypoints.openai.serving_engine import OpenAIServing logger = init_logger(__name__) + class OpenAIServingCompletion(OpenAIServing): + def __init__(self, engine: AsyncLLMEngine, served_model: str, response_role: str, chat_template=None): - super().__init__(engine=engine, served_model=served_model, response_role=response_role, chat_template=chat_template) + super().__init__(engine=engine, + served_model=served_model, + response_role=response_role, + chat_template=chat_template) - async def create_completion(self, request: CompletionRequest, raw_request: Request): + async def create_completion(self, request: CompletionRequest, + raw_request: Request): """Completion API similar to OpenAI's API. See https://platform.openai.com/docs/api-reference/completions/create @@ -43,11 +49,13 @@ async def create_completion(self, request: CompletionRequest, raw_request: Reque if request.suffix is not None: # The language models we currently support do not support suffix. - return self.create_error_response("suffix is not currently supported") + return self.create_error_response( + "suffix is not currently supported") if request.logit_bias is not None and len(request.logit_bias) > 0: # TODO: support logit_bias in vLLM engine. - return self.create_error_response("logit_bias is not currently supported") + return self.create_error_response( + "logit_bias is not currently supported") model_name = request.model request_id = f"cmpl-{random_uuid()}" @@ -55,7 +63,8 @@ async def create_completion(self, request: CompletionRequest, raw_request: Reque use_token_ids = False if isinstance(request.prompt, list): if len(request.prompt) == 0: - return self.create_error_response("please provide at least one prompt") + return self.create_error_response( + "please provide at least one prompt") first_element = request.prompt[0] if isinstance(first_element, int): use_token_ids = True @@ -63,16 +72,20 @@ async def create_completion(self, request: CompletionRequest, raw_request: Reque elif isinstance(first_element, (str, list)): # TODO: handles multiple prompt case in list[list[int]] if len(request.prompt) > 1: - return self.create_error_response("multiple prompts in a batch is not currently supported") + return self.create_error_response( + "multiple prompts in a batch is not currently supported" + ) use_token_ids = not isinstance(first_element, str) prompt = request.prompt[0] else: prompt = request.prompt if use_token_ids: - _, error_check_ret = await self._check_length(request, prompt_ids=prompt) + _, error_check_ret = await self._check_length(request, + prompt_ids=prompt) else: - token_ids, error_check_ret = await self._check_length(request, prompt=prompt) + token_ids, error_check_ret = await self._check_length( + request, prompt=prompt) if error_check_ret is not None: return error_check_ret @@ -105,18 +118,18 @@ async def create_completion(self, request: CompletionRequest, raw_request: Reque if use_token_ids: result_generator = self.engine.generate(None, - sampling_params, - request_id, - prompt_token_ids=prompt) + sampling_params, + request_id, + prompt_token_ids=prompt) else: - result_generator = self.engine.generate(prompt, sampling_params, request_id, - token_ids) + result_generator = self.engine.generate(prompt, sampling_params, + request_id, token_ids) # Similar to the OpenAI API, when n != best_of, we do not stream the # results. In addition, we do not stream the results when use beam search. stream = (request.stream - and (request.best_of is None or request.n == request.best_of) - and not request.use_beam_search) + and (request.best_of is None or request.n == request.best_of) + and not request.use_beam_search) def create_stream_response_json( index: int, @@ -139,7 +152,8 @@ def create_stream_response_json( ) if usage is not None: response.usage = usage - response_json = response.json(exclude_unset=True, ensure_ascii=False) + response_json = response.json(exclude_unset=True, + ensure_ascii=False) return response_json diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index d910f484e9b0..eecf6778d681 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -5,13 +5,17 @@ from vllm.logger import init_logger from vllm.transformers_utils.tokenizer import get_tokenizer from vllm.engine.async_llm_engine import AsyncLLMEngine -from vllm.entrypoints.openai.protocol import ( - CompletionRequest, ChatCompletionRequest, ErrorResponse, - LogProbs, ModelCard, ModelList, ModelPermission) +from vllm.entrypoints.openai.protocol import (CompletionRequest, + ChatCompletionRequest, + ErrorResponse, LogProbs, + ModelCard, ModelList, + ModelPermission) logger = init_logger(__name__) + class OpenAIServing: + def __init__(self, engine: AsyncLLMEngine, served_model: str, @@ -27,7 +31,8 @@ def __init__(self, if engine.engine_use_ray: post_init_task = asyncio.ensure_future(self._post_init()) - asyncio.get_event_loop().run_until_complete(asyncio.gather(post_init_task)) + asyncio.get_event_loop().run_until_complete( + asyncio.gather(post_init_task)) else: asyncio.run(self._post_init()) From 8adb6daf3d7b23f4d9c8bf8919bbf4a9528c6326 Mon Sep 17 00:00:00 2001 From: FlorianJoncour Date: Sat, 6 Jan 2024 02:43:39 +0100 Subject: [PATCH 03/14] forget to uncomment something --- vllm/entrypoints/openai/api_server.py | 33 ++++++++++++++------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 7301cdb8ff03..3890125ece2f 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -1,5 +1,7 @@ import argparse +import asyncio import json +from contextlib import asynccontextmanager from aioprometheus import MetricsMiddleware from aioprometheus.asgi.starlette import metrics import fastapi @@ -24,22 +26,21 @@ openai_serving_completion: OpenAIServingCompletion = None logger = init_logger(__name__) -# @asynccontextmanager -# async def lifespan(app: fastapi.FastAPI): -# -# async def _force_log(): -# while True: -# await asyncio.sleep(10) -# await engine.do_log_stats() -# -# if not engine_args.disable_log_stats: -# asyncio.create_task(_force_log()) -# -# yield -# -# -# app = fastapi.FastAPI(lifespan=lifespan) -app = fastapi.FastAPI() +@asynccontextmanager +async def lifespan(app: fastapi.FastAPI): + + async def _force_log(): + while True: + await asyncio.sleep(10) + await engine.do_log_stats() + + if not engine_args.disable_log_stats: + asyncio.create_task(_force_log()) + + yield + + +app = fastapi.FastAPI(lifespan=lifespan) def parse_args(): From 2b2d2713a22ccb83574a89506064e2b80d67a460 Mon Sep 17 00:00:00 2001 From: FlorianJoncour <148003496+FlorianJoncour@users.noreply.github.com> Date: Sat, 6 Jan 2024 01:53:45 +0000 Subject: [PATCH 04/14] Minor typo --- vllm/entrypoints/openai/api_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 3890125ece2f..8b65c92405eb 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -26,6 +26,7 @@ openai_serving_completion: OpenAIServingCompletion = None logger = init_logger(__name__) + @asynccontextmanager async def lifespan(app: fastapi.FastAPI): From b60cd239e1cf79cb9134239728362ef095ec7f68 Mon Sep 17 00:00:00 2001 From: FlorianJoncour Date: Sat, 6 Jan 2024 12:59:39 +0100 Subject: [PATCH 05/14] Manage running event loop if any --- vllm/entrypoints/openai/serving_engine.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index eecf6778d681..fe2323f4959f 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -29,11 +29,15 @@ def __init__(self, self.max_model_len = 0 self.tokenizer = None - if engine.engine_use_ray: - post_init_task = asyncio.ensure_future(self._post_init()) - asyncio.get_event_loop().run_until_complete( - asyncio.gather(post_init_task)) - else: + try: + event_loop = asyncio.get_running_loop() + except RuntimeError: + event_loop = None + + if event_loop is not None and event_loop.is_running( + ): # If the current is instanced by Ray Serve, there is already a running event loop + event_loop.create_task(self._post_init()) + else: # When using single vLLM without engine_use_ray asyncio.run(self._post_init()) async def _post_init(self): From 519aa3d3dc953f87c8055986de36e13312e04ea0 Mon Sep 17 00:00:00 2001 From: FlorianJoncour Date: Sat, 13 Jan 2024 00:21:52 +0100 Subject: [PATCH 06/14] Fixes as requested --- vllm/entrypoints/openai/api_server.py | 3 +- vllm/entrypoints/openai/serving_chat.py | 27 ++++++++++++++-- vllm/entrypoints/openai/serving_completion.py | 10 ++---- vllm/entrypoints/openai/serving_engine.py | 32 ++----------------- 4 files changed, 31 insertions(+), 41 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 8b65c92405eb..122fe9208dcb 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -132,7 +132,6 @@ async def create_chat_completion(request: ChatCompletionRequest, async def create_completion(request: CompletionRequest, raw_request: Request): generator = await openai_serving_completion.create_completion( request, raw_request) - logger.info("TYPE COMPLETION : %s" % str(type(generator))) if request.stream and not isinstance(generator, ErrorResponse): return StreamingResponse(content=generator, media_type="text/event-stream") @@ -164,7 +163,7 @@ async def create_completion(request: CompletionRequest, raw_request: Request): args.response_role, args.chat_template) openai_serving_completion = OpenAIServingCompletion( - engine, served_model, args.response_role, args.chat_template) + engine, served_model, args.response_role) # Register labels for metrics add_global_metrics_labels(model_name=engine_args.model) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index 626a148ab261..ddebb1730ae0 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -1,4 +1,5 @@ import time +import codecs from fastapi import Request from typing import AsyncGenerator, AsyncIterator, Union from vllm.logger import init_logger @@ -25,8 +26,8 @@ def __init__(self, chat_template=None): super().__init__(engine=engine, served_model=served_model, - response_role=response_role, - chat_template=chat_template) + response_role=response_role) + self._load_chat_template(chat_template) async def create_chat_completion( self, request: ChatCompletionRequest, raw_request: Request @@ -261,3 +262,25 @@ async def chat_completion_full_generator( ) return response + + def _load_chat_template(self, chat_template): + if chat_template is not None: + try: + with open(chat_template, "r") as f: + self.tokenizer.chat_template = f.read() + except OSError: + # If opening a file fails, set chat template to be args to + # ensure we decode so our escape are interpreted correctly + self.tokenizer.chat_template = codecs.decode( + chat_template, "unicode_escape") + + logger.info( + f"Using supplied chat template:\n{self.tokenizer.chat_template}" + ) + elif self.tokenizer.chat_template is not None: + logger.info( + f"Using default chat template:\n{self.tokenizer.chat_template}" + ) + else: + logger.warning( + "No chat template provided. Chat API will not work.") diff --git a/vllm/entrypoints/openai/serving_completion.py b/vllm/entrypoints/openai/serving_completion.py index dbdf42467b0d..597f86bd4b50 100644 --- a/vllm/entrypoints/openai/serving_completion.py +++ b/vllm/entrypoints/openai/serving_completion.py @@ -17,15 +17,11 @@ class OpenAIServingCompletion(OpenAIServing): - def __init__(self, - engine: AsyncLLMEngine, - served_model: str, - response_role: str, - chat_template=None): + def __init__(self, engine: AsyncLLMEngine, served_model: str, + response_role: str): super().__init__(engine=engine, served_model=served_model, - response_role=response_role, - chat_template=chat_template) + response_role=response_role) async def create_completion(self, request: CompletionRequest, raw_request: Request): diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index fe2323f4959f..e4e4b313c943 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -1,5 +1,4 @@ import asyncio -import codecs from http import HTTPStatus from typing import Dict, List, Optional, Tuple, Union from vllm.logger import init_logger @@ -16,14 +15,10 @@ class OpenAIServing: - def __init__(self, - engine: AsyncLLMEngine, - served_model: str, - response_role: str, - chat_template=None): + def __init__(self, engine: AsyncLLMEngine, served_model: str, + response_role: str): self.engine = engine self.served_model = served_model - self.chat_template = chat_template self.response_role = response_role self.max_model_len = 0 @@ -49,7 +44,6 @@ async def _post_init(self): engine_model_config.tokenizer, tokenizer_mode=engine_model_config.tokenizer_mode, trust_remote_code=engine_model_config.trust_remote_code) - self._load_chat_template(self.chat_template) async def show_available_models(self) -> ModelList: """Show available models. Right now we only have one model.""" @@ -136,25 +130,3 @@ async def _check_length( f"Please reduce the length of the messages or completion.", ) else: return input_ids, None - - def _load_chat_template(self, chat_template): - if chat_template is not None: - try: - with open(chat_template, "r") as f: - self.tokenizer.chat_template = f.read() - except OSError: - # If opening a file fails, set chat template to be args to - # ensure we decode so our escape are interpreted correctly - self.tokenizer.chat_template = codecs.decode( - chat_template, "unicode_escape") - - logger.info( - f"Using supplied chat template:\n{self.tokenizer.chat_template}" - ) - elif self.tokenizer.chat_template is not None: - logger.info( - f"Using default chat template:\n{self.tokenizer.chat_template}" - ) - else: - logger.warning( - "No chat template provided. Chat API will not work.") From afc6231b1b9d64e3cfb38f33cd3348947cd645dc Mon Sep 17 00:00:00 2001 From: simon-mo Date: Tue, 16 Jan 2024 06:24:07 +0000 Subject: [PATCH 07/14] wip-fixes, move response role to chat only --- vllm/entrypoints/openai/api_server.py | 2 +- vllm/entrypoints/openai/serving_chat.py | 5 ++--- vllm/entrypoints/openai/serving_completion.py | 7 ++----- vllm/entrypoints/openai/serving_engine.py | 5 +---- 4 files changed, 6 insertions(+), 13 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index d92cc9d5e0b5..c248b71b7862 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -168,7 +168,7 @@ async def create_completion(request: CompletionRequest, raw_request: Request): args.response_role, args.chat_template) openai_serving_completion = OpenAIServingCompletion( - engine, served_model, args.response_role) + engine, served_model) # Register labels for metrics add_global_metrics_labels(model_name=engine_args.model) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index ddebb1730ae0..6429b82fbd44 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -18,15 +18,14 @@ class OpenAIServingChat(OpenAIServing): - def __init__(self, engine: AsyncLLMEngine, served_model: str, response_role: str, chat_template=None): super().__init__(engine=engine, - served_model=served_model, - response_role=response_role) + served_model=served_model) + self.response_role = response_role self._load_chat_template(chat_template) async def create_chat_completion( diff --git a/vllm/entrypoints/openai/serving_completion.py b/vllm/entrypoints/openai/serving_completion.py index 597f86bd4b50..14a5f38bb4dc 100644 --- a/vllm/entrypoints/openai/serving_completion.py +++ b/vllm/entrypoints/openai/serving_completion.py @@ -16,12 +16,9 @@ class OpenAIServingCompletion(OpenAIServing): - - def __init__(self, engine: AsyncLLMEngine, served_model: str, - response_role: str): + def __init__(self, engine: AsyncLLMEngine, served_model: str): super().__init__(engine=engine, - served_model=served_model, - response_role=response_role) + served_model=served_model) async def create_completion(self, request: CompletionRequest, raw_request: Request): diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index e4e4b313c943..2a52ec8f365d 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -14,12 +14,9 @@ class OpenAIServing: - - def __init__(self, engine: AsyncLLMEngine, served_model: str, - response_role: str): + def __init__(self, engine: AsyncLLMEngine, served_model: str): self.engine = engine self.served_model = served_model - self.response_role = response_role self.max_model_len = 0 self.tokenizer = None From 051c5505f95667fa570a65a1252fc4b7089de7f0 Mon Sep 17 00:00:00 2001 From: simon-mo Date: Tue, 16 Jan 2024 19:43:52 +0000 Subject: [PATCH 08/14] add test cases, fix chat streaming bug --- tests/entrypoints/test_openai_server.py | 195 ++++++++++++++++++ vllm/entrypoints/openai/api_server.py | 3 +- vllm/entrypoints/openai/serving_chat.py | 15 +- vllm/entrypoints/openai/serving_completion.py | 4 +- vllm/entrypoints/openai/serving_engine.py | 1 + 5 files changed, 208 insertions(+), 10 deletions(-) create mode 100644 tests/entrypoints/test_openai_server.py diff --git a/tests/entrypoints/test_openai_server.py b/tests/entrypoints/test_openai_server.py new file mode 100644 index 000000000000..7e1558c25e51 --- /dev/null +++ b/tests/entrypoints/test_openai_server.py @@ -0,0 +1,195 @@ +import time +import subprocess + +import sys +import pytest +import requests +import ray # using Ray for overall ease of process management, parallel requests, and debugging. +import openai # use the official client for correctness check + +MAX_SERVER_START_WAIT_S = 600 # wait for server to start for 60 seconds +MODEL_NAME = "HuggingFaceH4/zephyr-7b-beta" # any model with a chat template should work here + +pytestmark = pytest.mark.asyncio + + +@ray.remote(num_gpus=1) +class ServerRunner: + + def __init__(self, args): + self.proc = subprocess.Popen( + ["python", "-m", "vllm.entrypoints.openai.api_server"] + args, + stdout=sys.stdout, + stderr=sys.stderr, + ) + self._wait_for_server() + + def ready(self): + return True + + def _wait_for_server(self): + # run health check + start = time.time() + while True: + try: + if requests.get( + "http://localhost:8000/health").status_code == 200: + break + except Exception as err: + if self.proc.poll() is not None: + raise RuntimeError("Server exited unexpectedly.") from err + + time.sleep(0.5) + if time.time() - start > MAX_SERVER_START_WAIT_S: + raise RuntimeError( + "Server failed to start in time.") from err + + def __del__(self): + if hasattr(self, "proc"): + self.proc.terminate() + + +@pytest.fixture(scope="session") +def server(): + yield + return + ray.init() + server_runner = ServerRunner.remote([ + "--model", + MODEL_NAME, + "--dtype", + "bfloat16", # use half precision for speed and memory savings in CI environment + "--max-model-len", + "8192" + ]) + ray.get(server_runner.ready.remote()) + yield server_runner + ray.shutdown() + + +@pytest.fixture(scope="session") +def client(): + client = openai.AsyncOpenAI( + base_url="http://localhost:8000/v1", + api_key="token-abc123", + ) + yield client + + +async def test_single_completion(server, client: openai.AsyncOpenAI): + completion = await client.completions.create(model=MODEL_NAME, + prompt="Hello, my name is", + max_tokens=5, + temperature=0.0) + + assert completion.id is not None + assert completion.choices is not None and len(completion.choices) == 1 + assert completion.choices[0].text is not None and len( + completion.choices[0].text) >= 5 + assert completion.choices[0].finish_reason == "length" + assert completion.usage == openai.types.CompletionUsage( + completion_tokens=5, prompt_tokens=6, total_tokens=11) + + +async def test_single_chat_session(server, client: openai.AsyncOpenAI): + messages = [{ + "role": "system", + "content": "you are a helpful assistant" + }, { + "role": "user", + "content": "what is 1+1?" + }] + + # test single completion + chat_completion = await client.chat.completions.create( + model=MODEL_NAME, + messages=messages, + max_tokens=10, + ) + assert chat_completion.id is not None + assert chat_completion.choices is not None and len( + chat_completion.choices) == 1 + assert chat_completion.choices[0].message is not None + message = chat_completion.choices[0].message + assert message.content is not None and len(message.content) >= 10 + assert message.role == "assistant" + messages.append({"role": "assistant", "content": message.content}) + + # test multi-turn dialogue + messages.append({"role": "user", "content": "express your result in json"}) + chat_completion = await client.chat.completions.create( + model=MODEL_NAME, + messages=messages, + max_tokens=10, + ) + message = chat_completion.choices[0].message + assert message.content is not None and len(message.content) >= 0 + + +async def test_completion_streaming(server, client: openai.AsyncOpenAI): + prompt = "What is an LLM?" + + single_completion = await client.completions.create( + model=MODEL_NAME, + prompt=prompt, + max_tokens=5, + temperature=0.0, + ) + single_output = single_completion.choices[0].text + single_usage = single_completion.usage + + stream = await client.completions.create( + model=MODEL_NAME, + prompt=prompt, + max_tokens=5, + temperature=0.0, + stream=True, + ) + chunks = [] + async for chunk in stream: + chunks.append(chunk.choices[0].text) + assert chunk.choices[0].finish_reason == "length" + assert chunk.usage == single_usage + assert "".join(chunks) == single_output + + +async def test_chat_streaming(server, client: openai.AsyncOpenAI): + messages = [{ + "role": "system", + "content": "you are a helpful assistant" + }, { + "role": "user", + "content": "what is 1+1?" + }] + + # test single completion + chat_completion = await client.chat.completions.create( + model=MODEL_NAME, + messages=messages, + max_tokens=10, + temperature=0.0, + ) + output = chat_completion.choices[0].message.content + stop_reason = chat_completion.choices[0].finish_reason + + # test streaming + stream = await client.chat.completions.create( + model=MODEL_NAME, + messages=messages, + max_tokens=10, + temperature=0.0, + stream=True, + ) + chunks = [] + async for chunk in stream: + delta = chunk.choices[0].delta + if delta.role: + assert delta.role == "assistant" + if delta.content: + chunks.append(delta.content) + assert chunk.choices[0].finish_reason == stop_reason + assert "".join(chunks) == output + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index c248b71b7862..d652045c8ad7 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -167,8 +167,7 @@ async def create_completion(request: CompletionRequest, raw_request: Request): openai_serving_chat = OpenAIServingChat(engine, served_model, args.response_role, args.chat_template) - openai_serving_completion = OpenAIServingCompletion( - engine, served_model) + openai_serving_completion = OpenAIServingCompletion(engine, served_model) # Register labels for metrics add_global_metrics_labels(model_name=engine_args.model) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index 6429b82fbd44..9b843a94de10 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -18,13 +18,13 @@ class OpenAIServingChat(OpenAIServing): + def __init__(self, engine: AsyncLLMEngine, served_model: str, response_role: str, chat_template=None): - super().__init__(engine=engine, - served_model=served_model) + super().__init__(engine=engine, served_model=served_model) self.response_role = response_role self._load_chat_template(chat_template) @@ -162,11 +162,12 @@ async def chat_completion_stream_generator( if finish_reason_sent[i]: continue + delta_text = output.text[len(previous_texts[i]):] + previous_texts[i] = output.text + previous_num_tokens[i] = len(output.token_ids) + if output.finish_reason is None: # Send token-by-token response for each request.n - delta_text = output.text[len(previous_texts[i]):] - previous_texts[i] = output.text - previous_num_tokens[i] = len(output.token_ids) choice_data = ChatCompletionResponseStreamChoice( index=i, delta=DeltaMessage(content=delta_text), @@ -188,7 +189,9 @@ async def chat_completion_stream_generator( total_tokens=prompt_tokens + previous_num_tokens[i], ) choice_data = ChatCompletionResponseStreamChoice( - index=i, delta=[], finish_reason=output.finish_reason) + index=i, + delta=DeltaMessage(content=delta_text), + finish_reason=output.finish_reason) chunk = ChatCompletionStreamResponse( id=request_id, object=chunk_object_type, diff --git a/vllm/entrypoints/openai/serving_completion.py b/vllm/entrypoints/openai/serving_completion.py index 14a5f38bb4dc..d842d1a2a919 100644 --- a/vllm/entrypoints/openai/serving_completion.py +++ b/vllm/entrypoints/openai/serving_completion.py @@ -16,9 +16,9 @@ class OpenAIServingCompletion(OpenAIServing): + def __init__(self, engine: AsyncLLMEngine, served_model: str): - super().__init__(engine=engine, - served_model=served_model) + super().__init__(engine=engine, served_model=served_model) async def create_completion(self, request: CompletionRequest, raw_request: Request): diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 2a52ec8f365d..e77a0720e498 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -14,6 +14,7 @@ class OpenAIServing: + def __init__(self, engine: AsyncLLMEngine, served_model: str): self.engine = engine self.served_model = served_model From d8cb6c5f4cc7548258712bf37ac54e58c7cba8b1 Mon Sep 17 00:00:00 2001 From: simon-mo Date: Tue, 16 Jan 2024 19:48:49 +0000 Subject: [PATCH 09/14] reset debugging clause --- tests/entrypoints/test_openai_server.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/entrypoints/test_openai_server.py b/tests/entrypoints/test_openai_server.py index 7e1558c25e51..3dadc05d5924 100644 --- a/tests/entrypoints/test_openai_server.py +++ b/tests/entrypoints/test_openai_server.py @@ -51,8 +51,6 @@ def __del__(self): @pytest.fixture(scope="session") def server(): - yield - return ray.init() server_runner = ServerRunner.remote([ "--model", From 29e7d439681f19b636f6c62ba477e8486927b357 Mon Sep 17 00:00:00 2001 From: simon-mo Date: Tue, 16 Jan 2024 22:02:00 +0000 Subject: [PATCH 10/14] fix chat template --- ...openai_server.py => test_chat_template.py} | 31 ++++++++----------- 1 file changed, 13 insertions(+), 18 deletions(-) rename tests/async_engine/{test_openai_server.py => test_chat_template.py} (82%) diff --git a/tests/async_engine/test_openai_server.py b/tests/async_engine/test_chat_template.py similarity index 82% rename from tests/async_engine/test_openai_server.py rename to tests/async_engine/test_chat_template.py index ff1ce423c517..9829ed69845f 100644 --- a/tests/async_engine/test_openai_server.py +++ b/tests/async_engine/test_chat_template.py @@ -1,12 +1,12 @@ -from argparse import Namespace from dataclasses import dataclass import os import pathlib import pytest -from fastapi.testclient import TestClient -from vllm.entrypoints.openai.api_server import * +from vllm.transformers_utils.tokenizer import get_tokenizer +from vllm.entrypoints.openai.serving_chat import OpenAIServingChat +from vllm.entrypoints.openai.protocol import ChatCompletionRequest chatml_jinja_path = pathlib.Path(os.path.dirname(os.path.abspath( __file__))).parent.parent / "examples/template_chatml.jinja" @@ -48,21 +48,22 @@ 'content': 'What is the capital of' }, ] -client = TestClient(app) @dataclass class MockTokenizer: chat_template = None +@dataclass +class MockServingChat: + tokenizer: MockTokenizer + def test_load_chat_template(): # Testing chatml template - mock_args = Namespace(chat_template=chatml_jinja_path) tokenizer = MockTokenizer() - - # Call the function with the mocked args - load_chat_template(mock_args, tokenizer) + mock_serving_chat = MockServingChat(tokenizer) + OpenAIServingChat._load_chat_template(mock_serving_chat, chat_template=chatml_jinja_path) template_content = tokenizer.chat_template @@ -76,11 +77,10 @@ def test_load_chat_template(): def test_no_load_chat_template(): # Testing chatml template template = "../../examples/does_not_exist" - mock_args = Namespace(chat_template=template) tokenizer = MockTokenizer() - # Call the function with the mocked args - load_chat_template(mock_args, tokenizer=tokenizer) + mock_serving_chat = MockServingChat(tokenizer) + OpenAIServingChat._load_chat_template(mock_serving_chat, chat_template=template) template_content = tokenizer.chat_template # Test assertions @@ -97,9 +97,8 @@ async def test_get_gen_prompt(model, template, add_generation_prompt, expected_output): # Initialize the tokenizer tokenizer = get_tokenizer(tokenizer_name=model) - - mock_args = Namespace(chat_template=template) - load_chat_template(mock_args, tokenizer) + mock_serving_chat = MockServingChat(tokenizer) + OpenAIServingChat._load_chat_template(mock_serving_chat, chat_template=template) # Create a mock request object using keyword arguments mock_request = ChatCompletionRequest( @@ -116,7 +115,3 @@ async def test_get_gen_prompt(model, template, add_generation_prompt, # Test assertion assert result == expected_output, f"The generated prompt does not match the expected output for model {model} and template {template}" - -def test_health_endpoint(): - response = client.get("/health") - assert response.status_code == 200 From b5ebd47d5532bdf392301dace32f4008eaf3121c Mon Sep 17 00:00:00 2001 From: simon-mo Date: Tue, 16 Jan 2024 22:06:39 +0000 Subject: [PATCH 11/14] format --- tests/async_engine/test_chat_template.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/async_engine/test_chat_template.py b/tests/async_engine/test_chat_template.py index 9829ed69845f..32d110e0f0b4 100644 --- a/tests/async_engine/test_chat_template.py +++ b/tests/async_engine/test_chat_template.py @@ -54,6 +54,7 @@ class MockTokenizer: chat_template = None + @dataclass class MockServingChat: tokenizer: MockTokenizer @@ -63,7 +64,8 @@ def test_load_chat_template(): # Testing chatml template tokenizer = MockTokenizer() mock_serving_chat = MockServingChat(tokenizer) - OpenAIServingChat._load_chat_template(mock_serving_chat, chat_template=chatml_jinja_path) + OpenAIServingChat._load_chat_template(mock_serving_chat, + chat_template=chatml_jinja_path) template_content = tokenizer.chat_template @@ -80,7 +82,8 @@ def test_no_load_chat_template(): tokenizer = MockTokenizer() mock_serving_chat = MockServingChat(tokenizer) - OpenAIServingChat._load_chat_template(mock_serving_chat, chat_template=template) + OpenAIServingChat._load_chat_template(mock_serving_chat, + chat_template=template) template_content = tokenizer.chat_template # Test assertions @@ -98,7 +101,8 @@ async def test_get_gen_prompt(model, template, add_generation_prompt, # Initialize the tokenizer tokenizer = get_tokenizer(tokenizer_name=model) mock_serving_chat = MockServingChat(tokenizer) - OpenAIServingChat._load_chat_template(mock_serving_chat, chat_template=template) + OpenAIServingChat._load_chat_template(mock_serving_chat, + chat_template=template) # Create a mock request object using keyword arguments mock_request = ChatCompletionRequest( @@ -114,4 +118,3 @@ async def test_get_gen_prompt(model, template, add_generation_prompt, # Test assertion assert result == expected_output, f"The generated prompt does not match the expected output for model {model} and template {template}" - From ff80aad0e9d6e7b29cf1f82645a1a79e253df6f3 Mon Sep 17 00:00:00 2001 From: simon-mo Date: Wed, 17 Jan 2024 04:29:04 +0000 Subject: [PATCH 12/14] Add test case to build pipeline --- .buildkite/test-pipeline.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.buildkite/test-pipeline.yaml b/.buildkite/test-pipeline.yaml index 3cd1bed0e50a..a6f3a3f0a2e3 100644 --- a/.buildkite/test-pipeline.yaml +++ b/.buildkite/test-pipeline.yaml @@ -19,6 +19,9 @@ steps: - label: Engine Test command: pytest -v -s engine +- label: Entrypoints Test + command: pytest -v -s entrypoints + - label: Kernels Test command: pytest -v -s kernels soft_fail: true From b3cc26f75ece6f5e5191dba02d303854715f1517 Mon Sep 17 00:00:00 2001 From: simon-mo Date: Wed, 17 Jan 2024 04:32:21 +0000 Subject: [PATCH 13/14] add test dependencies --- requirements-dev.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/requirements-dev.txt b/requirements-dev.txt index 89f8b3f08dbf..f8126008d079 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -16,3 +16,6 @@ pytest-asyncio httpx einops # required for MPT flash_attn # required for HuggingFace's llama implementation +openai +requests +ray \ No newline at end of file From 69c283f415b0e8f71f183d194a8f184c7a6a2851 Mon Sep 17 00:00:00 2001 From: simon-mo Date: Wed, 17 Jan 2024 05:11:53 +0000 Subject: [PATCH 14/14] use python3 --- tests/entrypoints/test_openai_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/entrypoints/test_openai_server.py b/tests/entrypoints/test_openai_server.py index 3dadc05d5924..707ab6d28d92 100644 --- a/tests/entrypoints/test_openai_server.py +++ b/tests/entrypoints/test_openai_server.py @@ -18,7 +18,7 @@ class ServerRunner: def __init__(self, args): self.proc = subprocess.Popen( - ["python", "-m", "vllm.entrypoints.openai.api_server"] + args, + ["python3", "-m", "vllm.entrypoints.openai.api_server"] + args, stdout=sys.stdout, stderr=sys.stderr, )