From fdd52d5308c98c5b9b3eedd89038ec50223875fa Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Tue, 8 Jul 2025 05:51:47 +0000 Subject: [PATCH 1/3] [Core] feat: Add aging factor support to priority request queue for fairer scheduling Signed-off-by: chaunceyjiang --- vllm/v1/core/sched/request_queue.py | 46 ++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/vllm/v1/core/sched/request_queue.py b/vllm/v1/core/sched/request_queue.py index fc2bc30b9a5f..f919b0709964 100644 --- a/vllm/v1/core/sched/request_queue.py +++ b/vllm/v1/core/sched/request_queue.py @@ -4,6 +4,7 @@ from __future__ import annotations import heapq +import time from abc import ABC, abstractmethod from collections import deque from collections.abc import Iterable, Iterator @@ -136,6 +137,25 @@ def __reversed__(self) -> Iterator[Request]: return super().__reversed__() +class PrioritizedItem: + + def __init__(self, request: Request, aging_factor: float = 0.1): + self.request = request + self.aging_factor = aging_factor + self.insert_time = request.arrival_time + + def __lt__(self, other: PrioritizedItem) -> bool: + now = time.time() + eff_self = self.request.priority - self.aging_factor * ( + now - self.insert_time) + eff_other = other.request.priority - other.aging_factor * ( + now - other.insert_time) + + if eff_self != eff_other: + return eff_self < eff_other + return self.insert_time < other.insert_time + + class PriorityRequestQueue(RequestQueue): """ A priority queue that supports heap operations. @@ -143,29 +163,31 @@ class PriorityRequestQueue(RequestQueue): Requests with a smaller value of `priority` are processed first. If multiple requests have the same priority, the one with the earlier `arrival_time` is processed first. + Requests are aged over time based on the `aging_factor`, which + reduces their effective priority as time passes. """ - def __init__(self) -> None: - self._heap: list[tuple[int, float, Request]] = [] + def __init__(self, aging_factor: float = 0.1) -> None: + self._heap: list[PrioritizedItem] = [] + self.aging_factor = aging_factor def add_request(self, request: Request) -> None: """Add a request to the queue according to priority policy.""" - heapq.heappush(self._heap, - (request.priority, request.arrival_time, request)) + item = PrioritizedItem(request, self.aging_factor) + heapq.heappush(self._heap, item) def pop_request(self) -> Request: """Pop a request from the queue according to priority policy.""" if not self._heap: raise IndexError("pop from empty heap") - _, _, request = heapq.heappop(self._heap) + request = heapq.heappop(self._heap).request return request def peek_request(self) -> Request: """Peek at the next request in the queue without removing it.""" if not self._heap: raise IndexError("peek from empty heap") - _, _, request = self._heap[0] - return request + return self._heap[0].request def prepend_request(self, request: Request) -> None: """Add a request to the queue according to priority policy. @@ -184,14 +206,16 @@ def prepend_requests(self, requests: RequestQueue) -> None: def remove_request(self, request: Request) -> None: """Remove a specific request from the queue.""" - self._heap = [(p, t, r) for p, t, r in self._heap if r != request] + self._heap = [item for item in self._heap if item.request != request] heapq.heapify(self._heap) def remove_requests(self, requests: Iterable[Request]) -> None: """Remove multiple specific requests from the queue.""" requests_to_remove = set(requests) - self._heap = [(p, t, r) for p, t, r in self._heap - if r not in requests_to_remove] + self._heap = [ + item for item in self._heap + if item.request not in requests_to_remove + ] heapq.heapify(self._heap) def __bool__(self) -> bool: @@ -206,7 +230,7 @@ def __iter__(self) -> Iterator[Request]: """Iterate over the queue according to priority policy.""" heap_copy = self._heap[:] while heap_copy: - _, _, request = heapq.heappop(heap_copy) + request = heapq.heappop(heap_copy).request yield request def __reversed__(self) -> Iterator[Request]: From f6a02961ef467533fe5011015a5beac6979983da Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Tue, 8 Jul 2025 07:17:05 +0000 Subject: [PATCH 2/3] [Core] feat: Add aging factor support to priority request queue for fairer scheduling Signed-off-by: chaunceyjiang --- tests/v1/core/test_scheduler.py | 70 +++++++++++++++++++++++++++++ vllm/v1/core/sched/request_queue.py | 21 ++++++--- 2 files changed, 84 insertions(+), 7 deletions(-) diff --git a/tests/v1/core/test_scheduler.py b/tests/v1/core/test_scheduler.py index 02d2c83ab158..f6c95d0bee56 100644 --- a/tests/v1/core/test_scheduler.py +++ b/tests/v1/core/test_scheduler.py @@ -1862,6 +1862,76 @@ def test_priority_scheduling_heap_property(): assert scheduled_priorities == expected_priorities +@pytest.mark.parametrize( + "arrival_times_spaced, trigger_aging", + [ + (0, False), # No spacing, all requests arrive at time 0 + (1000, True) # Large spacing to trigger aging + ]) +def test_priority_scheduling_with_aging(arrival_times_spaced: int, + trigger_aging: bool): + """Test that the waiting queue maintains heap + property for priority scheduling with aging.""" + scheduler = create_scheduler_with_priority( + max_num_seqs=1, # Only one request can run at a time + ) + + # Add requests in random priority order + priorities = [5, 1, 0, 0, 5, 0, 0, 0] + # arrival times are spaced out to simulate aging + # (requests with higher priority will age faster) + # and should be scheduled first + arrival_times = [ + float(i) * arrival_times_spaced for i in range(len(priorities)) + ] + requests = create_requests_with_priority(num_requests=len(priorities), + priorities=priorities, + arrival_times=arrival_times, + num_tokens=10) + + # Add all requests + for request in requests: + scheduler.add_request(request) + + # Schedule one request at a time and verify priority order + scheduled_priorities = [] + + while scheduler.waiting: + output = scheduler.schedule() + if output.scheduled_new_reqs: + req = output.scheduled_new_reqs[0] + scheduled_priorities.append(requests[int(req.req_id)].priority) + + # Simulate completion to make room for next request + model_output = ModelRunnerOutput( + req_ids=[req.req_id], + req_id_to_index={req.req_id: 0}, + sampled_token_ids=[[100]], + spec_token_ids=None, + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[], + ) + scheduler.update_from_output(output, model_output) + + # Finish the request to make room for the next one + scheduler.finish_requests(req.req_id, + RequestStatus.FINISHED_STOPPED) + if trigger_aging: + # If aging is enabled, the requests with priority 5 should be + # scheduled first, followed by priority 1, and then the requests with + # priority 0 should be scheduled last. + # This is because the requests with priority 0 wait sorter than + # the requests with priority 5 and 1, due to aging. + # The expected order is: + expected_priorities = [5, 1, 0, 0, 5, 0, 0, 0] + else: + # If aging is not enabled, the requests with priority 0 should be + # scheduled last since they have the lowest priority. + expected_priorities = [0, 0, 0, 0, 0, 1, 5, 5] + assert scheduled_priorities == expected_priorities + + def test_schedule_skip_tokenizer_init(): scheduler = create_scheduler(skip_tokenizer_init=True) requests = create_requests(num_requests=5) diff --git a/vllm/v1/core/sched/request_queue.py b/vllm/v1/core/sched/request_queue.py index f919b0709964..4276dc2cdc9f 100644 --- a/vllm/v1/core/sched/request_queue.py +++ b/vllm/v1/core/sched/request_queue.py @@ -144,15 +144,22 @@ def __init__(self, request: Request, aging_factor: float = 0.1): self.aging_factor = aging_factor self.insert_time = request.arrival_time - def __lt__(self, other: PrioritizedItem) -> bool: + @property + def priority(self) -> float: + """Calculate the effective priority of the request, factoring in aging. + The effective priority decreases over time based on the aging factor. + """ + # Aging is based on the time since the request was inserted + # into the queue and the aging factor. + if self.aging_factor <= 0: + return self.request.priority now = time.time() - eff_self = self.request.priority - self.aging_factor * ( - now - self.insert_time) - eff_other = other.request.priority - other.aging_factor * ( - now - other.insert_time) + return self.request.priority - self.aging_factor * (now - + self.insert_time) - if eff_self != eff_other: - return eff_self < eff_other + def __lt__(self, other: PrioritizedItem) -> bool: + if self.priority != other.priority: + return self.priority < other.priority return self.insert_time < other.insert_time From 52a9b6f1f5afcd376ce207a9466d93e47c4015a9 Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Tue, 8 Jul 2025 07:26:50 +0000 Subject: [PATCH 3/3] [Core] feat: Add aging factor support to priority request queue for fairer scheduling Signed-off-by: chaunceyjiang --- vllm/v1/core/sched/request_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/core/sched/request_queue.py b/vllm/v1/core/sched/request_queue.py index 4276dc2cdc9f..8e0cfcbc9624 100644 --- a/vllm/v1/core/sched/request_queue.py +++ b/vllm/v1/core/sched/request_queue.py @@ -149,7 +149,7 @@ def priority(self) -> float: """Calculate the effective priority of the request, factoring in aging. The effective priority decreases over time based on the aging factor. """ - # Aging is based on the time since the request was inserted + # Aging is based on the time since the request was inserted # into the queue and the aging factor. if self.aging_factor <= 0: return self.request.priority