diff --git a/docs/cli/README.md b/docs/cli/README.md index 8069110b3ca..1fcfdb14eac 100644 --- a/docs/cli/README.md +++ b/docs/cli/README.md @@ -22,3 +22,21 @@ If you have custom stage configs file, launch the server with command below ```bash vllm serve Qwen/Qwen2.5-Omni-7B --omni --stage-configs-path /path/to/stage_configs_file ``` + + +## bench + +Run benchmark tests for online serving throughput. +Available Commands: + +```bash +vllm bench serve --omni \ + --model Qwen/Qwen2.5-Omni-7B \ + --host server-host \ + --port server-port \ + --random-input-len 32 \ + --random-output-len 4 \ + --num-prompts 5 +``` + +See [vllm bench serve](./bench/serve.md) for the full reference of all available arguments. diff --git a/docs/cli/bench/serve.md b/docs/cli/bench/serve.md new file mode 100644 index 00000000000..bcb1df74cd9 --- /dev/null +++ b/docs/cli/bench/serve.md @@ -0,0 +1,359 @@ +# vLLM-Omni Benchmark CLI Guide +The vllm bench command launches the vLLM-Omni benchmark to evaluate the performance of multimodal models. + +## Notes +We currently only support using the "openai-chat-omni" backend. + +## Basic Parameter Description +You can use `vllm bench serve --omni --help=all` to get descriptions of all parameters. The commonly used parameters are described below: +- `--omni` + Enable Omni (multimodal) mode, supporting multimodal inputs and outputs such as images, videos, and audio. + +- `--backend` + Specify the backend adapter as openai-chat-omni, using OpenAI Chat compatible API behavior as the protocol. Currently only openai-chat-omni is supported. + +- `--model` + The model identifier to load, filled according to the models supported by vLLM-Omni. + +- `--endpoint` + The API endpoint exposed externally, to which clients send their requests. + +- `--dataset-name` + The name of the dataset used; random-mm indicates generating random multimodal inputs (images, videos, audio). + +- `--num-prompts` + The total number of requests to send, an integer. + +- `--max-concurrency` + "Maximum number of concurrent requests. This can be used " + "to help simulate an environment where a higher level component " + "is enforcing a maximum number of concurrent requests. While the " + "--request-rate argument controls the rate at which requests are " + "initiated, this argument will control how many are actually allowed " + "to execute at a time. This means that when used in combination, the " + "actual request rate may be lower than specified with --request-rate, " + "if the server is not processing requests fast enough to keep up." + +- `--request-rate` + "Number of requests per second. If this is inf, " + "then all the requests are sent at time 0. " + "Otherwise, we use Poisson process or gamma distribution " + "to synthesize the request arrival times." + +- `--ignore-eos` + "Set ignore_eos flag when sending the benchmark request." + +- `--metric-percentiles` + Comma-separated list of percentiles for selected metrics. " + "To report 25-th, 50-th, and 75-th percentiles, use \"25,50,75\". " + "Default value is \"99\"." + "Use \"--percentile-metrics\" to select metrics. + +- `--percentile-metrics` + "Comma-separated list of selected metrics to report percentiles." + "This argument specifies the metrics to report percentiles." + 'Allowed metric names are "ttft", "tpot", "itl", "e2el", "audio_ttfp", "audio_rtf". ' + +- `--save-result` +Specify to save benchmark results to a json file + +- `--save-detailed` +"When saving the results, whether to include per request " + "information such as response, error, ttfs, tpots, etc." + +- `--result-dir` + "Specify directory to save benchmark json results." + "If not specified, results are saved in the current directory." + +- `--result-filename` +"Specify the filename to save benchmark json results." + "If not specified, results will be saved in " + "{label}-{args.request_rate}qps-{base_model_id}-{current_dt}.json" + +- `--random-prefix-len` + Number of fixed prefix tokens before the random context in a request. + The total input length is the sum of random-prefix-len and a random + context length sampled from [input_len * (1 - range_ratio), + input_len * (1 + range_ratio)].Only the random and random-mm modes + support this parameter. + +- `--random-input-len` + Number of input tokens per request.Only the random and random-mm modes support this parameter. + +- `--random-output-len` + Number of output tokens per request.Only the random and random-mm modes support this parameter. + +- `--random-range-ratio` + Range ratio for sampling input/output length, + used only for random sampling. Must be in the range [0, 1) to define + a symmetric sampling range + [length * (1 - range_ratio), length * (1 + range_ratio)]. + Only the random and random-mm modes support this parameter. + +- `--random-mm-base-items-per-request` + Base number of multimodal items per request for random-mm. + Actual per-request count is sampled around this base using + --random-mm-num-mm-items-range-ratio. + Only the random-mm mode supports this parameter. + +- `--random-mm-limit-mm-per-prompt` + Per-modality hard caps for items attached per request, e.g. + '{"image": 3, "video": 1, "audio": 1}'. The sampled per-request item + count is clamped to the sum of these limits. When a modality + reaches its cap, its buckets are excluded and probabilities are + renormalized. + Only the random-mm mode supports this parameter. + +- `--random-mm-num-mm-items-range-ratio` + Range ratio r in [0, 1] for sampling items per request. + We sample uniformly from the closed integer range + [floor(n*(1-r)), ceil(n*(1+r))] + where n is the base items per request. + r=0 keeps it fixed; r=1 allows 0 items. The maximum is clamped + to the sum of per-modality limits from + --random-mm-limit-mm-per-prompt. + An error is raised if the computed min exceeds the max. + Only the random-mm mode supports this parameter. + +- `--random-mm-bucket-config` + The bucket config is a dictionary mapping a multimodal item + sampling configuration to a probability. + Currently allows for 3 modalities: audio, images and videos. + A bucket key is a tuple of (height, width, num_frames) + The value is the probability of sampling that specific item. + Example: + --random-mm-bucket-config + "{(256, 256, 1): 0.5, (720, 1280, 16): 0.4, (0, 1, 5): 0.10}" + First item: images with resolution 256x256 w.p. 0.5 + Second item: videos with resolution 720x1280 and 16 frames + Third item: audios with 1s duration and 5 channels w.p. 0.1 + OBS.: If the probabilities do not sum to 1, they are normalized. + Only the random-mm mode supports this parameter. + +## Usage Examples + +### Online Benchmark +
+Show more + +First start serving your model: + +```bash +vllm serve Qwen/Qwen2.5-Omni-7B --omni +``` + +Then run the benchmarking for sharegpt: + +```bash +# download dataset +# wget https://huggingface.co/datasets/anon8231489123/ShareGPT_Vicuna_unfiltered/resolve/main/ShareGPT_V3_unfiltered_cleaned_split.json +vllm bench serve \ + --omni \ + --port 43845 \ + --model /home/models/Qwen/Qwen3-Omni-30B-A3B-Instruct \ + --endpoint /v1/chat/completions \ + --backend openai-chat-omni \ + --num-prompts 2 \ + --dataset-name sharegpt \ + --dataset-path ShareGPT_V3_unfiltered_cleaned_split.json \ + --percentile-metrics ttft,tpot,itl,e2el +``` +If successful, you will see the following output: +```text +============ Serving Benchmark Result ============ +Successful requests: 2 +Failed requests: 0 +Benchmark duration (s): 81.63 +Request throughput (req/s): 0.02 +Peak concurrent requests: 2.00 +----------------End-to-end Latency---------------- +Mean E2EL (ms): 56966.13 +Median E2EL (ms): 56966.13 +P99 E2EL (ms): 81016.80 +================== Text Result =================== +Total input tokens: 36 +Total generated tokens: 5926 +Output token throughput (tok/s): 72.60 +Peak output token throughput (tok/s): 103.00 +Peak concurrent requests: 2.00 +Total Token throughput (tok/s): 73.04 +---------------Time to First Token---------------- +Mean TTFT (ms): 124.76 +Median TTFT (ms): 124.76 +P99 TTFT (ms): 156.10 +-----Time per Output Token (excl. 1st token)------ +Mean TPOT (ms): 481.30 +Median TPOT (ms): 481.30 +P99 TPOT (ms): 947.55 +---------------Inter-token Latency---------------- +Mean ITL (ms): 25.11 +Median ITL (ms): 0.33 +P99 ITL (ms): 25.17 +================== Audio Result ================== +Total audio duration generated(s): 3.95 +Total audio frames generated: 94890 +Audio throughput(audio duration/s): 0.05 +================================================== +``` + +Or run the benchmarking for random: + +```bash +vllm bench serve \ + --omni \ + --port 43845 \ + --endpoint /v1/chat/completions \ + --backend openai-chat-omni \ + --model /home/models/Qwen/Qwen3-Omni-30B-A3B-Instruct \ + --dataset-name random \ + --num-prompts 2 \ + --random-prefix-len 5 \ + --random-input-len 10 \ + --random-output-len 100 \ + --percentile-metrics ttft,tpot,itl,e2el,audio_ttfp,audio_rtf \ + --ignore-eos +``` + +If successful, you will see the following output: + +```text +============ Serving Benchmark Result ============ +Successful requests: 2 +Failed requests: 0 +Benchmark duration (s): 24.35 +Request throughput (req/s): 0.08 +Peak concurrent requests: 2.00 +----------------End-to-end Latency---------------- +Mean E2EL (ms): 22576.23 +Median E2EL (ms): 22576.23 +P99 E2EL (ms): 24205.72 +================== Text Result =================== +Total input tokens: 30 +Total generated tokens: 8973 +Output token throughput (tok/s): 368.52 +Peak output token throughput (tok/s): 81.00 +Peak concurrent requests: 2.00 +Total Token throughput (tok/s): 369.76 +---------------Time to First Token---------------- +Mean TTFT (ms): 125.16 +Median TTFT (ms): 125.16 +P99 TTFT (ms): 155.88 +-----Time per Output Token (excl. 1st token)------ +Mean TPOT (ms): 5.01 +Median TPOT (ms): 5.01 +P99 TPOT (ms): 5.42 +---------------Inter-token Latency---------------- +Mean ITL (ms): 34.15 +Median ITL (ms): 0.01 +P99 ITL (ms): 376.19 +================== Audio Result ================== +Total audio duration generated(s): 3.95 +Total audio frames generated: 94890 +Audio throughput(audio duration/s): 0.16 +---------------Time to First Packet--------------- +Mean AUDIO_TTFP (ms): 11756.89 +Median AUDIO_TTFP (ms): 11756.89 +P99 AUDIO_TTFP (ms): 20854.25 +-----------------Real Time Factor----------------- +Mean AUDIO_RTF: 3.75 +Median AUDIO_RTF: 3.75 +P99 AUDIO_RTF: 7.39 +================================================== +``` +Notes: +We use (audio generation time - first packet latency) / audio duration to calculate RTF. + +
+ +### Multi-Modal Benchmark + +
+Show more + +Benchmark the performance of multi-modal requests in vLLM-Omni. + +Generate synthetic image、video、audio inputs alongside random text prompts to stress-test vision models without external datasets. + +Notes: + +- Works only with online benchmark via the OpenAI backend (`--backend openai-chat-omni`) and endpoint `/v1/chat/completions`. + +Start the server (example): + +```bash +vllm serve Qwen/Qwen2.5-Omni-7B --omni +``` + +It is recommended to use the flag `--ignore-eos` to simulate real responses. You can set the size of the output via the arg `random-output-len`. + +Then run the benchmarking script: +```bash +vllm bench serve \ + --omni \ + --dataset-name random-mm \ + --port 40849 \ + --model /home/models/Qwen/Qwen3-Omni-30B-A3B-Instruct \ + --endpoint /v1/chat/completions \ + --backend openai-chat-omni \ + --request-rate 1 \ + --num-prompts 1 \ + --random-input-len 10 \ + --random-range-ratio 0.0 \ + --random-mm-base-items-per-request 2 \ + --random-mm-num-mm-items-range-ratio 0 \ + --random-mm-limit-mm-per-prompt '{"image":1,"video":1, "audio": 1}' \ + --random-mm-bucket-config '{"(32, 32, 1)": 0.5, "(0, 1, 1)": 0.1, "(32, 32, 2)":0.4}' \ + --ignore-eos \ + --percentile-metrics ttft,tpot,itl \ + --random-output-len 2 \ + --extra_body '{"modalities": ["text"]}' +``` + +If successful, you will see the following output: + +```text +============ Serving Benchmark Result ============ +Successful requests: 1 +Failed requests: 0 +Request rate configured (RPS): 1.00 +Benchmark duration (s): 1.21 +Request throughput (req/s): 0.83 +Peak concurrent requests: 1.00 +================== Text Result =================== +Total input tokens: 10 +Total generated tokens: 3 +Output token throughput (tok/s): 2.49 +Peak output token throughput (tok/s): 3.00 +Peak concurrent requests: 1.00 +Total Token throughput (tok/s): 10.77 +---------------Time to First Token---------------- +Mean TTFT (ms): 179.74 +Median TTFT (ms): 179.74 +P99 TTFT (ms): 179.74 +-----Time per Output Token (excl. 1st token)------ +Mean TPOT (ms): 12.76 +Median TPOT (ms): 12.76 +P99 TPOT (ms): 12.76 +---------------Inter-token Latency---------------- +Mean ITL (ms): 12.76 +Median ITL (ms): 12.76 +P99 ITL (ms): 25.24 +================== Audio Result ================== +Total audio duration generated(s): 0.00 +Total audio frames generated: 0 +Audio throughput(audio duration/s): 0.00 +================================================== +``` + +Behavioral notes: + +- If the requested base item count cannot be satisfied under the provided per-prompt limits, the tool raises an error rather than silently clamping. + +How sampling works: + +- Determine per-request item count k by sampling uniformly from the integer range defined by `--random-mm-base-items-per-request` and `--random-mm-num-mm-items-range-ratio`, then clamp k to at most the sum of per-modality limits. +- For each of the k items, sample a bucket (H, W, T) according to the normalized probabilities in `--random-mm-bucket-config`, while tracking how many items of each modality have been added. +- If a modality (e.g., image) reaches its limit from `--random-mm-limit-mm-per-prompt`, all buckets of that modality are excluded and the remaining bucket probabilities are renormalized before continuing. +This should be seen as an edge case, and if this behavior can be avoided by setting `--random-mm-limit-mm-per-prompt` to a large number. Note that this might result in errors due to engine config `--limit-mm-per-prompt`. +- The resulting request contains synthetic image data in `multi_modal_data` (OpenAI Chat format). When `random-mm` is used with the OpenAI Chat backend, prompts remain text and MM content is attached via `multi_modal_data`. +
diff --git a/vllm_omni/benchmarks/data_modules/__init__.py b/vllm_omni/benchmarks/data_modules/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/vllm_omni/benchmarks/data_modules/random_multi_modal_dataset.py b/vllm_omni/benchmarks/data_modules/random_multi_modal_dataset.py new file mode 100644 index 00000000000..14ba86cc4cb --- /dev/null +++ b/vllm_omni/benchmarks/data_modules/random_multi_modal_dataset.py @@ -0,0 +1,152 @@ +import base64 +import io +import logging +from collections.abc import Mapping +from typing import Any + +import numpy as np +import soundfile as sf +import torch +from vllm.benchmarks.datasets import RandomMultiModalDataset, process_image, process_video + +logger = logging.getLogger(__name__) + + +def process_audio(audio: Any) -> Mapping[str, Any]: + """ + Process a single audio input and return a multimedia content dictionary. + + Supports the following input types: + + 1. Dictionary with raw audio bytes: - Expects a dict with a 'bytes' key + containing raw audio data. + + 2. String input: - Treats the string as a URL or local file path. - + Prepends "file://" if the string doesn't start with "http://" or + "file://". - Returns a dictionary with the audio URL. + + Raises: + ValueError: If the input is not a supported type. + """ + if isinstance(audio, dict) and "bytes" in audio: + audio_bytes = audio["bytes"] + audio_base64 = base64.b64encode(audio_bytes).decode("utf-8") + return { + "type": "audio_url", + "audio_url": {"url": f"data:audio/mpeg;base64,{audio_base64}"}, + } + if isinstance(audio, str): + audio_url = audio if audio.startswith(("http://", "https://", "file://")) else f"file://{audio}" + return {"type": "audio_url", "audio_url": {"url": audio_url}} + + raise ValueError( + f"Invalid audio input {audio}. Must be a string of local path/remote url, " + f"or a dictionary with raw audio bytes in the form of `{{'bytes': raw_audio_bytes}}`." + ) + + +# ----------------------------------------------------------------------------- +# MultiModalDataset Implementation +# ----------------------------------------------------------------------------- +class OmniRandomMultiModalDataset(RandomMultiModalDataset): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def generate_synthetic_audio( + self, + duration: int, # seconds + num_channels: int, # 1:Mono,2:Stereo 5:5.1 surround sound + ) -> dict[str, Any]: + """Generate synthetic audio with random values. + Default use 48000Hz. + """ + sample_rate = 48000 + num_samples = int(sample_rate * duration) + audio_data = self._rng.uniform(-0.5, 0.5, (num_samples, num_channels)) + audio_data = np.clip(audio_data, -1.0, 1.0) + audio_tensor = torch.FloatTensor(audio_data.T) + audio_np = audio_tensor.numpy() + + buffer = io.BytesIO() + + sf.write(buffer, audio_np.T, sample_rate, format="wav") + + buffer.seek(0) + audio_bytes = buffer.read() + buffer.close() + return { + "bytes": audio_bytes, + } + + def generate_mm_item( + self, + mm_item_config: tuple[int, int, int], + ) -> Mapping[str, Any]: + """ + Create synthetic images and videos and + apply process_image/process_video respectively. + This follows the OpenAI API chat completions + https://github.com/openai/openai-python + """ + + if self.map_config_to_modality(mm_item_config) == "image": + return process_image(self.generate_synthetic_image(mm_item_config[1], mm_item_config[0])) + elif self.map_config_to_modality(mm_item_config) == "video": + return process_video(self.generate_synthetic_video(mm_item_config[1], mm_item_config[0], mm_item_config[2])) + elif self.map_config_to_modality(mm_item_config) == "audio": + return process_audio(self.generate_synthetic_audio(mm_item_config[1], mm_item_config[2])) + else: + raise ValueError(f"Invalid multimodal item configuration: {mm_item_config}") + + def generate_synthetic_video(self, width: int, height: int, num_frames: int) -> Any: + """Generate synthetic video with random values.""" + import imageio + + video_data = self._rng.integers( + 0, + 256, + (num_frames, height, width, 3), + dtype=np.uint8, + ) + buffer = io.BytesIO() + writer_kwargs = { + "format": "mp4", + "fps": 30, + "codec": "libx264", + "quality": 7, + "pixelformat": "yuv420p", + "macro_block_size": 16, + "ffmpeg_params": [ + "-preset", + "medium", + "-crf", + "23", + "-movflags", + "+faststart", + "-pix_fmt", + "yuv420p", + "-vf", + f"scale={width}:{height}", + ], + } + + with imageio.get_writer(buffer, **writer_kwargs) as writer: + for frame_idx in range(num_frames): + writer.append_data(video_data[frame_idx]) + buffer.seek(0) + video_bytes = buffer.read() + + return { + "bytes": video_bytes, + } + + def map_config_to_modality(self, config: tuple[int, int, int]) -> str: + """Map the configuration to the modality.""" + if config[0] == 0: + return "audio" + elif config[-1] == 1: + return "image" + elif config[-1] > 1: + return "video" + else: + raise ValueError(f"Invalid multimodal item configuration: {config}") diff --git a/vllm_omni/benchmarks/metrics/__init__.py b/vllm_omni/benchmarks/metrics/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/vllm_omni/benchmarks/metrics/metrics.py b/vllm_omni/benchmarks/metrics/metrics.py new file mode 100644 index 00000000000..f404a12f8e6 --- /dev/null +++ b/vllm_omni/benchmarks/metrics/metrics.py @@ -0,0 +1,330 @@ +import warnings +from dataclasses import dataclass + +import numpy as np +from transformers import PreTrainedTokenizerBase +from vllm.benchmarks.datasets import SampleRequest +from vllm.benchmarks.lib.endpoint_request_func import RequestFuncOutput +from vllm.benchmarks.serve import MILLISECONDS_TO_SECONDS_CONVERSION, TERM_PLOTLIB_AVAILABLE, BenchmarkMetrics, TaskType + + +@dataclass +class MultiModalsBenchmarkMetrics(BenchmarkMetrics): + mean_audio_ttfp_ms: float = 0.0 + median_audio_ttfp_ms: float = 0.0 + std_audio_ttfp_ms: float = 0.0 + percentiles_audio_ttfp_ms: list[tuple[float, float]] = None + total_audio_duration_ms: float = 0.0 + total_audio_frames: int = 0 + audio_throughput: float = 0.0 + mean_audio_rtf: float = 0.0 + median_audio_rtf: float = 0.0 + std_audio_rtf: float = 0.0 + percentiles_audio_rtf: list[tuple[float, float]] = None + + +def print_metrics( + task_type, + selected_percentile_metrics, + max_concurrency, + request_rate, + benchmark_duration, + goodput_config_dict, + metrics: MultiModalsBenchmarkMetrics, +): + print("{s:{c}^{n}}".format(s=" Serving Benchmark Result ", n=50, c="=")) + print("{:<40} {:<10}".format("Successful requests:", metrics.completed)) + print("{:<40} {:<10}".format("Failed requests:", metrics.failed)) + if max_concurrency is not None: + print("{:<40} {:<10}".format("Maximum request concurrency:", max_concurrency)) + if request_rate != float("inf"): + print("{:<40} {:<10.2f}".format("Request rate configured (RPS):", request_rate)) + print("{:<40} {:<10.2f}".format("Benchmark duration (s):", benchmark_duration)) + print("{:<40} {:<10.2f}".format("Request throughput (req/s):", metrics.request_throughput)) + if goodput_config_dict: + print("{:<40} {:<10.2f}".format("Request goodput (req/s):", metrics.request_goodput)) + if isinstance(metrics, MultiModalsBenchmarkMetrics): + print("{:<40} {:<10.2f}".format("Peak concurrent requests:", metrics.max_concurrent_requests)) + if task_type != TaskType.GENERATION or "e2el" in selected_percentile_metrics: + process_one_metric("e2el", metrics) + print_text_metrics(task_type, selected_percentile_metrics, metrics) + if task_type == TaskType.GENERATION: + print_audio_metrics(selected_percentile_metrics, metrics) + print("=" * 50) + + +def print_text_metrics(task_type, selected_percentile_metrics, metrics: MultiModalsBenchmarkMetrics): + print("{s:{c}^{n}}".format(s=" Text Result ", n=50, c="=")) + print("{:<40} {:<10}".format("Total input tokens:", metrics.total_input)) + if isinstance(metrics, MultiModalsBenchmarkMetrics): + print("{:<40} {:<10}".format("Total generated tokens:", metrics.total_output)) + print("{:<40} {:<10.2f}".format("Output token throughput (tok/s):", metrics.output_throughput)) + print("{:<40} {:<10.2f}".format("Peak output token throughput (tok/s):", metrics.max_output_tokens_per_s)) + print("{:<40} {:<10.2f}".format("Peak concurrent requests:", metrics.max_concurrent_requests)) + print("{:<40} {:<10.2f}".format("Total Token throughput (tok/s):", metrics.total_token_throughput)) + + if task_type == TaskType.GENERATION: + for metric in selected_percentile_metrics: + if metric == "e2el": + continue + if not metric.startswith("audio"): + process_one_metric(metric, metrics) + + +def print_audio_metrics(selected_percentile_metrics, metrics: MultiModalsBenchmarkMetrics): + print("{s:{c}^{n}}".format(s=" Audio Result ", n=50, c="=")) + print("{:<40} {:<10.2f}".format("Total audio duration generated(s):", metrics.total_audio_duration_ms)) + print("{:<40} {:<10}".format("Total audio frames generated:", metrics.total_audio_frames)) + print("{:<40} {:<10.2f}".format("Audio throughput(audio duration/s):", metrics.audio_throughput)) + for metric in selected_percentile_metrics: + if metric.startswith("audio"): + process_one_metric(metric, metrics) + + +def process_one_metric( + metric_attribute_name: str, + metrics: MultiModalsBenchmarkMetrics, +): + metric_header_map = { + "ttft": "Time to First Token", + "tpot": "Time per Output Token (excl. 1st token)", + "itl": "Inter-token Latency", + "e2el": "End-to-end Latency", + "audio_ttfp": "Time to First Packet", + "audio_rtf": "Real Time Factor", + } + + header = metric_header_map.get(metric_attribute_name, metric_attribute_name) + print("{s:{c}^{n}}".format(s=header, n=50, c="-")) + + is_audio_rtf = metric_attribute_name == "audio_rtf" + + suffix = "" if is_audio_rtf else "_ms" + unit_suffix = "" if is_audio_rtf else " (ms)" + + mean_attr_name = f"mean_{metric_attribute_name}{suffix}" + mean_value = getattr(metrics, mean_attr_name, 0.0) + print(f"{f'Mean {metric_attribute_name.upper()}{unit_suffix}:':<40} {mean_value:<10.2f}") + + median_attr_name = f"median_{metric_attribute_name}{suffix}" + median_value = getattr(metrics, median_attr_name, 0.0) + print(f"{f'Median {metric_attribute_name.upper()}{unit_suffix}:':<40} {median_value:<10.2f}") + + percentiles_attr_name = f"percentiles_{metric_attribute_name}{suffix}" + percentiles = getattr(metrics, percentiles_attr_name, []) + + for percentile, value in percentiles: + p_str = str(int(percentile)) if percentile.is_integer() else str(percentile) + label = f"P{p_str} {metric_attribute_name.upper()}{unit_suffix}:" + print(f"{label:<40} {value:<10.2f}") + + +def calculate_metrics( + input_requests: list[SampleRequest], + outputs: list[RequestFuncOutput], + dur_s: float, + tokenizer: PreTrainedTokenizerBase, + selected_percentiles: list[float], + goodput_config_dict: dict[str, float], + task_type, + selected_percentile_metrics, + max_concurrency, + request_rate, + benchmark_duration, +) -> tuple[BenchmarkMetrics, list[int]]: + """Calculate the metrics for the benchmark. + + Args: + input_requests: The input requests. + outputs: The outputs of the requests. + dur_s: The duration of the benchmark. + tokenizer: The tokenizer to use. + selected_percentiles: The percentiles to select. + goodput_config_dict: The goodput configuration. + + Returns: + A tuple of the benchmark metrics and the actual output lengths. + """ + actual_output_lens: list[int] = [] + total_input = 0 + completed = 0 + good_completed = 0 + itls: list[float] = [] + tpots: list[float] = [] + all_tpots: list[float] = [] + ttfts: list[float] = [] + e2els: list[float] = [] + audio_ttfps: list[float] = [] + audio_rtfs: list[float] = [] + audio_duration: list[float] = [] + audio_frames: list[int] = [] + for i in range(len(outputs)): + if outputs[i].success: + output_len = outputs[i].output_tokens + + if not output_len: + # We use the tokenizer to count the number of output tokens + # for some serving backends instead of looking at + # len(outputs[i].itl) since multiple output tokens may be + # bundled together + # Note : this may inflate the output token count slightly + output_len = len(tokenizer(outputs[i].generated_text, add_special_tokens=False).input_ids) + actual_output_lens.append(output_len) + total_input += input_requests[i].prompt_len + tpot = 0 + if output_len > 1: + latency_minus_ttft = outputs[i].latency - outputs[i].ttft + tpot = latency_minus_ttft / (output_len - 1) + tpots.append(tpot) + # Note: if output_len <= 1, we regard tpot as 0 for goodput + all_tpots.append(tpot) + itls += outputs[i].itl + ttfts.append(outputs[i].ttft) + audio_ttfps.append(getattr(outputs[i], "audio_ttfp", 0.0)) + audio_rtfs.append(getattr(outputs[i], "audio_rtf", 0.0)) + audio_duration.append(getattr(outputs[i], "audio_duration", 0.0)) + audio_frames.append(getattr(outputs[i], "audio_frames", 0.0)) + e2els.append(outputs[i].latency) + completed += 1 + else: + actual_output_lens.append(0) + + if goodput_config_dict: + valid_metrics = [] + slo_values = [] + + if "ttft" in goodput_config_dict: + valid_metrics.append(ttfts) + slo_values.append(goodput_config_dict["ttft"] / MILLISECONDS_TO_SECONDS_CONVERSION) + if "audio_ttft" in goodput_config_dict: + valid_metrics.append(audio_ttfps) + slo_values.append(goodput_config_dict["audio_ttft"] / MILLISECONDS_TO_SECONDS_CONVERSION) + if "tpot" in goodput_config_dict: + valid_metrics.append(all_tpots) + slo_values.append(goodput_config_dict["tpot"] / MILLISECONDS_TO_SECONDS_CONVERSION) + if "e2el" in goodput_config_dict: + valid_metrics.append(e2els) + slo_values.append(goodput_config_dict["e2el"] / MILLISECONDS_TO_SECONDS_CONVERSION) + + for req_metric in zip(*valid_metrics): + is_good_req = all([s >= r for s, r in zip(slo_values, req_metric)]) + if is_good_req: + good_completed += 1 + + if completed == 0: + warnings.warn( + "All requests failed. This is likely due to a misconfiguration on the benchmark arguments.", + stacklevel=2, + ) + + # Calculate max output tokens per second metric + max_output_tokens_per_s = 0.0 + max_concurrent_requests = 0 + + # Find the time range across all successful requests + successful_outputs = [output for output in outputs if output.success] + failed_outputs = [output for output in outputs if not output.success] + if successful_outputs: + min_start_time = min(output.start_time for output in successful_outputs) + max_end_time = max(output.start_time + output.latency for output in successful_outputs) + + # Create second buckets (ceiling to ensure we capture all time) + duration_seconds = int(np.ceil(max_end_time - min_start_time)) + 1 + tokens_per_second = np.zeros(duration_seconds) + concurrent_requests_per_second = np.zeros(duration_seconds) + + for i, output in enumerate(successful_outputs): + # Calculate token generation timestamp using + # start_time, ttft, and itl + token_times = [output.start_time + output.ttft] + current_time = token_times[0] + for itl_value in output.itl: + current_time += itl_value + token_times.append(current_time) + + # Add tokens to second buckets + for token_time in token_times: + second_bucket = int(token_time - min_start_time) + if 0 <= second_bucket < duration_seconds: + tokens_per_second[second_bucket] += 1 + + # Track concurrent requests for each second this request was active + request_start_second = int(output.start_time - min_start_time) + request_end_second = int((output.start_time + output.latency) - min_start_time) + + for second in range(request_start_second, request_end_second + 1): + concurrent_requests_per_second[second] += 1 + + # Find the maximum tokens per second and corresponding + # concurrent requests + if len(tokens_per_second) > 0: + max_output_tokens_per_s = float(np.max(tokens_per_second)) + max_concurrent_requests = int(np.max(concurrent_requests_per_second)) + + if TERM_PLOTLIB_AVAILABLE: + import termplotlib as tpl + + fig = tpl.figure() + fig.plot( + np.arange(len(tokens_per_second)), + tokens_per_second, + title="Output tokens per second", + ) + fig.plot( + np.arange(len(concurrent_requests_per_second)), + concurrent_requests_per_second, + title="Concurrent requests per second", + ) + fig.show() + else: + print("tip: install termplotlib and gnuplot to plot the metrics") + + metrics = MultiModalsBenchmarkMetrics( + completed=completed, + failed=len(failed_outputs), + total_input=total_input, + total_output=sum(actual_output_lens), + request_throughput=completed / dur_s, + request_goodput=good_completed / dur_s, + output_throughput=sum(actual_output_lens) / dur_s, + total_token_throughput=(total_input + sum(actual_output_lens)) / dur_s, + mean_ttft_ms=np.mean(ttfts or 0) * 1000, # ttfts is empty if streaming is not supported by the endpoint + std_ttft_ms=np.std(ttfts or 0) * 1000, + median_ttft_ms=np.median(ttfts or 0) * 1000, + percentiles_ttft_ms=[(p, np.percentile(ttfts or 0, p) * 1000) for p in selected_percentiles], + mean_audio_ttfp_ms=np.mean(audio_ttfps or 0) * 1000, + std_audio_ttfp_ms=np.std(audio_ttfps or 0) * 1000, + median_audio_ttfp_ms=np.median(audio_ttfps or 0) * 1000, + percentiles_audio_ttfp_ms=[(p, np.percentile(audio_ttfps or 0, p) * 1000) for p in selected_percentiles], + total_audio_duration_ms=sum(audio_duration), + total_audio_frames=sum(audio_frames), + audio_throughput=sum(audio_duration) / dur_s, + mean_audio_rtf=np.mean(audio_rtfs or 0), + std_audio_rtf=np.std(audio_rtfs or 0), + median_audio_rtf=np.median(audio_rtfs or 0), + percentiles_audio_rtf=[(p, np.percentile(audio_rtfs or 0, p)) for p in selected_percentiles], + mean_tpot_ms=np.mean(tpots or 0) * 1000, + std_tpot_ms=np.std(tpots or 0) * 1000, + median_tpot_ms=np.median(tpots or 0) * 1000, + percentiles_tpot_ms=[(p, np.percentile(tpots or 0, p) * 1000) for p in selected_percentiles], + mean_itl_ms=np.mean(itls or 0) * 1000, + std_itl_ms=np.std(itls or 0) * 1000, + median_itl_ms=np.median(itls or 0) * 1000, + percentiles_itl_ms=[(p, np.percentile(itls or 0, p) * 1000) for p in selected_percentiles], + mean_e2el_ms=np.mean(e2els or 0) * 1000, + std_e2el_ms=np.std(e2els or 0) * 1000, + median_e2el_ms=np.median(e2els or 0) * 1000, + percentiles_e2el_ms=[(p, np.percentile(e2els or 0, p) * 1000) for p in selected_percentiles], + max_output_tokens_per_s=max_output_tokens_per_s, + max_concurrent_requests=max_concurrent_requests, + ) + print_metrics( + task_type, + selected_percentile_metrics, + max_concurrency, + request_rate, + benchmark_duration, + goodput_config_dict, + metrics, + ) + return metrics, actual_output_lens diff --git a/vllm_omni/benchmarks/patch/__init__.py b/vllm_omni/benchmarks/patch/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/vllm_omni/benchmarks/patch/patch.py b/vllm_omni/benchmarks/patch/patch.py new file mode 100644 index 00000000000..6591af0148f --- /dev/null +++ b/vllm_omni/benchmarks/patch/patch.py @@ -0,0 +1,538 @@ +import asyncio +import base64 +import contextlib +import io +import json +import os +import random +import sys +import time +import traceback +from collections.abc import Iterable +from dataclasses import dataclass +from datetime import datetime +from typing import Literal + +import aiohttp +from pydub import AudioSegment +from tqdm.asyncio import tqdm +from transformers import PreTrainedTokenizerBase +from vllm.benchmarks import datasets +from vllm.benchmarks.datasets import SampleRequest +from vllm.benchmarks.lib.endpoint_request_func import ( + ASYNC_REQUEST_FUNCS, + OPENAI_COMPATIBLE_BACKENDS, + RequestFuncInput, + RequestFuncOutput, + StreamedResponseHandler, + _get_chat_content, + _update_headers_common, + _update_payload_common, + _validate_api_url, +) +from vllm.logger import init_logger + +logger = init_logger(__name__) +from vllm_omni.benchmarks.data_modules.random_multi_modal_dataset import OmniRandomMultiModalDataset + +get_samples_old = datasets.get_samples + + +def get_samples(args, tokenizer): + if args.backend not in ["openai-chat-omni"]: + raise ValueError("benchmark is only supported on 'openai-chat-omni' backend.") + if args.dataset_name == "random-mm": + dataset = OmniRandomMultiModalDataset(random_seed=args.seed, dataset_path=args.dataset_path) + input_requests = dataset.sample( + tokenizer=tokenizer, + num_requests=args.num_prompts, + prefix_len=args.random_prefix_len, + range_ratio=args.random_range_ratio, + input_len=args.random_input_len, + output_len=args.random_output_len, + base_items_per_request=args.random_mm_base_items_per_request, + limit_mm_per_prompt=args.random_mm_limit_mm_per_prompt, + num_mm_items_range_ratio=args.random_mm_num_mm_items_range_ratio, + bucket_config=args.random_mm_bucket_config, + request_id_prefix=args.request_id_prefix, + no_oversample=args.no_oversample, + ) + return input_requests + else: + return get_samples_old(args, tokenizer) + + +datasets.get_samples = get_samples + + +@dataclass +class MixRequestFuncOutput(RequestFuncOutput): + audio_ttfp: float = 0.0 + audio_duration: float = 0.0 + audio_frames: int = 0 + audio_rtf: float = 0.0 + + +async def async_request_openai_chat_omni_completions( + request_func_input: RequestFuncInput, + session: aiohttp.ClientSession, + pbar: tqdm | None = None, + mm_position: Literal["first", "last"] = "last", +) -> MixRequestFuncOutput: + api_url = request_func_input.api_url + _validate_api_url(api_url, "OpenAI Chat Completions API", "chat/completions") + + content = _get_chat_content(request_func_input, mm_position=mm_position) + + payload = { + "model": request_func_input.model_name if request_func_input.model_name else request_func_input.model, + "messages": [ + {"role": "user", "content": content}, + ], + "temperature": 0.0, + "max_tokens": request_func_input.output_len, + "stream": True, + "stream_options": { + "include_usage": True, + }, + } + _update_payload_common(payload, request_func_input) + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", + } + _update_headers_common(headers, request_func_input) + + output = MixRequestFuncOutput() + output.prompt_len = request_func_input.prompt_len + + generated_text = "" + generated_audio = "" + ttft = 0.0 + st = time.perf_counter() + output.start_time = st + most_recent_timestamp = st + audio_generate_time = 0.0 + audio_first_timestamp = st + try: + async with session.post(url=api_url, json=payload, headers=headers) as response: + if response.status == 200: + handler = StreamedResponseHandler() + async for chunk_bytes in response.content.iter_any(): + chunk_bytes = chunk_bytes.strip() + if not chunk_bytes: + continue + + messages = handler.add_chunk(chunk_bytes) + for message in messages: + # NOTE: SSE comments (often used as pings) start with + # a colon. These are not JSON data payload and should + # be skipped. + if message.startswith(":"): + continue + + chunk = message.removeprefix("data: ") + + if chunk != "[DONE]": + timestamp = time.perf_counter() + data = json.loads(chunk) + if choices := data.get("choices"): + modality = data.get("modality") + content = choices[0]["delta"].get("content") + if modality == "text": + # First token + if ttft == 0.0: + ttft = timestamp - st + output.ttft = ttft + else: + output.itl.append(timestamp - most_recent_timestamp) + generated_text += content or "" + elif modality == "audio": + if output.audio_ttfp == 0.0: + audio_first_timestamp = timestamp + output.audio_ttfp = timestamp - st + audio_generate_time = timestamp - audio_first_timestamp + if content != "": + audio_bytes = base64.b64decode(content) + seg = AudioSegment.from_file(io.BytesIO(audio_bytes)) + if seg is not None: + if generated_audio is None: + generated_audio = seg + else: + generated_audio = seg + generated_audio + + elif usage := data.get("usage"): + output.output_tokens = usage.get("completion_tokens") + most_recent_timestamp = timestamp + + output.generated_text = generated_text + if generated_audio is not None: + output.audio_duration = len(generated_audio) / 1000.0 + frame_width = generated_audio.frame_width + if frame_width > 0: + output.audio_frames = len(generated_audio.raw_data) // frame_width + else: + output.audio_frames = 0 + logger.warning("Audio frame width is zero") + audio_duration = output.audio_duration + if audio_duration > 0: + output.audio_rtf = audio_generate_time / output.audio_duration + else: + output.audio_rtf = 0 + logger.warning("Audio duration is zero") + + output.success = True + output.latency = most_recent_timestamp - st + else: + output.error = response.reason or "" + output.success = False + except Exception: + output.success = False + exc_info = sys.exc_info() + output.error = "".join(traceback.format_exception(*exc_info)) + logger.error(f"ERROR: send request failed, reason is: {output.error}") + + if pbar: + pbar.update(1) + return output + + +ASYNC_REQUEST_FUNCS["openai-chat-omni"] = async_request_openai_chat_omni_completions +if "openai-chat-omni" not in OPENAI_COMPATIBLE_BACKENDS: + OPENAI_COMPATIBLE_BACKENDS.append("openai-chat-omni") + +# ruff: noqa: E402 +# Prevent import order from causing patch failures +from vllm.benchmarks import serve +from vllm.benchmarks.serve import TaskType, calculate_metrics_for_embeddings, get_request, wait_for_endpoint + +from vllm_omni.benchmarks.metrics.metrics import MultiModalsBenchmarkMetrics, calculate_metrics + +# ruff: noqa: E402 + +benchmark_old = serve.benchmark + + +async def benchmark( + task_type: TaskType, + endpoint_type: str, + api_url: str, + base_url: str, + model_id: str, + model_name: str, + tokenizer: PreTrainedTokenizerBase, + input_requests: list[SampleRequest], + logprobs: int | None, + request_rate: float, + burstiness: float, + disable_tqdm: bool, + num_warmups: int, + profile: bool, + selected_percentile_metrics: list[str], + selected_percentiles: list[float], + ignore_eos: bool, + goodput_config_dict: dict[str, float], + max_concurrency: int | None, + lora_modules: Iterable[str] | None, + extra_headers: dict | None, + extra_body: dict | None, + ramp_up_strategy: Literal["linear", "exponential"] | None = None, + ramp_up_start_rps: int | None = None, + ramp_up_end_rps: int | None = None, + ready_check_timeout_sec: int = 600, +): + try: + request_func = ASYNC_REQUEST_FUNCS[endpoint_type] + except KeyError: + raise ValueError(f"Unknown backend: {endpoint_type}") from None + + # Reuses connections across requests to reduce TLS handshake overhead. + connector = aiohttp.TCPConnector( + limit=max_concurrency or 0, + limit_per_host=max_concurrency or 0, + ttl_dns_cache=300, + use_dns_cache=True, + keepalive_timeout=60, + enable_cleanup_closed=True, + force_close=False, + ssl=("https://" in api_url), + ) + + session = aiohttp.ClientSession( + connector=connector, + trust_env=True, + timeout=aiohttp.ClientTimeout(total=6 * 60 * 60), + ) + + print("Starting initial single prompt test run...") + test_prompt, test_prompt_len, test_output_len, test_mm_content = ( + input_requests[0].prompt, + input_requests[0].prompt_len, + input_requests[0].expected_output_len, + input_requests[0].multi_modal_data, + ) + + assert ( + test_mm_content is None + or isinstance(test_mm_content, dict) + or (isinstance(test_mm_content, list) and all(isinstance(item, dict) for item in test_mm_content)) + ), "multi_modal_data must be a dict or list[dict]" + test_input = RequestFuncInput( + model=model_id, + model_name=model_name, + prompt=test_prompt, + api_url=api_url, + prompt_len=test_prompt_len, + output_len=test_output_len, + logprobs=logprobs, + multi_modal_content=test_mm_content, + ignore_eos=ignore_eos, + extra_headers=extra_headers, + extra_body=extra_body, + ) + + if ready_check_timeout_sec > 0: + test_output = await wait_for_endpoint( + request_func, + test_input, + session, + timeout_seconds=ready_check_timeout_sec, + ) + if not test_output.success: + raise ValueError( + "Initial test run failed - Please make sure benchmark " + "arguments are correctly specified. " + f"Error: {test_output.error}" + ) + else: + print("Initial test run completed.") + else: + print("Skipping endpoint ready check.") + + if num_warmups > 0: + print(f"Warming up with {num_warmups} requests...") + warmup_pbar = None if disable_tqdm else tqdm(total=num_warmups) + warmup_semaphore = asyncio.Semaphore(max_concurrency) if max_concurrency else contextlib.nullcontext() + warmup_tasks = [] + + async def warmup_limited_request_func(): + async with warmup_semaphore: + return await request_func(request_func_input=test_input, session=session, pbar=warmup_pbar) + + for _ in range(num_warmups): + request_task = asyncio.create_task(warmup_limited_request_func()) + warmup_tasks.append(request_task) + _ = await asyncio.gather(*warmup_tasks) + + if warmup_pbar is not None: + warmup_pbar.close() + print("Warmup run completed.") + + print("Starting main benchmark run...") + + if lora_modules: + # For each input request, choose a LoRA module at random. + lora_modules = iter([random.choice(lora_modules) for _ in range(len(input_requests))]) + + if profile: + print("Starting profiler...") + profile_input = RequestFuncInput( + model=model_id, + model_name=model_name, + prompt=test_prompt, + api_url=base_url + "/start_profile", + prompt_len=test_prompt_len, + output_len=test_output_len, + logprobs=logprobs, + multi_modal_content=test_mm_content, + ignore_eos=ignore_eos, + extra_headers=extra_headers, + extra_body=extra_body, + ) + profile_output = await request_func(request_func_input=profile_input, session=session) + if profile_output.success: + print("Profiler started") + + distribution = "Poisson process" if burstiness == 1.0 else "Gamma distribution" + + if ramp_up_strategy is not None: + print(f"Traffic ramp-up strategy: {ramp_up_strategy}.") + print( + f"Will increase RPS from {ramp_up_start_rps} to {ramp_up_end_rps} RPS over the duration of the benchmark." + ) + else: + print(f"Traffic request rate: {request_rate}") + + print(f"Burstiness factor: {burstiness} ({distribution})") + print(f"Maximum request concurrency: {max_concurrency}") + + pbar = None if disable_tqdm else tqdm(total=len(input_requests)) + + semaphore = asyncio.Semaphore(max_concurrency) if max_concurrency else contextlib.nullcontext() + + async def limited_request_func(request_func_input, session, pbar): + async with semaphore: + return await request_func(request_func_input=request_func_input, session=session, pbar=pbar) + + benchmark_start_time = time.perf_counter() + tasks: list[asyncio.Task] = [] + + rps_change_events = [] + last_int_rps = -1 + if ramp_up_strategy is not None and ramp_up_start_rps is not None: + last_int_rps = ramp_up_start_rps + rps_change_events.append( + { + "rps": last_int_rps, + "timestamp": datetime.now().isoformat(), + } + ) + + async for request, current_request_rate in get_request( + input_requests, + request_rate, + burstiness, + ramp_up_strategy, + ramp_up_start_rps, + ramp_up_end_rps, + ): + if ramp_up_strategy is not None: + current_int_rps = int(current_request_rate) + if current_int_rps > last_int_rps: + timestamp = datetime.now().isoformat() + for rps_val in range(last_int_rps + 1, current_int_rps + 1): + rps_change_events.append({"rps": rps_val, "timestamp": timestamp}) + last_int_rps = current_int_rps + prompt, prompt_len, output_len, mm_content, request_id = ( + request.prompt, + request.prompt_len, + request.expected_output_len, + request.multi_modal_data, + request.request_id, + ) + req_model_id, req_model_name = model_id, model_name + if lora_modules: + req_lora_module = next(lora_modules) + req_model_id, req_model_name = req_lora_module, req_lora_module + + request_func_input = RequestFuncInput( + model=req_model_id, + model_name=req_model_name, + prompt=prompt, + api_url=api_url, + prompt_len=prompt_len, + output_len=output_len, + logprobs=logprobs, + multi_modal_content=mm_content, + ignore_eos=ignore_eos, + extra_headers=extra_headers, + extra_body=extra_body, + request_id=request_id, + ) + tasks.append( + asyncio.create_task(limited_request_func(request_func_input=request_func_input, session=session, pbar=pbar)) + ) + outputs: list[MixRequestFuncOutput] = await asyncio.gather(*tasks) + + if pbar is not None: + pbar.close() + + benchmark_duration = time.perf_counter() - benchmark_start_time + + if task_type == TaskType.GENERATION: + metrics, actual_output_lens = calculate_metrics( + input_requests=input_requests, + outputs=outputs, + dur_s=benchmark_duration, + tokenizer=tokenizer, + selected_percentiles=selected_percentiles, + goodput_config_dict=goodput_config_dict, + task_type=task_type, + selected_percentile_metrics=selected_percentile_metrics, + max_concurrency=max_concurrency, + request_rate=request_rate, + benchmark_duration=benchmark_duration, + ) + else: + metrics = calculate_metrics_for_embeddings( + outputs=outputs, + dur_s=benchmark_duration, + selected_percentiles=selected_percentiles, + ) + actual_output_lens = 0 + + if isinstance(metrics, MultiModalsBenchmarkMetrics): + result = { + "duration": benchmark_duration, + "completed": metrics.completed, + "failed": metrics.failed, + "total_input_tokens": metrics.total_input, + "total_output_tokens": metrics.total_output, + "request_throughput": metrics.request_throughput, + "request_goodput": metrics.request_goodput if goodput_config_dict else None, + "output_throughput": metrics.output_throughput, + "total_token_throughput": metrics.total_token_throughput, + "input_lens": [output.prompt_len for output in outputs], + "output_lens": actual_output_lens, + "ttfts": [output.ttft for output in outputs], + "itls": [output.itl for output in outputs], + "generated_texts": [output.generated_text for output in outputs], + "errors": [output.error for output in outputs], + "max_output_tokens_per_s": metrics.max_output_tokens_per_s, + "max_concurrent_requests": metrics.max_concurrent_requests, + } + else: + result = { + "duration": benchmark_duration, + "completed": metrics.completed, + "total_input_tokens": metrics.total_input, + "request_throughput": metrics.request_throughput, + "total_token_throughput": metrics.total_token_throughput, + "input_lens": [output.prompt_len for output in outputs], + "errors": [output.error for output in outputs], + } + + if rps_change_events: + result["rps_change_events"] = rps_change_events + + def process_one_metric( + # E.g., "ttft" + metric_attribute_name: str, + ): + # This function prints and adds statistics of the specified + # metric. + if metric_attribute_name not in selected_percentile_metrics: + return + is_audio_rtf = metric_attribute_name == "audio_rtf" + + suffix = "" if is_audio_rtf else "_ms" + for p, value in getattr(metrics, f"percentiles_{metric_attribute_name}{suffix}"): + p_word = str(int(p)) if int(p) == p else str(p) + result[f"p{p_word}_{metric_attribute_name}{suffix}"] = value + + if task_type == TaskType.GENERATION: + for metric in selected_percentile_metrics: + process_one_metric(metric) + else: + process_one_metric("e2el") + + if profile: + print("Stopping profiler...") + profile_input = RequestFuncInput( + model=model_id, + prompt=test_prompt, + api_url=base_url + "/stop_profile", + prompt_len=test_prompt_len, + output_len=test_output_len, + logprobs=logprobs, + ) + profile_output = await request_func(request_func_input=profile_input, session=session) + if profile_output.success: + print("Profiler stopped") + + await session.close() + return result + + +serve.benchmark = benchmark diff --git a/vllm_omni/benchmarks/serve.py b/vllm_omni/benchmarks/serve.py new file mode 100644 index 00000000000..fe946036931 --- /dev/null +++ b/vllm_omni/benchmarks/serve.py @@ -0,0 +1,9 @@ +import argparse +import asyncio +from typing import Any + +from vllm.benchmarks.serve import main_async + + +def main(args: argparse.Namespace) -> dict[str, Any]: + return asyncio.run(main_async(args)) diff --git a/vllm_omni/entrypoints/cli/__init__.py b/vllm_omni/entrypoints/cli/__init__.py index b233a71e6d2..2ffba613055 100644 --- a/vllm_omni/entrypoints/cli/__init__.py +++ b/vllm_omni/entrypoints/cli/__init__.py @@ -1,5 +1,13 @@ """CLI helpers for vLLM-Omni entrypoints.""" +# To ensure patch imports work properly, disable unused import checks +# ruff: noqa: E402, F401 +# isort: off +from vllm_omni.benchmarks.patch import patch +# isort: on + +from vllm_omni.entrypoints.cli.benchmark.serve import OmniBenchmarkServingSubcommand + from .serve import OmniServeCommand -__all__ = ["OmniServeCommand"] +__all__ = ["OmniServeCommand", "OmniBenchmarkServingSubcommand"] diff --git a/vllm_omni/entrypoints/cli/benchmark/__init__.py b/vllm_omni/entrypoints/cli/benchmark/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/vllm_omni/entrypoints/cli/benchmark/base.py b/vllm_omni/entrypoints/cli/benchmark/base.py new file mode 100644 index 00000000000..6a6f97eb1e8 --- /dev/null +++ b/vllm_omni/entrypoints/cli/benchmark/base.py @@ -0,0 +1,23 @@ +import argparse + +from vllm.entrypoints.cli.types import CLISubcommand + + +class OmniBenchmarkSubcommandBase(CLISubcommand): + """The base class of subcommands for vllm bench.""" + + help: str + + @classmethod + def add_cli_args(cls, parser: argparse.ArgumentParser) -> None: + """Add the CLI arguments to the parser.""" + raise NotImplementedError + + @staticmethod + def cmd(args: argparse.Namespace) -> None: + """Run the benchmark. + + Args: + args: The arguments to the command. + """ + raise NotImplementedError diff --git a/vllm_omni/entrypoints/cli/benchmark/main.py b/vllm_omni/entrypoints/cli/benchmark/main.py new file mode 100644 index 00000000000..8880e35c7cf --- /dev/null +++ b/vllm_omni/entrypoints/cli/benchmark/main.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import argparse +import typing + +from vllm.entrypoints.cli.types import CLISubcommand +from vllm.entrypoints.utils import VLLM_SUBCMD_PARSER_EPILOG + +from vllm_omni.entrypoints.cli.benchmark.base import OmniBenchmarkSubcommandBase + +if typing.TYPE_CHECKING: + from vllm.utils import FlexibleArgumentParser + + +class OmniBenchmarkSubcommand(CLISubcommand): + """The `bench` subcommand for the vLLM CLI.""" + + name = "bench" + help = "vLLM-omni bench subcommand." + + @staticmethod + def cmd(args: argparse.Namespace) -> None: + args.dispatch_function(args) + + def validate(self, args: argparse.Namespace) -> None: + pass + + def subparser_init(self, subparsers: argparse._SubParsersAction) -> FlexibleArgumentParser: + bench_parser = subparsers.add_parser( + self.name, description=self.help, usage=f"vllm {self.name} [options]" + ) + bench_subparsers = bench_parser.add_subparsers(required=True, dest="bench_type") + + for cmd_cls in OmniBenchmarkSubcommandBase.__subclasses__(): + cmd_subparser = bench_subparsers.add_parser( + cmd_cls.name, + help=cmd_cls.help, + description=cmd_cls.help, + usage=f"vllm {self.name} {cmd_cls.name} [--omni] [options]", + ) + cmd_subparser.add_argument( + "--omni", + action="store_true", + help="Enable benchmark-Omni mode (always enabled for omni commands)", + ) + cmd_subparser.set_defaults(dispatch_function=cmd_cls.cmd) + cmd_cls.add_cli_args(cmd_subparser) + + cmd_subparser.epilog = VLLM_SUBCMD_PARSER_EPILOG.format(subcmd=f"{self.name} {cmd_cls.name}") + + return bench_parser + + +def cmd_init() -> list[CLISubcommand]: + return [OmniBenchmarkSubcommand()] diff --git a/vllm_omni/entrypoints/cli/benchmark/serve.py b/vllm_omni/entrypoints/cli/benchmark/serve.py new file mode 100644 index 00000000000..906e8851a4a --- /dev/null +++ b/vllm_omni/entrypoints/cli/benchmark/serve.py @@ -0,0 +1,51 @@ +import argparse + +from vllm.benchmarks.serve import add_cli_args + +from vllm_omni.benchmarks.serve import main +from vllm_omni.entrypoints.cli.benchmark.base import OmniBenchmarkSubcommandBase + + +class OmniBenchmarkServingSubcommand(OmniBenchmarkSubcommandBase): + """The `serve` subcommand for vllm bench.""" + + name = "serve" + help = "Benchmark the online serving throughput." + + @classmethod + def add_cli_args(cls, parser: argparse.ArgumentParser) -> None: + add_cli_args(parser) + for action in parser._actions: + if action.dest == "percentile_metrics": + action.help = ( + "Comma-separated list of selected metrics to report percentiles." + "This argument specifies the metrics to report percentiles." + 'Allowed metric names are "ttft", "tpot", "itl", "e2el", "audio_ttfp", "audio_rtf". ' + ) + if action.dest == "random_mm_limit_mm_per_prompt": + action.help = ( + "Per-modality hard caps for items attached per request, e.g. " + '\'{"image": 3, "video": 0, "audio": 1}\'. The sampled per-request item ' + "count is clamped to the sum of these limits. When a modality " + "reaches its cap, its buckets are excluded and probabilities are " + "renormalized." + ) + if action.dest == "random_mm_bucket_config": + action.help = ( + "The bucket config is a dictionary mapping a multimodal item" + "sampling configuration to a probability." + "Currently allows for 3 modalities: audio, images and videos. " + "A bucket key is a tuple of (height, width, num_frames)" + "The value is the probability of sampling that specific item. " + "Example: " + "--random-mm-bucket-config " + "{(256, 256, 1): 0.5, (720, 1280, 16): 0.4, (0, 1, 5): 0.10} " + "First item: images with resolution 256x256 w.p. 0.5" + "Second item: videos with resolution 720x1280 and 16 frames " + "Third item: audios with 1s duration and 5 channels w.p. 0.1" + "OBS.: If the probabilities do not sum to 1, they are normalized." + ) + + @staticmethod + def cmd(args: argparse.Namespace) -> None: + main(args) diff --git a/vllm_omni/entrypoints/cli/main.py b/vllm_omni/entrypoints/cli/main.py index 6a65d9d6cde..629a4641cce 100644 --- a/vllm_omni/entrypoints/cli/main.py +++ b/vllm_omni/entrypoints/cli/main.py @@ -18,10 +18,12 @@ def main(): from vllm.entrypoints.utils import VLLM_SUBCMD_PARSER_EPILOG, cli_env_setup from vllm.utils.argparse_utils import FlexibleArgumentParser + import vllm_omni.entrypoints.cli.benchmark.main import vllm_omni.entrypoints.cli.serve CMD_MODULES = [ vllm_omni.entrypoints.cli.serve, + vllm_omni.entrypoints.cli.benchmark.main, ] cli_env_setup()