diff --git a/examples/experimental/simple_offline_bench.py b/examples/experimental/simple_offline_bench.py new file mode 100644 index 000000000..283474f3b --- /dev/null +++ b/examples/experimental/simple_offline_bench.py @@ -0,0 +1,105 @@ +# Copyright 2025 Rebellions Inc. All rights reserved. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +os.environ["RBLN_KERNEL_MODE"] = "triton" +os.environ["VLLM_RBLN_ENFORCE_MODEL_FP32"] = "1" +os.environ["VLLM_RBLN_USE_VLLM_MODEL"] = "1" +os.environ["VLLM_RBLN_COMPILE_STRICT_MODE"] = "1" +os.environ["VLLM_DISABLE_COMPILE_CACHE"] = "1" +os.environ["VLLM_USE_V1"] = "1" + +import argparse +import time + +from transformers import AutoTokenizer +from vllm import LLM, SamplingParams + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--model", + type=str, + default="meta-llama/Llama-3.2-3B-instruct") + parser.add_argument("--max-num-seqs", type=int, default=32) + parser.add_argument("--max-num-batched-tokens", type=int, default=128) + parser.add_argument("--max-model-len", type=int, default=2 * 1024) + parser.add_argument("--tensor-parallel-size", type=int, default=1) + parser.add_argument("--pipeline-parallel-size", type=int, default=1) + parser.add_argument("--block-size", type=int, default=1024) + parser.add_argument("--num-requests", type=int, default=64) + parser.add_argument("--input-len", type=int, default=1024) + parser.add_argument("--output-len", type=int, default=1024) + + return parser.parse_args() + + +def main(): + args = parse_args() + + tokenizer = AutoTokenizer.from_pretrained(args.model) + vocab = iter(tokenizer.vocab) + single_token_string = next(vocab) + while True: + if (len( + tokenizer.encode(single_token_string * 2, + add_special_tokens=False)) == 2): + break + single_token_string = next(vocab) + + prompts = [single_token_string * (args.input_len - 1)] * args.num_requests + + llm = LLM( + model=args.model, + max_model_len=args.max_model_len, + max_num_seqs=args.max_num_seqs, + max_num_batched_tokens=args.max_num_batched_tokens, + tensor_parallel_size=args.tensor_parallel_size, + pipeline_parallel_size=args.pipeline_parallel_size, + block_size=args.block_size, + enable_chunked_prefill=True, + gpu_memory_utilization=1, + enable_prefix_caching=False, + ) + + _ = llm.generate( + prompts[:min(len(prompts), 3)], + SamplingParams( + temperature=0.0, + ignore_eos=True, + max_tokens=args.output_len, + )) + + st = time.perf_counter() + outputs = llm.generate( + prompts, + SamplingParams( + temperature=0.0, + ignore_eos=True, + max_tokens=args.output_len, + )) + total_run_time = time.perf_counter() - st + + print(f"total run time: {total_run_time} (sec)") + assert all(output.prompt_token_ids and ( + len(output.prompt_token_ids) == args.input_len) for output in outputs) + assert all( + len(output.outputs[0].token_ids) == args.output_len + for output in outputs) + print(f"output throughput: {(args.num_requests*args.output_len)/total_run_time} (token/sec)") + + +if __name__ == "__main__": + main() diff --git a/vllm_rbln/v1/core/rbln_scheduler.py b/vllm_rbln/v1/core/rbln_scheduler.py index 6df877772..09a41eeac 100644 --- a/vllm_rbln/v1/core/rbln_scheduler.py +++ b/vllm_rbln/v1/core/rbln_scheduler.py @@ -67,6 +67,7 @@ def schedule(self) -> SchedulerOutput: # The only differences are: # - Disable mixed batching # - Limit prefill batch size to 1 + # - Limit decode batch size to (max_num_seqs // pipeline_parallel_size) # Search for NOTE(RBLN) for details # NOTE(woosuk) on the scheduling algorithm: @@ -265,6 +266,14 @@ def schedule(self) -> SchedulerOutput: if self.ec_connector is not None: self.ec_connector.update_state_after_alloc(request, i) + # NOTE(RBLN): We restrict the decode batch size to + # (max_num_seqs // pipeline_parallel_size) to prevent pipeline + # bubbles. + if len(scheduled_running_reqs) >= ( + self.max_num_running_reqs // + self.vllm_config.parallel_config.pipeline_parallel_size): + break + # Record the LoRAs in scheduled_running_reqs scheduled_loras: set[int] = set() if self.lora_config: diff --git a/vllm_rbln/v1/worker/rbln_model_runner.py b/vllm_rbln/v1/worker/rbln_model_runner.py index adb84c5bd..5001075aa 100644 --- a/vllm_rbln/v1/worker/rbln_model_runner.py +++ b/vllm_rbln/v1/worker/rbln_model_runner.py @@ -417,7 +417,8 @@ def __init__( # execute_model() and sample_tokens(). self.execute_model_state: ExecuteModelState | None = None - self.max_batch_size = self.scheduler_config.max_num_seqs + self.max_batch_size = (self.scheduler_config.max_num_seqs // + self.parallel_config.pipeline_parallel_size) self.max_num_seqs = self.scheduler_config.max_num_seqs self.max_prefill_batch_size = 1 self.max_num_batched_tokens = ( @@ -626,7 +627,8 @@ def _update_states(self, scheduler_output: SchedulerOutput) -> None: if new_block_ids is not None: # Append the new blocks to the existing block IDs. for block_ids, new_ids in zip(req_state.block_ids, - new_block_ids): + new_block_ids, + strict=False): block_ids.extend(new_ids) else: assert new_block_ids is not None @@ -1249,7 +1251,10 @@ def _pool( pooler_output: list[Optional[torch.Tensor]] = [] for raw_output, seq_len, prompt_len in zip( - raw_pooler_output, seq_lens_cpu, pooling_metadata.prompt_lens): + raw_pooler_output, + seq_lens_cpu, + pooling_metadata.prompt_lens, + strict=False): output = raw_output.data if seq_len == prompt_len else None pooler_output.append(output) @@ -1419,12 +1424,11 @@ def warm_up_model(self) -> None: num_kv_cache_groups) # compile decode graph - decode_max_batch_size = self.scheduler_config.max_num_seqs decode_max_seq_len = self.model_config.max_model_len dummy_decode_requests = [] dummy_decode_num_scheduled_tokens = {} - for _ in range(decode_max_batch_size): + for _ in range(self.max_batch_size): self._add_dummy_requests( requests=dummy_decode_requests, num_scheduled_tokens=dummy_decode_num_scheduled_tokens, @@ -1750,8 +1754,8 @@ def execute_model( positions = rbln_utils.pad(positions, -1, prefill_size) else: # decode batch padding - input_ids = rbln_utils.pad(input_ids, 0, self.max_num_seqs) - positions = rbln_utils.pad(positions, -2, self.max_num_seqs) + input_ids = rbln_utils.pad(input_ids, 0, self.max_batch_size) + positions = rbln_utils.pad(positions, -2, self.max_batch_size) if self.lora_config is not None: lora_ids = [ @@ -2629,7 +2633,8 @@ def _reshape_kv_cache_tensors( state_tensors = [] storage_offset_bytes = 0 for (shape, dtype) in zip(kv_cache_spec.shapes, - kv_cache_spec.dtypes): + kv_cache_spec.dtypes, + strict=False): dtype_size = get_dtype_size(dtype) num_element_per_page = ( kv_cache_spec.page_size_bytes // dtype_size)