diff --git a/requirements/test.txt b/requirements/test.txt index 6dcd4ff01460..a4c8f1b129ff 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -27,6 +27,10 @@ argcomplete==3.5.1 # via datamodel-code-generator arrow==1.3.0 # via isoduration +async-timeout==5.0.1 + # via + # aiohttp + # redis attrs==24.2.0 # via # aiohttp @@ -132,6 +136,11 @@ encodec==0.1.1 # via vocos evaluate==0.4.3 # via lm-eval +exceptiongroup==1.2.2 + # via + # anyio + # hypothesis + # pytest fastparquet==2024.11.0 # via genai-perf fastrlock==0.8.2 @@ -633,7 +642,6 @@ setuptools==75.8.0 # via # mamba-ssm # pytablewriter - # torch shellingham==1.5.4 # via typer six==1.16.0 @@ -692,8 +700,13 @@ tokenizers==0.21.1 # via # -r requirements/test.in # transformers +toml==0.10.2 + # via datamodel-code-generator tomli==2.2.1 - # via schemathesis + # via + # black + # pytest + # schemathesis tomli-w==1.2.0 # via schemathesis torch==2.6.0 @@ -765,12 +778,16 @@ types-python-dateutil==2.9.0.20241206 # via arrow typing-extensions==4.12.2 # via + # anyio + # black # huggingface-hub # librosa # mistral-common + # multidict # pqdm # pydantic # pydantic-core + # rich # torch # typer tzdata==2024.2 diff --git a/tests/v1/core/test_scheduler.py b/tests/v1/core/test_scheduler.py index c76e90d3e3b0..367a07876774 100644 --- a/tests/v1/core/test_scheduler.py +++ b/tests/v1/core/test_scheduler.py @@ -3,20 +3,14 @@ from unittest.mock import Mock import pytest -import torch -from vllm.config import (CacheConfig, KVTransferConfig, ModelConfig, - SchedulerConfig, VllmConfig) -from vllm.multimodal.inputs import MultiModalKwargs, PlaceholderRange -from vllm.sampling_params import SamplingParams +from vllm.multimodal.inputs import PlaceholderRange +from vllm.tests.v1.utils import EOS_TOKEN_ID, create_requests, create_scheduler from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.core.sched.scheduler import Scheduler -from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheConfig, - KVCacheGroupSpec) from vllm.v1.outputs import ModelRunnerOutput -from vllm.v1.request import Request, RequestStatus -from vllm.v1.structured_output import StructuredOutputManager -from vllm.tests.v1.utils import (create_scheduler, create_requests, EOS_TOKEN_ID) +from vllm.v1.request import RequestStatus + def test_add_requests(): scheduler = create_scheduler() diff --git a/tests/v1/kv_connector/__init__.py b/tests/v1/kv_connector/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/v1/kv_connector/test_nixl_connector.py b/tests/v1/kv_connector/test_nixl_connector.py new file mode 100644 index 000000000000..b746978907ea --- /dev/null +++ b/tests/v1/kv_connector/test_nixl_connector.py @@ -0,0 +1,42 @@ +# SPDX-License-Identifier: Apache-2.0 +import copy +from typing import Optional + +from vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector import ( + NixlConnectorMetadata) +from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT, ModelRunnerOutput +from vllm.v1.request import RequestStatus, Request + +from .utils import create_request, create_scheduler, create_vllm_config + +def test_scheduler_worker_inferface(): + + vllm_config = create_vllm_config() + scheduler = create_scheduler(vllm_config) + + # 2 Full Blocks and 1 Half Block. + BLOCK_SIZE = vllm_config.cache_config.block_size + NUM_EXTERNAL_FULL_BLOCKS = 2 + NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5)) + + request = create_request(request_id=1, + num_tokens=NUM_TOKENS, + do_remote_prefill=True) + request_id = request.request_id + + scheduler.add_request(request) + + # Remote Prefill, triggers NixlConnectorMetdata. + scheduler_output = scheduler.schedule() + kv_connector_metadata = scheduler_output.kv_connector_metadata + assert kv_connector_metadata is not None + assert isinstance(kv_connector_metadata, NixlConnectorMetadata) + + assert len(kv_connector_metadata.requests) == 1 + assert request_id in kv_connector_metadata.requests + req_meta = kv_connector_metadata.requests[request_id] + + for block_id, block in zip( + req_meta.local_block_ids, + scheduler.kv_cache_manager.req_to_blocks[request_id]): + assert block_id == block.block_id diff --git a/tests/v1/kv_connector/test_remote_decode_scheduler.py b/tests/v1/kv_connector/test_remote_decode_scheduler.py new file mode 100644 index 000000000000..f02462eae264 --- /dev/null +++ b/tests/v1/kv_connector/test_remote_decode_scheduler.py @@ -0,0 +1,259 @@ +# SPDX-License-Identifier: Apache-2.0 +import copy +from typing import Optional + +from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT, ModelRunnerOutput +from vllm.v1.request import RequestStatus, Request + +from .utils import create_request, create_scheduler, create_vllm_config + +def test_basic_remote_prefill_cycle(): + """Test Remote Prefills Lifecycle.""" + + vllm_config = create_vllm_config() + scheduler = create_scheduler(vllm_config) + + # 2 Full Blocks and 1 Half Block. + BLOCK_SIZE = vllm_config.cache_config.block_size + NUM_EXTERNAL_FULL_BLOCKS = 2 + NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5)) + START_FREE_BLOCK_QUEUE_SIZE = ( + scheduler.kv_cache_manager.block_pool.free_block_queue.num_free_blocks) + + request = create_request(request_id=1, + num_tokens=NUM_TOKENS, + do_remote_prefill=True) + + scheduler.add_request(request) + request_id = request.request_id + + # STEP (1): + # (1a): schedule() + scheduler_output = scheduler.schedule() + + # Nothing running and empty scheduler output. + assert len(scheduler.running) == 0 + assert len(scheduler_output.scheduled_new_reqs) == 0 + assert len(scheduler_output.scheduled_cached_reqs) == 0 + assert len(scheduler_output.num_scheduled_tokens) == 0 + assert scheduler_output.total_num_scheduled_tokens == 0 + + # Req waiting for KVs with no computed + # or scheduled tokens. + assert len(scheduler.waiting) == 1 + assert request in scheduler.waiting + assert (request.status == RequestStatus.WAITING_FOR_REMOTE_KVS) + assert (request.num_computed_tokens == 0) + + # ... but should have (uncached) blocks allocated to it. + block_pool = scheduler.kv_cache_manager.block_pool + assert (block_pool.free_block_queue.num_free_blocks + < START_FREE_BLOCK_QUEUE_SIZE) + assert len(block_pool.cached_block_hash_to_block) == 0 + for block in scheduler.kv_cache_manager.req_to_blocks[request_id]: + assert block._block_hash is None + + # (1b): forward() + model_runner_output = EMPTY_MODEL_RUNNER_OUTPUT + + # (1c): update_from_output() + engine_core_outputs = scheduler.update_from_output( + scheduler_output, model_runner_output) + assert len(engine_core_outputs.outputs) == 0 + + # STEP (2): + # (2a): schedule(): nothing happens! + scheduler_output = scheduler.schedule() + assert len(scheduler.waiting) == 1 + assert len(scheduler.running) == 0 + + # (2b): forward(): request finishes recv. + model_runner_output = copy.deepcopy( + EMPTY_MODEL_RUNNER_OUTPUT) + model_runner_output.finished_recving = [request_id] + + # (2c): update_from_output(): + engine_core_outputs = scheduler.update_from_output( + scheduler_output, model_runner_output) + assert len(scheduler.waiting) == 1 + assert (request_id in scheduler.finished_recving_KV_req_ids) + + # (3a): schedule(): this should actually schedule. + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 1 + + # Confirm the block are actually allocated. + num_hashed_blocks = 0 + for block in scheduler.kv_cache_manager.req_to_blocks[request_id]: + assert block.ref_cnt == 1 + num_hashed_blocks += (1 if block._block_hash is not None else 0) + assert num_hashed_blocks == NUM_EXTERNAL_FULL_BLOCKS + + # Confirm the rest of the prompt is scheduled in this step. + scheduled_req = scheduler_output.scheduled_new_reqs[0] + num_scheduled_tokens = scheduler_output.num_scheduled_tokens[request_id] + num_computed_tokens = scheduled_req.num_computed_tokens + total_prompt_tokens = len(scheduled_req.prompt_token_ids) + assert (num_scheduled_tokens == total_prompt_tokens - num_computed_tokens) + + +def test_interleaved_remote_prefill_cycle(): + """Test Remote Prefills Work Well With Other Requests.""" + + vllm_config = create_vllm_config() + scheduler = create_scheduler(vllm_config) + + # 2 Full Blocks and 1 Half Block. + BLOCK_SIZE = vllm_config.cache_config.block_size + NUM_EXTERNAL_FULL_BLOCKS = 2 + NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5)) + + request_remote = create_request( + request_id=1, + num_tokens=NUM_TOKENS, + do_remote_prefill=True + ) + request_local_a = create_request( + request_id=2, + num_tokens=NUM_TOKENS, + ) + request_local_b = create_request( + request_id=3, + num_tokens=NUM_TOKENS, + ) + + # STEP 1: Regular request is running. + scheduler.add_request(request_local_a) + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 1 + + model_runner_output = make_model_runner_output( + [request_local_a]) + scheduler.update_from_output(scheduler_output, + model_runner_output) + + # STEP 2: Add a local and remote request. + scheduler.add_request(request_local_b) + scheduler.add_request(request_remote) + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 2 + assert len(scheduler.waiting) == 1 + assert len(scheduler_output.scheduled_new_reqs) == 1 + assert len(scheduler_output.scheduled_cached_reqs) == 1 + + model_runner_output = make_model_runner_output( + [request_local_a, request_local_b]) + scheduler.update_from_output(scheduler_output, + model_runner_output) + + # STEP 3: continue running, KVs not arrived yet. + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 2 + assert len(scheduler.waiting) == 1 + assert len(scheduler_output.scheduled_new_reqs) == 0 + assert len(scheduler_output.scheduled_cached_reqs) == 2 + + model_runner_output = make_model_runner_output( + reqs=[request_local_a, request_local_b]) + scheduler.update_from_output(scheduler_output, + model_runner_output) + assert len(scheduler.running) == 2 + assert len(scheduler.waiting) == 1 + assert len(scheduler_output.scheduled_new_reqs) == 0 + assert len(scheduler_output.scheduled_cached_reqs) == 2 + + # STEP 4: KVs arrive. + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 2 + assert len(scheduler.waiting) == 1 + assert len(scheduler_output.scheduled_new_reqs) == 0 + assert len(scheduler_output.scheduled_cached_reqs) == 2 + + model_runner_output = make_model_runner_output( + [request_local_a, request_local_b], + finished_recving=[request_remote.request_id] + ) + scheduler.update_from_output(scheduler_output, + model_runner_output) + + # STEP 5: RECVed KVs are sent to ModelRunner. + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 3 + assert len(scheduler.waiting) == 0 + assert len(scheduler_output.scheduled_new_reqs) == 1 + assert len(scheduler_output.scheduled_cached_reqs) == 2 + + +def test_remote_prefill_no_prefix_cache_uncomputed_blocks(): + """ + With P/D, blocks can be allocated but uncomputed for + multiple engine steps. This test confirms that we do + not accidentally have cache hits against uncomputed + blocks. + """ + + vllm_config = create_vllm_config() + scheduler = create_scheduler(vllm_config) + + vllm_config = create_vllm_config() + scheduler = create_scheduler(vllm_config) + + # 2 and a half full external blocks. + BLOCK_SIZE = vllm_config.cache_config.block_size + NUM_EXTERNAL_FULL_BLOCKS = 2 + NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5)) + + # Both of these requests have prompts like [1,1,1,1,1, ...] + request_remote = create_request( + request_id=1, + num_tokens=NUM_TOKENS, + do_remote_prefill=True, + use_all_1s_for_prompt_tokens=True, + ) + + request_local = create_request( + request_id=2, + num_tokens=NUM_TOKENS, + do_remote_prefill=False, + use_all_1s_for_prompt_tokens=True, + ) + + # Schedule the remote prefill request. This should not + # cause any blocks to be cached. + scheduler.add_request(request_remote) + scheduler_output = scheduler.schedule() + scheduler.update_from_output( + scheduler_output, + EMPTY_MODEL_RUNNER_OUTPUT + ) + assert len(scheduler.waiting) == 1 + + # Schedule the local prefill request. This should + # cause blocks to be cached, but separately from + scheduler.add_request(request_local) + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 1 + assert len(scheduler.waiting) == 1 + + local_blocks = scheduler.kv_cache_manager.req_to_blocks[request_local.request_id] + remote_blocks = scheduler.kv_cache_manager.req_to_blocks[request_remote.request_id] + + # Local should have cached blocks (but not all due to preallocate). + num_hashed_blocks = 0 + for block in local_blocks: + assert block.ref_cnt == 1 + num_hashed_blocks += ( + 1 if block._block_hash is not None else 0) + assert num_hashed_blocks > 0 + + # Remote blocks should not be cached. + for block in remote_blocks: + assert block.ref_cnt == 1 + assert block._block_hash is None + + +def test_remote_prefill_no_blocks_available(): + """ + letTest whether we properly handle no blocks available + """ + pass \ No newline at end of file diff --git a/tests/v1/kv_connector/test_remote_prefill_scheduler.py b/tests/v1/kv_connector/test_remote_prefill_scheduler.py new file mode 100644 index 000000000000..98a65904e7ea --- /dev/null +++ b/tests/v1/kv_connector/test_remote_prefill_scheduler.py @@ -0,0 +1,260 @@ +# SPDX-License-Identifier: Apache-2.0 +import copy +from typing import Optional + +from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT +from vllm.v1.request import RequestStatus, Request + +from .utils import (create_request, create_scheduler, + create_vllm_config, create_model_runner_output) + +def test_basic_remote_prefill_cycle(): + """Test Remote Prefills Lifecycle.""" + + vllm_config = create_vllm_config() + scheduler = create_scheduler(vllm_config) + + # 2 Full Blocks and 1 Half Block. + BLOCK_SIZE = vllm_config.cache_config.block_size + NUM_EXTERNAL_FULL_BLOCKS = 2 + NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5)) + START_FREE_BLOCK_QUEUE_SIZE = ( + scheduler.kv_cache_manager.block_pool.free_block_queue.num_free_blocks) + + request = create_request(request_id=1, + num_tokens=NUM_TOKENS, + do_remote_prefill=True) + + scheduler.add_request(request) + request_id = request.request_id + + # STEP (1): + # (1a): schedule() + scheduler_output = scheduler.schedule() + + # Nothing running and empty scheduler output. + assert len(scheduler.running) == 0 + assert len(scheduler_output.scheduled_new_reqs) == 0 + assert len(scheduler_output.scheduled_cached_reqs) == 0 + assert len(scheduler_output.num_scheduled_tokens) == 0 + assert scheduler_output.total_num_scheduled_tokens == 0 + + # Req waiting for KVs with no computed + # or scheduled tokens. + assert len(scheduler.waiting) == 1 + assert request in scheduler.waiting + assert (request.status == RequestStatus.WAITING_FOR_REMOTE_KVS) + assert (request.num_computed_tokens == 0) + + # ... but should have (uncached) blocks allocated to it. + block_pool = scheduler.kv_cache_manager.block_pool + assert (block_pool.free_block_queue.num_free_blocks + < START_FREE_BLOCK_QUEUE_SIZE) + assert len(block_pool.cached_block_hash_to_block) == 0 + for block in scheduler.kv_cache_manager.req_to_blocks[request_id]: + assert block._block_hash is None + + # (1b): forward() + model_runner_output = EMPTY_MODEL_RUNNER_OUTPUT + + # (1c): update_from_output() + engine_core_outputs = scheduler.update_from_output( + scheduler_output, model_runner_output) + assert len(engine_core_outputs.outputs) == 0 + + # STEP (2): + # (2a): schedule(): nothing happens! + scheduler_output = scheduler.schedule() + assert len(scheduler.waiting) == 1 + assert len(scheduler.running) == 0 + + # (2b): forward(): request finishes recv. + model_runner_output = copy.deepcopy( + EMPTY_MODEL_RUNNER_OUTPUT) + model_runner_output.finished_recving = [request_id] + + # (2c): update_from_output(): + engine_core_outputs = scheduler.update_from_output( + scheduler_output, model_runner_output) + assert len(scheduler.waiting) == 1 + assert (request_id in scheduler.finished_recving_KV_req_ids) + + # (3a): schedule(): this should actually schedule. + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 1 + + # Confirm the block are actually allocated. + num_hashed_blocks = 0 + for block in scheduler.kv_cache_manager.req_to_blocks[request_id]: + assert block.ref_cnt == 1 + num_hashed_blocks += (1 if block._block_hash is not None else 0) + assert num_hashed_blocks == NUM_EXTERNAL_FULL_BLOCKS + + # Confirm the rest of the prompt is scheduled in this step. + scheduled_req = scheduler_output.scheduled_new_reqs[0] + num_scheduled_tokens = scheduler_output.num_scheduled_tokens[request_id] + num_computed_tokens = scheduled_req.num_computed_tokens + total_prompt_tokens = len(scheduled_req.prompt_token_ids) + assert (num_scheduled_tokens == total_prompt_tokens - num_computed_tokens) + + +def test_interleaved_remote_prefill_cycle(): + """Test Remote Prefills Work Well With Other Requests.""" + + vllm_config = create_vllm_config() + scheduler = create_scheduler(vllm_config) + + # 2 Full Blocks and 1 Half Block. + BLOCK_SIZE = vllm_config.cache_config.block_size + NUM_EXTERNAL_FULL_BLOCKS = 2 + NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5)) + + request_remote = create_request( + request_id=1, + num_tokens=NUM_TOKENS, + do_remote_prefill=True + ) + request_local_a = create_request( + request_id=2, + num_tokens=NUM_TOKENS, + ) + request_local_b = create_request( + request_id=3, + num_tokens=NUM_TOKENS, + ) + + # STEP 1: Regular request is running. + scheduler.add_request(request_local_a) + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 1 + + model_runner_output = create_model_runner_output( + [request_local_a]) + scheduler.update_from_output(scheduler_output, + model_runner_output) + + # STEP 2: Add a local and remote request. + scheduler.add_request(request_local_b) + scheduler.add_request(request_remote) + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 2 + assert len(scheduler.waiting) == 1 + assert len(scheduler_output.scheduled_new_reqs) == 1 + assert len(scheduler_output.scheduled_cached_reqs) == 1 + + model_runner_output = create_model_runner_output( + [request_local_a, request_local_b]) + scheduler.update_from_output(scheduler_output, + model_runner_output) + + # STEP 3: continue running, KVs not arrived yet. + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 2 + assert len(scheduler.waiting) == 1 + assert len(scheduler_output.scheduled_new_reqs) == 0 + assert len(scheduler_output.scheduled_cached_reqs) == 2 + + model_runner_output = create_model_runner_output( + reqs=[request_local_a, request_local_b]) + scheduler.update_from_output(scheduler_output, + model_runner_output) + assert len(scheduler.running) == 2 + assert len(scheduler.waiting) == 1 + assert len(scheduler_output.scheduled_new_reqs) == 0 + assert len(scheduler_output.scheduled_cached_reqs) == 2 + + # STEP 4: KVs arrive. + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 2 + assert len(scheduler.waiting) == 1 + assert len(scheduler_output.scheduled_new_reqs) == 0 + assert len(scheduler_output.scheduled_cached_reqs) == 2 + + model_runner_output = create_model_runner_output( + [request_local_a, request_local_b], + finished_recving=[request_remote.request_id] + ) + scheduler.update_from_output(scheduler_output, + model_runner_output) + + # STEP 5: RECVed KVs are sent to ModelRunner. + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 3 + assert len(scheduler.waiting) == 0 + assert len(scheduler_output.scheduled_new_reqs) == 1 + assert len(scheduler_output.scheduled_cached_reqs) == 2 + + +def test_remote_prefill_no_prefix_cache_uncomputed_blocks(): + """ + With P/D, blocks can be allocated but uncomputed for + multiple engine steps. This test confirms that we do + not accidentally have cache hits against uncomputed + blocks. + """ + + vllm_config = create_vllm_config() + scheduler = create_scheduler(vllm_config) + + vllm_config = create_vllm_config() + scheduler = create_scheduler(vllm_config) + + # 2 and a half full external blocks. + BLOCK_SIZE = vllm_config.cache_config.block_size + NUM_EXTERNAL_FULL_BLOCKS = 2 + NUM_TOKENS = int(BLOCK_SIZE * (NUM_EXTERNAL_FULL_BLOCKS + 0.5)) + + # Both of these requests have prompts like [1,1,1,1,1, ...] + request_remote = create_request( + request_id=1, + num_tokens=NUM_TOKENS, + do_remote_prefill=True, + use_all_1s_for_prompt_tokens=True, + ) + + request_local = create_request( + request_id=2, + num_tokens=NUM_TOKENS, + do_remote_prefill=False, + use_all_1s_for_prompt_tokens=True, + ) + + # Schedule the remote prefill request. This should not + # cause any blocks to be cached. + scheduler.add_request(request_remote) + scheduler_output = scheduler.schedule() + scheduler.update_from_output( + scheduler_output, + EMPTY_MODEL_RUNNER_OUTPUT + ) + assert len(scheduler.waiting) == 1 + + # Schedule the local prefill request. This should + # cause blocks to be cached, but separately from + scheduler.add_request(request_local) + scheduler_output = scheduler.schedule() + assert len(scheduler.running) == 1 + assert len(scheduler.waiting) == 1 + + local_blocks = scheduler.kv_cache_manager.req_to_blocks[request_local.request_id] + remote_blocks = scheduler.kv_cache_manager.req_to_blocks[request_remote.request_id] + + # Local should have cached blocks (but not all due to preallocate). + num_hashed_blocks = 0 + for block in local_blocks: + assert block.ref_cnt == 1 + num_hashed_blocks += ( + 1 if block._block_hash is not None else 0) + assert num_hashed_blocks > 0 + + # Remote blocks should not be cached. + for block in remote_blocks: + assert block.ref_cnt == 1 + assert block._block_hash is None + + +def test_remote_prefill_no_blocks_available(): + """ + letTest whether we properly handle no blocks available + """ + pass \ No newline at end of file diff --git a/tests/v1/kv_connector/utils.py b/tests/v1/kv_connector/utils.py new file mode 100644 index 000000000000..4fdc5beef78a --- /dev/null +++ b/tests/v1/kv_connector/utils.py @@ -0,0 +1,159 @@ +# SPDX-License-Identifier: Apache-2.0 +from typing import Optional +import torch + +from vllm.config import (CacheConfig, DeviceConfig, KVTransferConfig, + ModelConfig, SchedulerConfig, VllmConfig) +from vllm.sampling_params import KVTransferParams, SamplingParams +from vllm.v1.core.sched.scheduler import Scheduler +from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheConfig, + KVCacheGroupSpec) +from vllm.v1.outputs import ModelRunnerOutput +from vllm.v1.request import Request +from vllm.v1.structured_output import StructuredOutputManager +from vllm.v1.worker.gpu_model_runner import GPUModelRunner + +EOS_TOKEN_ID = 50256 + +def create_vllm_config( + model: str = "facebook/opt-125m", + max_num_seqs: int = 16, + max_num_batched_tokens: int = 64, + block_size: int = 16, +) -> VllmConfig: + scheduler_config = SchedulerConfig( + max_num_seqs=max_num_seqs, + max_num_batched_tokens=max_num_batched_tokens, + max_model_len=max_num_batched_tokens, + ) + model_config = ModelConfig( + model=model, + task="auto", + tokenizer=model, + tokenizer_mode="auto", + trust_remote_code=True, + dtype="float16", + seed=42, + ) + # Cache config, optionally force APC + cache_config = CacheConfig( + block_size=block_size, + gpu_memory_utilization=0.9, + swap_space=0, + cache_dtype="auto", + enable_prefix_caching=True, + ) + kv_transfer_config = KVTransferConfig( + kv_connector="NixlConnector", + kv_role="kv_both", + ) + return VllmConfig( + scheduler_config=scheduler_config, + model_config=model_config, + cache_config=cache_config, + kv_transfer_config=kv_transfer_config, + device_config=DeviceConfig("cpu") + ) + + +def create_model_runner( + vllm_config: VllmConfig, + device: torch.device, +) -> GPUModelRunner: + return GPUModelRunner( + vllm_config=vllm_config, + device=device, + ) + + +def create_scheduler( + vllm_config: VllmConfig, + num_blocks: int = 10000, +) -> Scheduler: + block_size = vllm_config.cache_config.block_size + kv_cache_config = KVCacheConfig( + num_blocks=num_blocks, # A large number of blocks to hold all requests + tensors={}, + kv_cache_groups=[ + KVCacheGroupSpec(['layer'], + FullAttentionSpec(block_size, 1, 1, torch.float32, + False)) + ], + ) + vllm_config.cache_config.num_gpu_blocks = num_blocks + return Scheduler( + vllm_config=vllm_config, + kv_cache_config=kv_cache_config, + log_stats=True, + structured_output_manager=StructuredOutputManager(vllm_config), + ) + + +def create_request( + request_id: int, + num_tokens: int = 10, + max_tokens: int = 16, + do_remote_decode: bool = False, + do_remote_prefill: bool = False, + use_all_1s_for_prompt_tokens: bool = False, +) -> list[Request]: + if do_remote_decode: + assert not do_remote_prefill + kv_transfer_params = KVTransferParams( + do_remote_prefill=True + ) + elif do_remote_prefill: + kv_transfer_params = KVTransferParams( + do_remote_prefill=True, + remote_engine_id="abc", + remote_block_ids=[1, 2, 3], + ) + else: + kv_transfer_params = None + + sampling_params = SamplingParams( + max_tokens=max_tokens, + kv_transfer_params=kv_transfer_params, + ) + + + if use_all_1s_for_prompt_tokens: + prompt_token_ids = [1] * num_tokens + else: + prompt_token_ids = [ + i * request_id for i in range(num_tokens) + ] + + return Request( + request_id=f"id-{request_id}", + prompt=None, + prompt_token_ids=prompt_token_ids, + sampling_params=sampling_params, + multi_modal_inputs=None, + multi_modal_placeholders=None, + multi_modal_hashes=None, + eos_token_id=EOS_TOKEN_ID, + arrival_time=0, + ) + +def create_model_runner_output( + reqs: list[Request], + finished_sending: Optional[list[str]] = None, + finished_recving: Optional[list[str]] = None, +) -> ModelRunnerOutput: + req_ids = [req.request_id for req in reqs] + req_id_to_index = { + req_id: idx for idx, req_id in enumerate(req_ids) + } + sampled_token_ids = [[0] for _ in req_ids] + + return ModelRunnerOutput( + req_ids=req_ids, + req_id_to_index=req_id_to_index, + sampled_token_ids=sampled_token_ids, + spec_token_ids=None, + logprobs=None, + prompt_logprobs_dict={}, + finished_sending=finished_sending, + finished_recving=finished_recving, + ) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/base.py b/vllm/distributed/kv_transfer/kv_connector/v1/base.py index 3fd8c3344e2e..95d3dfb7c841 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/base.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/base.py @@ -201,6 +201,7 @@ def get_num_new_matched_tokens( @abstractmethod def update_state_after_alloc(self, request: "Request", + block_ids: list[int], num_external_tokens: int): """ Update KVConnector state after block allocation. diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index e905bc537789..28da7b1ef031 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -47,11 +47,11 @@ class ReqMeta: def __init__( self, - block_ids: list[int], + local_block_ids: list[int], remote_block_ids: list[int], remote_engine_id: list[int], ): - self.block_ids = block_ids + self.local_block_ids = local_block_ids self.remote_block_ids = remote_block_ids self.remote_engine_id = remote_engine_id @@ -63,13 +63,13 @@ def __init__(self): def add_new_req( self, - req_id: str, - block_ids: list[int], + request_id: str, + local_block_ids: list[int], kv_transfer_params: KVTransferParams, ): - assert req_id not in self.requests - self.requests[req_id] = ReqMeta( - block_ids, + assert request_id not in self.requests + self.requests[request_id] = ReqMeta( + local_block_ids=local_block_ids, remote_block_ids=kv_transfer_params.remote_block_ids, remote_engine_id=kv_transfer_params.remote_engine_id) @@ -97,10 +97,11 @@ def get_num_new_matched_tokens(self, request: "Request", request, num_computed_tokens) def update_state_after_alloc(self, request: "Request", + block_ids: list[int], num_external_tokens: int): assert self.connector_scheduler is not None return self.connector_scheduler.update_state_after_alloc( - request, num_external_tokens) + request, block_ids, num_external_tokens) def build_connector_meta( self, @@ -146,6 +147,7 @@ class NixlConnectorScheduler: def __init__(self, vllm_config: VllmConfig, engine_id: str): self.vllm_config = vllm_config + self.block_size = vllm_config.cache_config.block_size self.engine_id = engine_id # Requests that need to start recv. @@ -156,15 +158,31 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): def get_num_new_matched_tokens(self, request: "Request", num_computed_tokens: int) -> int: """For remote prefill, allocate for all tokens.""" + + # NOTE: this function is called in the WAITING loop. + # So we should only have full blocks of computed tokens. + assert num_computed_tokens % self.block_size == 0 + if request.do_remote_prefill: - return len(request.prompt_token_ids) - num_computed_tokens + # NOTE: subtract 1 since we compute the last token + # here so that we can sample the first token. + num_prompt_tokens = len(request.prompt_token_ids) - 1 + + # Round down to a full block shape. + num_external_blocks = num_prompt_tokens // self.block_size + rounded_num_prompt_tokens = num_external_blocks * self.block_size + return max(rounded_num_prompt_tokens - num_computed_tokens, 0) + else: + return 0 def update_state_after_alloc(self, request: "Request", + block_ids: list[int], num_external_tokens: int): if request.do_remote_decode: pass if request.do_remote_prefill and num_external_tokens > 0: - self._reqs_need_recv[request.request_id] = request + self._reqs_need_recv[request.request_id] = ( + request, block_ids) def build_connector_meta( self, @@ -173,18 +191,12 @@ def build_connector_meta( meta = NixlConnectorMetadata() # Loop through scheduled reqs and convert to ReqMeta. - for new_req in scheduler_output.scheduled_new_reqs: - req = self._reqs_need_recv.pop(new_req.req_id, None) - if req is not None: - meta.add_new_req( - request_id=new_req.req_id, - local_block_ids=new_req.block_ids, - kv_transfer_params=req.kv_transfer_params, - ) - - # Invariant: only new requests should need load - # and we should get all new requests each step(). - assert len(self._reqs_need_recv) == 0 + for req_id, (req, block_ids) in self._reqs_need_recv.items(): + meta.add_new_req( + request_id=req_id, + local_block_ids=block_ids, + kv_transfer_params=req.kv_transfer_params, + ) return meta @@ -198,7 +210,8 @@ def __init__(self, engine_id: str): logger.info("Initializing NIXL wrapper") # Agent. - self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), None) + # self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), None) + self.nixl_wrapper = None # Map of engine_id -> list[agent_names] (1 per rank). self._remote_agents: dict[str, list[str]] = {} @@ -260,8 +273,6 @@ def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]): (key_cache.data_ptr(), value_cache.data_ptr())) self.kv_caches_base_addr[self.engine_id] = kv_caches_base_addr - print(f"{len(self.kv_caches_base_addr[self.engine_id])=}") - print(f"{self.kv_caches_base_addr[self.engine_id][0]=}") descs = self.nixl_wrapper.get_reg_descs(caches_data, "VRAM") logger.debug("Registering descs: %s", caches_data) @@ -331,7 +342,9 @@ def add_remote_agent(self, nixl_agent_meta: NixlAgentMetadata): for layer_id in range(self.num_layers): # Both K and V. print(f"{len(self.kv_caches_base_addr[self.engine_id])=}") - print(f"{len(self.kv_caches_base_addr[self.engine_id][layer_id])=}") + print( + f"{len(self.kv_caches_base_addr[self.engine_id][layer_id])=}" + ) print(f"{self.kv_caches_base_addr[self.engine_id][layer_id]=}") for base_addr in self.kv_caches_base_addr[ self.engine_id][layer_id]: @@ -360,16 +373,16 @@ def add_remote_agent(self, nixl_agent_meta: NixlAgentMetadata): block_offset = block_id * dst_block_len blocks_data.append( (base_addr + block_offset, dst_block_len, - self.rank * tp_multiplier)) + self.rank * tp_multiplier)) logger.debug("Created %s blocks for dst engine %s and rank %s", - len(blocks_data), engine_id, - self.rank * tp_multiplier + i) + len(blocks_data), engine_id, + self.rank * tp_multiplier + i) # Register with NIXL. descs = self.nixl_wrapper.get_xfer_descs(blocks_data, "VRAM") self.dst_xfer_side_handles[engine_id][i] = ( self.nixl_wrapper.prep_xfer_dlist( - self._remote_agents[engine_id][self.rank * tp_multiplier + - i], descs)) + self._remote_agents[engine_id][self.rank * tp_multiplier + i], + descs)) def get_finished(self) -> tuple[set[str], set[str]]: """Get requests that are done sending or recving.""" @@ -377,16 +390,15 @@ def get_finished(self) -> tuple[set[str], set[str]]: done_recving = self._pop_done_transfers(self._recving_transfers) return done_sending, done_recving - def _get_new_notifs(self) -> set[str]: + def _get_new_notifs(self) -> list[str]: """Get req_ids which got a remote xfer message.""" - notified_req_ids: set[str] = set() + notified_req_ids: list[str] = [] # TODO: handle the TP case (N notifies for TP=N). # See: vllm/worker/worker_base.py L476 in DynamoPR. for req_ids in self.nixl_wrapper.get_new_notifs().values(): for req_id in req_ids: - assert req_id not in notified_req_ids - notified_req_ids.add(req_id) + notified_req_ids.append(req_id) return notified_req_ids def _pop_done_transfers(self, transfers: dict[str, list[str]]) -> set[str]: @@ -397,7 +409,7 @@ def _pop_done_transfers(self, transfers: dict[str, list[str]]) -> set[str]: Returns: set of req_ids that have all done xfers """ - done_req_ids: str[str] = set() + done_req_ids: list[str] = [] for req_id, handles in transfers.items(): running_reqs = [] for handle in handles: @@ -412,7 +424,7 @@ def _pop_done_transfers(self, transfers: dict[str, list[str]]) -> set[str]: raise RuntimeError("Transfer failed with state %s", xfer_state) if len(running_reqs) == 0: - done_req_ids.add(req_id) + done_req_ids.append(req_id) else: transfers[req_id] = running_reqs return done_req_ids diff --git a/vllm/platforms/cpu.py b/vllm/platforms/cpu.py index 70553354a060..47a48126ed5c 100644 --- a/vllm/platforms/cpu.py +++ b/vllm/platforms/cpu.py @@ -43,7 +43,8 @@ def get_attn_backend_cls(cls, selected_backend: _Backend, head_size: int, logger.info("Using CPU MLA backend.") return "vllm.attention.backends.cpu_mla.CPUMLABackend" logger.info("Using Torch SDPA backend.") - return "vllm.attention.backends.torch_sdpa.TorchSDPABackend" + return "vllm.v1.attention.backends.flash_attn.FlashAttentionBackend" + # return "vllm.attention.backends.torch_sdpa.TorchSDPABackend" @classmethod def get_device_total_memory(cls, device_id: int = 0) -> int: diff --git a/vllm/v1/core/kv_cache_manager.py b/vllm/v1/core/kv_cache_manager.py index 33bb825a11a7..4c0c56a3a96f 100644 --- a/vllm/v1/core/kv_cache_manager.py +++ b/vllm/v1/core/kv_cache_manager.py @@ -166,10 +166,6 @@ def get_computed_blocks( num_computed_tokens = len(computed_blocks) * self.block_size return computed_blocks, num_computed_tokens - def cache_blocks(self, request: Request): - # TODO: implement this. - pass - def allocate_slots( self, request: Request, @@ -284,17 +280,41 @@ def allocate_slots( req_blocks.extend(new_blocks) if not self.enable_caching or skip_cache_blocks: + # If self.enable_caching, this is true since can only + # get to this codepath when we have never been scheduled. + assert request.request_id not in self.num_cached_block return new_blocks + self.cache_blocks( + request=request, + num_tokens=num_tokens, + num_computed_tokens=num_computed_tokens, + new_computed_blocks=new_computed_blocks, + ) + return new_blocks + + def cache_blocks( + self, + request: Request, + num_tokens: int, + num_computed_tokens: int, + new_computed_blocks: Optional[list[KVCacheBlock]] = None, + ): + if new_computed_blocks is None: + new_computed_blocks = [] + + req_blocks = self.req_to_blocks[request.request_id] + # Use `new_computed_blocks` for a new request, and `num_cached_block` # for a running request. num_cached_blocks = self.num_cached_block.get(request.request_id, len(new_computed_blocks)) - # Speculated tokens might be rejected in the future, so we does + + # Speculated tokens might be rejected in the future, so we do # not cache any speculated tokens. We only cache blocks with # generated (accepted) tokens. - num_full_blocks_after_append = (num_computed_tokens + num_tokens - len( - request.spec_token_ids)) // self.block_size + num_full_blocks_after_append = ( + num_computed_tokens + num_tokens - len(request.spec_token_ids)) // self.block_size self.block_pool.cache_full_blocks( request=request, @@ -308,7 +328,6 @@ def allocate_slots( self.num_cached_block[ request.request_id] = num_full_blocks_after_append - return new_blocks def free(self, request: Request) -> None: """Free the blocks allocated for the request. diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 995bc7512e22..ac27a96a855a 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -32,7 +32,6 @@ logger = init_logger(__name__) - class Scheduler(SchedulerInterface): def __init__( @@ -100,8 +99,7 @@ def __init__( self.finished_req_ids: set[str] = set() # Requests in states for tracking KV transfers for P/D disagg - self.sending_KV_req_ids: set[str] = set() - self.recving_KV_req_ids: set[str] = set() + self.finished_recving_KV_req_ids: set[str] = set() # OPTIMIZATION: Cache the CachedRequestData objects to avoid creating # them at each scheduling step. @@ -176,11 +174,6 @@ def schedule(self) -> SchedulerOutput: req_index = 0 while req_index < len(self.running) and token_budget > 0: request = self.running[req_index] - if (request.request_id in self.recving_KV_req_ids - or request.request_id in self.sending_KV_req_ids): - # P/D: This request is still recv/sending KVs. - req_index += 1 - continue if request.request_id in self.scheduled_req_ids: # This request has already been scheduled. req_index += 1 @@ -223,11 +216,6 @@ def schedule(self) -> SchedulerOutput: # The request cannot be scheduled. # Preempt the lowest-priority request. preempted_req = self.running.pop() - # NOTE(rob): we cannot free these blocks once in flight. - # TODO(rob): understand full implications of this. - if preempted_req.request_id in self.recving_KV_req_ids: - pass - self.kv_cache_manager.free(preempted_req) preempted_req.status = RequestStatus.PREEMPTED preempted_req.num_computed_tokens = 0 @@ -305,6 +293,25 @@ def schedule(self) -> SchedulerOutput: request = self.waiting[0] + # Skip request if the remote KV recv is still waiting + # for the requests to arrive. + if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: + if request.request_id in self.finished_recving_KV_req_ids: + assert self.kv_cache_manager.enable_caching + # Now that the KVs have been recved, we can cache + # them and set num_computed_tokens. + self.kv_cache_manager.cache_blocks( + request, + num_tokens=0, + num_computed_tokens=(len(request.all_token_ids) - 1) + ) + request.status = RequestStatus.WAITING + self.kv_cache_manager.free(request) + else: + self.waiting.popleft() + skipped_waiting_requests.appendleft(request) + continue + # Skip request if the structured output request is still waiting # for FSM compilation. if request.status == RequestStatus.WAITING_FOR_FSM: @@ -340,49 +347,7 @@ def schedule(self) -> SchedulerOutput: # Total computed tokens (local + external). num_computed_tokens += num_external_tokens - # TODO: how can we make this code clean? - if not request.do_remote_prefill: - - # Number of tokens to be scheduled. - # We use `request.num_tokens` instead of - # `request.num_prompt_tokens` to consider the resumed reqs, - # which have output tokens. - num_new_tokens = request.num_tokens - num_computed_tokens - if (0 < self.scheduler_config.long_prefill_token_threshold - < num_new_tokens): - num_new_tokens = ( - self.scheduler_config.long_prefill_token_threshold) - num_new_tokens = min(num_new_tokens, token_budget) - assert num_new_tokens > 0 - - # Schedule encoder inputs. - if request.has_encoder_inputs: - (encoder_inputs_to_schedule, num_new_tokens, - new_encoder_budget - ) = self._try_schedule_encoder_inputs( - request, num_computed_tokens, num_new_tokens, - encoder_budget) - if num_new_tokens == 0: - # The request cannot be scheduled. - break - else: - encoder_inputs_to_schedule = None - new_encoder_budget = encoder_budget - - new_blocks = self.kv_cache_manager.allocate_slots( - request, num_new_tokens + num_external_tokens, - computed_blocks) - if new_blocks is None: - # The request cannot be scheduled. - break - else: - # TODO: handle preempted state. - assert request.status != RequestStatus.PREEMPTED - assert self.connector is not None - - # Schedule 0 tokens until the recv is done. - num_new_tokens = 0 - + if (request.do_remote_prefill and num_external_tokens > 0): # Allocate slots for the external tokens, but skip # caching until after the KV transfer is done. new_blocks = self.kv_cache_manager.allocate_slots( @@ -391,9 +356,56 @@ def schedule(self) -> SchedulerOutput: computed_blocks, skip_cache_blocks=True) if new_blocks is None: - # Request cannot be scheduled. + # Requests cannot be scheduled + break + + self.waiting.popleft() + skipped_waiting_requests.appendleft(request) + request.status = RequestStatus.WAITING_FOR_REMOTE_KVS + + # KVConnector: update internal state after allocation. + # This information is used to determine if a load is + # needed for this request. + if self.connector is not None: + self.connector.update_state_after_alloc( + request, + [b.block_id for b in computed_blocks + new_blocks], + num_external_tokens, + ) + continue + + # Number of tokens to be scheduled. + # We use `request.num_tokens` instead of + # `request.num_prompt_tokens` to consider the resumed reqs, + # which have output tokens. + num_new_tokens = request.num_tokens - num_computed_tokens + if (0 < self.scheduler_config.long_prefill_token_threshold + < num_new_tokens): + num_new_tokens = ( + self.scheduler_config.long_prefill_token_threshold) + num_new_tokens = min(num_new_tokens, token_budget) + assert num_new_tokens > 0 + + # Schedule encoder inputs. + if request.has_encoder_inputs: + (encoder_inputs_to_schedule, num_new_tokens, + new_encoder_budget + ) = self._try_schedule_encoder_inputs( + request, num_computed_tokens, num_new_tokens, + encoder_budget) + if num_new_tokens == 0: + # The request cannot be scheduled. break - self.recving_KV_req_ids.add(request.request_id) + else: + encoder_inputs_to_schedule = None + new_encoder_budget = encoder_budget + + new_blocks = self.kv_cache_manager.allocate_slots( + request, num_new_tokens + num_external_tokens, + computed_blocks) + if new_blocks is None: + # The request cannot be scheduled. + break # KVConnector: update internal state after allocation. # This information is used to determine if a load is @@ -401,6 +413,7 @@ def schedule(self) -> SchedulerOutput: if self.connector is not None: self.connector.update_state_after_alloc( request, + [b.block_id for b in computed_blocks + new_blocks], num_external_tokens, ) @@ -752,15 +765,20 @@ def update_from_output( # inside AsyncLLM. if request.do_remote_decode and not stopped: request.status = RequestStatus.FINISHED_REMOTE_DECODE - self.sending_KV_req_ids.add(req_id) + self._free_request(request, skip_free_blocks=True) + # TODO(rob): do this on a per-Connector basis. - # From POV of DWorker, this is a remote prefill. + remote_blocks = [ + block.block_id for block in + self.kv_cache_manager.req_to_blocks[request.request_id] + ] + kv_transfer_params = KVTransferParams( do_remote_prefill=True, # put the remote block ids here - remote_block_ids=[1, 2, 3], + remote_block_ids=remote_blocks, # put the enigne id here - remote_engine_id="abcdefg", + remote_engine_id=self.connector.engine_id, ) # Add EngineCoreOutput for this Request. @@ -785,13 +803,10 @@ def update_from_output( new_running.append(request) # P/D: update recv and send status from last step. - for req_id in list(model_runner_output.finished_recving): - # TODO(rob): Implement this method. - # Cache blocks for APC after KVs have been recv'ed. - self.kv_cache_manager.cache_blocks(req_id) - self.recving_KV_req_ids.remove(req_id) - for req_id in list(model_runner_output.finished_sending): - self._free_request(self.requests[req_id]) + for req_id in (model_runner_output.finished_recving or []): + self.finished_recving_KV_req_ids.add(req_id) + for req_id in (model_runner_output.finished_sending or []): + self._free_blocks(self.requests[req_id]) self.running = new_running engine_core_outputs = EngineCoreOutputs( @@ -841,16 +856,23 @@ def finish_requests( request.status = finished_status self._free_request(request) - def _free_request(self, request: Request) -> None: + def _free_request(self, request: Request, + skip_free_blocks: bool = False) -> None: assert request.is_finished() - self.kv_cache_manager.free(request) - self.kv_cache_manager.free_block_hashes(request) self.encoder_cache_manager.free(request) self._cached_reqs_data.pop(request.request_id, None) - del self.requests[request.request_id] - self.sending_KV_req_ids.discard(request.request_id) self.finished_req_ids.add(request.request_id) + if not skip_free_blocks: + self._free_blocks(request) + + def _free_blocks(self, request: Request): + assert request.is_finished() + assert request.request_id not in self._cached_reqs_data + self.kv_cache_manager.free(request) + self.kv_cache_manager.free_block_hashes(request) + del self.requests[request.request_id] + def get_num_unfinished_requests(self) -> int: return len(self.waiting) + len(self.running) diff --git a/vllm/v1/outputs.py b/vllm/v1/outputs.py index d1eae6a8ba7c..24052e01f006 100644 --- a/vllm/v1/outputs.py +++ b/vllm/v1/outputs.py @@ -101,8 +101,8 @@ class ModelRunnerOutput: prompt_logprobs_dict: dict[str, Optional[LogprobsTensors]] # [req_ids] - finished_sending: set[str] - finished_recving: set[str] + finished_sending: Optional[list[str]] = None + finished_recving: Optional[list[str]] = None EMPTY_MODEL_RUNNER_OUTPUT = ModelRunnerOutput(req_ids=[], @@ -111,5 +111,5 @@ class ModelRunnerOutput: spec_token_ids=None, logprobs=None, prompt_logprobs_dict={}, - finished_sending=set(), - finished_recving=set()) + finished_sending=[], + finished_recving=[]) diff --git a/vllm/v1/request.py b/vllm/v1/request.py index 60b004dc0b2d..bd0d57ee4c8e 100644 --- a/vllm/v1/request.py +++ b/vllm/v1/request.py @@ -159,6 +159,7 @@ class RequestStatus(enum.IntEnum): """Status of a request.""" WAITING = enum.auto() WAITING_FOR_FSM = enum.auto() + WAITING_FOR_REMOTE_KVS = enum.auto() RUNNING = enum.auto() PREEMPTED = enum.auto() # Note: anything after PREEMPTED will be considered diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index c79ecfdfab5d..d463e16cc1b1 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -38,8 +38,8 @@ from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, LogprobsTensors, ModelRunnerOutput) from vllm.v1.sample.metadata import SamplingMetadata -from vllm.v1.sample.rejection_sampler import RejectionSampler -from vllm.v1.spec_decode.eagle import EagleProposer +# from vllm.v1.sample.rejection_sampler import RejectionSampler +# from vllm.v1.spec_decode.eagle import EagleProposer from vllm.v1.spec_decode.metadata import SpecDecodeMetadata from vllm.v1.spec_decode.ngram_proposer import NgramProposer from vllm.v1.spec_decode.utils import is_spec_decode_supported @@ -199,8 +199,9 @@ def __init__( self.vllm_config.compilation_config.cudagraph_capture_sizes)) # Cache the device properties. - self.device_properties = torch.cuda.get_device_properties(self.device) - self.num_sms = self.device_properties.multi_processor_count + # self.device_properties = torch.cuda.get_device_properties(self.device) + # self.num_sms = self.device_properties.multi_processor_count + self.num_sms = 0 # Persistent buffers for CUDA graphs. self.input_ids = torch.zeros(self.max_num_tokens, @@ -1021,13 +1022,12 @@ def maybe_wait_for_save(): kv_connector = get_kv_transfer_group() kv_connector.wait_for_save() - def maybe_get_finished() -> tuple[set[str], set[str]]: + def maybe_get_finished() -> tuple[list[str], list[str]]: if has_kv_transfer_group(): kv_connector = get_kv_transfer_group() return kv_connector.get_finished() else: - # TODO: make this optional instead. - return set(), set() + return [], [] self._update_states(scheduler_output) if not scheduler_output.total_num_scheduled_tokens: