Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions examples/experimental/simple_offline_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright 2025 Rebellions Inc. All rights reserved.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

os.environ["RBLN_KERNEL_MODE"] = "triton"
os.environ["VLLM_RBLN_ENFORCE_MODEL_FP32"] = "1"
os.environ["VLLM_RBLN_USE_VLLM_MODEL"] = "1"
os.environ["VLLM_RBLN_COMPILE_STRICT_MODE"] = "1"
os.environ["VLLM_DISABLE_COMPILE_CACHE"] = "1"
os.environ["VLLM_USE_V1"] = "1"

import argparse
import time

from transformers import AutoTokenizer
from vllm import LLM, SamplingParams


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--model",
type=str,
default="meta-llama/Llama-3.2-3B-instruct")
parser.add_argument("--max-num-seqs", type=int, default=32)
parser.add_argument("--max-num-batched-tokens", type=int, default=128)
parser.add_argument("--max-model-len", type=int, default=2 * 1024)
parser.add_argument("--tensor-parallel-size", type=int, default=1)
parser.add_argument("--pipeline-parallel-size", type=int, default=1)
parser.add_argument("--block-size", type=int, default=1024)
parser.add_argument("--num-requests", type=int, default=64)
parser.add_argument("--input-len", type=int, default=1024)
parser.add_argument("--output-len", type=int, default=1024)

return parser.parse_args()


def main():
args = parse_args()

tokenizer = AutoTokenizer.from_pretrained(args.model)
vocab = iter(tokenizer.vocab)
single_token_string = next(vocab)
while True:
if (len(
tokenizer.encode(single_token_string * 2,
add_special_tokens=False)) == 2):
break
single_token_string = next(vocab)

prompts = [single_token_string * (args.input_len - 1)] * args.num_requests

llm = LLM(
model=args.model,
max_model_len=args.max_model_len,
max_num_seqs=args.max_num_seqs,
max_num_batched_tokens=args.max_num_batched_tokens,
tensor_parallel_size=args.tensor_parallel_size,
pipeline_parallel_size=args.pipeline_parallel_size,
block_size=args.block_size,
enable_chunked_prefill=True,
gpu_memory_utilization=1,
enable_prefix_caching=False,
)

_ = llm.generate(
prompts[:min(len(prompts), 3)],
SamplingParams(
temperature=0.0,
ignore_eos=True,
max_tokens=args.output_len,
))

st = time.perf_counter()
outputs = llm.generate(
prompts,
SamplingParams(
temperature=0.0,
ignore_eos=True,
max_tokens=args.output_len,
))
total_run_time = time.perf_counter() - st

print(f"total run time: {total_run_time} (sec)")
assert all(output.prompt_token_ids and (
len(output.prompt_token_ids) == args.input_len) for output in outputs)
assert all(
len(output.outputs[0].token_ids) == args.output_len
for output in outputs)
print(f"output throughput: {(args.num_requests*args.output_len)/total_run_time} (token/sec)")


if __name__ == "__main__":
main()
9 changes: 9 additions & 0 deletions vllm_rbln/v1/core/rbln_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def schedule(self) -> SchedulerOutput:
# The only differences are:
# - Disable mixed batching
# - Limit prefill batch size to 1
# - Limit decode batch size to (max_num_seqs // pipeline_parallel_size)
# Search for NOTE(RBLN) for details

# NOTE(woosuk) on the scheduling algorithm:
Expand Down Expand Up @@ -265,6 +266,14 @@ def schedule(self) -> SchedulerOutput:
if self.ec_connector is not None:
self.ec_connector.update_state_after_alloc(request, i)

# NOTE(RBLN): We restrict the decode batch size to
# (max_num_seqs // pipeline_parallel_size) to prevent pipeline
# bubbles.
if len(scheduled_running_reqs) >= (
self.max_num_running_reqs //
self.vllm_config.parallel_config.pipeline_parallel_size):
break

# Record the LoRAs in scheduled_running_reqs
scheduled_loras: set[int] = set()
if self.lora_config:
Expand Down
21 changes: 13 additions & 8 deletions vllm_rbln/v1/worker/rbln_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ def __init__(
# execute_model() and sample_tokens().
self.execute_model_state: ExecuteModelState | None = None

self.max_batch_size = self.scheduler_config.max_num_seqs
self.max_batch_size = (self.scheduler_config.max_num_seqs //
self.parallel_config.pipeline_parallel_size)
self.max_num_seqs = self.scheduler_config.max_num_seqs
self.max_prefill_batch_size = 1
self.max_num_batched_tokens = (
Expand Down Expand Up @@ -626,7 +627,8 @@ def _update_states(self, scheduler_output: SchedulerOutput) -> None:
if new_block_ids is not None:
# Append the new blocks to the existing block IDs.
for block_ids, new_ids in zip(req_state.block_ids,
new_block_ids):
new_block_ids,
strict=False):
block_ids.extend(new_ids)
else:
assert new_block_ids is not None
Expand Down Expand Up @@ -1249,7 +1251,10 @@ def _pool(

pooler_output: list[Optional[torch.Tensor]] = []
for raw_output, seq_len, prompt_len in zip(
raw_pooler_output, seq_lens_cpu, pooling_metadata.prompt_lens):
raw_pooler_output,
seq_lens_cpu,
pooling_metadata.prompt_lens,
strict=False):

output = raw_output.data if seq_len == prompt_len else None
pooler_output.append(output)
Expand Down Expand Up @@ -1419,12 +1424,11 @@ def warm_up_model(self) -> None:
num_kv_cache_groups)

# compile decode graph
decode_max_batch_size = self.scheduler_config.max_num_seqs
decode_max_seq_len = self.model_config.max_model_len

dummy_decode_requests = []
dummy_decode_num_scheduled_tokens = {}
for _ in range(decode_max_batch_size):
for _ in range(self.max_batch_size):
self._add_dummy_requests(
requests=dummy_decode_requests,
num_scheduled_tokens=dummy_decode_num_scheduled_tokens,
Expand Down Expand Up @@ -1750,8 +1754,8 @@ def execute_model(
positions = rbln_utils.pad(positions, -1, prefill_size)
else:
# decode batch padding
input_ids = rbln_utils.pad(input_ids, 0, self.max_num_seqs)
positions = rbln_utils.pad(positions, -2, self.max_num_seqs)
input_ids = rbln_utils.pad(input_ids, 0, self.max_batch_size)
positions = rbln_utils.pad(positions, -2, self.max_batch_size)

if self.lora_config is not None:
lora_ids = [
Expand Down Expand Up @@ -2629,7 +2633,8 @@ def _reshape_kv_cache_tensors(
state_tensors = []
storage_offset_bytes = 0
for (shape, dtype) in zip(kv_cache_spec.shapes,
kv_cache_spec.dtypes):
kv_cache_spec.dtypes,
strict=False):
dtype_size = get_dtype_size(dtype)
num_element_per_page = (
kv_cache_spec.page_size_bytes // dtype_size)
Expand Down