From 1577ca502d594e18793ab2257bdbe0371efa34c0 Mon Sep 17 00:00:00 2001 From: Liran Schour Date: Thu, 14 May 2026 10:38:27 +0300 Subject: [PATCH 1/9] Flush all pendning jobs on last step. Signed-off-by: Liran Schour --- .../offloading_connector/test_scheduler.py | 32 +++++++++++++++++++ .../kv_connector/v1/offloading/scheduler.py | 5 +++ 2 files changed, 37 insertions(+) diff --git a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py index 9a8783b084e0..dfb88dc43853 100644 --- a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py +++ b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py @@ -14,7 +14,9 @@ from vllm.distributed.kv_events import BlockRemoved, BlockStored from vllm.distributed.kv_transfer.kv_connector.v1.offloading.scheduler import ( OffloadingConnectorScheduler, + TransferJobStatus, ) +from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.core.kv_cache_utils import BlockHash from vllm.v1.kv_cache_interface import ( FullAttentionSpec, @@ -919,3 +921,33 @@ def test_complete_store_called_per_job(request_runner, async_scheduling: bool): # Finish: no store pending -> no further call. runner.run(decoded_tokens=[EOS_TOKEN_ID]) assert runner.manager.complete_store.call_count == 0 + + +def test_flush_all_jobs_when_no_requests_remain(request_runner): + """When _req_status is empty, build_connector_meta flushes all pending + jobs since there will be no future step to complete them.""" + runner = request_runner( + block_size=4, + num_gpu_blocks=100, + async_scheduling=False, + block_size_factor=1, + ) + sched = runner.connector_scheduler + + # Simulate: requests already finished, but store jobs still pending + sched._jobs[42] = TransferJobStatus( + req_id="done_req", + pending_count=1, + keys=set(), + is_store=True, + ) + sched._jobs[43] = TransferJobStatus( + req_id="done_req", + pending_count=1, + keys=set(), + is_store=True, + ) + assert not sched._req_status + + meta = sched.build_connector_meta(SchedulerOutput.make_empty()) + assert meta.jobs_to_flush == {42, 43} diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py index ef016bb02ebb..eb8bc6e7435d 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py @@ -779,6 +779,11 @@ def build_connector_meta( assert self._jobs[any_jid].is_store self._current_batch_jobs_to_flush.update(req_status.transfer_jobs) + # If no requests remain, flush all pending jobs - there won't be + # a future scheduler step to trigger their completion. + if not self._req_status: + self._current_batch_jobs_to_flush.update(self._jobs) + meta = OffloadingConnectorMetadata( load_jobs=self._current_batch_load_jobs, store_jobs=self._build_store_jobs(scheduler_output), From 5e0a191076921a931c015dd81608295c936ab01c Mon Sep 17 00:00:00 2001 From: Liran Schour Date: Thu, 14 May 2026 12:03:16 +0300 Subject: [PATCH 2/9] Fix: detect last step by Check all requests are finished Signed-off-by: Liran Schour --- .../offloading_connector/test_scheduler.py | 68 +++++++++++-------- .../kv_connector/v1/offloading/scheduler.py | 13 ++-- 2 files changed, 48 insertions(+), 33 deletions(-) diff --git a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py index dfb88dc43853..c2a2e1972da3 100644 --- a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py +++ b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py @@ -260,9 +260,12 @@ def test_concurrent_lookups_of_the_same_prefix(request_runner, async_scheduling: runner.manager.prepare_store.side_effect = lambda keys, req_context: ( generate_store_output(keys) ) + # With sync scheduling, all-finished flush fires within this run. + # With async scheduling, the finish is delayed so flush fires later. runner.run( decoded_tokens=[EOS_TOKEN_ID], expected_stored=(0, 1, 2), + expected_flushed=(0, 1, 2) if not async_scheduling else (), ) # start a request to load the first block, but don't complete @@ -327,6 +330,7 @@ def test_abort_loading_requests(request_runner, async_scheduling: bool): runner.run( decoded_tokens=[EOS_TOKEN_ID], expected_stored=(0, 1, 2), + expected_flushed=(0, 1, 2) if not async_scheduling else (), ) # start a request to load the first block, but don't complete @@ -768,7 +772,11 @@ def test_do_remote_decode_stores_all_blocks(request_runner, async_scheduling: bo runner.manager.prepare_store.side_effect = lambda keys, req_context: ( generate_store_output(keys) ) - runner.run(decoded_tokens=[EOS_TOKEN_ID], expected_stored=(0, 1, 2)) + runner.run( + decoded_tokens=[EOS_TOKEN_ID], + expected_stored=(0, 1, 2), + expected_flushed=(0, 1, 2) if not async_scheduling else (), + ) # Reset GPU prefix cache so the next request must load from CPU. runner.scheduler.reset_prefix_cache() @@ -833,8 +841,13 @@ def test_fence_at_update_state_after_alloc(request_runner): runner.manager.prepare_store.side_effect = lambda keys, req_context: ( generate_store_output(keys) ) - runner.run(decoded_tokens=[EOS_TOKEN_ID], complete_transfers=False) - assert runner.connector_scheduler._block_id_to_pending_jobs + runner.run( + decoded_tokens=[EOS_TOKEN_ID], + complete_transfers=False, + expected_stored=(0,), + expected_flushed=(0,), + ) + assert runner.connector_scheduler._block_id_to_pending_jobs == {} runner.scheduler.reset_prefix_cache() runner.new_request(token_ids=[0] * 4) @@ -845,8 +858,6 @@ def test_fence_at_update_state_after_alloc(request_runner): runner.run( decoded_tokens=[], complete_transfers=False, - expected_stored=(0,), - expected_flushed=(0,), ) assert runner.connector_scheduler._block_id_to_pending_jobs == {} @@ -866,8 +877,13 @@ def test_fence_at_build_store_jobs(request_runner): runner.manager.prepare_store.side_effect = lambda keys, req_context: ( generate_store_output(keys) ) - runner.run(decoded_tokens=[EOS_TOKEN_ID], complete_transfers=False) - assert runner.connector_scheduler._block_id_to_pending_jobs + runner.run( + decoded_tokens=[EOS_TOKEN_ID], + complete_transfers=False, + expected_stored=(0,), + expected_flushed=(0,), + ) + assert runner.connector_scheduler._block_id_to_pending_jobs == {} runner.scheduler.reset_prefix_cache() runner.new_request(token_ids=[1] * 4) @@ -877,8 +893,6 @@ def test_fence_at_build_store_jobs(request_runner): ) runner.run( decoded_tokens=[EOS_TOKEN_ID], - expected_stored=(0,), - expected_flushed=(0,), ) assert runner.connector_scheduler._block_id_to_pending_jobs == {} @@ -924,30 +938,26 @@ def test_complete_store_called_per_job(request_runner, async_scheduling: bool): def test_flush_all_jobs_when_no_requests_remain(request_runner): - """When _req_status is empty, build_connector_meta flushes all pending - jobs since there will be no future step to complete them.""" + """When all tracked requests are finished, build_connector_meta flushes + all pending jobs since there will be no future step to complete them.""" + block_size = 4 + block_size_factor = 1 + offloaded_block_size = block_size * block_size_factor + runner = request_runner( - block_size=4, + block_size=block_size, num_gpu_blocks=100, async_scheduling=False, - block_size_factor=1, + block_size_factor=block_size_factor, ) - sched = runner.connector_scheduler - # Simulate: requests already finished, but store jobs still pending - sched._jobs[42] = TransferJobStatus( - req_id="done_req", - pending_count=1, - keys=set(), - is_store=True, + runner.new_request(token_ids=[0] * offloaded_block_size) + runner.manager.prepare_store.side_effect = lambda keys, req_context: ( + generate_store_output(keys) ) - sched._jobs[43] = TransferJobStatus( - req_id="done_req", - pending_count=1, - keys=set(), - is_store=True, + runner.run( + decoded_tokens=[EOS_TOKEN_ID], + complete_transfers=False, + expected_stored=(0,), + expected_flushed=(0,), ) - assert not sched._req_status - - meta = sched.build_connector_meta(SchedulerOutput.make_empty()) - assert meta.jobs_to_flush == {42, 43} diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py index eb8bc6e7435d..8c5c79df33d2 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py @@ -779,10 +779,15 @@ def build_connector_meta( assert self._jobs[any_jid].is_store self._current_batch_jobs_to_flush.update(req_status.transfer_jobs) - # If no requests remain, flush all pending jobs - there won't be - # a future scheduler step to trigger their completion. - if not self._req_status: - self._current_batch_jobs_to_flush.update(self._jobs) + # If all tracked requests are finished, flush all pending store + # jobs - there won't be a future scheduler step to trigger their + # completion. + if self._req_status and all( + rs.req.is_finished() for rs in self._req_status.values() + ): + self._current_batch_jobs_to_flush.update( + jid for jid, js in self._jobs.items() if js.is_store + ) meta = OffloadingConnectorMetadata( load_jobs=self._current_batch_load_jobs, From c05898a562210d25fd48d001edfb38e403ab101e Mon Sep 17 00:00:00 2001 From: liranschour Date: Thu, 14 May 2026 13:53:09 +0300 Subject: [PATCH 3/9] Update vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py Co-authored-by: Or Ozeri Signed-off-by: liranschour --- .../kv_transfer/kv_connector/v1/offloading/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py index 8c5c79df33d2..dbd975f15ef7 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py @@ -780,7 +780,7 @@ def build_connector_meta( self._current_batch_jobs_to_flush.update(req_status.transfer_jobs) # If all tracked requests are finished, flush all pending store - # jobs - there won't be a future scheduler step to trigger their + # jobs - there might not be a future scheduler step to trigger their # completion. if self._req_status and all( rs.req.is_finished() for rs in self._req_status.values() From 01d7162da88fb7cc637150f4e8a9a9e57442211d Mon Sep 17 00:00:00 2001 From: liranschour Date: Thu, 14 May 2026 13:54:33 +0300 Subject: [PATCH 4/9] Update vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py Co-authored-by: Or Ozeri Signed-off-by: liranschour --- .../kv_transfer/kv_connector/v1/offloading/scheduler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py index dbd975f15ef7..87e9dbd2b5d7 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py @@ -785,9 +785,7 @@ def build_connector_meta( if self._req_status and all( rs.req.is_finished() for rs in self._req_status.values() ): - self._current_batch_jobs_to_flush.update( - jid for jid, js in self._jobs.items() if js.is_store - ) + self._current_batch_jobs_to_flush.update(self._jobs.keys()) meta = OffloadingConnectorMetadata( load_jobs=self._current_batch_load_jobs, From 80fa11bece65260ec0dc4ee26779acae7383a9c0 Mon Sep 17 00:00:00 2001 From: Liran Schour Date: Thu, 14 May 2026 15:49:14 +0300 Subject: [PATCH 5/9] Linter issues Signed-off-by: Liran Schour --- .../v1/kv_connector/unit/offloading_connector/test_scheduler.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py index c2a2e1972da3..cf84cb4bf887 100644 --- a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py +++ b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py @@ -14,9 +14,7 @@ from vllm.distributed.kv_events import BlockRemoved, BlockStored from vllm.distributed.kv_transfer.kv_connector.v1.offloading.scheduler import ( OffloadingConnectorScheduler, - TransferJobStatus, ) -from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.core.kv_cache_utils import BlockHash from vllm.v1.kv_cache_interface import ( FullAttentionSpec, From ec151883579bb18fd1056adf447b5d053159db14 Mon Sep 17 00:00:00 2001 From: Liran Schour Date: Sat, 16 May 2026 19:15:36 +0300 Subject: [PATCH 6/9] Flush only store jobs Signed-off-by: Liran Schour --- .../unit/offloading_connector/test_scheduler.py | 6 +++++- .../kv_transfer/kv_connector/v1/offloading/scheduler.py | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py index e0f03a0dcb96..018ccee63125 100644 --- a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py +++ b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py @@ -982,7 +982,11 @@ def test_reset_cache(request_runner, async_scheduling: bool): runner.manager.prepare_store.side_effect = lambda keys, req_context: ( generate_store_output(keys) ) - runner.run(decoded_tokens=[EOS_TOKEN_ID], expected_stored=(0, 1, 2)) + runner.run( + decoded_tokens=[EOS_TOKEN_ID], + expected_stored=(0, 1, 2), + expected_flushed=(0, 1, 2) if not async_scheduling else (), + ) # Reset GPU prefix cache then start a request that loads from CPU. # Leave the load in-flight so that reset_cache must flush it. diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py index 9114135b7391..9380ec06d803 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py @@ -788,7 +788,9 @@ def build_connector_meta( if self._req_status and all( rs.req.is_finished() for rs in self._req_status.values() ): - self._current_batch_jobs_to_flush.update(self._jobs.keys()) + self._current_batch_jobs_to_flush.update( + jid for jid, js in self._jobs.items() if js.is_store + ) meta = OffloadingConnectorMetadata( load_jobs=self._current_batch_load_jobs, From f2539b83e0dea4e23f04f3acd499f54c1074dc87 Mon Sep 17 00:00:00 2001 From: Liran Schour Date: Mon, 18 May 2026 11:08:34 +0300 Subject: [PATCH 7/9] Flush all jobs Signed-off-by: Liran Schour --- .../v1/kv_connector/unit/offloading_connector/utils.py | 9 +++++---- .../kv_connector/v1/offloading/scheduler.py | 10 ++++------ 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/tests/v1/kv_connector/unit/offloading_connector/utils.py b/tests/v1/kv_connector/unit/offloading_connector/utils.py index ec5ffef93505..910ca05b17e5 100644 --- a/tests/v1/kv_connector/unit/offloading_connector/utils.py +++ b/tests/v1/kv_connector/unit/offloading_connector/utils.py @@ -348,10 +348,11 @@ def new_request( def _parse_transfers(self): for transfer_spec in self.offloading_spec.get_flushed_transfers(): src_spec, dst_spec = transfer_spec - assert isinstance(src_spec, GPULoadStoreSpec) - - for block_id in src_spec.block_ids: - self.flushed_gpu_blocks.add(self.gpu_blocks[block_id.item()]) + if isinstance(src_spec, GPULoadStoreSpec): + # store flush + for block_id in src_spec.block_ids: + self.flushed_gpu_blocks.add(self.gpu_blocks[block_id.item()]) + # load flush: no block tracking needed block_size_factor = self.block_size_factor diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py index 9380ec06d803..6737221e7203 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py @@ -782,15 +782,13 @@ def build_connector_meta( assert self._jobs[any_jid].is_store self._current_batch_jobs_to_flush.update(req_status.transfer_jobs) - # If all tracked requests are finished, flush all pending store - # jobs - there might not be a future scheduler step to trigger their - # completion. + # If all tracked requests are finished, flush all pending jobs + # (both store and load) - there might not be a future scheduler + # step to trigger their completion. if self._req_status and all( rs.req.is_finished() for rs in self._req_status.values() ): - self._current_batch_jobs_to_flush.update( - jid for jid, js in self._jobs.items() if js.is_store - ) + self._current_batch_jobs_to_flush.update(self._jobs.keys()) meta = OffloadingConnectorMetadata( load_jobs=self._current_batch_load_jobs, From e90eecb4fd8e18f921e2125e41f6b4c786788b47 Mon Sep 17 00:00:00 2001 From: Liran Schour Date: Mon, 18 May 2026 11:44:38 +0300 Subject: [PATCH 8/9] Track also load jobs Signed-off-by: Liran Schour --- tests/v1/kv_connector/unit/offloading_connector/utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/v1/kv_connector/unit/offloading_connector/utils.py b/tests/v1/kv_connector/unit/offloading_connector/utils.py index 910ca05b17e5..bac8dbdf9464 100644 --- a/tests/v1/kv_connector/unit/offloading_connector/utils.py +++ b/tests/v1/kv_connector/unit/offloading_connector/utils.py @@ -352,7 +352,10 @@ def _parse_transfers(self): # store flush for block_id in src_spec.block_ids: self.flushed_gpu_blocks.add(self.gpu_blocks[block_id.item()]) - # load flush: no block tracking needed + else: + # load flush + for block_id in dst_spec.block_ids: + self.flushed_gpu_blocks.add(self.gpu_blocks[block_id.item()]) block_size_factor = self.block_size_factor From ea9c1bbac7d80d6b606d51efacaf60e3ad10d136 Mon Sep 17 00:00:00 2001 From: Liran Schour Date: Mon, 18 May 2026 13:04:58 +0300 Subject: [PATCH 9/9] Fix unit tests Signed-off-by: Liran Schour --- .../v1/kv_connector/unit/offloading_connector/test_scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py index 018ccee63125..e0c2ffe104bf 100644 --- a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py +++ b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py @@ -355,6 +355,7 @@ def test_abort_loading_requests(request_runner, async_scheduling: bool): runner.run( decoded_tokens=[], expected_loaded=(0, 1, 2), + expected_flushed=(0, 1, 2), ) # assert request is deleted