Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions tests/v1/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1862,6 +1862,76 @@ def test_priority_scheduling_heap_property():
assert scheduled_priorities == expected_priorities


@pytest.mark.parametrize(
"arrival_times_spaced, trigger_aging",
[
(0, False), # No spacing, all requests arrive at time 0
(1000, True) # Large spacing to trigger aging
])
def test_priority_scheduling_with_aging(arrival_times_spaced: int,
trigger_aging: bool):
"""Test that the waiting queue maintains heap
property for priority scheduling with aging."""
scheduler = create_scheduler_with_priority(
max_num_seqs=1, # Only one request can run at a time
)

# Add requests in random priority order
priorities = [5, 1, 0, 0, 5, 0, 0, 0]
# arrival times are spaced out to simulate aging
# (requests with higher priority will age faster)
# and should be scheduled first
arrival_times = [
float(i) * arrival_times_spaced for i in range(len(priorities))
]
requests = create_requests_with_priority(num_requests=len(priorities),
priorities=priorities,
arrival_times=arrival_times,
num_tokens=10)

# Add all requests
for request in requests:
scheduler.add_request(request)

# Schedule one request at a time and verify priority order
scheduled_priorities = []

while scheduler.waiting:
output = scheduler.schedule()
if output.scheduled_new_reqs:
req = output.scheduled_new_reqs[0]
scheduled_priorities.append(requests[int(req.req_id)].priority)

# Simulate completion to make room for next request
model_output = ModelRunnerOutput(
req_ids=[req.req_id],
req_id_to_index={req.req_id: 0},
sampled_token_ids=[[100]],
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
)
scheduler.update_from_output(output, model_output)

# Finish the request to make room for the next one
scheduler.finish_requests(req.req_id,
RequestStatus.FINISHED_STOPPED)
if trigger_aging:
# If aging is enabled, the requests with priority 5 should be
# scheduled first, followed by priority 1, and then the requests with
# priority 0 should be scheduled last.
# This is because the requests with priority 0 wait sorter than
# the requests with priority 5 and 1, due to aging.
# The expected order is:
expected_priorities = [5, 1, 0, 0, 5, 0, 0, 0]
else:
# If aging is not enabled, the requests with priority 0 should be
# scheduled last since they have the lowest priority.
expected_priorities = [0, 0, 0, 0, 0, 1, 5, 5]
assert scheduled_priorities == expected_priorities


def test_schedule_skip_tokenizer_init():
scheduler = create_scheduler(skip_tokenizer_init=True)
requests = create_requests(num_requests=5)
Expand Down
53 changes: 42 additions & 11 deletions vllm/v1/core/sched/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import heapq
import time
from abc import ABC, abstractmethod
from collections import deque
from collections.abc import Iterable, Iterator
Expand Down Expand Up @@ -136,36 +137,64 @@ def __reversed__(self) -> Iterator[Request]:
return super().__reversed__()


class PrioritizedItem:

def __init__(self, request: Request, aging_factor: float = 0.1):
self.request = request
self.aging_factor = aging_factor
self.insert_time = request.arrival_time

@property
def priority(self) -> float:
"""Calculate the effective priority of the request, factoring in aging.
The effective priority decreases over time based on the aging factor.
"""
# Aging is based on the time since the request was inserted
# into the queue and the aging factor.
if self.aging_factor <= 0:
return self.request.priority
now = time.time()
return self.request.priority - self.aging_factor * (now -
self.insert_time)

def __lt__(self, other: PrioritizedItem) -> bool:
if self.priority != other.priority:
return self.priority < other.priority
return self.insert_time < other.insert_time


class PriorityRequestQueue(RequestQueue):
"""
A priority queue that supports heap operations.

Requests with a smaller value of `priority` are processed first.
If multiple requests have the same priority, the one with the earlier
`arrival_time` is processed first.
Requests are aged over time based on the `aging_factor`, which
reduces their effective priority as time passes.
"""

def __init__(self) -> None:
self._heap: list[tuple[int, float, Request]] = []
def __init__(self, aging_factor: float = 0.1) -> None:
self._heap: list[PrioritizedItem] = []
self.aging_factor = aging_factor

def add_request(self, request: Request) -> None:
"""Add a request to the queue according to priority policy."""
heapq.heappush(self._heap,
(request.priority, request.arrival_time, request))
item = PrioritizedItem(request, self.aging_factor)
heapq.heappush(self._heap, item)

def pop_request(self) -> Request:
"""Pop a request from the queue according to priority policy."""
if not self._heap:
raise IndexError("pop from empty heap")
_, _, request = heapq.heappop(self._heap)
request = heapq.heappop(self._heap).request
return request

def peek_request(self) -> Request:
"""Peek at the next request in the queue without removing it."""
if not self._heap:
raise IndexError("peek from empty heap")
_, _, request = self._heap[0]
return request
return self._heap[0].request

def prepend_request(self, request: Request) -> None:
"""Add a request to the queue according to priority policy.
Expand All @@ -184,14 +213,16 @@ def prepend_requests(self, requests: RequestQueue) -> None:

def remove_request(self, request: Request) -> None:
"""Remove a specific request from the queue."""
self._heap = [(p, t, r) for p, t, r in self._heap if r != request]
self._heap = [item for item in self._heap if item.request != request]
heapq.heapify(self._heap)

def remove_requests(self, requests: Iterable[Request]) -> None:
"""Remove multiple specific requests from the queue."""
requests_to_remove = set(requests)
self._heap = [(p, t, r) for p, t, r in self._heap
if r not in requests_to_remove]
self._heap = [
item for item in self._heap
if item.request not in requests_to_remove
]
heapq.heapify(self._heap)

def __bool__(self) -> bool:
Expand All @@ -206,7 +237,7 @@ def __iter__(self) -> Iterator[Request]:
"""Iterate over the queue according to priority policy."""
heap_copy = self._heap[:]
while heap_copy:
_, _, request = heapq.heappop(heap_copy)
request = heapq.heappop(heap_copy).request
yield request

def __reversed__(self) -> Iterator[Request]:
Expand Down