Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions tests/v1/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3349,3 +3349,28 @@ def test_ec_connector_allocate_encoder_tokens_with_external_load(use_kv_connecto
# ==============================================================================
# EPD (Encoder-Prefill-Decode) Encoder-cache-specific tests end
# ==============================================================================


def test_prepend_skipped_requests_order():
scheduler = create_scheduler(max_num_seqs=1, use_kv_connector=True)
requests = create_requests(num_requests=4)
for request in requests:
scheduler.add_request(request)

# 4 requests waiting, capture their order
expected_waiting_reqs = list(scheduler.waiting)

# simulate first 2 waiting requests are waiting for remote KVs
for req in expected_waiting_reqs[:2]:
req.status = RequestStatus.WAITING_FOR_REMOTE_KVS

# schedule step
# expect the first 2 waiting to be skipped, the third running,
# and the fourth waiting
scheduler.schedule()

# pop the third request which is expected to be running
expected_waiting_reqs.pop(2)

# verify waiting order is preserved
assert list(scheduler.waiting) == expected_waiting_reqs
21 changes: 6 additions & 15 deletions vllm/v1/core/sched/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ def __iter__(self) -> Iterator[Request]:
"""Iterate over the queue according to the policy."""
pass

@abstractmethod
def __reversed__(self) -> Iterator[Request]:
"""Iterate over the queue in reverse order."""
pass


class FCFSRequestQueue(deque[Request], RequestQueue):
"""A first-come-first-served queue that supports deque operations."""
Expand All @@ -100,8 +95,12 @@ def prepend_request(self, request: Request) -> None:

def prepend_requests(self, requests: RequestQueue) -> None:
"""Prepend all requests from another queue to the front of this
queue."""
self.extendleft(reversed(requests))
queue.

Note: The requests will be prepended in reverse order of their
appearance in the `requests` queue.
"""
self.extendleft(requests)

def remove_request(self, request: Request) -> None:
"""Remove a specific request from the queue."""
Expand All @@ -128,10 +127,6 @@ def __iter__(self) -> Iterator[Request]:
"""Iterate over the queue according to FCFS policy."""
return super().__iter__()

def __reversed__(self) -> Iterator[Request]:
"""Iterate over the queue in reverse order."""
return super().__reversed__()


class PriorityRequestQueue(RequestQueue):
"""
Expand Down Expand Up @@ -202,10 +197,6 @@ def __iter__(self) -> Iterator[Request]:
while heap_copy:
yield heapq.heappop(heap_copy)

def __reversed__(self) -> Iterator[Request]:
"""Iterate over the queue in reverse priority order."""
return reversed(list(self))


def create_request_queue(policy: SchedulingPolicy) -> RequestQueue:
"""Create request queue based on scheduling policy."""
Expand Down