Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ argcomplete==3.5.1
# via datamodel-code-generator
arrow==1.3.0
# via isoduration
async-timeout==5.0.1
# via
# aiohttp
# redis
attrs==24.2.0
# via
# aiohttp
Expand Down Expand Up @@ -132,6 +136,11 @@ encodec==0.1.1
# via vocos
evaluate==0.4.3
# via lm-eval
exceptiongroup==1.2.2
# via
# anyio
# hypothesis
# pytest
fastparquet==2024.11.0
# via genai-perf
fastrlock==0.8.2
Expand Down Expand Up @@ -633,7 +642,6 @@ setuptools==75.8.0
# via
# mamba-ssm
# pytablewriter
# torch
shellingham==1.5.4
# via typer
six==1.16.0
Expand Down Expand Up @@ -692,8 +700,13 @@ tokenizers==0.21.1
# via
# -r requirements/test.in
# transformers
toml==0.10.2
# via datamodel-code-generator
tomli==2.2.1
# via schemathesis
# via
# black
# pytest
# schemathesis
tomli-w==1.2.0
# via schemathesis
torch==2.6.0
Expand Down Expand Up @@ -765,12 +778,16 @@ types-python-dateutil==2.9.0.20241206
# via arrow
typing-extensions==4.12.2
# via
# anyio
# black
# huggingface-hub
# librosa
# mistral-common
# multidict
# pqdm
# pydantic
# pydantic-core
# rich
# torch
# typer
tzdata==2024.2
Expand Down
14 changes: 4 additions & 10 deletions tests/v1/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@
from unittest.mock import Mock

import pytest
import torch

from vllm.config import (CacheConfig, KVTransferConfig, ModelConfig,
SchedulerConfig, VllmConfig)
from vllm.multimodal.inputs import MultiModalKwargs, PlaceholderRange
from vllm.sampling_params import SamplingParams
from vllm.multimodal.inputs import PlaceholderRange
from vllm.tests.v1.utils import EOS_TOKEN_ID, create_requests, create_scheduler
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.core.sched.scheduler import Scheduler
from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheConfig,
KVCacheGroupSpec)
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.structured_output import StructuredOutputManager
from vllm.tests.v1.utils import (create_scheduler, create_requests, EOS_TOKEN_ID)
from vllm.v1.request import RequestStatus


def test_add_requests():
scheduler = create_scheduler()
Expand Down
Empty file.
42 changes: 42 additions & 0 deletions tests/v1/kv_connector/test_nixl_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# SPDX-License-Identifier: Apache-2.0
import copy
from typing import Optional

from vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector import (
NixlConnectorMetadata)
from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT, ModelRunnerOutput
from vllm.v1.request import RequestStatus, Request

from .utils import create_request, create_scheduler, create_vllm_config

def test_scheduler_worker_inferface():

vllm_config = create_vllm_config()
scheduler = create_scheduler(vllm_config)

# 2 Full Blocks and 1 Half Block.
BLOCK_SIZE = vllm_config.cache_config.block_size
NUM_EXTERNAL_FULL_BLOCKS = 2
NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5))

request = create_request(request_id=1,
num_tokens=NUM_TOKENS,
do_remote_prefill=True)
request_id = request.request_id

scheduler.add_request(request)

# Remote Prefill, triggers NixlConnectorMetdata.
scheduler_output = scheduler.schedule()
kv_connector_metadata = scheduler_output.kv_connector_metadata
assert kv_connector_metadata is not None
assert isinstance(kv_connector_metadata, NixlConnectorMetadata)

assert len(kv_connector_metadata.requests) == 1
assert request_id in kv_connector_metadata.requests
req_meta = kv_connector_metadata.requests[request_id]

for block_id, block in zip(
req_meta.local_block_ids,
scheduler.kv_cache_manager.req_to_blocks[request_id]):
assert block_id == block.block_id
259 changes: 259 additions & 0 deletions tests/v1/kv_connector/test_remote_decode_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
# SPDX-License-Identifier: Apache-2.0
import copy
from typing import Optional

from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT, ModelRunnerOutput
from vllm.v1.request import RequestStatus, Request

from .utils import create_request, create_scheduler, create_vllm_config

def test_basic_remote_prefill_cycle():
"""Test Remote Prefills Lifecycle."""

vllm_config = create_vllm_config()
scheduler = create_scheduler(vllm_config)

# 2 Full Blocks and 1 Half Block.
BLOCK_SIZE = vllm_config.cache_config.block_size
NUM_EXTERNAL_FULL_BLOCKS = 2
NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5))
START_FREE_BLOCK_QUEUE_SIZE = (
scheduler.kv_cache_manager.block_pool.free_block_queue.num_free_blocks)

request = create_request(request_id=1,
num_tokens=NUM_TOKENS,
do_remote_prefill=True)

scheduler.add_request(request)
request_id = request.request_id

# STEP (1):
# (1a): schedule()
scheduler_output = scheduler.schedule()

# Nothing running and empty scheduler output.
assert len(scheduler.running) == 0
assert len(scheduler_output.scheduled_new_reqs) == 0
assert len(scheduler_output.scheduled_cached_reqs) == 0
assert len(scheduler_output.num_scheduled_tokens) == 0
assert scheduler_output.total_num_scheduled_tokens == 0

# Req waiting for KVs with no computed
# or scheduled tokens.
assert len(scheduler.waiting) == 1
assert request in scheduler.waiting
assert (request.status == RequestStatus.WAITING_FOR_REMOTE_KVS)
assert (request.num_computed_tokens == 0)

# ... but should have (uncached) blocks allocated to it.
block_pool = scheduler.kv_cache_manager.block_pool
assert (block_pool.free_block_queue.num_free_blocks
< START_FREE_BLOCK_QUEUE_SIZE)
assert len(block_pool.cached_block_hash_to_block) == 0
for block in scheduler.kv_cache_manager.req_to_blocks[request_id]:
assert block._block_hash is None

# (1b): forward()
model_runner_output = EMPTY_MODEL_RUNNER_OUTPUT

# (1c): update_from_output()
engine_core_outputs = scheduler.update_from_output(
scheduler_output, model_runner_output)
assert len(engine_core_outputs.outputs) == 0

# STEP (2):
# (2a): schedule(): nothing happens!
scheduler_output = scheduler.schedule()
assert len(scheduler.waiting) == 1
assert len(scheduler.running) == 0

# (2b): forward(): request finishes recv.
model_runner_output = copy.deepcopy(
EMPTY_MODEL_RUNNER_OUTPUT)
model_runner_output.finished_recving = [request_id]

# (2c): update_from_output():
engine_core_outputs = scheduler.update_from_output(
scheduler_output, model_runner_output)
assert len(scheduler.waiting) == 1
assert (request_id in scheduler.finished_recving_KV_req_ids)

# (3a): schedule(): this should actually schedule.
scheduler_output = scheduler.schedule()
assert len(scheduler.running) == 1

# Confirm the block are actually allocated.
num_hashed_blocks = 0
for block in scheduler.kv_cache_manager.req_to_blocks[request_id]:
assert block.ref_cnt == 1
num_hashed_blocks += (1 if block._block_hash is not None else 0)
assert num_hashed_blocks == NUM_EXTERNAL_FULL_BLOCKS

# Confirm the rest of the prompt is scheduled in this step.
scheduled_req = scheduler_output.scheduled_new_reqs[0]
num_scheduled_tokens = scheduler_output.num_scheduled_tokens[request_id]
num_computed_tokens = scheduled_req.num_computed_tokens
total_prompt_tokens = len(scheduled_req.prompt_token_ids)
assert (num_scheduled_tokens == total_prompt_tokens - num_computed_tokens)


def test_interleaved_remote_prefill_cycle():
"""Test Remote Prefills Work Well With Other Requests."""

vllm_config = create_vllm_config()
scheduler = create_scheduler(vllm_config)

# 2 Full Blocks and 1 Half Block.
BLOCK_SIZE = vllm_config.cache_config.block_size
NUM_EXTERNAL_FULL_BLOCKS = 2
NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5))

request_remote = create_request(
request_id=1,
num_tokens=NUM_TOKENS,
do_remote_prefill=True
)
request_local_a = create_request(
request_id=2,
num_tokens=NUM_TOKENS,
)
request_local_b = create_request(
request_id=3,
num_tokens=NUM_TOKENS,
)

# STEP 1: Regular request is running.
scheduler.add_request(request_local_a)
scheduler_output = scheduler.schedule()

Check failure on line 127 in tests/v1/kv_connector/test_remote_decode_scheduler.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (F821)

tests/v1/kv_connector/test_remote_decode_scheduler.py:127:27: F821 Undefined name `make_model_runner_output`
assert len(scheduler.running) == 1

model_runner_output = make_model_runner_output(
[request_local_a])
scheduler.update_from_output(scheduler_output,
model_runner_output)

# STEP 2: Add a local and remote request.
scheduler.add_request(request_local_b)
scheduler.add_request(request_remote)
scheduler_output = scheduler.schedule()
assert len(scheduler.running) == 2

Check failure on line 139 in tests/v1/kv_connector/test_remote_decode_scheduler.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (F821)

tests/v1/kv_connector/test_remote_decode_scheduler.py:139:27: F821 Undefined name `make_model_runner_output`
assert len(scheduler.waiting) == 1
assert len(scheduler_output.scheduled_new_reqs) == 1
assert len(scheduler_output.scheduled_cached_reqs) == 1

model_runner_output = make_model_runner_output(
[request_local_a, request_local_b])
scheduler.update_from_output(scheduler_output,
model_runner_output)

# STEP 3: continue running, KVs not arrived yet.
scheduler_output = scheduler.schedule()

Check failure on line 150 in tests/v1/kv_connector/test_remote_decode_scheduler.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (F821)

tests/v1/kv_connector/test_remote_decode_scheduler.py:150:27: F821 Undefined name `make_model_runner_output`
assert len(scheduler.running) == 2
assert len(scheduler.waiting) == 1
assert len(scheduler_output.scheduled_new_reqs) == 0
assert len(scheduler_output.scheduled_cached_reqs) == 2

model_runner_output = make_model_runner_output(
reqs=[request_local_a, request_local_b])
scheduler.update_from_output(scheduler_output,
model_runner_output)
assert len(scheduler.running) == 2
assert len(scheduler.waiting) == 1
assert len(scheduler_output.scheduled_new_reqs) == 0
assert len(scheduler_output.scheduled_cached_reqs) == 2

# STEP 4: KVs arrive.

Check failure on line 165 in tests/v1/kv_connector/test_remote_decode_scheduler.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (F821)

tests/v1/kv_connector/test_remote_decode_scheduler.py:165:27: F821 Undefined name `make_model_runner_output`
scheduler_output = scheduler.schedule()
assert len(scheduler.running) == 2
assert len(scheduler.waiting) == 1
assert len(scheduler_output.scheduled_new_reqs) == 0
assert len(scheduler_output.scheduled_cached_reqs) == 2

model_runner_output = make_model_runner_output(
[request_local_a, request_local_b],
finished_recving=[request_remote.request_id]
)
scheduler.update_from_output(scheduler_output,
model_runner_output)

# STEP 5: RECVed KVs are sent to ModelRunner.
scheduler_output = scheduler.schedule()
assert len(scheduler.running) == 3
assert len(scheduler.waiting) == 0
assert len(scheduler_output.scheduled_new_reqs) == 1
assert len(scheduler_output.scheduled_cached_reqs) == 2


def test_remote_prefill_no_prefix_cache_uncomputed_blocks():
"""
With P/D, blocks can be allocated but uncomputed for
multiple engine steps. This test confirms that we do
not accidentally have cache hits against uncomputed
blocks.
"""

vllm_config = create_vllm_config()
scheduler = create_scheduler(vllm_config)

vllm_config = create_vllm_config()
scheduler = create_scheduler(vllm_config)

# 2 and a half full external blocks.
BLOCK_SIZE = vllm_config.cache_config.block_size
NUM_EXTERNAL_FULL_BLOCKS = 2
NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5))

# Both of these requests have prompts like [1,1,1,1,1, ...]
request_remote = create_request(
request_id=1,
num_tokens=NUM_TOKENS,
do_remote_prefill=True,
use_all_1s_for_prompt_tokens=True,
)

request_local = create_request(
request_id=2,
num_tokens=NUM_TOKENS,
do_remote_prefill=False,
use_all_1s_for_prompt_tokens=True,
)

# Schedule the remote prefill request. This should not
# cause any blocks to be cached.
scheduler.add_request(request_remote)
scheduler_output = scheduler.schedule()
scheduler.update_from_output(
scheduler_output,
EMPTY_MODEL_RUNNER_OUTPUT
)
assert len(scheduler.waiting) == 1

# Schedule the local prefill request. This should
# cause blocks to be cached, but separately from
scheduler.add_request(request_local)
scheduler_output = scheduler.schedule()
assert len(scheduler.running) == 1
assert len(scheduler.waiting) == 1

local_blocks = scheduler.kv_cache_manager.req_to_blocks[request_local.request_id]
remote_blocks = scheduler.kv_cache_manager.req_to_blocks[request_remote.request_id]

# Local should have cached blocks (but not all due to preallocate).
num_hashed_blocks = 0
for block in local_blocks:
assert block.ref_cnt == 1
num_hashed_blocks += (
1 if block._block_hash is not None else 0)
assert num_hashed_blocks > 0

# Remote blocks should not be cached.
for block in remote_blocks:
assert block.ref_cnt == 1
assert block._block_hash is None


def test_remote_prefill_no_blocks_available():
"""
letTest whether we properly handle no blocks available
"""
pass
Loading
Loading