Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix] Fix async postprocessor in case of preemption #8267

Merged
merged 8 commits into from
Sep 8, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixes
alexm-redhat committed Sep 7, 2024
commit 3d113e4d44ec3ca0d51f2c9912e341b22934678c
94 changes: 43 additions & 51 deletions vllm/core/scheduler.py
Original file line number Diff line number Diff line change
@@ -537,13 +537,6 @@ def _schedule_running(
preempted: List[SequenceGroup] = ret.preempted
swapped_out: List[SequenceGroup] = ret.swapped_out

# NOTE(woosuk): Preemption happens only when there is no available slot
# to keep all the sequence groups in the RUNNING state.

# Store original running requests for the case of async + preemption
# if self.use_async_output_proc:
# orig_running = self.running.copy()

running_queue = self.running
assert len(self._async_stopped) == 0
while running_queue:
@@ -552,6 +545,7 @@ def _schedule_running(
seq_group, SequenceStatus.RUNNING, enable_chunking, budget)

if num_running_tokens == 0:
# No budget => Stop
break

running_queue.popleft()
@@ -564,19 +558,9 @@ def _schedule_running(
) > self.scheduler_config.max_model_len:
self._async_stopped.append(seq_group)
continue

# With async postprocessor, when preemption kicks in, we need
# first to drain the async postprocessor, so that all async
# block_table freeing is applied before the preemption freeing
# is applied.
# if self.use_async_output_proc and not self._can_append_slots(
# seq_group):
# tmp = self.running
# self.running = orig_running
# assert self.output_proc_callback is not None
# self.output_proc_callback()
# self.running = tmp


# NOTE(woosuk): Preemption happens only when there is no available slot
# to keep all the sequence groups in the RUNNING state.
while not self._can_append_slots(seq_group):
budget.subtract_num_batched_tokens(seq_group.request_id,
num_running_tokens)
@@ -588,35 +572,39 @@ def _schedule_running(
and seq_group.lora_int_id in curr_loras):
curr_loras.remove(seq_group.lora_int_id)

# Determine victim sequence
if running_queue:
# Preempt the lowest-priority sequence groups.
# Preempt the lowest-priority sequence group.
victim_seq_group = running_queue.pop()

if self.use_async_output_proc:
self.output_proc_callback(sync_request_id=victim_seq_group.request_id)
if victim_seq_group.is_finished():
continue

else:
# No other sequence group can be preempted.
# Preempt the current sequence group.
victim_seq_group = seq_group

# With async postprocessor, before preempting a sequence
# we need to ensure it has no pending async postprocessor
do_preempt = True
if self.use_async_output_proc:
self.output_proc_callback(
request_id=victim_seq_group.request_id)

# It may be that the async pending "victim_seq_group"
# becomes finished, in which case we simply free its
# memory
if victim_seq_group.is_finished():
self._free_finished_seq_group(victim_seq_group)
do_preempt = False

# Do preemption
if do_preempt:
preempted_mode = self._preempt(victim_seq_group,
blocks_to_swap_out)
if preempted_mode == PreemptionMode.RECOMPUTE:
preempted.append(victim_seq_group)
else:
swapped_out.append(victim_seq_group)
else:
# No other sequence groups can be preempted.
# Preempt the current sequence group.
if self.use_async_output_proc:
self.output_proc_callback(sync_request_id=victim_seq_group.request_id)
if victim_seq_group.is_finished():
continue

preempted_mode = self._preempt(seq_group,
blocks_to_swap_out)
if preempted_mode == PreemptionMode.RECOMPUTE:
preempted.append(seq_group)
else:
swapped_out.append(seq_group)
if not running_queue:
break
else:
self._append_slots(seq_group, blocks_to_copy)
@@ -1275,22 +1263,26 @@ def _free_finished_seqs(self, seq_group: SequenceGroup) -> None:
if seq.is_finished():
self.free_seq(seq)

def _free_finished_seq_group(self, seq_group: SequenceGroup) -> None:
if seq_group.is_finished():
# Free cross-attention block table, if it exists
self._free_seq_group_cross_attn_blocks(seq_group)

# Add the finished requests to the finished requests list.
# This list will be used to update the Mamba cache in the
# next step.
self._finished_requests_ids.append(seq_group.request_id)

# Free finished seqs
self._free_finished_seqs(seq_group)

def free_finished_seq_groups(self) -> None:
remaining: Deque[SequenceGroup] = deque()
for seq_group in self.running:
if seq_group.is_finished():
# Free cross-attention block table, if it exists
self._free_seq_group_cross_attn_blocks(seq_group)
# Add the finished requests to the finished requests list.
# This list will be used to update the Mamba cache in the
# next step.
self._finished_requests_ids.append(seq_group.request_id)
else:
self._free_finished_seq_group(seq_group)
if not seq_group.is_finished():
remaining.append(seq_group)

# Free finished seqs
self._free_finished_seqs(seq_group)

self.running = remaining

# Handle async stopped sequence groups