diff --git a/tests/unit_tests/kv_offload/offloading_connector/test_scheduler.py b/tests/unit_tests/kv_offload/offloading_connector/test_scheduler.py index e899906e87..3b75ce2394 100644 --- a/tests/unit_tests/kv_offload/offloading_connector/test_scheduler.py +++ b/tests/unit_tests/kv_offload/offloading_connector/test_scheduler.py @@ -210,9 +210,12 @@ def test_concurrent_lookups_of_the_same_prefix(request_runner, async_scheduling: # store 1 blocks runner.new_request(token_ids=[0] * offloaded_block_size) runner.manager.prepare_store.side_effect = (lambda block_hashes, req_context: generate_store_output(block_hashes)) + # With sync scheduling, all-finished flush fires within this run. + # With async scheduling, the finish is delayed so flush fires later. runner.run( decoded_tokens=[EOS_TOKEN_ID], expected_stored_gpu_block_indexes=(0, 1, 2), + expected_flushed_gpu_block_indexes=(0, 1, 2) if not async_scheduling else (), ) # start a request to load the first block, but don't complete @@ -249,6 +252,9 @@ def test_concurrent_lookups_of_the_same_prefix(request_runner, async_scheduling: # second request will use the GPU prefix cache assert transfer_jobs == list(runner.offloading_spec.handler.transfer_specs) + # Fence index drained: stores completed before request_finished ran. + assert runner.connector_scheduler._block_id_to_pending_jobs == {} + @pytest.mark.parametrize("async_scheduling", [True, False]) def test_abort_loading_requests(request_runner, async_scheduling: bool): @@ -269,6 +275,7 @@ def test_abort_loading_requests(request_runner, async_scheduling: bool): runner.run( decoded_tokens=[EOS_TOKEN_ID], expected_stored_gpu_block_indexes=(0, 1, 2), + expected_flushed_gpu_block_indexes=(0, 1, 2) if not async_scheduling else (), ) # start a request to load the first block, but don't complete @@ -295,6 +302,7 @@ def test_abort_loading_requests(request_runner, async_scheduling: bool): runner.run( decoded_tokens=[], expected_loaded_gpu_block_indexes=(0, 1, 2), + expected_flushed_gpu_block_indexes=(0, 1, 2), ) # assert request is deleted diff --git a/tests/unit_tests/kv_offload/offloading_connector/utils.py b/tests/unit_tests/kv_offload/offloading_connector/utils.py index aab2a5c64f..91c17b9912 100644 --- a/tests/unit_tests/kv_offload/offloading_connector/utils.py +++ b/tests/unit_tests/kv_offload/offloading_connector/utils.py @@ -286,10 +286,14 @@ def new_request( def _parse_transfers(self): for transfer_spec in self.offloading_spec.get_flushed_transfers(): src_spec, dst_spec = transfer_spec - assert isinstance(src_spec, GPULoadStoreSpec) - - for block_id in src_spec.block_ids: - self.flushed_gpu_block_indexes.add(self.gpu_block_index[block_id.item()]) + if isinstance(src_spec, GPULoadStoreSpec): + # store flush + for block_id in src_spec.block_ids: + self.flushed_gpu_block_indexes.add(self.gpu_block_index[block_id.item()]) + else: + # load flush + for block_id in dst_spec.block_ids: + self.flushed_gpu_block_indexes.add(self.gpu_block_index[block_id.item()]) block_size_factor = self.offloaded_block_size // self.gpu_block_size